import urllib from .utils import DEBUG_MODE, ONBOARDING_PACKAGE, ONBOARDING_SCRIPTS,COLLECTION_DIR, error from .utils import run, run_with_output ,wait, OFFBOARDING_PACKAGE, DIY_URL_MACOS, DIY_URL_LINUX, DIY_PACKAGE_MACOS, DIY_PACKAGE_LINUX, LINUX_DIY_SCRIPT from mde_tools import constants import logging import platform import distro as distro_info import os, time, re, tempfile import json import socket import threading import select log = logging.getLogger(constants.LOGGER_NAME) class os_details: platform = None package_manager = None version = None distro = None def __init__(self): self.platform, self.distro, self.package_manager, self.version, self.build = self._identify_platform() log.debug(f"platform: [{self.platform}]") log.debug(f"distribution: [{self.distro}] version: [{self.version}]") log.debug(f"package manager: [{self.package_manager}]") def _identify_platform(self): if platform.system() == 'Darwin': sw_vers = run_with_output("sw_vers") version = self._find_regex(sw_vers, r'ProductVersion:\s+(\S+)') build = self._find_regex(sw_vers, r'BuildVersion:\s+(\S+)') return (constants.MACOS_PLATFORM, "Darwin", "brew", version, build) if platform.system() == "Linux": package_mapping = { "debian": "apt", # ubuntu [deb] "fedora": "yum", # rhel, centos, oracle [rpm] "sles": "zypper", # sles [rpm] "centos": "yum" } version = distro_info.version(best=True) for like, package_manager in package_mapping.items(): if like in [distro_info.like(), distro_info.id()]: return (constants.LINUX_PLATFORM, distro_info.name(), package_manager, version, "") return (constants.LINUX_PLATFORM, distro_info.name(), None, version, "") raise NotImplementedError(f"unknown platform: {platform.system()}") def _find_regex(self, text, expression): matches = re.findall(expression, text) if matches is None or len(matches) == 0: return None return ''.join(matches[0]) def is_big_sur_and_up(self) -> bool: if self.platform is constants.LINUX_PLATFORM: return False if not self.version: return False version_parts = self.version.split(".") major_ver = version_parts[0] if len(version_parts)>0 else 0 return int(major_ver) >= 11 class package: name = None version = None installed = False source = None def __init__(self, name, version = None, is_installed = False, source = None): self.name = name self.version = version self.installed = is_installed self.source = source def __repr__(self): return f'Package name: {self.name}, version: {self.version}, installed? {self.installed}, source: {self.source}' class machine: download_folder = None username = None hostname = None platform = None distro = None os_version = None package_manager = None start_time = time.time() temp_folder = None @staticmethod def get_start_time(): return machine.start_time @staticmethod def get_platform(): if machine.platform is None: details = os_details() machine.platform = details.platform machine.package_manager = details.package_manager machine.os_version = details.version machine.distro = details.distro return machine.platform @staticmethod def is_installed(appname): return run_with_output(f"which {appname}") is not None def is_connected(self, url='http://google.com'): try: urllib.request.urlopen(url) # Python 3.x return True except: return False @staticmethod def create_multiple_process_events(no_of_events): for event_number in range(no_of_events): run(f"ls ./test_{event_number+1} &> /dev/null", False) time.sleep(0.01) @staticmethod def create_multiple_file_events(no_of_events): possible_paths = ['/bin/ls', 'usr/bin/ls'] possible_paths = [path for path in possible_paths if os.path.exists(path)] if len(possible_paths) == 0: raise Exception("could not find executable binary") exec_path = possible_paths[0] temp_dir = machine.get_temp_folder() for event_number in range(no_of_events): dest_path = os.path.join(temp_dir, f"test_{event_number+1}.ls") run(f"cp {exec_path} {dest_path}", False) time.sleep(0.01) run(f"rm {temp_dir}/test_*") @staticmethod def create_multiple_network_events(no_of_events, timeout = 60): start_time = time.time() # create multiple network event with IPV6 connections def connect_event(): try: ipv6_ip = '2a02:26f0:12f:2a3::356e 80' machine.create_ipv6_conn(ipv6_ip) return 1 except Exception as ex: log.warning(f"failed to complete events: {ex}") return 0 connection_events = 0 while( connection_events < no_of_events and (time.time() - start_time) < timeout ): connection_events = connection_events + connect_event() elapsed_time = time.time() - start_time log.debug(f"{connection_events} network events created [{elapsed_time:.2f}s]") return connection_events == no_of_events @staticmethod def create_multiple_network_events2(end): # create multiple network event with IPV4 connections def connect_non_blocking(ip, port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setblocking(0) try: s.connect((ip, port)) except socket.error as e: if e.errno == socket.errno.EINPROGRESS: pass else: return False return s def is_connection_complete(sock): _readable, writable, _exceptional = select.select([], [sock], [], 0) return sock in writable def check_connections(ip_list, port): results = {} def check_single_connection(ip): sock = connect_non_blocking(ip, port) if sock and is_connection_complete(sock): results[ip] = True sock.close() else: results[ip] = False threads = [] for ip in ip_list: thread = threading.Thread(target=check_single_connection, args=(ip,)) thread.start() threads.append(thread) for thread in threads: thread.join() return results # Example usage ip_list = [f"192.168.1.{i}" for i in range(1, end + 1)] # ip_list = ["192.168.1.1", "192.168.1.2", "192.168.1.3"] port_number = 8080 results = check_connections(ip_list, port_number) for ip, success in results.items(): if success: print(f"Connection to {ip} successful.") else: print(f"Connection to {ip} failed.") @staticmethod def trigger_mac_login_event(): return run('sudo scutil <<< "notify State:/Users/ConsoleUser"') @staticmethod def get_sense_guid(logs): for logs_dir in [logs, os.path.join(logs, 'rotated')]: sense_guid_line = machine._get_last_line_from_logs(logs_dir, "senseGuid:") if sense_guid_line is not None: return machine._parse_from_json(sense_guid_line, "senseGuid:") return None @staticmethod def find_messages_in_log(logs_dir, key): messages_array = [] lines = machine._get_last_line_from_logs(logs_dir, key, get_all_lines=True) if lines is None: return None for line in lines: time_since_epoch = machine._parse_time_from_log_message(line) if time_since_epoch is None: continue if time_since_epoch > machine.start_time: messages_array.append(line) if len(messages_array) > 0: return messages_array return None @staticmethod def _parse_time_from_log_message(line): time_string_list = re.findall(r'\[.+?\]', line) if time_string_list is None or len(time_string_list) == 0: log.info("failed to parse datetime from log mesaage") return None # Expected format: [2020-04-28 05:14:25.659656 UTC] time_obj = time.strptime(time_string_list[2][1:-1].split('.')[0], "%Y-%m-%d %H:%M:%S") # Return time since epoch in UTC return time.mktime(time_obj) - time.altzone @staticmethod def last_log_message_time(logs_dir, key): line = machine._get_last_line_from_logs(logs_dir, key) if line is None: return None time_since_epoch = machine._parse_time_from_log_message(line) return time_since_epoch @staticmethod def verify_msg_in_logs(logs_dir, message, ref_time=None, timeout_sec = 5): if ref_time is None: ref_time = machine.get_start_time() log.info(f"Looking for [{message}] in logs") start_time = time.time() while time.time() - start_time < timeout_sec: last_log_msg_time = machine.last_log_message_time(logs_dir, message) if last_log_msg_time is None: wait(5, "retry: waiting for msg to appear in log") continue delta = last_log_msg_time - ref_time log.debug(f"last message time: {last_log_msg_time}. delta = {delta}") if delta < 0: wait(5, "retry: waiting for updated msg to appear in log") continue return True return False @staticmethod def get_hostname(): if machine.hostname is None: platform = machine.get_platform() if platform == 'macOS': machine.hostname = run_with_output("scutil --get LocalHostName") if platform == 'Linux': machine.hostname = run_with_output("/bin/hostname") log.info(f"Hostname: [{machine.hostname}]") return machine.hostname @staticmethod def get_username(): if machine.username is None: machine.username = str(run_with_output("whoami")) return machine.username @staticmethod def create_executable(output_filename): input_file = run_with_output("which curl") return run(f"cp {input_file} {output_filename}", validate_exit_code=True) @staticmethod def create_ipv6_conn(ipv6_ip): return run_with_output("nc 2a00:a040:0:6::d437:ba4a 443", timeout_in_sec=120) == '' @staticmethod def create_ipv4_conn(ipv4_ip): return run_with_output(f"curl {ipv4_ip}", timeout_in_sec=120) @staticmethod def download_diy_package(timeout_sec=30): # TODO: move out of browser class platform = machine.get_platform() if platform == "Linux": diy_url = DIY_URL_LINUX diy_package = DIY_PACKAGE_LINUX else: diy_url = DIY_URL_MACOS diy_package = DIY_PACKAGE_MACOS log.debug(f"diy url: [{diy_url}]") machine.download_file(diy_url, filename=diy_package) log.debug("downloading...") start_time = time.time() while time.time() - start_time < timeout_sec: package_downloaded = machine.package_downloaded(diy_package) if package_downloaded: break wait(1, "retrying to download DIY package") if not package_downloaded: return False log.debug("package downloaded") unzipped = machine.unzip_package(diy_package) log.debug(f"package unzipped: [{unzipped}]") if platform == "macOS": return unzipped is not None elif platform != "Linux": return False # Linux: log.info(f"executing script: {unzipped}") success = run(f"bash {unzipped}") log.debug(f"download_diy_package success: {success}") run(f'rm "{unzipped}"') return success @staticmethod def download_file(url, filename=None, timeout_sec=30): download_folder = machine.get_download_folder() if filename is None: command = f'''cd {download_folder} && curl --connect-timeout {timeout_sec} -L -O "{url}"''' else: if os.path.isabs(filename): download_folder = os.path.dirname(filename) filename = os.path.basename(filename) command = f'''cd {download_folder} && curl --connect-timeout {timeout_sec} -L -o "{filename}" "{url}"''' if machine.get_platform() == 'macOS': ## MacOS curl is different as we used in linux so we installed similar version from brew to make parity on our events try: log.info("trying to use newest curl version from brew MacOS") versions = os.listdir('/usr/local/Cellar/curl/') if versions: # Sort the versions sorted_versions = sorted(versions, key=lambda s: list(map(int, s.split('.')))) latest_version = sorted_versions[-1] new_curl_path = f"/usr/local/Cellar/curl/{latest_version}/bin/curl" if os.path.exists(new_curl_path): print(f"new_curl_path: {new_curl_path}") if filename is None: command = f'''cd {download_folder} && {new_curl_path} --connect-timeout {timeout_sec} -L -O "{url}"''' else: command = f'''cd {download_folder} && {new_curl_path} --connect-timeout {timeout_sec} -L -o "{filename}" "{url}"''' except: log.warning("could not find new curl version, use default.") log.debug(f"curl command: {command}") success = run(command) log.info(f"file downloaded: {filename} [{'ok' if success else 'fail'}]") return success @staticmethod def file_exists_in_download_folder(filename): full_name = os.path.join(machine.get_download_folder(), filename) return os.path.exists(full_name) @staticmethod def run_process(command): return run(command) @staticmethod def get_temp_folder(): if machine.temp_folder is None: machine.temp_folder = tempfile.gettempdir() return machine.temp_folder @staticmethod def set_temp_folder(temp_folder): machine.temp_folder = temp_folder log.info(f"temp folder set: [{machine.temp_folder}]") @staticmethod def get_collection_folder(): return os.path.join(machine.get_temp_folder(),COLLECTION_DIR) @staticmethod def copy_to_collection_folder(filename): collection_folder = machine.get_collection_folder() log.debug(f"os.path.exists(collection_folder) {os.path.exists(collection_folder)}") if not os.path.exists(collection_folder) and not run(f"sudo mkdir {collection_folder}"): log.error("cannot create temp directory") return False log.debug(f"os.path.exists(collection_folder) {collection_folder}, {os.path.exists(collection_folder)} after creation") if not run(f"sudo cp '{filename}' '{collection_folder}'"): log.error(f"cannot copy {filename} to collection folder") return False return True @staticmethod def copy_dir_to_collection_folder(folder_path, target_name): collection_folder = machine.get_collection_folder() if not os.path.exists(collection_folder) and not run(f"sudo mkdir {collection_folder}"): log.error("cannot create temp directory") return False if not run(f"sudo cp -r '{folder_path}' '{collection_folder}/{target_name}'"): log.error(f"cannot copy {folder_path} to collection folder") return False return True @staticmethod def move_file_to_target_folder(file_path, target_folder): if not run(f"sudo mv '{file_path}' '{target_folder}'"): log.error(f"cannot move {file_path} to {target_folder}") return False return True @staticmethod def get_download_folder(): if machine.download_folder is None: platform = machine.get_platform() if platform == 'macOS': machine.download_folder = os.path.join("/","Users",machine.get_username(),"Downloads") elif platform == 'Linux': machine.download_folder = run_with_output("xdg-user-dir DOWNLOAD") log.debug(f"Download folder: [{machine.download_folder}]") return machine.download_folder @staticmethod def change_permissions(filename): return run(f"sudo chmod 666 {filename}") @staticmethod def get_logs_dir(): platform = machine.get_platform() if platform == 'macOS': return '/Library/Logs/Microsoft/mdatp' if platform == 'Linux': return '/var/log/microsoft/mdatp' raise Exception("unknown platform") @staticmethod def get_process_ids(proc_names, excluded_processes=[]): pids = set() for proc_name in proc_names: command = f"ps aux | grep {proc_name} | grep -v grep" for excluded_proc in excluded_processes: command = command + f" | grep -v {excluded_proc}" result = os.popen(command).read() if result is None: continue lines = result.strip().split('\n') for line in lines: parts = line.split() if len(parts)>1: pids.add(parts[1]) return list(pids) @staticmethod def _check_duplicates_process(output): pids = [] new_output = [] for process in output: pid = process.split()[1] if pid in pids: continue pids.append(pid) new_output.append(process) return new_output @staticmethod def get_process_info(process_names): output = [] for proc_name in process_names: process_data = os.popen(f"ps aux | grep {proc_name} | grep -v grep").read().strip().split("\n") if process_data != None or process_data != '': [output.append(process) for process in process_data if process not in output and process != None and process != ''] new_output = machine._check_duplicates_process(output) return "\n".join(new_output) @staticmethod def unzip_package(package_name,timeout = 30): end_time = time.time() + timeout file_name = machine.get_package_filename(package_name) if not file_name: log.debug(f"package was not found in download folder [{package_name}]") return None download_folder = machine.get_download_folder() if package_name != file_name: package_name = file_name unzip_output = run_with_output(f'unzip -o "{os.path.join(download_folder, package_name)}" -d {download_folder}', verbose=True) log.info(f'unzip -o "{os.path.join(download_folder, package_name)}" -d {download_folder}') while time.time() < end_time: if unzip_output is not None: break wait(2, "wait for the unzip to finish") script_name = re.findall("inflating: (.+?)(?:\n|$)", unzip_output)[0].strip() log.debug(f"script name: {script_name}") return script_name @staticmethod def get_package_filename(package_filename, timeout=15): start_time = machine.get_start_time() end_time = time.time() + timeout while time.time() < end_time: for filename in os.listdir(machine.get_download_folder()): if ".crdownload" in filename: continue # take the package that was created after the test had started file_create_time = os.path.getmtime(os.path.join(machine.get_download_folder(), filename)) if package_filename in filename and file_create_time > start_time: return filename wait(1, "wait for package to download") return None @staticmethod def clean_downloads_folder(): run(f"rm -rf {os.path.join(machine.get_download_folder(), '*DefenderATP*')}") @staticmethod def package_downloaded(package_filename, timeout = 120): return machine.get_package_filename(package_filename,timeout) is not None @staticmethod def delete_package_files(zip_package,script_name): run(f"rm -rf {os.path.join(machine.get_download_folder(), zip_package)}") run(f"rm -rf {os.path.join(machine.get_download_folder(), script_name)}") @staticmethod def delete_diy_files(): run("rm -rf ~/Downloads/*DIY*") run("rm -rf ~/Downloads/__MACOSX") @staticmethod def _get_onboarding_script(): download_folder = machine.get_download_folder() files = os.listdir(download_folder) onboarding_script = None for script in ONBOARDING_SCRIPTS: if script in files: onboarding_script = script return onboarding_script @staticmethod def _get_last_line_from_logs(logs_dir, expression, get_all_lines=False): temp_filename = os.path.join(machine.get_temp_folder(), "config_temp") # read all enterprise logs into a single file run(f"sudo cat {logs_dir}/microsoft_defender_enterprise* > {temp_filename}", False) # read lines from combined log output = run_with_output(f"cat {temp_filename}", verbose=False) if output is None: return None lines = output.split('\n') lines = [line for line in lines if expression in line] # delete temp file run(f"rm -rf {temp_filename}", False) if len(lines) == 0: return None if get_all_lines: return lines return lines[-1] @staticmethod def get_crash_dump_dir(): platform = machine.get_platform() if platform == 'macOS': return '/Library/logs/DiagnosticReports' if platform == 'Linux': return '/var/crash' raise Exception("unknown platform") @staticmethod def _parse_from_json(line, key): #matches = re.findall(f'\"{key}\":\"(.+?)\"', line) matches = re.findall(f'{key}(.*?)(?:,|$)', line) if matches is None or len(matches) == 0: return None return matches[0] @staticmethod def query_installed_package(package_name): command = None curr_distro_likes = distro_info.like().split(' ') curr_distro_likes.append(distro_info.id()) if [like for like in curr_distro_likes if like == "debian"]: command = f"dpkg-query -W -f=\'\\{{\"Name\":\"${{binary:Package}}\", \"Version\": \"${{Version}}\", \"Status\": \"${{db:Status-Status}}\", \"Source\": \"${{Source}}\"}}\' {package_name}" elif [like for like in curr_distro_likes if like in ["sles", "centos", "fedora", "rhel"]]: command = f"rpm -qa --qf \'\\{{ \"Name\":\"%{{NAME}}\", \"Version\": \"%{{VERSION}}-%{{RELEASE}}\", \"Status\": \"installed\", \"Source\": \"%{{SOURCEPACKAGE}}\"\\}}\' {package_name}" else: log.info(f'Could not fetch package details for package {package_name}') return package(package_name) result = run_with_output(cmd=command, verbose=False) try: res_dict = json.loads(result) is_installed = True if res_dict["Status"] == "installed" else False return package(name = res_dict['Name'], version = res_dict["Version"], is_installed = is_installed, source = res_dict['Source']) except: return package(package_name)