import io, logging, re, sh, subprocess, glob, os, pprint, socket from .merged_config import get_mdatp_config_allchannel from pathlib import Path from .audit_log_analyzer import AuditLogAnalyzer from .ebpf_analyzer import EbpfAnalyzer from .mdatp import mdatp import json import shutil from .machine import os_details, machine from mde_tools import constants from .utils import collect_mde_conflicts, collect_running_conflicting_binaries,\ collect_non_mdatp_auditd_rules, create_zip, run_with_output, command_exists,\ collect_command_output_to_file, collect_file_to_file, run_sh_command, \ remove_old_files, TempDirectoryManager, get_locale_file_path import itertools from .analyze_profiles import analyze_profile diagnostic_functions = dict() log = logging.getLogger(constants.LOGGER_NAME) os_info = os_details() def register_diagnostic_collection_func(collection_name=None, platform=None, required_package=None, skip_for_e2e=None): """Register diagnostic collection function to execute upon diagnostic collection scenario. collection_name str: The name to put in the collection, defaults to function name platform str: Platform to run the data collection on. Should use constants, such as constants.LINUX_PLATFORM or MACOS_PLATFORM. Will default to all platforms if not provided Each function should output the following structure: {"crashes/": ["/tmp/crash1.log", "/tmp/crash2.log"]} Dictionary where each key should be the destination path in the output directory The value should be a path (or list of paths) of files to copy to the output directory. When creating files, it's better to create temporary files using `TempDirectoryManager.create_temp_file` so files will be created in non-persistent non-visible directories. """ def decorator(func): # Use collection_name if supplied, otherwise use function name if required_package and command_exists(required_package) == False: log.warning(f'Skipping data collector function [{collection_name}] as {required_package} is not installed') return func # if certain diagnostic functions are not required for e2e tests, skip them if skip_for_e2e and os.environ.get('E2E_TEST', False): log.info(f'Skipping data collector function [{collection_name}] as skip E2E_TEST is set') return func if platform == os_info.platform or not platform: diagnostic_functions[collection_name if collection_name else func.__name__] = func return func return decorator @register_diagnostic_collection_func('macOS defender bundle validity check', constants.MACOS_PLATFORM, required_package='mdatp') def collect_macos_defender_bundle_validity(_args): log.info(" Collect MDE bundle validity") output = io.StringIO() output.write(f"dump {constants.MAC_MDATP_APP_BUNDLE}/Contents/Info.plist\n") run_sh_command(sh.cat, f'{constants.MAC_MDATP_APP_BUNDLE}/Contents/Info.plist', _out=output, _err_to_out=True, _ok_code=[0,1]) output.write(f"\n\n") output.write(f"codesign -dvvvvv {constants.MAC_MDATP_APP_BUNDLE}\n") run_sh_command(sh.codesign, '-dvvvv', f'{constants.MAC_MDATP_APP_BUNDLE}', _out=output, _err_to_out=True, _ok_code=[0,1]) output.write(f"\n\n") output.write(f"spctl --assess --verbose=4 --type execute {constants.MAC_MDATP_APP_BUNDLE}\n") run_sh_command(sh.spctl, '--assess', '--verbose=4', '--type', 'execute', f'{constants.MAC_MDATP_APP_BUNDLE}', _out=output, _err_to_out=True, _ok_code=[0,1]) output.write(f"\n\n") output.write(f"codesign --verify --deep --verbose=4 {constants.MAC_MDATP_APP_BUNDLE}\n") run_sh_command(sh.codesign, '--verify', '--deep', '--verbose=4', f'{constants.MAC_MDATP_APP_BUNDLE}', _out=output, _err_to_out=True, _ok_code=[0,1]) lines = output.getvalue().split('\n') temp_path = TempDirectoryManager.create_temp_file(prefix='mde_bundle_validity', suffix='.txt') with open(temp_path, 'w', encoding='utf-8') as writer: for line in lines: writer.write(line) writer.write("\n") return {'mde_bundle_validity.txt': temp_path} @register_diagnostic_collection_func('MDE Diagnostic') def collect_mde_diagnostic(_args): def fallback_logs(): dirs = list({constants.LOG_DIR[os_info.platform], os.path.dirname(constants.WDAV_STATE[os_info.platform]), os.path.dirname(constants.WDAV_CFG[os_info.platform]), os.path.dirname(constants.MDATP_MANAGED[os_info.platform]) }) files = list() if os_info.platform == constants.LINUX_PLATFORM: if mdatp.opt_log_dir_exists(): log.info('Collecting logs from: ' + constants.LOG_DIR_OPT) dirs.append(constants.LOG_DIR_OPT) files.append(constants.MDE_PATH_CONFIG) files.append(constants.MDE_PATH_FALLBACK) diagnostics_zip = TempDirectoryManager.create_temp_file(prefix='diagnostics', suffix='.zip') create_zip(diagnostics_zip, path=dirs, files=files, retain_dir_tree=True, recursive=True) return diagnostics_zip remove_old_files(folder_path=constants.WDAV_DIAGNOSTIC_PATH, files_to_keep = 3) if command_exists('mdatp') == False: # if mdatp is not installed, we still want to try to collect logs log.info('mdatp command is not found, try to collect MDE log...') diagnostic = fallback_logs() else: diagnostic = mdatp.collect_logs(copy_to_collection=False) if not diagnostic: log.warning('Failed to create MDE diagnostic zip using the mdatp command. Generating manually.') diagnostic = fallback_logs() log.info('Successfully created MDE diagnostic zip') return {'mde_diagnostic.zip' : diagnostic} # TODO: Soon MDATP health will be part of diagnostics, we can remove after this change @register_diagnostic_collection_func('MDE Health', required_package='mdatp') def collect_mde_health(_args): health_data = mdatp.health_data() if not health_data: log.warning("Failed to collect MDE health") return health_path = TempDirectoryManager.create_temp_file(prefix='mde_health', suffix='.txt') with open(health_path, 'w', encoding='utf-8') as writer: writer.write(health_data) return {'health.txt': health_path} @register_diagnostic_collection_func('MDE Health Features', required_package='mdatp') def collect_mde_health_features(_args): health_features_data = mdatp.health_features_data() if not health_features_data: log.warning("Failed to collect MDE health features") return health_features_path = TempDirectoryManager.create_temp_file(prefix='mde_health_features', suffix='.txt') with open(health_features_path, 'w', encoding='utf-8') as writer: writer.write(health_features_data) return {'health_details_features.txt': health_features_path} @register_diagnostic_collection_func('MDE Permissions', required_package='mdatp') def collect_mde_Permissions(_args): health_permissions_data = mdatp.health_permissions_data() if not health_permissions_data: log.warning("Failed to collect MDE health --details permissions") return health_permission_path = TempDirectoryManager.create_temp_file(prefix='mde_health_permissions', suffix='.txt') with open(health_permission_path, 'w', encoding='utf-8') as writer: writer.write(health_permissions_data) return {'permissions.txt': health_permission_path} @register_diagnostic_collection_func('Groups', constants.MACOS_PLATFORM) def collect_groups(_args): output = io.StringIO() run_sh_command(sh.dscl, '.','-list','/Groups', _out=output) lines = output.getvalue().split('\n') groups_path = TempDirectoryManager.create_temp_file(prefix='groups', suffix='.txt') with open(groups_path, 'w', encoding='utf-8') as writer: for name in lines: try: output1 = io.StringIO() run_sh_command(sh.dscl, '.', '-read', f'/Groups/{name}', _out=output1) writer.write(output1.getvalue()) writer.write("-------------------------\n") except sh.ErrorReturnCode as e: log.warning(f"Command failed with exit code {e.exit_code}") log.warning(f"Output: {e.stdout.decode('utf-8')}") log.warning(f"Error: {e.stderr.decode('utf-8')}") except UnicodeDecodeError as e: log.warning(f"UnicodeDecodeError: {e}") return {'groups.txt': groups_path} # @register_diagnostic_collection_func('MDE antivirus-engine-pool-content', required_package='mdatp') # def collect_mde_antivirus_engine_pool_content(_args): # collect_engine_pool_content_data = mdatp.collect_engine_pool_content() # if not collect_engine_pool_content_data: # log.warning("Failed to collect MDE diagnostic antivirus-engine-pool-content --time 10") # return # collect_engine_pool_content_path = TempDirectoryManager.create_temp_file(prefix=f'mde_engine_pool_content', suffix='.txt') # print(collect_engine_pool_content_path) # with open(collect_engine_pool_content_path, 'w', encoding='utf-8') as writer: # writer.write(collect_engine_pool_content_data) # return {'engine_core_pool_content.txt': collect_engine_pool_content_path} @register_diagnostic_collection_func('sw_vers Information', constants.MACOS_PLATFORM) def collect_macos_sw_vers(_args): output = io.StringIO() run_sh_command(sh.sw_vers, _out=output) syslog_path = TempDirectoryManager.create_temp_file(prefix='sw_vers', suffix='.txt') with open(syslog_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'sw_vers.txt': syslog_path} @register_diagnostic_collection_func('sysctl Information', constants.MACOS_PLATFORM) def collect_sysctl_information(_args): output = io.StringIO() run_sh_command(sh.sysctl, '-a', _out=output) path = TempDirectoryManager.create_temp_file(prefix='sysctl', suffix='.txt') with open(path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'sysctl.txt': path} @register_diagnostic_collection_func('taskinfo Information', constants.MACOS_PLATFORM) def collect_taskinfo_information(_args): files = {} path = TempDirectoryManager.create_temp_file(prefix='taskinfo', suffix='.txt') files.update({f'taskinfo.txt': path}) try: output = io.StringIO() run_sh_command(sh.taskinfo, '--threads', _out=output) with open(path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) except Exception as e: with open(path, 'w', encoding='utf-8') as writer: writer.write(f"e: {e}") return files @register_diagnostic_collection_func('console Information', constants.MACOS_PLATFORM) def collect_console_information(_args): files = {} path = TempDirectoryManager.create_temp_file(prefix='console', suffix='.txt') files.update({f'console.txt': path}) try: input_data = "show State:/Users/ConsoleUser" output = io.StringIO() run_sh_command(sh.scutil, _in=input_data, _out=output) with open(path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) except Exception as e: with open(path, 'w', encoding='utf-8') as writer: writer.write(f"e: {e}") return files @register_diagnostic_collection_func('macOS Syslog', constants.MACOS_PLATFORM, skip_for_e2e=True) def collect_macos_syslog(_args): log.info(" Collect last 1h syslog") output = io.StringIO() run_sh_command(sh.log, 'show','--debug','--info','--style','compact','--color','none','-last', '1h', _out=output) syslog_path = TempDirectoryManager.create_temp_file(prefix='syslog', suffix='.txt') with open(syslog_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'syslog.txt': syslog_path} @register_diagnostic_collection_func('macOS BTM', constants.MACOS_PLATFORM) def collect_macos_dumpbtm(_args): log.info(" Collect sytem BTM") output = io.StringIO() run_sh_command(sh.sfltool, 'dumpbtm', _out=output) log_path = TempDirectoryManager.create_temp_file(prefix='sfltool_bumpbtm', suffix='.txt') with open(log_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'sfltool_dumpbtm.txt': log_path} @register_diagnostic_collection_func('macOS wdavdaemon process vmmap', constants.MACOS_PLATFORM, required_package='mdatp', skip_for_e2e=True) def collect_macos_wdavdaemon_process_vmmap(_args): log.info(" Collect vmmap of MDE processes") temp = TempDirectoryManager.create_temp_dir(prefix="mde_process_vmmap") mdatp.vmmap_wdavdaemon_processes(temp) zip_file_path = shutil.make_archive(temp, 'zip', temp) shutil.rmtree(temp) return {"mde_process_vmmap.zip": zip_file_path} @register_diagnostic_collection_func('macOS process sampling', constants.MACOS_PLATFORM, required_package='mdatp', skip_for_e2e=True) def collect_macos_edr_process_sampling(_args): log.info(" Collect sampling of MDE EDR processes") temp = TempDirectoryManager.create_temp_dir(prefix="mde_process_sampling") mdatp.sample_edr_processes(temp) zip_file_path = shutil.make_archive(temp, 'zip', temp) shutil.rmtree(temp) return {"mde_process_sampling.zip": zip_file_path} @register_diagnostic_collection_func('macOS netext debug syslog', constants.MACOS_PLATFORM, skip_for_e2e=True) def collect_macos_netext_debug_syslog(args): try: time_to_collect = args.length except AttributeError: time_to_collect = 30 # default log.info(f" Collect {time_to_collect}s netext debug syslog") output = io.StringIO() run_sh_command(sh.log, 'stream','--debug','--info','--style','compact','--color','none','--timeout', f'{time_to_collect}s', '-predicate', 'process CONTAINS "netext"', _out=output) syslog_path = TempDirectoryManager.create_temp_file(prefix='syslog_netext', suffix='.txt') with open(syslog_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'syslog_netext_30s_realtime.txt': syslog_path} @register_diagnostic_collection_func('macOS epsext debug syslog', constants.MACOS_PLATFORM, skip_for_e2e=True) def collect_macos_epsext_debug_syslog(args): try: time_to_collect = args.length except AttributeError: time_to_collect = 30 # default log.info(f" Collect {time_to_collect}s epsext debug syslog") output = io.StringIO() run_sh_command(sh.log, 'stream','--debug','--info','--style','compact','--color','none','--timeout', f'{time_to_collect}s', '-predicate', 'process CONTAINS "epsext"', _out=output) syslog_path = TempDirectoryManager.create_temp_file(prefix='syslog_epsext', suffix='.txt') with open(syslog_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) output1 = io.StringIO() date_command = ["date", "-v", "-48H", "+%Y-%m-%d %H:%M:%S"] date_output = subprocess.check_output(date_command).decode().strip() run_sh_command(sh.log, 'show','--start', date_output,'--info','--color','none', '--predicate','process == "epsext"', _out=output1) syslog2d_path = TempDirectoryManager.create_temp_file(prefix='syslog_epsext', suffix='.txt') with open(syslog2d_path, 'w', encoding='utf-8') as writer: writer.write(output1.getvalue()) return {'syslog_epsext_30s_realtime.txt': syslog_path, 'syslog_epsext_2day_info.txt': syslog2d_path} @register_diagnostic_collection_func('MDE Crashes Information', required_package='mdatp', skip_for_e2e=True) def collect_mde_crashes(_args): crashes = mdatp.collect_crash_logs(["wdavdaemon", "telemetryd", "telemetryd_v2", "epsext", "netext", "osqueryi", "user_context"], copy_to_collection=False) if not crashes: log.info('No crash dumps or logs found') return return {'crashes/': crashes} @register_diagnostic_collection_func('Process Information') def collect_process_information(_args): fp_flag="w" process_data = io.StringIO() if os_info.platform == constants.LINUX_PLATFORM: run_sh_command(sh.ps, "axo", "pid,ppid,user,%cpu,%mem,vsz,rss,tty,stat,start,time,command", _out=process_data) else: process_data = io.BytesIO() fp_flag="wb" run_sh_command(sh.ps, "auxww", "-o", "pid,ppid,user,%cpu,%mem,vsz,rss,tty,stat,start,time,command", _out=process_data) processes_data_path = TempDirectoryManager.create_temp_file(prefix='processes_info', suffix='.txt') with open(processes_data_path, fp_flag) as writer: writer.write(process_data.getvalue()) return {'process_information.txt': processes_data_path} @register_diagnostic_collection_func('Launchd Information', constants.MACOS_PLATFORM) def collect_launchd_information(_args): files = {} commands = [ {'command': ["dumpstate"], 'suffix': "dumpstate"}, {'command': ["list"], 'suffix': "list_0"}, {'command': ["print", "gui/501"], 'suffix': "print_gui_501"}, {'command': ["print", "user/501"], 'suffix': "print_user_501"}, {'command': ["print", "system"], 'suffix': "print_system"}, {'command': ["print-disabled", "system"], 'suffix': "print_disabled_system"}, ] for item in commands: try: command = item['command'] suffix = item['suffix'] output = io.StringIO() run_sh_command(sh.launchctl, *command, _out=output) path = TempDirectoryManager.create_temp_file(prefix='launchctl_{suffix}', suffix='.txt') with open(path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) files.update({f'launchctl_{suffix}.txt': path}) except Exception as e: log.info(f"collect launchd exception: {e}") return files @register_diagnostic_collection_func('lsappinfo Information', constants.MACOS_PLATFORM) def collect_lsappinfo_information(_args): files = {} commands = [ {'command': ["list"], 'suffix': "list"}, ] for item in commands: try: command = item['command'] suffix = item['suffix'] output = io.StringIO() run_sh_command(sh.lsappinfo, *command, _out=output) path = TempDirectoryManager.create_temp_file(prefix='lsappinfo_{suffix}', suffix='.txt') with open(path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) files.update({f'lsappinfo_{suffix}.txt': path}) except Exception as e: log.info(f"collect lsappinfo exception: {e}") return files @register_diagnostic_collection_func('Proc Directory', constants.LINUX_PLATFORM, required_package='mdatp') def collect_proc_directory(_args): #create tmp file for output processes_data_path = TempDirectoryManager.create_temp_file(prefix='proc_directory_info', suffix='.txt') #get mdatp pids mdatp_processes = mdatp.get_mdatp_processes() mdatp_pids = [x[0] for x in mdatp_processes] #iterate over pids and collect /proc/$pid/* with open(processes_data_path, 'w', encoding='utf-8') as writer: for pid in mdatp_pids: proc_maps = io.StringIO() run_sh_command(sh.cat, f"/proc/{pid}/maps", _out=proc_maps) writer.write(proc_maps.getvalue()) writer.write("------------------------------------------------\n\n") return {'proc_directory_info.txt': processes_data_path} @register_diagnostic_collection_func('Extra Service Configurations', constants.LINUX_PLATFORM, required_package='mdatp') def collect_extra_service_config_directory(_args): directory = Path(constants.SERVICE_EXTRA_CONFIG_DIR) if not directory.exists(): return log_files = [file for file in directory.iterdir() if file.is_file()] if not log_files: return service_config_zip = TempDirectoryManager.create_temp_file(prefix='service_extra_config', suffix='.zip') create_zip(service_config_zip, path=directory, zipdir='service_extra_config', recursive=True) return {'service_extra_config.zip': service_config_zip} @register_diagnostic_collection_func('AuditD information', constants.LINUX_PLATFORM, required_package='auditd') def collect_auditd_information(_args): audit_version = mdatp.auditd_version() audit_status = mdatp.auditd_status(full=True) audit_conf = '\n\n'.join([mdatp.auditctl_status(), mdatp.auditd_conf(), mdatp.audispd_conf()]) keywords = ['auditd', 'augenrules', 'auditctl','sudo cat /var/log/syslog', 'sudo cat /var/log/messsages', '=========='] lines = mdatp.auditd_syslog().split('\n') audit_loaded_rules = mdatp.auditd_loaded_rules() audit_deployed_rules = mdatp.auditd_deployed_rules() audit_deployed_plugins = mdatp.auditd_deployed_plugins() audisp_deployed_rules = mdatp.audisp_deployed_rules() audisp_deployed_plugins = mdatp.audisp_deployed_plugins() audit_syslog = '\n'.join([line for line in lines if any ([expr in line for expr in keywords])]) non_mdatp_rules = collect_non_mdatp_auditd_rules(audit_loaded_rules) if not audit_status: log.warning("Failed to collect AuditD information") return #TODO: analyse conflicting rules auditd_data_path = TempDirectoryManager.create_temp_file(prefix='auditd_info', suffix='.txt') with open(auditd_data_path, 'w', encoding='utf-8') as writer: writer.write('\n\n'.join([audit_version, audit_status, audit_conf, audit_loaded_rules, non_mdatp_rules, audit_deployed_rules, audit_deployed_plugins, audisp_deployed_rules, audisp_deployed_plugins, audit_syslog])) return {'auditd_info.txt': auditd_data_path} @register_diagnostic_collection_func('AuditD analysis', constants.LINUX_PLATFORM, required_package='auditd') def collect_auditd_log(_args): directory = Path('/var/log/audit/') log_files = [file for file in directory.iterdir() if file.is_file()] analyzer = AuditLogAnalyzer() for log in log_files: analyzer.analyze(log) auditd_data_path = TempDirectoryManager.create_temp_file(prefix='auditd_log', suffix='.txt') analyzer.write_to(auditd_data_path) auditd_logs_zip = TempDirectoryManager.create_temp_file(prefix='auditd_logs', suffix='.zip') create_zip(auditd_logs_zip, path=directory, zipdir='auditd_logs') return {'auditd_log_analysis.txt': auditd_data_path, 'auditd_logs.zip' : auditd_logs_zip} @register_diagnostic_collection_func('Ebpf Info', constants.LINUX_PLATFORM) def collect_ebpf_info(_args): analyzer = EbpfAnalyzer() analyzer.collect_kernel_configurations() analyzer.collect_enabled_functions() ebpf_syscalls_zip = TempDirectoryManager.create_temp_file(prefix='ebpf_syscalls', suffix='.zip') create_zip(ebpf_syscalls_zip, path=constants.EBPF_SYSCALLS, zipdir='ebpf_sycalls', recursive=True, retain_dir_tree=True) ebpf_raw_syscalls_zip = TempDirectoryManager.create_temp_file(prefix='ebpf_raw_syscalls', suffix='.zip') create_zip(ebpf_raw_syscalls_zip, path=constants.EBPF_RAW_SYSCALLS, zipdir='ebpf_raw_syscalls', recursive=True, retain_dir_tree=True) return {'ebpf_kernel_config.txt': analyzer.kernel_configurations, 'ebpf_enabled_func.txt' : analyzer.enabled_functions, 'ebpf_syscalls.zip' :ebpf_syscalls_zip , 'ebpf_raw_syscalls.zip': ebpf_raw_syscalls_zip} @register_diagnostic_collection_func('eBPF Maps Info', constants.LINUX_PLATFORM, required_package='mdatp') def collect_ebpf_maps_info(_args): ebpf_maps_data = EbpfAnalyzer().get_ebpf_maps_data() if not ebpf_maps_data: log.warning("Failed to collect ebpf maps info") return ebpf_maps_path = TempDirectoryManager.create_temp_file(prefix='ebpf_maps_info', suffix='.txt') with open(ebpf_maps_path, 'w', encoding='utf-8') as writer: writer.write(ebpf_maps_data) return {'ebpf_maps_info.txt': ebpf_maps_path} @register_diagnostic_collection_func('Collecting syslog/messages', constants.LINUX_PLATFORM) def collect_syslog(_args): base_log_file = None if os.path.exists('/var/log/syslog'): base_log_file = '/var/log/syslog' elif os.path.exists('/var/log/messages'): base_log_file = '/var/log/messages' else: raise RuntimeError('Neither [/var/log/syslog] nor [/var/log/messages] exists') filename = os.path.basename(base_log_file) logs_zip = TempDirectoryManager.create_temp_file(prefix=f'{filename}', suffix='.zip') create_zip(logs_zip, path=os.path.dirname(base_log_file), prefix_pattern=filename, zipdir=filename + 's') return {f'{filename}s.zip' : logs_zip} @register_diagnostic_collection_func('MDE Conflicting Processes', constants.LINUX_PLATFORM) def collect_mde_conflicting_agents(_args): conflicting_processes_file = TempDirectoryManager.create_temp_file(prefix=f'conflicting_processes', suffix='.txt') conflicts = "No Known Conflicts" conflicting_agents, _ = collect_mde_conflicts() conflicting_binaries = [] if os_info.platform == constants.LINUX_PLATFORM: if command_exists('auditd'): conflicting_binaries = collect_running_conflicting_binaries(mdatp.auditd_loaded_rules()) else: log.warning('Not able to collect conflicting binaries as auditd is not installed') if len(conflicting_agents) > 0 or len(conflicting_binaries) > 0: conflicts = '\n\n'.join([conflicting_agents, conflicting_binaries]) with open(conflicting_processes_file, 'w', encoding='utf-8') as writer: writer.write(conflicts) return {'conflicting_processes_information.txt': conflicting_processes_file} @register_diagnostic_collection_func('MDE Exclusions', required_package='mdatp') def collect_mde_exclusions(_args): exclusions_data = mdatp.get_exclusions() if not exclusions_data: log.warning("Failed to collect MDE exclusions") return exclusions_path = TempDirectoryManager.create_temp_file(prefix=f'mde_exclusions', suffix='.txt') with open(exclusions_path, 'w', encoding='utf-8') as writer: writer.write(exclusions_data) return {'exclusions.txt': exclusions_path} @register_diagnostic_collection_func('MDE Definitions Details', required_package='mdatp') def collect_mde_definitions_details(_args): definitions_data = mdatp.definitions_data() if not definitions_data: log.warning("Failed to collect MDE Definitions details") return definitions_path = TempDirectoryManager.create_temp_file(prefix=f'mde_definitions', suffix='.txt') with open(definitions_path, 'w', encoding='utf-8') as writer: writer.write(definitions_data) return {'definitions.txt': definitions_path} @register_diagnostic_collection_func('MDE Directories List', required_package='mdatp') def collect_mde_dir_information(_args): output = io.StringIO() mde_dirs = mdatp.get_mde_directories() for mde_dir in mde_dirs: run_sh_command(sh.ls, "-lR", mde_dir, _out=output, _iter=True) if os_info.platform == constants.LINUX_PLATFORM: if mdatp.opt_log_dir_exists(): try: run_sh_command(sh.stat, constants.LOG_DIR_OPT, _out=output) # pylint: disable=not-callable except Exception as e: output.write(f"Could not stat opt log dir: {e}") try: run_sh_command(sh.stat, "/var", _out=output) # pylint: disable=not-callable run_sh_command(sh.stat, "/var/log", _out=output) # pylint: disable=not-callable run_sh_command(sh.stat, "/var/log/microsoft", _out=output) # pylint: disable=not-callable run_sh_command(sh.stat, "/var/log/microsoft/mdatp", _out=output) # pylint: disable=not-callable except: output.write("Could not stat log dirs") mde_dirs_path = TempDirectoryManager.create_temp_file(prefix=f'mde_directories', suffix='.txt') with open(mde_dirs_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'mde_directories.txt': mde_dirs_path} @register_diagnostic_collection_func('Disk Usage') def collect_disk_usage_information(_args): output = io.StringIO() run_sh_command(sh.df, "-h", _out=output) disk_usage_path = TempDirectoryManager.create_temp_file(prefix=f'disk_usage', suffix='.txt') with open(disk_usage_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) files = {} files.update({'disk_usage.txt': disk_usage_path}) if os_info.platform == constants.MACOS_PLATFORM: commands = [ {'command': ["apfs", "list"], 'suffix': "apfs_list"}, {'command': ["apfs", "listUsers", "/"], 'suffix': "apfs_listusers"}, {'command': ["list"], 'suffix': "list"}, {'command': ["info", "-all"], 'suffix': "info"} ] for item in commands: try: command = item['command'] suffix = item['suffix'] output = io.StringIO() run_sh_command(sh.diskutil, *command, _out=output) path = TempDirectoryManager.create_temp_file(prefix=f'diskutil_{suffix}', suffix='.txt') with open(path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) files.update({f'diskutil_{suffix}.txt': path}) except Exception as e: log.warning(f'diskutil {item} exception: {e}') return files @register_diagnostic_collection_func('MDE User Info', required_package='mdatp') def collect_mde_user_information(_args): if os_info.platform == constants.LINUX_PLATFORM: user = 'mdatp' else: user = '_mdatp' output = io.StringIO() run_sh_command(sh.id, user, _out=output) mde_user_path = TempDirectoryManager.create_temp_file(prefix=f'mde_user', suffix='.txt') with open(mde_user_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'mde_user.txt': mde_user_path} @register_diagnostic_collection_func('MDE Definitions Mount Point', constants.LINUX_PLATFORM, required_package='mdatp') def collect_mde_definitions_mount_points(_args): output = io.StringIO() mde_definitions = mdatp.get_database_root() output.write('Definitions mount point:\n') run_sh_command(sh.findmnt, "-n", "--target", mde_definitions, _out=output) mde_definitions_mount_path = TempDirectoryManager.create_temp_file(prefix=f'mde_definitions_mount', suffix='.txt') with open(mde_definitions_mount_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'mde_definitions_mount.txt': mde_definitions_mount_path} @register_diagnostic_collection_func('MDE Service Status', constants.LINUX_PLATFORM, required_package='mdatp') def collect_mde_service_status(_args): output = io.StringIO() run_sh_command(sh.service, "mdatp", "status", _out=output) service_status_path = TempDirectoryManager.create_temp_file(prefix=f'service_status', suffix='.txt') with open(service_status_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'service_status.txt': service_status_path} @register_diagnostic_collection_func('MDE Service File', constants.LINUX_PLATFORM, required_package='mdatp') def collect_mde_service_file(_args): output = io.StringIO() abs_path = '' if os.path.exists(constants.MDATP_SERVICE_PATH_DEB): abs_path = constants.MDATP_SERVICE_PATH_DEB elif os.path.exists(constants.MDATP_SERVICE_PATH_RPM): abs_path = constants.MDATP_SERVICE_PATH_RPM if os.path.exists(abs_path): service_file_path = TempDirectoryManager.create_temp_file(prefix=f'service_file', suffix='.txt') run_sh_command(sh.cp, abs_path, service_file_path) else: log.warning('Not able to get mdatp.sevice file') return {'service_file.txt': service_file_path} @register_diagnostic_collection_func('Hardware Information') def collect_hardware_information(_args): output = io.StringIO() if os_info.platform == constants.MACOS_PLATFORM: run_sh_command(sh.ioreg, "-l", "-w 0", _out=output) else: run_sh_command(sh.lshw, _out=output) # Not installed by default on RHEL, may throw. hardware_info_path = TempDirectoryManager.create_temp_file(prefix=f'hardware_info', suffix='.txt') with open(hardware_info_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'hardware_info.txt': hardware_info_path} @register_diagnostic_collection_func(f'system_profiler Information', constants.MACOS_PLATFORM, skip_for_e2e=True) def collect_system_profiler_information(_args): types = [ 'SPBluetoothDataType', 'SPUSBDataType', 'SPThunderboltDataType', 'SPInstallHistoryDataType', 'SPApplicationsDataType', 'SPLogsDataType', 'SPFrameworksDataType', 'SPExtensionsDataType', 'SPDeveloperToolsDataType', 'SPSoftwareDataType', 'SPConfigurationProfileDataType', 'SPStorageDataType', 'SPManagedClientDataType', 'SPPowerDataType', 'SPFirewallDataType', 'SPDisabledSoftwareDataType', 'SPNetworkVolumeDataType', 'SPDiagnosticsDataType', ] dict = {} for data_type in types: output = io.StringIO() run_sh_command(sh.system_profiler, f"{data_type}", _out=output) output_path = TempDirectoryManager.create_temp_file(prefix=f'system_profiler_{data_type}', suffix='.txt') with open(output_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) dict.update({f'system_profiler_{data_type}.txt': output_path}) return dict @register_diagnostic_collection_func('profiles Info', constants.MACOS_PLATFORM) def collect_profiles_information(_args): output = io.StringIO() output1 = io.StringIO() output_path = TempDirectoryManager.create_temp_file(prefix=f'profiles', suffix='.xml') result_output_path = TempDirectoryManager.create_temp_file(prefix=f'profiles_analyze_result', suffix='.txt') run_sh_command(sh.profiles, 'show', '-output', 'stdout-xml', _out=output) with open(output_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) mobileconfig_template_path = TempDirectoryManager.create_temp_file(prefix=f'mdatp', suffix='.mobileconfig') mobileconfig_url = constants.MDATP_MOBILECONFIG_URL try: log.info(f"downloading mobileconfig file from {mobileconfig_url}...") if not machine.download_file(mobileconfig_url, filename=mobileconfig_template_path, timeout_sec=10): log.error(f"Failed to download mobileconfig file from {mobileconfig_url} try embedded mobileconfig file") with open(mobileconfig_template_path, 'w', encoding='utf-8') as writer: writer.write(constants.MDATP_MOBILECONFIG_CONTENT) except Exception as e: log.error(f"Failed to download mobileconfig file from {mobileconfig_url}: {e} try embedded mobileconfig file") with open(mobileconfig_template_path, 'w', encoding='utf-8') as writer: writer.write(constants.MDATP_MOBILECONFIG_CONTENT) if os.path.exists(mobileconfig_template_path): log.debug(f"mobileconfig file downloaded to [{mobileconfig_template_path}]") try: analyze_profile(output_path, mobileconfig_template_path, output1) except Exception as e: log.warning(f'analyze_profile exception: {e}') output1.write(f"analyze_profile exception: {e}\n") else: log.warning(f"mobileconfig file not found at [{mobileconfig_template_path}]") output1.write(f"mobileconfig file not found.\n") try: with open(result_output_path, 'w') as writer: writer.write(output1.getvalue()) except Exception as e: log.warning(f'analyze_profile exception: {e}') with open(result_output_path, 'w') as writer: writer.write(f"e: {e}\n") return {'profiles.xml': output_path, 'profiles_analyze_result.txt': result_output_path, 'mdatp.mobileconfig': mobileconfig_template_path} @register_diagnostic_collection_func('MDMOverrides Information', constants.MACOS_PLATFORM) def collect_mdmoverrides_information(_args): return collect_file_to_file(TempDirectoryManager.get_temp_dir(), "", '/Library/Application Support/com.apple.TCC/MDMOverrides.plist', 'MDMOverrides.plist') @register_diagnostic_collection_func('firewall Information', constants.MACOS_PLATFORM) def collect_firewall_information(_args): return collect_file_to_file(TempDirectoryManager.get_temp_dir(), "", '/Library/Preferences/com.apple.alf.plist', 'com.apple.alf.plist') @register_diagnostic_collection_func('Mount Information') def collect_mount_information(_args): return collect_command_output_to_file(TempDirectoryManager.get_temp_dir(), 'mount', 'mount.txt', 'mount') @register_diagnostic_collection_func('Uname Information') def collect_uname_information(_args): output = io.StringIO() run_sh_command(sh.uname, "-a", _out=output) uname_path = TempDirectoryManager.create_temp_file(prefix=f'uname', suffix='.txt') with open(uname_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'uname.txt': uname_path} @register_diagnostic_collection_func('Memory Information') def collect_memory_information(_args): if os_info.platform == 'macOS': command = "memory_pressure" else: command = "free" return collect_command_output_to_file(TempDirectoryManager.get_temp_dir(), 'memory', 'memory.txt', command) @register_diagnostic_collection_func('Meminfo command') def collect_meminfo(_args): output = io.StringIO() if os_info.platform == 'macOS': run_sh_command(sh.vm_stat, _out=output) else: run_sh_command(sh.cat, "/proc/meminfo", _out=output) memory_path = TempDirectoryManager.create_temp_file(prefix=f'meminfo', suffix='.txt') with open(memory_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'meminfo.txt': memory_path} @register_diagnostic_collection_func('CPU Information') def collect_cpu_information(_args): output = io.StringIO() if os_info.platform == 'macOS': run_sh_command(sh.grep, run_sh_command(sh.sysctl, '-a'), 'machdep.cpu', _out=output) else: run_sh_command(sh.lscpu, _out=output) cpuinfo_path = TempDirectoryManager.create_temp_file(prefix=f'cpuinfo', suffix='.txt') with open(cpuinfo_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'cpuinfo.txt': cpuinfo_path} @register_diagnostic_collection_func('Linux namespace information', constants.LINUX_PLATFORM) def collect_namespace_information(_args): output = io.StringIO() run_sh_command(sh.lsns, '--output-all', _out=output) lsns_path = TempDirectoryManager.create_temp_file(prefix=f'lsns_info', suffix='.txt') with open(lsns_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'lsns_info.txt': lsns_path} @register_diagnostic_collection_func('MDE Open File Descriptors Information', required_package='mdatp') def collect_file_descriptors_information(_args): return collect_command_output_to_file(TempDirectoryManager.get_temp_dir(), 'lsof', 'lsof.txt', 'lsof') @register_diagnostic_collection_func('SELinux Status Information', constants.LINUX_PLATFORM) def collect_sestatus_information(_args): return collect_command_output_to_file(TempDirectoryManager.get_temp_dir(), 'sestatus', 'sestatus.txt', 'sestatus') @register_diagnostic_collection_func('lsmod Information', constants.LINUX_PLATFORM) def collect_lsmod_information(_args): return collect_command_output_to_file(TempDirectoryManager.get_temp_dir(), 'lsmod', 'lsmod.txt', 'lsmod') @register_diagnostic_collection_func('dmesg Information', constants.LINUX_PLATFORM) def collect_dmesg_information(_args): output = io.StringIO() try: run_sh_command(sh.dmesg, "-T", _out=output) except: output.write("Could not run dmesg -T command") dmesg_info_path = TempDirectoryManager.create_temp_file(prefix=f'dmesg_info', suffix='.txt') with open(dmesg_info_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'dmesg.txt': dmesg_info_path} @register_diagnostic_collection_func('kernel lockdown Info', constants.LINUX_PLATFORM) def collect_kernel_lockdown_information(_args): output = io.StringIO() try: run_sh_command(sh.cat, "/sys/kernel/security/lockdown", _out=output) except: output.write("Could not run cat /sys/kernel/security/lockdown") kernel_lockdown_info_path = TempDirectoryManager.create_temp_file(prefix=f'kernel_lockdown_info', suffix='.txt') with open(kernel_lockdown_info_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'kernel_lockdown.txt': kernel_lockdown_info_path} @register_diagnostic_collection_func('System Extensions Information', constants.MACOS_PLATFORM) def collect_system_extensions_information(_args): if re.match("10.14.*", os_info.version) is not None: return files = {} ## csrutil_status output = io.StringIO() run_sh_command(sh.csrutil, 'status', _out=output) path = TempDirectoryManager.create_temp_file(prefix=f'csrutil_status', suffix='.txt') with open(path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) files.update({f'csrutil_status.txt': path}) commands = [ {'command': ["list"], 'suffix': "list"}, {'command': ["diagnose"], 'suffix': "diagnose"}, ] for item in commands: try: command = item['command'] suffix = item['suffix'] output = io.StringIO() run_sh_command(sh.systemextensionsctl, *command, _out=output) path = TempDirectoryManager.create_temp_file(prefix=f'systemextensionsctl_{suffix}', suffix='.txt') with open(path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) files.update({f'systemextensionsctl_{suffix}.txt': path}) except Exception as e: log.warning(f'systemextensionsctl {item} exception: {e}') return files @register_diagnostic_collection_func('machine info commands') def collect_machine_info(_args): diag_functions = [('top','-l','1','-o','cpu'), ('nettop','-l','1'), ('fs_usage','-t','1')] if (os_info.platform == 'macOS') else [('top','-c', '-n','1','b', '-w', '512')] results = {} for func in diag_functions: output = io.StringIO() run_sh_command(sh.Command(func[0]), *func[1:], _out=output) path = TempDirectoryManager.create_temp_file(prefix=f'{func[0]}', suffix='.txt') with open(path, 'wt', encoding='utf-8') as f: f.write(output.getvalue()) results[f'{func[0]}.txt'] = f.name return results @register_diagnostic_collection_func('TCC DB Information', constants.MACOS_PLATFORM, skip_for_e2e=True) def collect_tcc_db_information(_args): if re.match("10.14.*", os_info.version) is not None: # we don't need to get this on Mojave return output = io.StringIO() sql_string = "service,client,client_type,allowed" if os_info.is_big_sur_and_up(): sql_string = "service,client,client_type,auth_value,auth_reason" try: run_sh_command(sh.sqlite3, '/Library/Application Support/com.apple.TCC/TCC.db', 'select ' + sql_string + ' from access', _out=output) except: output.write("Unable to query tcc.db") tcc_db_path = TempDirectoryManager.create_temp_file(prefix=f'tcc_db', suffix='.txt') with open(tcc_db_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'tcc_db_access.txt': tcc_db_path} # sometime the system_profiler command force wdavdaemon process to quit # @register_diagnostic_collection_func('Mac system profiler information', constants.MACOS_PLATFORM) # def collect_mac_system_profiler(_args): # if args.system_profiler is not None: # print("args.system_profiler is set") # output = io.StringIO() # run_sh_command(sh.system_profiler, "-json", _out=output) # system_profiler_path = TempDirectoryManager.create_temp_file(prefix=f'system_profiler', suffix='.json') # with open(system_profiler_path, 'w', encoding='utf-8') as writer: # writer.write(output.getvalue()) # return {'system_profiler.txt': system_profiler_path} # The leaks command will cause process performance issue, even crash. Skip this for now # @register_diagnostic_collection_func('Memory Leaks Information', constants.MACOS_PLATFORM, required_package='mdatp') # def collect_memory_leaks_information(_args): # output = io.StringIO() # processes = mdatp.get_mdatp_processes() # for process in processes: # pid, command = process # if not "ext" in command and not "wdavdaemon privileged" in command: #querying leaks on privileged daemon renders the machine unusable # output.write(f"Leaks output for process '{command}':\n") # run_sh_command(sh.leaks, pid, _out=output, _timeout=300, _iter=True) # memory_leaks_path = TempDirectoryManager.create_temp_file(prefix=f'memory_leaks', suffix='.txt') # with open(memory_leaks_path, 'w', encoding='utf-8') as writer: # writer.write(output.getvalue()) # return {'memory_leaks.txt': memory_leaks_path} @register_diagnostic_collection_func('rtp statistics', required_package='mdatp') def collect_rtp_statistics(_args): rtp_stats = mdatp.rtp_statistics() rtp_path = TempDirectoryManager.create_temp_file(prefix=f'rtp_statistics', suffix='.txt') with open(rtp_path, 'w', encoding='utf-8') as writer: writer.write(rtp_stats) return {'rtp_statistics.txt': rtp_path} @register_diagnostic_collection_func('libc Information', constants.LINUX_PLATFORM) def collect_libc_info(_args): # On debian the library is called libc6, on rhel its glibc libc_package_info = [machine.query_installed_package('libc6'), machine.query_installed_package('glibc')] log_path = TempDirectoryManager.create_temp_file(prefix=f'libc_info', suffix='.txt') with open(log_path, 'w', encoding='utf-8') as writer: for package in libc_package_info: if package.installed == True: writer.write(str(package)) return {'libc_info.txt': log_path} @register_diagnostic_collection_func('Uptime Information') def collect_uptime_info(_args): return collect_command_output_to_file(TempDirectoryManager.get_temp_dir(), 'uptime_info', 'uptime_info.txt', 'uptime') @register_diagnostic_collection_func('Last Information') def collect_macos_last(_args): return collect_command_output_to_file(TempDirectoryManager.get_temp_dir(), 'last_info', 'last_info.txt', 'last') @register_diagnostic_collection_func('pmset Information', constants.MACOS_PLATFORM) def collect_pmset_log(_args): output = io.StringIO() log_path = TempDirectoryManager.create_temp_file(prefix=f'pmset', suffix='.log') with open(log_path, 'w', encoding='utf-8') as writer: writer.write("pmset is invoking all non-blocking -g arguments\n") subcmds = [ '', 'live', 'custom', 'cap', 'sched', 'ups', 'adapter', 'ps', ['ps', '-xml'], 'accps', 'rawbatt', 'therm', 'assertions', 'sysload', 'useractivity', 'log', 'history', 'historydetailed', 'hidnull', 'userclients', 'uuid', 'rtc', 'getters', 'powerstate', 'stats', 'systemstate', ] for subcmd in subcmds: output = io.StringIO() if run_sh_command(sh.pmset, '-g', subcmd, _out=output) != None: with open(log_path, 'a', encoding='utf-8') as writer: writer.write(f"\nINVOKE: pmset -g {subcmd}\n") writer.write(output.getvalue()) writer.write("\n") return {'pmset_everything.txt': log_path} @register_diagnostic_collection_func('misc logs', constants.MACOS_PLATFORM) def collect_misc_logs(_args): dict = {} dict.update(collect_file_to_file(TempDirectoryManager.get_temp_dir(), "", '/private/var/log/jamf.log', 'jamf.log')) dict.update(collect_file_to_file(TempDirectoryManager.get_temp_dir(), "", '/private/var/log/appfirewall.log', 'appfirewall.log')) dict.update(collect_file_to_file(TempDirectoryManager.get_temp_dir(), "", '/private/var/log/kernel-shutdown.log', 'kernel-shutdown.log')) dict.update(collect_file_to_file(TempDirectoryManager.get_temp_dir(), "", '/private/var/log/com.apple.xpc.launchd/launchd.log', 'xpc_launchd.log')) dict.update(collect_file_to_file(TempDirectoryManager.get_temp_dir(), "", '/etc/resolv.conf', 'resolv.conf')) # check if Library/CS/falcon_logs exist if os.path.exists('/Library/CS/falcon_logs/'): falcon_logs_zip = TempDirectoryManager.create_temp_file(prefix=f'falcon_logs', suffix='.zip') create_zip(falcon_logs_zip, path='/Library/CS/falcon_logs', zipdir='falcon_logs') dict.update({'falcon_logs.zip' : falcon_logs_zip}) if os.path.exists('/Library/Logs/Sophos Anti-Virus.log'): dict.update(collect_file_to_file(TempDirectoryManager.get_temp_dir(), "", '/Library/Logs/Sophos Anti-Virus.log', 'sophose_av.log')) if os.path.exists('/Library/Logs/McAfeeSecurity.log'): dict.update(collect_file_to_file(TempDirectoryManager.get_temp_dir(), "", '/Library/Logs/Sophos Anti-Virus.log', 'sophose_av.log')) if os.path.exists('/Library/Bitdefender/AVP/Logs/'): bd_logs_zip = TempDirectoryManager.create_temp_file(prefix=f'bitdefender_logs', suffix='.zip') create_zip(bd_logs_zip, path='/Library/Bitdefender/AVP/Logs/', zipdir='bitdefender_logs') dict.update({'bitdefender_logs.zip' : bd_logs_zip}) return dict @register_diagnostic_collection_func('lsregister Information', constants.MACOS_PLATFORM) def collect_lsregister_logs(_args): dict = {} log_path = TempDirectoryManager.create_temp_file(prefix='lsregister', suffix='.log') dict.update({'lsregister.txt' : log_path}) try: lsregister_path = '/System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/LaunchServices.framework/Versions/A/Support/lsregister' output = io.StringIO() run_sh_command(sh.Command(lsregister_path), '-dump', _out=output) with open(log_path, 'a', encoding='utf-8') as writer: writer.write(output.getvalue()) except Exception as e: with open(log_path, 'a', encoding='utf-8') as writer: writer.write(f"Exception: {e}\n") writer.write("\n") return dict @register_diagnostic_collection_func('Locale Information') def collect_locale_information(_args): output = io.StringIO() if os_info.platform == constants.LINUX_PLATFORM: output.write('localectl status:\n') run_sh_command(sh.localectl, "status",_out=output) output.write('\nlocale:\n') run_sh_command(sh.locale, _out=output) output.write('\nlocale -c charmap:\n') run_sh_command(sh.locale, "-c", "charmap", _out=output) output.write('\nlocale -a:\n') run_sh_command(sh.locale, "-a", _out=output) output.write('\nlocale -m:\n') run_sh_command(sh.locale, "-m", _out=output) # Get the appropriate locale file path locale_file = get_locale_file_path() if locale_file is None: output.write("No locale file found in standard locations\n") else: try: output.write(f'\nContents of {locale_file}:\n') with open(locale_file, 'r', encoding='utf-8') as file: output.write(file.read()) except Exception as e: log.error(f"An error occurred while accessing {locale_file}: {e}") locale_info_path = TempDirectoryManager.create_temp_file(prefix='locale_info', suffix='.txt') with open(locale_info_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'locale_info.txt': locale_info_path} @register_diagnostic_collection_func('/tmp files owned by group:mdatp', constants.LINUX_PLATFORM, required_package='mdatp') def collect_tmp_file_owned_by_mdatp(_args): output = io.StringIO() #sudo find /tmp -group mdatp | xargs du -sh | sort -rh sh.sort(sh.xargs(sh.find('/tmp', '-group', 'mdatp'), 'du', '-sh'), '-rh', _out=output) tmp_files_owned_by_mdatp = TempDirectoryManager.create_temp_file(prefix='tmp_files_owned_by_mdatp', suffix='.txt') with open(tmp_files_owned_by_mdatp, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'tmp_files_owned_by_mdatp.txt': tmp_files_owned_by_mdatp} @register_diagnostic_collection_func('MDATP configurations', required_package='mdatp') def collect_merged_config(_args): output = get_mdatp_config_allchannel() if not output: log.warning("Failed to collect MDE MERGED config") return mdatp_config_path = TempDirectoryManager.create_temp_file(prefix='merged_config', suffix='.txt') for each in output: with open(mdatp_config_path, 'a', encoding='utf-8') as writer: writer.write(f"{each['description']}\n") writer.write(f"{each['title']}\n") writer.write(f"{each['filepath']}\n") if "value" in each: writer.write('\n') writer.write(json.dumps(each['value'], indent=4)) elif "fileerror" in each: writer.write(f"Error: {each['fileerror']}") writer.write('\n\n\n') return {'mdatp_config.txt': mdatp_config_path} @register_diagnostic_collection_func('Enginedb files', required_package='mdatp') def collect_enginedb_file(_args): db_files = dict() def _check_n_copy(f): abs_path = os.path.join(constants.ENGINEDB_DIR, f) if os.path.exists(abs_path): tmp_file = TempDirectoryManager.create_temp_file(prefix=f'{f}', suffix=f".{f.split('.')[-1]}") run_sh_command(sh.cp, abs_path, tmp_file) db_files[f] = tmp_file _check_n_copy('mpenginedb.db') _check_n_copy('mpenginedb.db-wal') _check_n_copy('mpenginedb.db-shm') if not db_files: log.warning('No enginedb files exist') return db_files @register_diagnostic_collection_func('Linux iptables rules', constants.LINUX_PLATFORM) def collect_iptables_rules(_args): output = io.StringIO() output.write('iptables -L -nv:\n') run_sh_command(sh.iptables, "-L", "-n", "-v", _out=output) output.write('iptables -L -nv -t nat:\n') run_sh_command(sh.iptables, "-L", "-n", "-v", "-t", "nat", _out=output) output.write('iptables -L -nv -t mangle:\n') run_sh_command(sh.iptables, "-L", "-n", "-v", "-t", "mangle", _out=output) output.write('iptables -L -nv -t raw:\n') run_sh_command(sh.iptables, "-L", "-n", "-v", "-t", "raw", _out=output) iptables_path = TempDirectoryManager.create_temp_file(prefix='iptables_rules', suffix='.txt') with open(iptables_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'iptables_rules.txt': iptables_path} @register_diagnostic_collection_func('Network information', constants.LINUX_PLATFORM) def collect_network_info_linux(_args): output = io.StringIO() network_info_path = TempDirectoryManager.create_temp_file(prefix='network_info', suffix='.txt') output.write('ip link show:\n') run_sh_command(sh.ip, "link", "show", _out=output) output.write('ip address show:\n') run_sh_command(sh.ip, "address", "show", _out=output) output.write('ip route show:\n') run_sh_command(sh.ip, "route", "show", _out=output) output.write('ip rule show:\n') run_sh_command(sh.ip, "rule", "show", _out=output) output.write('nft list ruleset:\n') run_sh_command(sh.nft, "list", "ruleset", _out=output) output.write('cat /etc/iproute2/rt_tables:\n') run_sh_command(sh.cat, "/etc/iproute2/rt_tables", _out=output) with open(network_info_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'network_info.txt': network_info_path} @register_diagnostic_collection_func('Network information', constants.MACOS_PLATFORM, required_package='mdatp') def collect_network_info_mac(_args): output = io.StringIO() network_info_path = TempDirectoryManager.create_temp_file(prefix='network_info', suffix='.txt') output.write('scutil --dns\n') run_sh_command(sh.scutil, "--dns", _out=output) output.write('cat /etc/hosts:\n') run_sh_command(sh.cat, "/etc/hosts", _out=output) connectivity_test = mdatp.run_connectivity_test() if not connectivity_test: log.warning('Failed to collect MDE connectivity test') return output.write('\nrun mdatp connectivity test:\n') output.write(connectivity_test) output.write('\n\nPF flushed rules\n') output.write('\n\npfctl -vvs rules\n') run_sh_command(sh.pfctl, "-vvs", "rules", _out=output) output.write('\n\npfctl -vvs rules -a com.microsoft/MDEP\n') run_sh_command(sh.pfctl, "-vvs", "rules", "-a", "com.microsoft/MDEP" , _out=output) output.write('\n\ncat /etc/pf.conf:\n') run_sh_command(sh.cat, "/etc/pf.conf", _out=output) with open(network_info_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'network_info.txt': network_info_path} @register_diagnostic_collection_func('System spindump', constants.MACOS_PLATFORM) def collect_spindump_mac(_args): output = io.StringIO() system_spindump_path = TempDirectoryManager.create_temp_file(prefix='system_spindump', suffix='.txt') system_spindump_log_path = TempDirectoryManager.create_temp_file(prefix='system_spindump_log', suffix='.txt') output.write('spindump execution log:\n') output.write('For the actual results, open the system_spindump.txt file\n') run_sh_command(sh.spindump, "-o", f"{system_spindump_path}", _out=output, _timeout=300, _iter=True) with open(system_spindump_log_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) results = {} if os.path.exists(system_spindump_path): results['system_spindump.txt'] = system_spindump_path if os.path.exists(system_spindump_log_path): results['system_spindump_log.txt'] = system_spindump_log_path return results @register_diagnostic_collection_func('Sysctl information', constants.LINUX_PLATFORM) def collect_sysctl_info(_args): output = io.StringIO() sysctl_info_path = TempDirectoryManager.create_temp_file(prefix='sysctl_info', suffix='.txt') output.write('sysctl -a:\n') run_sh_command(sh.sysctl, "-a", _out=output) with open(sysctl_info_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'sysctl_info.txt': sysctl_info_path} @register_diagnostic_collection_func('Hostname diagnostics information', constants.LINUX_PLATFORM) def collect_hostname_diagnostics_information(_args): output = io.StringIO() output.write("hostname:\n") run_sh_command(sh.hostname, _out=output) output.write("hostname -A:\n") run_sh_command(sh.hostname, "-A", _out=output) output.write("dnsdomainname:\n") run_sh_command(sh.dnsdomainname, _out=output) output.write("dnshostname:\n") run_sh_command(sh.dnshostname, _out=output) output.write("domainname:\n") run_sh_command(sh.domainname, "-A", _out=output) output.write("cat /etc/hostname:\n") run_sh_command(sh.cat, "/etc/hostname", _out=output) output.write("cat /etc/resolv.conf:\n") run_sh_command(sh.cat, "/etc/resolv.conf", _out=output) output.write("cat /etc/hosts:\n") run_sh_command(sh.cat, "/etc/hosts", _out=output) output.write("cat /etc/nsswitch.conf:\n") run_sh_command(sh.cat, "/etc/nsswitch.conf", _out=output) output.write("getent:\n") run_sh_command(sh.getent, "ahosts", run_sh_command(sh.hostname).strip(), _out=output) output.write("Collecting getaddrinfo information:\n") for info in socket.getaddrinfo(host=socket.gethostname(), port=None, family=socket.AF_INET, flags=socket.AI_CANONNAME): output.write("Entry: " + str(info) + "\n") hostname_diag_path = TempDirectoryManager.create_temp_file(prefix='hostname_diag', suffix='.txt') with open(hostname_diag_path, 'w', encoding='utf-8') as writer: writer.write(output.getvalue()) return {'hostname_diagnostics.txt': hostname_diag_path} @register_diagnostic_collection_func('MDE Event statistics', required_package='mdatp') def collect_event_statistics(_args): event_statistics = mdatp.collect_event_statistics() if not event_statistics: log.warning('Failed to collect MDE event statistics') return mde_event_statistics = TempDirectoryManager.create_temp_file(prefix='mde_event_statistics', suffix='.txt') with open(mde_event_statistics, 'w', encoding='utf-8') as writer: writer.write(event_statistics) return {'mde_event_statistics.txt' : mde_event_statistics} @register_diagnostic_collection_func('MDE eBPF statistics(Linux platform)', constants.LINUX_PLATFORM, required_package='mdatp') def collect_ebpf_statistics(_args): ebpf_statistics = mdatp.collect_ebpf_statistics() if not ebpf_statistics: log.warning('Failed to collect MDE eBPF statistics') return mde_ebpf_statistics = TempDirectoryManager.create_temp_file(prefix='mde_ebpf_statistics', suffix='.txt') with open(mde_ebpf_statistics, 'w', encoding='utf-8') as writer: writer.write(ebpf_statistics) return {'mde_ebpf_statistics.txt' : mde_ebpf_statistics} @register_diagnostic_collection_func('Kernel logs', constants.LINUX_PLATFORM) def collect_kernel_logs(_args): def file_lt_100MB(absolute_file_path): size = round(os.path.getsize(absolute_file_path)/(1024*1024),2) #Bytes to MB if size > 100: log.warning(f'Not collecting file [{absolute_file_path}] because of larger size [{size}] MB') return False return True kernel_logs_zip = TempDirectoryManager.create_temp_file(prefix='kernel_logs', suffix='.zip') create_zip(kernel_logs_zip, path='/var/log', prefix_pattern='kern.log', zipdir='kernel_logs', predicate=file_lt_100MB) return {'kernel_logs.zip' : kernel_logs_zip} @register_diagnostic_collection_func('MDC logs', constants.LINUX_PLATFORM) def collect_MDC_logs(_args): all_mde_dirs = glob.glob(constants.MDC_CONFIG) if not all_mde_dirs: log.warning('MDE.Linux Extension folder doesn\'t exist. Going ahead as non-MDC') return latest_mde_dir = max(all_mde_dirs, key=os.path.getctime) latest_mde_config = os.path.join(latest_mde_dir, 'HandlerEnvironment.json') log_dir = json.load(open(latest_mde_config))[0]['handlerEnvironment']['logFolder'] zip_created = False mdc_log_zip = TempDirectoryManager.create_temp_file(prefix='mdc_logs', suffix='.zip') if os.path.exists(log_dir): zip_created = create_zip(mdc_log_zip, path=log_dir, zipdir='mdc_logs/mde_logs') if zip_created: status_dir = os.path.join(latest_mde_dir, 'status') if os.path.exists(status_dir): create_zip(mdc_log_zip, path=status_dir, zipdir='mdc_logs/mdc_status', mode='a') else: log.warning(f'MDC status dir [{status_dir}] doesnt exist.') state_file = os.path.join(latest_mde_dir, 'state.json') if os.path.exists(state_file): create_zip(mdc_log_zip, files=[state_file], zipdir='mdc_logs', mode='a') else: log.warning(f'MDC state file [{state_file}] doesnt exist.') config_dir = os.path.join(latest_mde_dir, 'config') if os.path.exists(config_dir): create_zip(mdc_log_zip, path=config_dir, zipdir='mdc_logs/mdc_config', mode='a') else: log.warning(f'MDC config dir [{config_dir}] doesnt exist.') return {'mdc_log.zip' : mdc_log_zip} @register_diagnostic_collection_func('Netext Config', constants.MACOS_PLATFORM, required_package='mdatp') def collect_netext_config(_args): netext_config = "" if os.path.exists(constants.NETEXT_CONFIG_FILE_PATH): netext_config = run_with_output(f"\"{constants.NETEXT_CONFIG_FILE_PATH}\"", timeout_in_sec=20) if not netext_config: log.warning("Failed to collect MDE netext_config") netext_config = "Failed to collect MDE netext_config" else: log.warning('netext_config tool not found') netext_config = "netext_config tool not found" netext_config_path = TempDirectoryManager.create_temp_file(prefix='mde_netext_config', suffix='.txt') with open(netext_config_path, 'w', encoding='utf-8') as writer: writer.write(netext_config) return {'netext_config.txt': netext_config_path} @register_diagnostic_collection_func('threat list', required_package='mdatp') def collect_threat_list(_args): threat_list = mdatp.threat_list() if threat_list is None: return {} threat_list_path = TempDirectoryManager.create_temp_file(prefix='threat_list', suffix='.txt') with open(threat_list_path, 'w', encoding='utf-8') as writer: writer.write(threat_list) return {'threat_list.txt': threat_list_path} class SystemMonitor(): def __init__(self): self.out_file = TempDirectoryManager.create_temp_file(prefix='top_output', suffix='.txt') self.summary_file = TempDirectoryManager.create_temp_file(prefix='top_summary', suffix='.txt') self.outlier_file = TempDirectoryManager.create_temp_file(prefix='top_outlier', suffix='.txt') self.cpu_key = '%CPU' self.user_key = 'USER' self.pid_key = 'PID' if os_info.platform == constants.MACOS_PLATFORM: self.mem_key = 'MEM' self.command = ['top', '-l', '0', '-s', '5', '-o', self.mem_key] self.mem_converter = lambda x: str(self._bytes_to_mb(x)) + 'M' else: self.mem_key = '%MEM' self.command = ["top", "-b", '-d', '5', '-w', '512', '-o', self.mem_key] self.mem_converter = lambda x: x self.process = None def __enter__(self): #TODO Use the same logic at all the places where system call is made. if constants.IS_COMPILED_AS_BINARY and os_info.platform == constants.LINUX_PLATFORM: env = dict(os.environ) lp_key = 'LD_LIBRARY_PATH' lp_orig = env.get(lp_key + '_ORIG') if lp_orig is not None: env[lp_key] = lp_orig else: env.pop(lp_key, None) self.process = subprocess.Popen(self.command, stdout=open(self.out_file, 'w'), env=env) else: self.process = subprocess.Popen(self.command, stdout=open(self.out_file, 'w')) return self def __exit__(self, exc_type, exc_value, exc_tb): if self.process: self.stop() def info(self): log.info(f'Top Command output: [{self.out_file}]') log.info(f'Top Command Summary: [{self.summary_file}]') log.info(f'Top Command Outliers: [{self.outlier_file}]') def monitor(self): subprocess.run(self.command) def _to_bytes(self, size): if isinstance(size, int) or isinstance(size, float): return float(size) if size.endswith('+') or size.endswith('-'): return self._to_bytes(size[:-1]) if size.endswith('B'): return float(size[:-1]) if size.endswith('K'): return 1024*float(size[:-1]) if size.endswith('M'): return self._to_bytes(str(1024*float(size[:-1]))+ 'K') if size.endswith('G'): return self._to_bytes(str(1024*float(size[:-1]))+ 'M') return float(size) def _bytes_to_mb(self, size): return round(size/(1024*1024), 2) def analyse_process_data(self, processed_data): # pre-process values processed_data = [{**d, self.mem_key: self._to_bytes(d[self.mem_key])} for d in processed_data] processed_data = [{**d, self.cpu_key: float(d[self.cpu_key])} for d in processed_data] sorted_data = sorted(processed_data, key=lambda x: (x['command'], x['PID'])) grouped_by_process = itertools.groupby(sorted_data, key=lambda x: (x['command'], x['PID'])) grouped_by_process_data = {} for key, group in grouped_by_process: group_list = [] for item in group: group_list.append(item) grouped_by_process_data[key] = group_list non_outliers, outliers = self.remove_outliers(grouped_by_process_data) with open(self.outlier_file, 'w', encoding='utf-8') as writer: for key in outliers: writer.write(f'{key}\n') for item in outliers[key]: writer.write(f'{item}\n') non_outliers_flat = [item for sublist in non_outliers.values() for item in sublist] non_outliers_flat.sort(key=lambda data: (data[self.mem_key], data[self.cpu_key]), reverse = True) top_10_mem = self.get_top_n_unique(processed_data, 10) non_outliers_flat.sort(key=lambda data: (data[self.cpu_key], data[self.mem_key]), reverse = True) top_10_cpu = self.get_top_n_unique(processed_data, 10) summary = "Top 10 CPU Consumer:\n\n" summary += '\n'.join(json.dumps(p) for p in top_10_cpu) summary += '\n\n' + "*"*20 + "\n" summary += "Top 10 MEM Consumer:\n\n" summary += '\n'.join(json.dumps(p) for p in top_10_mem) mdatp_owned = "" if command_exists('mdatp'): mdatp_processes = [] mdatp_processes = mdatp.get_mdatp_processes() mdatp_pids = [x[0] for x in mdatp_processes] for p in non_outliers_flat: if p[self.pid_key] in mdatp_pids: mdatp_owned += json.dumps(p) mdatp_owned += "\n" else: mdatp_owned = "MDATP is not installed" summary += '\n\n' + "*"*20 + "\n" summary += "MDATP owned process:\n\n" summary += mdatp_owned return summary def stop(self): try: self.process.kill() self.process.wait() raw_data = list(map(lambda x : re.sub(' +', ',', x.strip()), open(self.out_file).readlines())) processed_data = self.process_data(raw_data) summary = '' try: summary = self.analyse_process_data(processed_data) except ModuleNotFoundError as ex: log.warning(f"Exception => {ex}. Outliers will not be removed and only basic data will be provided") #On MacOS, MEM is in absolute terms like B, KB, MB, GB #Whereas on Linux its in percentage processed_data.sort(key=lambda data: (self._to_bytes(data[self.mem_key]), self._to_bytes(data[self.cpu_key])), reverse = True) top_10_mem = self.get_top_n_unique(processed_data, 10) processed_data.sort(key=lambda data: (self._to_bytes(data[self.cpu_key]), self._to_bytes(data[self.mem_key])), reverse = True) top_10_cpu = self.get_top_n_unique(processed_data, 10) summary += "Top 10 CPU Consumer(may have outliers):\n\n" + pprint.pformat(top_10_cpu) summary += '\n\n' + "*"*20 + "\n" + "Top 10 MEM Consumer(may have outliers):\n\n" + pprint.pformat(top_10_mem) except Exception as ex: log.warning(f"Exception => {ex}") open(self.summary_file, "w").write(summary) self.process = None return {'top_output.txt': self.out_file, 'top_summary.txt':self.summary_file, 'top_outliers.txt':self.outlier_file} except Exception as e: log.error(f"Couldn't run analysis on top output. Exception => {e}") return {'top_output.txt': self.out_file} def get_top_n_unique(self, data, n): top_n_unique = list() added = set() i = 0 while len(top_n_unique) < n and i high_whisker: outlier_row = row.copy() outlier_row['outlier_type'] = 'First quantile' if row[subkey] < low_whisker else 'Fourth quantile' outliers_array.append(outlier_row) else: non_outliers_array.append(row) if len(outliers_array) > 0: outliers[key] = outliers_array if len(non_outliers_array) > 0: non_outliers[key] = non_outliers_array return non_outliers, outliers def process_data(self, raw_data): header_tokens = [] process_data = [] cpu_index = 0 mem_index = 0 user_index = 0 pid_index = 0 command_index = (0,0) reading_data = False itr = iter(range(len(raw_data))) for i in itr: data = raw_data[i] if reading_data: if len(data) == 0: #[Linux] Checking new line to detect next top run reading_data = False continue tokens = data.split(',') if tokens[0] == 'Processes:': #[MacOS] No new line after the output, so checking the header token reading_data = False continue process_data.append({'command':' '.join(tokens[command_index[0]: command_index[1] + len(tokens) - len(header_tokens)]), #Command/process name may have spaces self.cpu_key: tokens[cpu_index + (0 if command_index[0] > cpu_index else len(tokens) - len(header_tokens))], self.mem_key: tokens[mem_index + (0 if command_index[0] > mem_index else len(tokens) - len(header_tokens))], self.user_key: tokens[user_index + (0 if command_index[0] > user_index else len(tokens) - len(header_tokens))], self.pid_key: tokens[pid_index + (0 if command_index[0] > pid_index else len(tokens) - len(header_tokens))].strip('*')}) else: if len(data) == 0: i = next(itr) data = raw_data[i] header_tokens = data.split(',') cpu_index = header_tokens.index(self.cpu_key) mem_index = header_tokens.index(self.mem_key) user_index = header_tokens.index(self.user_key) pid_index = header_tokens.index(self.pid_key) command_index = (header_tokens.index('COMMAND'), header_tokens.index('COMMAND') + 1) reading_data = True return process_data