from collections import defaultdict import io import re import os import platform from subprocess import check_output, Popen, STDOUT, CalledProcessError, call import time import shlex import psutil import logging import datetime from datetime import timedelta import zipfile import functools import sh import shutil import stat import decorator import tempfile import stat import pwd import grp from . import constants log = logging.getLogger(constants.LOGGER_NAME) CONNECTIVITY_LINE_REGEX = re.compile(r'^Testing connection with (.+) \.\.\. \[(.+)\]$') MDATP_KEY = re.compile("-F key=mdatp") SUPPORTED_GEOS = ["EUS", "CUS", "UKS", "WEU", "NEU", "CAN", "CH", "EAU", "SAU"] CONFLICTING_AGENTS = { "mcafee": { "masvc", "macmnsvc", "cmdagent","macompatsvc", "MFEcma" }, "carbon black": {" cbagentd", "cbdaemon", "cbosxsensorservice", "cbsensor" }, "symantec": { "rtvscand" }, "sophos": { "savconfig", "savdctl", "savdstatus", "savlog", "savscan", "savsetup", "savupdate", "savscand" }, "panda": { "pcop" }, "trendmicro" : { "ds_agent" }, "crowdstrike" : { "falcon-sensor" } } CONFLICTING_BINARIES = { "/usr/bin/adclient": "adclient", "/usr/local/sbin/haproxy": "haproxy" } DEBUG_MODE = 'DEBUG' in os.environ.keys() and os.environ['DEBUG'] in [1, "1", "True", "true", True] SKIP_KNOWN_ISSUES = 'SKIP_KNOWN_ISSUES' in os.environ.keys() and os.environ['SKIP_KNOWN_ISSUES'] in [1, "1", "True", "true", True] # -- Paths ---- ONBOARDING_PACKAGE = 'WindowsDefenderATPOnboardingPackage' OFFBOARDING_PACKAGE = f'WindowsDefenderATPOffboardingPackage_valid_until_{(datetime.datetime.now() + timedelta(days=30)).strftime("%Y-%m-%d")}' DIY_PACKAGE_MACOS = 'MDATP MacOS DIY.zip' DIY_PACKAGE_LINUX = 'MDE Linux DIY.zip' LINUX_DIY_SCRIPT = 'mde_linux_edr_diy.sh' ONBOARDING_SCRIPTS = ['WindowsDefenderATPOnboarding.py', # legacy name 'MicrosoftDefenderATPOnboardingMacOs.py', # macOS 'MicrosoftDefenderATPOnboardingLinuxServer.py'] # Linux COLLECTION_DIR = 'mdatp' APP_COMPAT_JSON = 'results_for_pytest.json' # -- Strings GLOBAL_CAPPING_MSG = "Global capping status changed, patternSequence:{pattern_sequence}, isLimitReached:{limit_reached}" # -- URLs ---- DIY_URL_MACOS = "https://aka.ms/mdatpmacosdiy" DIY_URL_LINUX = "https://aka.ms/linuxdiy" EICAR_TEST_URL = "https://www.eicar.org/download/eicar.com.txt" def modify_ld_path(): env = dict(os.environ) if constants.IS_COMPILED_AS_BINARY and platform.system() == 'Linux': lp_key = 'LD_LIBRARY_PATH' lp_orig = env.get(lp_key + '_ORIG') if lp_orig is not None: env[lp_key] = lp_orig else: env.pop(lp_key, None) return env new_env = modify_ld_path() # Define the common sh options common_sh_options = { '_encoding': 'utf-8', '_decode_errors': 'ignore', '_err_to_out': True, '_env': new_env, '_timeout': 120 } def collect_command_output_to_file(base_dir, prefix, filename, command): output = io.StringIO() try: cmd = sh.Command(command) cmd(**common_sh_options, _out=output) except: output.write(f"Could not run command: {command}") file_path = os.path.join(base_dir, f'{prefix}.txt') with open(file_path, 'w') as writer: writer.write(output.getvalue()) return {filename: file_path} # Define a wrapper function to run sh commands with common options def run_sh_command(command, *args, **kwargs): # Merge common options with the specific options for this command all_options = {**common_sh_options, **kwargs} try: # print(f"command: {command}") # print(f"args: {args}") # print(f"all_options: {all_options}") return command(*args, **all_options) except sh.CommandNotFound as e: log.warning(f"Command '{command}' not found.") if all_options['_out']: all_options['_out'].write(f"[ERROR]: Command '{command}' not found.") except sh.TimeoutException as e: log.warning(f"Command '{command} {args}' timed out after {all_options['_timeout']} seconds.") if all_options['_out']: all_options['_out'].write(f"[ERROR]: Command '{command} {args}' timed out after {all_options['_timeout']} seconds.") except Exception as e: log.warning(f'Could not run command {command} exception: {e}') if all_options['_out']: all_options['_out'].write(f'[ERROR]: Could not run command {command} exception: {e}') return None def collect_file_to_file(base_dir, prefix, src_file, dst_file): file_extension = os.path.splitext(dst_file)[1] temp_path = os.path.join(base_dir, f'{prefix}.{file_extension}') with open(temp_path, 'w') as writer: try: if os.path.exists(src_file): sh.cp(src_file, temp_path, _env=new_env) else: writer.write(f'{src_file} does not exist.') except: writer.write(f"Could not run get {src_file}") return {dst_file: temp_path} def remove_old_files(folder_path, files_to_keep): # Get list of files in the folder with their creation times try: files = [(os.path.join(folder_path, f), os.path.getctime(os.path.join(folder_path, f))) for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))] # Sort files by creation time, newest first files.sort(key=lambda x: x[1], reverse=True) # Keep only the specified number of newest files files_to_keep = files[:files_to_keep] if len(files) > files_to_keep: log.info(f"remove old files in: {folder_path} and keep last {files_to_keep} files.") # Iterate over files in the folder for file_path, creation_time in files: # Check if the file should be deleted if (file_path, creation_time) not in files_to_keep: os.remove(file_path) except: return None def run(cmd, validate_exit_code=False, timeout=200, verbose=True): # pylint: disable=unused-argument start_time = time.time() log.debug(f'running [{cmd}]') if not validate_exit_code: return 0 == call(cmd, shell=True, env=modify_ld_path()) while start_time + timeout > time.time() and validate_exit_code: exit_code = call(cmd, shell=True, env=modify_ld_path()) if exit_code == 0: return exit_code == 0 log.info(f"run command: '{cmd}' finished with exit_code > 0") log.debug(f"run command: '{cmd}' finished with exit_code > 0, exit_code: {exit_code}, time elapsed: {time.time()- start_time} seconds") # check_output does not support pipe out of the box MAX_OUTPUT_LEN = 256 def run_with_output(cmd: str, timeout_in_sec: int = 5, return_stdout_on_err=False, verbose=True): log.debug(f'running_with_output [{cmd}]') try: bytes_output = check_output(shlex.split(cmd), stderr=STDOUT, timeout=timeout_in_sec, env=modify_ld_path()) output = bytes_output.decode('utf8', 'ignore').strip() log.debug(f"output [{output[:MAX_OUTPUT_LEN]}{'...' if len(output)>MAX_OUTPUT_LEN else ''}]") return output # Incase the exit code of the called process is non zero except CalledProcessError as non_zero_error: output = non_zero_error.output.decode('utf8', 'ignore').strip() if non_zero_error.output else "" stderr = non_zero_error.stderr.decode('utf8', 'ignore').strip() if non_zero_error.stderr else "" if verbose: log.warning(f"Executing failed with return code: {non_zero_error.returncode}") log.warning(f"output [{output[:MAX_OUTPUT_LEN]}{'...' if len(output)>MAX_OUTPUT_LEN else ''}]") log.warning(f"stderr [{stderr[:MAX_OUTPUT_LEN]}{'...' if len(stderr)>MAX_OUTPUT_LEN else ''}]") return output if return_stdout_on_err else None except Exception as e: log.error(f'run failed {e}') return None def run_and_get_pid(cmd): proc = Popen(cmd.split(' '), env=modify_ld_path()) proc.terminate() return proc.pid def convert_to_timestamp(time_str): return time.mktime(time.strptime(time_str.split('.')[0], '%Y-%m-%dT%H:%M:%S')) def wait(time_sec, reason, verbose=True): if verbose: log.info(f"[SLEEP] [{time_sec}sec] " + reason) time.sleep(time_sec) def trace(msg): log.debug(msg) def error(msg): log.error(msg) return False def print_title_and_measure_time(f): print(f"decorating {f.__name__}") def wrapper(f,*args,**kwargs): log.info(f"-- {f.__name__} START --") start_time = time.time() try: f(*args, **kwargs) elapsed_time = time.time() - start_time log.info(f"-- {f.__name__} PASSED [{elapsed_time:.2f}sec] --") except Exception as e: elapsed_time = time.time() - start_time log.error(f"-- {f.__name__} FAILED [{elapsed_time:.2f}sec] --") top_n = top_cpu_consumers() if top_n is None: top_n = "Could not retrieve top cpu consumers" log.info(f"top cpu consumers:\n{top_n}") raise e wrapper.__name__ = f.__name__ return decorator.decorator(wrapper, f) def skip_known_issues(): return SKIP_KNOWN_ISSUES def parse_connectivity_test(connectivity_result: str): results = defaultdict(list) for line in connectivity_result.split('\n'): regex_extraction = CONNECTIVITY_LINE_REGEX.findall(line) if regex_extraction: host, status = regex_extraction[0] status = status == 'OK' # EDR C&C if 'winatp' in host: results['edr_cnc'].append(status) # EDR Cyber elif 'events' in host: results['edr_cyber'].append(status) # AV else: results['av'].append(status) return results def retrieve_event_id_for_connectivity_results(status_array, good_id, warn_id, error_id): os_prefix = '2' if platform.system() == 'Darwin' else '3' if all(status_array): return f'{os_prefix}{good_id}' elif any(status_array): return f'{os_prefix}{warn_id}' else: return f'{os_prefix}{error_id}' def retrieve_event_id_for_processes(status_array, good_id, error_id): os_prefix = '2' if platform.system() == 'Darwin' else '3' if all(status_array): return f'{os_prefix}{good_id}' else: return f'{os_prefix}{error_id}' # This functions receives process counter and returns string represting this process status def translate_process_counter_to_string(process_count): if process_count == 0: return 'Down' elif process_count == 1: return 'Running' else: return 'Error' def top_cpu_consumers(n=5): try: # Run the ps command ps_output = sh.ps("axo", "pid,pcpu,pmem,comm") except sh.ErrorReturnCode as e: log.error(f"top_cpu_consumers, Failed to run ps command: {e}") return None try: # Sort the output sort_output = sh.sort(ps_output, "-nrk", "2,3") except sh.ErrorReturnCode as e: log.error(f"top_cpu_consumers, Failed to sort ps output: {e}") return None try: # Get the top n lines head_output = sh.head(sort_output, "-n", str(n)) return head_output except sh.ErrorReturnCode as e: log.error(f"top_cpu_consumers, Failed to get top {n} lines: {e}") return None return None def collect_mde_conflicts(): conflicting_agents = [] conflicting_orgs = set() running_processes = dict() for proc in psutil.process_iter(['pid', 'name']): try: running_processes[proc.info['name']] = proc.info except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass for org, agents in CONFLICTING_AGENTS.items(): for agent in agents: if agent in running_processes: conflicting_agent_data = running_processes[agent] log.info(f"org: {org}, agent: {agent} is running: {conflicting_agent_data}") conflicting_agents.append(f"{org} - {agent}:\n{conflicting_agent_data}") conflicting_orgs.add(org) if len(conflicting_agents) > 0: conflicting_agents_string = 'conflicting security tools' conflicting_agents = '\n'.join([conflicting_agents_string, '=' * len(conflicting_agents_string) , '\n\n'.join(conflicting_agents)]) return conflicting_agents, conflicting_orgs def confilicting_orgs(): try: _, conflicting_orgs = collect_mde_conflicts() return ", ".join(org for org in conflicting_orgs) except Exception as e: log.exception(f"Conflicting_orgs raised an exception {e}") return [] def collect_conflicting_binaries(audit_rules): conflicting_binaries = set() for binary in CONFLICTING_BINARIES.keys(): if os.path.exists(binary): conflicting_binaries.add(binary) return ', '.join(binary for binary in conflicting_binaries if not is_process_excluded(CONFLICTING_BINARIES[binary], audit_rules)) def is_process_excluded(process, audit_rules): exclusion_rules_by_name = f"-a (?:exit,never|never,exit).* -F exe={process}" process_pid_list = str(run_with_output(f"pgrep {process}")).splitlines() return re.search(exclusion_rules_by_name, audit_rules) or all(re.search(r'-a (?:exit,never|never,exit).* -F pid=' + pid, audit_rules) for pid in process_pid_list) def collect_running_conflicting_binaries(audit_rules): conflicting_binaries = collect_conflicting_binaries(audit_rules) if len(conflicting_binaries) > 0: conflicting_binaries_string = 'conflicting binaries' conflicting_binaries = '\n'.join([conflicting_binaries_string, '=' * len(conflicting_binaries_string), conflicting_binaries]) return conflicting_binaries def collect_non_mdatp_auditd_rules(auditd_rules): rules_list = auditd_rules.splitlines()[2:] output_rules = '\n'.join(e for e in rules_list if not MDATP_KEY.search(e)) if len(output_rules) > 0: non_mdatp_rules_string = 'non-mdatp rules' non_mdatp_rules = '\n'.join([non_mdatp_rules_string, '=' * len(non_mdatp_rules_string), output_rules]) else: non_mdatp_rules = "" return non_mdatp_rules def get_time_string(time = datetime.datetime.now(datetime.timezone.utc)): return time.strftime("%Y_%m_%d_%H_%M_%S") def create_zip(zip_file, *, path=None, files=None, zipdir='', prefix_pattern='', suffix_pattern='', retain_dir_tree=False, recursive=False, predicate=None, mode='w'): if path is None and files is None: raise RuntimeError('Pass either path or files list') collected_files = [] if path: def filter_predicate(f): return f.startswith(prefix_pattern) and f.endswith(suffix_pattern) def reduce_routine(paths): for path in paths: all_files = os.walk(path, topdown=True) if not recursive: all_files = [next(all_files)] for root, _, files in all_files: for f in files: if filter_predicate(f) and (predicate is None or predicate(os.path.join(root, f))): yield os.path.join(root, f) if not isinstance(path, list): path = [path] collected_files.extend(reduce_routine(path)) if files: if not isinstance(files, list): files = [files] collected_files.extend(files) with zipfile.ZipFile(zip_file, mode, zipfile.ZIP_DEFLATED) as zipfile_handle: unzipped_files = [] zipped_file_count = 0 for f in collected_files: try: if retain_dir_tree: zipfile_handle.write(f, os.path.join(zipdir, f.lstrip('/'))) else: zipfile_handle.write(f, os.path.join(zipdir, os.path.basename(f))) zipped_file_count += 1 except Exception as e: unzipped_files.append(f) continue max_error_output_files = 5 total_unzipped_files_count = len(unzipped_files) unzipped_files = unzipped_files[:min(total_unzipped_files_count,max_error_output_files)] if len(unzipped_files) > 0: log.info(f"Unable to zip {total_unzipped_files_count} out of total {zipped_file_count + total_unzipped_files_count} files.") log.info(f"Unzipped files : {unzipped_files}. Output truncated to {max_error_output_files} files.") return True def command_exists(command_name): return shutil.which(command_name) is not None def is_newer_kernel(current_kernel, minimum_kernel): def fill_missing_version_numbers(version): for _ in range(0, 3 - len(version)): version.append(0) def sanitizee_extra_version(extra): for i in range(0, len(extra)): if not str(extra[i]).isnumeric(): return extra[:i] else: extra[i] = int(extra[i]) return extra current_version_tokens = current_kernel.split('-') minimum_version_tokens = minimum_kernel.split('-') current_mainline_kernel = list(map(int, current_version_tokens[0].split('.'))) minimum_mainline_kernel = list(map(int, minimum_version_tokens[0].split('.'))) rc = False if len(current_version_tokens) > 1 and current_version_tokens[1].startswith('rc'): rc = True fill_missing_version_numbers(current_mainline_kernel) fill_missing_version_numbers(minimum_mainline_kernel) if current_mainline_kernel < minimum_mainline_kernel: return False elif current_mainline_kernel > minimum_mainline_kernel: return True elif rc: return False current_extra = sanitizee_extra_version(current_version_tokens[1].split('.')) if len(current_version_tokens) > 1 else [] minimum_extra = sanitizee_extra_version(minimum_version_tokens[1].split('.')) if len(minimum_version_tokens) > 1 else [] fill_missing_version_numbers(current_extra) fill_missing_version_numbers(minimum_extra) return current_extra >= minimum_extra def bytes_to_gb(kb): return round(kb / (1024 ** 3), 2) def get_disk_space(): """Check if the system has the required amount of free disk space.""" total, used, free = shutil.disk_usage('/') total_gb = bytes_to_gb(total) used_gb = bytes_to_gb(used) free_gb = bytes_to_gb(free) return {'total_gb' : total_gb, 'used_gb' : used_gb, 'free_gb' : free_gb } def get_memory(): """Check if the system has the required amount of available memory.""" mem_info = psutil.virtual_memory() available_gb = bytes_to_gb(mem_info.available) total_gb = bytes_to_gb(mem_info.total) return {'total_gb' : total_gb, 'available_gb' : available_gb } def get_cpu_cores(): """Check if the system has the required number of CPU cores.""" cpu_cores = psutil.cpu_count(logical=False) return {'count': cpu_cores} def get_distro_info(): """Get the Linux distribution name and version.""" if "distro_info" in get_distro_info.__dict__: return get_distro_info.distro_info try: with open('/etc/os-release') as f: lines = f.readlines() distro_info = {} for line in lines: # Check if the line contains '=' before splitting if '=' in line: key, value = line.strip().split('=', 1) distro_info[key] = value.strip('"') get_distro_info.distro_info = distro_info return distro_info except Exception as e: log.error(f" Error retrieving distro information: {e}") return {} def get_permission_string(path): """ Get the permissions of a file or directory in octal format. """ try: mode = os.stat(path).st_mode # Get mode (permissions) permissions = stat.S_IMODE(mode) # Mask to get permission bits return oct(permissions)[-3:] # Return last three digits (e.g., 777, 755, etc.) except Exception as e: return None def get_owner_and_group(path): """ Get the owner and group of a given path. """ try: # Get the stat result of the file/directory stat_info = os.stat(path) # Get the user ID (UID) and group ID (GID) uid = stat_info.st_uid gid = stat_info.st_gid # Get the owner (username) and group name owner = pwd.getpwuid(uid).pw_name group = grp.getgrgid(gid).gr_name return owner, group except Exception as e: return None, None def get_locale_file_path(): """Get the locale file path by checking common locations.""" for path in constants.LOCALE_FILE_PATHS: if os.path.exists(path): return path return None class TempDirectoryManager: _temp_directory_path = None @classmethod def get_temp_dir(cls): if cls._temp_directory_path is None: if 'TMPDIR' not in os.environ: os.environ['TMPDIR'] = '/tmp' TMPDIR=os.path.abspath(os.environ['TMPDIR']) cls._temp_directory_path = tempfile.mkdtemp(prefix=f'mde_support_{get_time_string()}_', dir=TMPDIR) os.chmod(cls._temp_directory_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # Read and write for others log.info(f'Temporary directory created at: {cls._temp_directory_path}') return cls._temp_directory_path @classmethod def create_temp_file(cls, prefix, suffix): base_dir = cls.get_temp_dir() return os.path.join(base_dir, f'{prefix}{suffix}') @classmethod def create_temp_dir(cls, prefix): base_dir = cls.get_temp_dir() path = os.path.join(base_dir, f'{prefix}') os.makedirs(path, exist_ok=True) return path class LinuxPackageVersionExtractor(): @staticmethod def debian_standard_version(version_string): #2.31-0ubuntu9.16 return version_string.split('-')[0] @staticmethod def rpm_standard_version(version_string): #glibc-2.17-326.el7_9.3.x86_64 return version_string.split('-')[1]