#!/usr/bin/python3 import argparse import datetime import os import sys import shutil import typing import zipfile import tempfile import stat import sh from .report import xml_report_root, xml_functions from .diagnostic import diagnostic_functions, SystemMonitor from .connectivity_test import perform_test from .machine import os_details, machine from .exclude import exclude from .certinfocollection import certinfocollection from .rate_limiter import rate_limiter from .skip_faulty_rules import skip_faulty_rules from .pii_disclaimer import present_disclaimer from . import installation_report from . import perf from . import constants, logger from .mdatp import mdatp from .perf_trace import perf_trace from .utils import command_exists from .spike import observe_cpu_mem_spikes # Add module directory to import paths array from lxml import etree from enum import Enum from . import filesystem as fs from .mdatp import mdatp from . import SCRIPT_VERSION from .utils import log, run, run_with_output class PrintHelp(Enum): NONE = 0 MAIN = 1 CONNECTIVITYTEST = 2 global log def find_rootlogger_basefilename(): """Finds the root logger base filename """ log_file = None for h in log.__dict__['handlers']: if h.__class__.__name__ == 'FileHandler': log_file = h.baseFilename break elif h.__class__.__name__ == 'TimedRotatingFileHandler': log_file = h.baseFilename break return log_file def export_report_folder(files_to_copy: typing.Dict[str, typing.Union[str, typing.List[str]]], output_path: str): print(files_to_copy) # Create report folder os.makedirs(output_path, exist_ok=True, mode=stat.S_IRUSR) for path_in_dir, file_to_copy in files_to_copy.items(): dst_in_folder_path = os.path.join(output_path, path_in_dir) # Create necessary dirs if not exists os.makedirs(dst_in_folder_path, exist_ok=True) if isinstance(file_to_copy, list): for f in file_to_copy: shutil.copy2(f, dst_in_folder_path) else: shutil.copy2(file_to_copy, dst_in_folder_path) # Export log baseFilename = find_rootlogger_basefilename() shutil.copyfile(baseFilename, os.path.join(output_path, 'log.txt')) if not constants.IS_COMPILED_AS_BINARY: # Export XML etree.ElementTree(xml_report_root).write(os.path.join(output_path, 'mde.xml'), pretty_print=True) # Add events.xml file to directory shutil.copyfile(constants.XML_EVENTS_PATH, os.path.join(output_path, constants.XML_FILENAME)) # Export HTML xslt = etree.parse(constants.XSLT_REPORT_PATH) with open(os.path.join(output_path, 'report.html'), 'wb') as writer: writer.write(etree.tostring(etree.XSLT(xslt)(xml_report_root), pretty_print=True)) log.info(f'Folder created at: {output_path}') def process_file(file_to_copy, path_in_zip, zip_writer): if file_to_copy is None or not (os.path.isfile(file_to_copy) or os.path.isdir(file_to_copy)): return os.chmod(file_to_copy, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) zip_writer.write(file_to_copy, arcname=path_in_zip if os.path.basename(path_in_zip) else os.path.join(os.path.dirname(path_in_zip), os.path.basename(file_to_copy))) def export_report_archive(files_and_data_to_copy: typing.Dict[str, typing.Union[str, typing.List[str]]], output_path: str): print(f"[outfiles] => {files_and_data_to_copy}") with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zip_writer: for path_in_zip, file_to_copy in files_and_data_to_copy.items(): if isinstance(file_to_copy, list): for f in file_to_copy: process_file(f, path_in_zip, zip_writer) else: # Save the file as the given name (key of the dictionary) or if only directory exists save it as the original filename process_file(file_to_copy, path_in_zip, zip_writer) # Export log baseFilename = find_rootlogger_basefilename() os.chmod(baseFilename, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) zip_writer.write(baseFilename, arcname='log.txt') # Export XML zip_writer.writestr('mde.xml', etree.tostring(xml_report_root, pretty_print=True)) # Currently parsing XSLT crashing in our static binary if not constants.IS_COMPILED_AS_BINARY and not os.environ.get("E2E_TEST", False): # Add events.xml file to zip os.chmod(constants.XML_EVENTS_PATH, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) zip_writer.write(constants.XML_EVENTS_PATH, arcname=constants.XML_FILENAME) # Export HTML xslt = etree.parse(constants.XSLT_REPORT_PATH) zip_writer.writestr('report.html', etree.tostring(etree.XSLT(xslt)(xml_report_root), pretty_print=True)) os.chmod(output_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) log.info(f'Archive created at: {output_path}') # clean up for path_in_zip, file_to_copy in files_and_data_to_copy.items(): if not isinstance(file_to_copy, list): if file_to_copy is not None and '/tmp/mde_support_' in file_to_copy: dir_path = os.path.dirname(file_to_copy) if os.path.exists(dir_path): shutil.rmtree(dir_path) def collect_diagnostic(args): log.info('[MDE Diagnostic]') files_dict = {} for func_name, func in diagnostic_functions.items(): try: log.info(f' Collecting {func_name}') func_result = func(args) # Update files_dict only if function returned a non-empty valid dict if isinstance(func_result, dict) and func_result: log.info(f' Adding {", ".join(func_result.keys())} to report directory') files_dict.update(func_result) except Exception as e: log.error(f" Diagnostics collection encountered an issue at function {func_name} - {str(e)}") return files_dict def generate_report_xml(): log.info('[Report Generator]') for func_name, func in xml_functions.items(): try: log.debug(f' Generating {func_name}') func() except Exception as e: log.error(f" Report generator encountered an issue at function {func_name} - {str(e)}") def log_tool_info(): log.info(f'XMDEClientAnalyzer Version: {SCRIPT_VERSION}') def period(duration): if duration.lower().endswith('d'): return datetime.timedelta(days=int(duration[:-1])) elif duration.lower().endswith('h'): return datetime.timedelta(hours=int(duration[:-1])) elif duration.lower().endswith('m'): return datetime.timedelta(minutes=int(duration[:-1])) raise argparse.ArgumentTypeError('Wrong time duration. Allowed xd(for days), xh(for hours), xm(for minutes)') def delay_range_limited_int_type(arg): try: value = int(arg) except ValueError: raise argparse.ArgumentTypeError(f"Must be a base-10 integer: {arg}") if value < 0 or value > 2880: raise argparse.ArgumentTypeError(f"The delay argument must be between 1 and 2880 inclusive: {arg}") return value def get_parser(): parser = argparse.ArgumentParser(description='MDE Diagnostics Tool') parser.add_argument('--output', '-o', type=str, help='Output path to export report') parser.add_argument('--outdir', type=str, help='Directory where diagnostics file will be generated.') parser.add_argument('--no-zip', '-nz', action='store_true', help='If set a directory will be created instead of an archive file.') parser.add_argument('--force', '-f', action='store_true', help='Will overwrite if output directory exists.') parser.add_argument('--diagnostic', '-d', action='store_true', help='Collect extensive machine diagnostic information.') parser.add_argument('--skip-mdatp', action='store_true', help='Skip any mdatp command. Use this when the mdatp command is unresponsive.') parser.add_argument('--bypass-disclaimer', action='store_true', help='Do not display disclaimer banner.') parser.add_argument('--interactive', '-i', action='store_true', help='Interactive diagnostic,') parser.add_argument('--delay', '-dd', type=delay_range_limited_int_type, help='Delay diagnostic by how many minutes (0~2880), use this to wait for more debug logs before it collects.') parser.add_argument('--mdatp-log', choices={'error', 'warning', 'info', 'debug', 'verbose', 'trace'}, help='Set MDATP log level. If you use interactive or delay mode, the log level will set to debug automatically, and reset after 48h.') parser.add_argument('--max-log-size', type=int, help='Maximum log file size in MB before rotating(Will restart mdatp).') parser.set_defaults(performance=False) parser.set_defaults(installation=False) parser.set_defaults(exclude=False) parser.set_defaults(trace=False) parser.set_defaults(ratelimit=False) parser.set_defaults(skipfaultyrules=False) parser.set_defaults(connectivitytest=False) parser.set_defaults(certinfocollection=False) parser.set_defaults(observespikes=False) subparsers = parser.add_subparsers() certinfocollection_subparser = subparsers.add_parser("certinfocollection", help='Collect cert information: Subject name and Hashes') certinfocollection_subparser.add_argument(dest='target_file', help='target directory to store installed certs information') perf_subparser = subparsers.add_parser("performance", help='Collect extensive machine performance tracing for analysis of a performance scenario that can be reproduced on demand') perf_subparser.add_argument('--frequency', type=int, default=3000, help="profile at this frequency") perf_subparser.add_argument('--length', type=int, default=10, help="length of time to collect (in seconds)") installation_subparser = subparsers.add_parser("installation", help='Collect different installation/onboarding reports') installation_subparser.add_argument('-d', '--distro', help="Check for distro support", action="store_true") installation_subparser.add_argument('-m', '--min-requirement', help="Check for the system info against offical minimum requirements", action="store_true") installation_subparser.add_argument('-e', '--external-dep', help="Check for externel package dependency", action="store_true") installation_subparser.add_argument('-c', '--connectivity', help="Check for connectivity for services used by MDE", action="store_true") installation_subparser.add_argument('-a', '--all', help="Run all checks", action="store_true") installation_subparser.add_argument("-o", '--onboarding-script', type=str, help="Path to onboarding script") installation_subparser.add_argument("-g", '--geo', type=str, help="Geo string to test ") exclude_subparser = subparsers.add_parser("exclude", help="Exclude specific process(es) from audit-d monitoring.") exclude_subparser.add_argument("-a", "--arch", help="cpu architecture, used in arch specific syscalls. available: 32, 64. default: 64", type=str, metavar='<32/64>') exclude_subparser.add_argument("-e", "--exe", help="exclude by executable name, i.e: bash", action='append', metavar='') exclude_subparser.add_argument("-p", "--pid", help="exclude by process id, i.e: 911", type=int, action='append', metavar='') exclude_subparser.add_argument("-d", "--dir", help="exclude by target path, i.e: /var/foo/bar", action='append', metavar='') exclude_subparser.add_argument("-x", "--exe_dir", help="exclude by executable path and target path, i.e: /bin/bash /var/foo/bar", action='append', nargs=2, metavar=('','')) exclude_subparser.add_argument("-q", "--queue", help="set dispatcher q_depth size", type=int, metavar='') exclude_subparser.add_argument("-r", "--remove", help="remove exclusion file", action='store_true') exclude_subparser.add_argument("-s", "--stat", help="get statistics about common executables", action='store_true') exclude_subparser.add_argument("-l", "--list", help="list auditd rules", action='store_true') exclude_subparser.add_argument("-o", "--override", help="Override the existing auditd exclusion rules file for mdatp", action='store_true') exclude_subparser.add_argument("-c", "--syscall", help="exclude all process of the given syscall", action='append', metavar='') rate_limit_subparser = subparsers.add_parser("ratelimit", help="Set the rate limit for auditd events. Rate limit will update the limits for auditd events for all the applications using auditd, which could impact applications other than MDE.") rate_limit_subparser.add_argument("-e", "--enable", help="enable/disable the rate limit with default values", type=str, metavar='') rate_limit_subparser.add_argument("-r", "--rate", help=argparse.SUPPRESS, type=int, metavar='') skip_faulty_rules_subparser = subparsers.add_parser("skipfaultyrules", help="Continue loading rules in spite of an error. This summarizes the results of loading the rules. The exit code will not be success if any rule fails to load.") skip_faulty_rules_subparser.add_argument("-e", "--enable", help="enable/disable loading of rules in spite of an error.", default='true', choices={'true', 'false'}) tracing_subparser = subparsers.add_parser('trace',help='Use OS tracing facilities to record Defender performance traces.', epilog='On Linux, lttng needs to be installed') tracing_subparser.add_argument('--length', default=500, help='Length of time to record the trace (in seconds).', type=int) tracing_subparser.add_argument('--mask', default=18446744073709551615, help='Mask to select with event to trace. Defaults to all') spikes_subparser = subparsers.add_parser("observespikes", help='Collect the process logs in case of spike or mdatp crash') spikes_subparser.add_argument('--upload', action='store_true', help='Upload to azure storage account') spikes_subparser.add_argument('--account-name', help='Azure storage account name') spikes_subparser.add_argument('--account-key', help='Azure storage account key') spikes_subparser.add_argument('--container-name', help='Azure storage container name') spikes_subparser.add_argument('--duration', type=period, help='Monitor for duration(ex: 1d, 6h, 2m') spikes_subparser.add_argument('--sampling-rate', type=int, default=5, help='Monitoring frequncy rate in seconds') spikes_subparser.add_argument('--mem', type=int, default=250000, help='Memory threshold in KB') spikes_subparser.add_argument('--cpu', type=int, default=50, help='CPU threshold in percentage') connectivitytest_subparser = subparsers.add_parser("connectivitytest", help="Perform connectivity test for MDE") connectivitytest_subparser.add_argument("-o", '--onboarding-script', type=str, help="Path to onboarding script") connectivitytest_subparser.add_argument("-g", '--geo', type=str, help="Geo string to test ") perf_subparser.set_defaults(performance=True) installation_subparser.set_defaults(installation=True) exclude_subparser.set_defaults(exclude=True) certinfocollection_subparser.set_defaults(certinfocollection=True) rate_limit_subparser.set_defaults(ratelimit=True) tracing_subparser.set_defaults(trace=True) skip_faulty_rules_subparser.set_defaults(skipfaultyrules=True) spikes_subparser.set_defaults(observespikes=True) connectivitytest_subparser.set_defaults(connectivitytest=True) return { 'parser': parser, 'connectivitytest_subparser': connectivitytest_subparser } def mandatory_args(args): if not (args.diagnostic or args.performance or args.installation or args.exclude or args.trace or args.ratelimit or args.skipfaultyrules or args.connectivitytest or args.observespikes or args.certinfocollection): return PrintHelp.MAIN if args.connectivitytest and not (args.geo or args.onboarding_script): return PrintHelp.CONNECTIVITYTEST PrintHelp.NONE #exclude should not run with perf or diagnostics def mutually_excluded(args): return not (args.exclude and (args.performance or args.diagnostic)) #rate limit should not run with perf or diagnostics def mutually_rate_limit(args): return not (args.ratelimit and (args.performance or args.diagnostic)) #skip faulty rules should not run with perf or diagnostics def mutually_skip_faulty_rules(args): return not (args.skipfaultyrules and (args.performance or args.diagnostic)) #installation report should not run with diagnostics def mutually_installation_report(args): return not (args.installation and (args.performance or args.diagnostic)) def main(): parsers = get_parser() parser = parsers['parser'] args = parser.parse_args() help = mandatory_args(args) if help == PrintHelp.MAIN: parser.print_help() parser.exit() elif help == PrintHelp.CONNECTIVITYTEST: parsers['connectivitytest_subparser'].print_help() parser.exit() if args.outdir: os.environ['TMPDIR'] = args.outdir global log log = logger.set_logger(tempfile.mkstemp(prefix=f'mde_tool_log_{datetime.datetime.now(datetime.timezone.utc).strftime("%Y_%m_%d_%H_%M_%S")}', suffix='.log')[1], constants.LOGGER_NAME) log_tool_info() if args.skip_mdatp or 'SKIP_MDATP' in os.environ: os.environ['SKIP_MDATP'] = "True" log.warning("Skip mdatp command, as SKIP_MDATP is set.") if not mutually_excluded(args): parser.error('Performance/Diagnostics and Exclude should be used individually') if not mutually_rate_limit(args): parser.error('Performance/Diagnostics and Rate limit should be used individually') if not mutually_skip_faulty_rules(args): parser.error('Performance/Diagnostics and Skip Faulty Rules should be used individually') if not mutually_installation_report(args): parser.error('Performance/Diagnostics and Installation should be used individually') if not args.output: args.output = os.path.join(tempfile.gettempdir(), f'{datetime.datetime.now().strftime("%d_%m_%Y_%H_%M_%S")}_output') output_path = args.output if args.no_zip else args.output + '.zip' if args.trace: if os_details().platform == constants.LINUX_PLATFORM: log.info("Linux performance tracing is not supported.") sys.exit(0) trace_path = perf_trace(args) if args.no_zip: export_report_folder(trace_path, output_path) else: export_report_archive(trace_path, output_path) return if args.connectivitytest: perform_test(args.geo, args.onboarding_script) sys.exit(0) # Verify privileges: if os.geteuid() != 0 and not args.trace: parser.error('Please run this tool using `sudo`') if args.exclude: if os_details().platform != constants.LINUX_PLATFORM: parser.error('Exclude is currently supported only on Linux') if args.certinfocollection: if os_details().platform != constants.LINUX_PLATFORM: parser.error('certinfocollection is currently supported only on Linux') if args.ratelimit: if os_details().platform != constants.LINUX_PLATFORM: parser.error('Rate limit is currently supported only on Linux') if args.skipfaultyrules: if os_details().platform != constants.LINUX_PLATFORM: parser.error('Skipping faulty rules is currently supported only on Linux') if args.installation: if os_details().platform != constants.LINUX_PLATFORM: parser.error('Installation report is currently supported only on Linux') if command_exists('mdatp'): mdatp.lazy_fix_log_folder_issue() if args.observespikes: if args.upload and args.account_key is None and args.account_name is None and args.container_name is None: parser.error("Required --account-name, --account-key, --container-name with --upload") if os_details().platform != constants.LINUX_PLATFORM: parser.error('Observing memory or cpu spikes is currently supported only on Linux') try: observe_cpu_mem_spikes(args) except Exception as e: log.error(f"Exception while observing cpu or memory spikes {e}") return if not args.bypass_disclaimer: if not present_disclaimer(): return log.info("Setup log level of MDE") with mdatp.LogManager(args.interactive, args.delay, args.mdatp_log, args.max_log_size), SystemMonitor() as system_monitor: system_monitor.info() files_dict = dict() if args.diagnostic or args.performance: if os.path.exists(output_path) and not args.force: parser.error('Chosen path already exists, please select non-existing path to export') if args.performance: #TODO: add it to checked prerequisites? if os_details().platform == constants.LINUX_PLATFORM: if not command_exists('perf'): parser.error('perf is not installed') files_dict["perf_benchmark.tar.gz"] = perf.capture_on_linux(secs=args.length, frequency=args.frequency) else: files_dict["perf_benchmark.tar.gz"] = perf.capture_on_macos(secs=args.length, frequency=args.frequency) diag_files_dict = collect_diagnostic(args) files_dict.update(diag_files_dict) if os_details().platform == constants.LINUX_PLATFORM: files_dict['installation_report.json'] = installation_report.capture_installation_report(args) files_dict.update(system_monitor.stop()) # Generate Report XML generate_report_xml() # Export report if args.no_zip: export_report_folder(files_dict, output_path) else: export_report_archive(files_dict, output_path) elif args.installation: if os.path.exists(output_path) and not args.force: parser.error('Chosen path already exists, please select non-existing path to export') log.info('[Installation Report]') files_dict['installation_report.json'] = installation_report.capture_installation_report(args) if args.no_zip: export_report_folder(files_dict, output_path) else: export_report_archive(files_dict, output_path) elif args.ratelimit: rate_limiter(args) elif args.skipfaultyrules: skip_faulty_rules(args) elif args.certinfocollection: certinfocollection(args) else: exclude(args)