# sudo python3 open_files.py --ScriptName open_files.py --id log4j_handlersV2 --filter-env LOG4J_FORMAT_MSG_NO_LOOKUPS=true --filter-name "log4j,LOG4J,spring-core" --filter-command "java,javaw" --manifest-path "META-INF/maven/org.apache.logging.log4j/log4j-core/pom.properties" --marker-path /var/opt/microsoft/mdatp/wdavedr/log4jMitigationApplied --collect-dirlist /log4j/core/lookup/JndiLookup.class,log4j-,spring-core- # sudo python2 open_files.py --ScriptName open_files.py --id log4j_handlersV2 --filter-env LOG4J_FORMAT_MSG_NO_LOOKUPS=true --filter-name "log4j,LOG4J,spring-core" --filter-command "java,javaw" --manifest-path "META-INF/maven/org.apache.logging.log4j/log4j-core/pom.properties" --marker-path /var/opt/microsoft/mdatp/wdavedr/log4jMitigationApplied --collect-dirlist /log4j/core/lookup/JndiLookup.class,log4j-,spring-core- # sudo rm /opt/microsoft/mdatp/resources/cache/log4j_handlersV2.json from genericpath import isdir import os import re import sys import json from datetime import datetime as dt import zipfile import string import argparse import traceback import functools import itertools import subprocess as sb py_version = sys.version_info if py_version.major == 3: if py_version.minor >= 11: from datetime import UTC else: from datetime import timezone as UTC MAX_FILE_SIZE = 1024 * 1024 # 1MB MANIFEST_OLD_PATH = "META-INF/MANIFEST.MF" def take(n, l): for i, item in enumerate(l): if i > n: break yield item class Jar: def __init__(self, path): self.path = path self._manifest = {} self._dirlist = [] def _parse_manifest(self, lines): version_indication = "version=" version_lines = [line for line in lines if line.startswith(version_indication)] if len(version_lines) > 0: version = version_lines[0][len(version_indication):] yield 'Version', version.strip() field_names = ['Specification-Version', 'Specification-Title', 'Specification-Vendor', 'Implementation-Version', 'Implementation-Title', 'Implementation-Vendor'] for line in lines: if any(line.startswith(field_name) for field_name in field_names): key, value = line.split(':') yield key.strip(), value.strip() def _open(self): if not zipfile.is_zipfile(self.path): raise ValueError("path is not a zip file: {}".format(self.path)) return zipfile.ZipFile(self.path) def _read_dirlist(self): with self._open() as zf: filenames = dict(p for p in zf.namelist()) return [f for f in filenames if any(r.search(f.lower()) for r in args.dirlist)] def _get_manifest_path(self, zf): for path in [args.manifest_path, MANIFEST_OLD_PATH]: if path in zf.namelist(): return path def _read_manifest(self, throw_on_error=False): try: with self._open() as zf: manifest_path = self._get_manifest_path(zf) if not manifest_path: # Not found manifest file return {} manifest_info = zf.getinfo(manifest_path) if (manifest_info.file_size > MAX_FILE_SIZE): raise IOError("manifest file is too big") with zf.open(manifest_path) as f: readline_f = functools.partial(f.readline, MAX_FILE_SIZE) manifest_lines = list(x.decode().strip() for x in iter(readline_f, b'')) manifest = self._parse_manifest(manifest_lines) return dict((k, v) for k, v in manifest if not args.manifest_keys or any(m.search(k.lower()) for m in args.manifest_keys)) except: sys.stderr.write("error while reading manifest of '{}': {}\n".format(self.path, traceback.format_exc())) if throw_on_error: raise return {} def manifest(self, throw_on_error=False): if not self._manifest: self._manifest = self._read_manifest(throw_on_error) return self._manifest def dirlist(self): try: if not self._dirlist: self._dirlist = self._read_dirlist() return self._dirlist except Exception as e: sys.stderr.write("error reading dirlist: {}\n".format(e)) return [] class FatalArgumentError(Exception): pass class State: def __init__(self, name, payload): self.payload = payload self.merge_cached_content = args.merge_cached_content self.path = self._get_location(name) @staticmethod def _resources_dir(): for path in ["/opt/microsoft/mdatp/resources", "/Library/Application Support/Microsoft/Defender/"]: if os.path.exists(path): return path raise ValueError("platform not supported: {}".format(sys.platform)) @staticmethod def _get_location(name): basedir = os.path.join(State._resources_dir(), "cache") if not os.path.exists(basedir): os.mkdir(basedir) return os.path.join(basedir, name) + ".json" def __enter__(self): if not os.path.exists(self.path): return if not self.merge_cached_content: return with open(self.path, 'r') as f: try: old_payload = json.load(f) except: old_payload = {} current_handles = dict(h['name'] for h in self.payload['handlers']) if 'handlers' in old_payload: for handler in old_payload['handlers']: name, pid = handler['name'], handler['pid'] if name in current_handles or not os.path.exists(name): continue handler['active'] = False self.payload['handlers'].append(handler) if 'environ' in old_payload and pid in old_payload['environ']: self.payload['environ'][pid] = old_payload['environ'][pid] if 'manifest' in old_payload and name in old_payload['manifest']: self.payload['manifest'][name] = old_payload['manifest'][name] if 'dirlist' in old_payload and name in old_payload['dirlist']: self.payload['dirlist'][name] = old_payload['dirlist'][name] if 'ports' in old_payload and pid in old_payload['ports']: self.payload['ports'][pid] = old_payload['ports'][pid] if 'target_software' in old_payload and pid in old_payload['target_software']: self.payload['target_software'][pid] = old_payload['target_software'][pid] def __exit__(self, *args, **kwargs): with open(self.path, 'w') as f: json.dump(self.payload, f, separators=(',', ':')) def validate_state_id(dest): if not re.match('^[A-Za-z0-9._]+$', dest): raise ValueError("state id invalid: {}".format(dest)) return dest def validate_command_name(dest): commands = [x.strip() for x in dest.split(",")] for command in commands: if not re.match('^[A-Za-z0-9]+$', command): raise ValueError("command name invalid: {}".format(command)) return commands def split_by_comma(dest): return [x.strip() for x in dest.split(",")] def split_and_compile(dest): return [re.compile(r) for r in split_by_comma(dest)] def validate_limit(dest): limit = int(dest) if limit < 1: raise ValueError("limit is too small") return limit def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--ScriptName", dest='script_name') parser.add_argument("--id", type=validate_state_id) parser.add_argument("--limit", type=validate_limit, default=100, help='limit results. defaults to 100.') parser.add_argument("--limit-files", type=validate_limit, default=2000, help='limit results. defaults to 2000.') parser.add_argument("--filter-command", type=validate_command_name, dest='lsof_commands', help='commands to filter') parser.add_argument("--filter-name", type=split_and_compile, default=[], help='filter handler name') parser.add_argument("--filter-env", type=split_and_compile, default=[], help='filter env vars') parser.add_argument("--manifest-path", help='manifest path to read') parser.add_argument("--marker-path", dest='marker_path', default=None, help='marker file path to check if mitigation was applied') parser.add_argument("--collect-manifest-keys", dest='manifest_keys', type=split_and_compile, default=[], help='filter manifest keys') parser.add_argument("--collect-dirlist", dest='dirlist', type=split_and_compile, default=[], help='filter filenames') parser.add_argument("--merge-cached-content", type=bool, default=True) args = parser.parse_args() # Replace base path of marker-path with MDE_STATE_PATH if the environment variable is set state_path = os.getenv("MDE_STATE_PATH") if state_path and args.marker_path: original_base = "/var/opt/microsoft/mdatp" if args.marker_path.startswith(original_base): args.marker_path = args.marker_path.replace(original_base, state_path, 1) if not all([args.id, args.lsof_commands]): parser.error("you must pass --id and --filter-command") return args def _run_cmd(cmd): p = sb.Popen(cmd, stdout=sb.PIPE, bufsize=1, universal_newlines=True, shell=True) for line in p.stdout: yield line.strip() def grab_environ(pid, filter_env): # use 'ps' to grab the env variables of given pid cmd = "ps eww -o command {} | tr ' ' '\n' | tail -n+3".format(int(pid)) for line in _run_cmd(cmd): if not filter_env or any(x.search(line) for x in filter_env): yield line def grab_ports(pid): for line in list_open_file_handles('-nPi', '-p {}'.format(pid)): yield dict(type=line['type'], proto=line['node'], addr=line['name']) def is_zip_supported_format(name): return name.endswith('.jar') or name.endswith('.war') or name.endswith('.ear') def list_open_file_handles(*flags): # run lsof with given parameters, take only first N handles (ignore the rest) cmd = 'lsof {0} -a -c {1} 2>/dev/null'.format(' '.join(flags), ' -c '.join(args.lsof_commands)) handles_gen = _run_cmd(cmd) # if lsof returned output, the first line would be the column names column_names = next(handles_gen, "").strip().lower().split() if not column_names: return # iterate each handler & parse it for handler_line in handles_gen: ts = dt.now(UTC).strftime('%Y-%m-%d %H:%M:%S UTC') handler = dict(zip(column_names, handler_line.split()), timestamp=ts, active=True) yield handler def lsof_entries(): # iterate each handler & parse it for handler in list_open_file_handles(): handler_name = handler.get('name') # handler name must exist if not handler_name: continue yield handler def capping_lsof_entries(handles_list): if len(handles_list) > args.limit_files: required_handles = [] handles_to_check_dirlist = [] for h in handles_list: if not args.filter_name or any(x.search(h['name'].lower()) for x in args.filter_name): required_handles.append(h) else: handles_to_check_dirlist.append(h) # we prefer to give priority to files we need according to filter-name arg and then we add the files to check for nested jars if len(required_handles) > args.limit_files: return take(args.limit_files, required_handles) else: free_space = args.limit_files - len(required_handles) required_handles = required_handles + list(take(free_space, handles_to_check_dirlist)) return required_handles else: return handles_list def get_tomcat_webapps_folders(catalina_base): default_webapps_folders = ['webapps'] conf_path = os.path.join(catalina_base, "conf", "server.xml") if not os.path.exists(conf_path): return default_webapps_folders try: folders = [] import xml.etree.ElementTree as ET conf = ET.parse(conf_path) for host_elem in conf.getroot().findall(".//Host"): folders.append(host_elem.get("appBase")) return folders if folders else default_webapps_folders except: return default_webapps_folders def get_jars_from_tomcat_webapps(catalina_jar_handle): # If /proc doesn't exist, this is probably a Mac if not os.path.exists('/proc'): return [] pid = catalina_jar_handle['pid'] env = dict([line.split('=') for line in list(grab_environ(pid, split_and_compile('-Dcatalina.base')))]) catalina_base = env.get('-Dcatalina.base') if not catalina_base: return [] working_dir = os.readlink("/proc/{}/cwd".format(pid)) catalina_base = os.path.normpath(os.path.join(working_dir, catalina_base)) web_apps_relative_folders = get_tomcat_webapps_folders(catalina_base) webapp_libs = [] for web_apps_relative_folder in web_apps_relative_folders: webapps_dir = os.path.join(catalina_base, web_apps_relative_folder) if not os.path.exists(webapps_dir): continue for webapp in os.listdir(webapps_dir): webapp_path = os.path.join(webapps_dir, webapp) if os.path.isfile(webapp_path) and webapp_path.endswith('.war'): lib_handle = catalina_jar_handle.copy() lib_handle['name'] = webapp_path webapp_libs.append(lib_handle) elif os.path.isdir(webapp_path): libs_dir = os.path.join(webapp_path, 'WEB-INF', 'lib') if not os.path.exists(libs_dir): continue libs = os.listdir(libs_dir) for lib in libs: # We don't have the actual handle, so copy the original handle and replace the path lib_handle = catalina_jar_handle.copy() lib_handle['name'] = os.path.join(libs_dir, lib) webapp_libs.append(lib_handle) return webapp_libs def grab_target_software(pid): env = dict([line.split('=') for line in list(grab_environ(pid, split_and_compile('-Dcatalina.home')))]) catalina_home = env.get('-Dcatalina.home') if not catalina_home: return None working_dir = os.readlink("/proc/{}/cwd".format(pid)) catalina_home = os.path.normpath(os.path.join(working_dir, catalina_home)) catalina_jar_path = os.path.join(catalina_home, 'lib', 'catalina.jar') if not os.path.exists(catalina_jar_path): return None catalina_jar = Jar(catalina_jar_path) return catalina_jar.manifest() def main(): if os.geteuid() != 0: raise ValueError("script needs to be run as root") # Check if marker-path exists if provided mitigation_marked = False if args.marker_path: mitigation_marked = os.path.exists(args.marker_path) # keep all file handles we need and files that contains jars we need (ubar-jars) handles = capping_lsof_entries(list(lsof_entries())) # Tomcat is specifical because it doesn't keep open handles on jars loaded for webapps # Identify Tomcat by its main component called Catalina, look up its webapps folder, and enumrate the libraries tomcat_webapp_libs = [] for h in handles: if h['name'].lower().endswith("catalina.jar"): tomcat_webapp_libs = tomcat_webapp_libs + get_jars_from_tomcat_webapps(h) handles = handles + tomcat_webapp_libs jars = dict((h['name'], Jar(h['name'])) for h in handles if is_zip_supported_format(h['name'])) dirlist = dict((name, jar.dirlist()) for name, jar in jars.items() if jar.dirlist()) # keep only relevant handles (the required file or contains a required files) filtered_handles = [] for h in handles: if not args.filter_name or any(x.search(h['name'].lower()) for x in args.filter_name): filtered_handles.append(h) continue # required nested jars (we filter specific keys according to collect-dirlist arg) if h['name'] in dirlist: filtered_handles.append(h) filtered_jars = dict((h['name'], Jar(h['name'])) for h in filtered_handles) pids = set(h['pid'] for h in filtered_handles) environ = dict((pid, list(grab_environ(pid, args.filter_env))) for pid in pids) ports = dict((pid, list(take(args.limit, grab_ports(pid)))) for pid in pids) target_software = dict((pid, grab_target_software(pid)) for pid in pids) # read only manifest of file names according to args.filter_name manifest = dict((name, jar.manifest()) for name, jar in filtered_jars.items() if any(x.search(name.lower()) for x in args.filter_name) and jar.manifest()) payload = dict(scriptVersion=1, cmdline=" ".join(sys.argv[1:]), handlers=filtered_handles, ports=ports, environ=environ, dirlist=dirlist, manifest=manifest, mitigation_marked=mitigation_marked, target_software=target_software, java_version=dict()) with State(name=args.id, payload=payload): print(json.dumps(payload)) if __name__ == "__main__": try: args = parse_args() sys.exit(main()) # exit code 1 is python syntax error, use 2 for argument error and 3 for runtime exception except FatalArgumentError as e: sys.exit(2) except BaseException as e: if isinstance(e, SystemExit) and e.code in (0, None): sys.exit(0) sys.stderr.write(traceback.format_exc()) sys.exit(3)