diff --git a/log4j-finder.py b/log4j-finder.py index 8ca4cc8..9a23a76 100755 --- a/log4j-finder.py +++ b/log4j-finder.py @@ -20,6 +20,7 @@ # import os import io +import re import sys import time import zipfile @@ -64,21 +65,33 @@ p.lower() for p in [ "JndiManager.class", + "JndiLookup.class", + "JMSAppender.class", ] ] # Known BAD MD5_BAD = { # JndiManager.class (source: https://github.com/nccgroup/Cyber-Defence/blob/master/Intelligence/CVE-2021-44228/modified-classes/md5sum.txt) - "04fdd701809d17465c17c7e603b1b202": "log4j 2.9.0 - 2.11.2", - "21f055b62c15453f0d7970a9d994cab7": "log4j 2.13.0 - 2.13.3", + "e816a3aef55c711f9118c4310812f0b4": "log4j 1.2.4 - 1.2.5", + "b249ccaf49cc542d30a8ba58d9168dd6": "log4j 1.2.6 - 1.2.7", + "905cbcf34cb8bcbff41d49850825d44b": "log4j 1.2.8", + "b249ccaf49cc542d30a8ba58d9168dd6": "log4j 1.2.9", + "685284cd73dabe660e6ab92835c902f4": "log4j 1.2.11", + "45857e7767d0af0ee2773ce69a9b77fb": "log4j 1.2.12", + "6df11afef01bbc20b1862977da8dd0e3": "log4j 1.2.13 - 1.2.14", + "057abb2f43d712e8b2c519f1f5684a94": "log4j 1.2.15", + "abbf972ad55b21cb813ffb82c65c4239": "log4j 1.2.16", + "aa189ba43b50b4cd95f60473929b3009": "log4j 1.2.17", + "6b15f42c333ac39abacfeeeb18852a44": "log4j 2.1 - 2.3", + "8b2260b1cce64144f6310876f94b1638": "log4j 2.4 - 2.5", "3bd9f41b89ce4fe8ccbf73e43195a5ce": "log4j 2.6 - 2.6.2", "415c13e7c8505fb056d540eac29b72fa": "log4j 2.7 - 2.8.1", + "a193703904a3f18fb3c90a877eb5c8a7": "log4j 2.8.2", + "04fdd701809d17465c17c7e603b1b202": "log4j 2.9.0 - 2.11.2", "5824711d6c68162eb535cc4dbf7485d3": "log4j 2.12.0 - 2.12.1", "102cac5b7726457244af1f44e54ff468": "log4j 2.12.2", - "6b15f42c333ac39abacfeeeb18852a44": "log4j 2.1 - 2.3", - "8b2260b1cce64144f6310876f94b1638": "log4j 2.4 - 2.5", - "a193703904a3f18fb3c90a877eb5c8a7": "log4j 2.8.2", + "21f055b62c15453f0d7970a9d994cab7": "log4j 2.13.0 - 2.13.3", "f1d630c48928096a484e4b95ccb162a0": "log4j 2.14.0 - 2.14.1", # 2.15.0 vulnerable to Denial of Service attack (source: https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-45046) "5d253e53fa993e122ff012221aa49ec3": "log4j 2.15.0", @@ -94,6 +107,21 @@ HOSTNAME = platform.node() +class JARinfo: + + kind = 'JAR' + + def __init__(self, version, parents, classinfos): + self.version = version + self.parents = parents + self.classinfos = classinfos + +class Classinfo: + + def __init__(self, zpath, md5): + self.zpath = zpath + self.md5 = md5 + def md5_digest(fobj): """Calculate the MD5 digest of a file object.""" @@ -151,26 +179,37 @@ def scantree(path, stats=None, exclude=None): def iter_jarfile(fobj, parents=None, stats=None): """ - Yields (zfile, zinfo, zpath, parents) for each file in zipfile that matches `FILENAMES` or `JAR_EXTENSIONS` (recursively) + Yields JARinfo for each file in zipfile that matches `FILENAMES` or `JAR_EXTENSIONS` (recursively) """ parents = parents or [] try: with zipfile.ZipFile(fobj) as zfile: - for zinfo in zfile.infolist(): - # log.debug(zinfo.filename) - zpath = Path(zinfo.filename) - if zpath.name.lower() in FILENAMES: - yield (zinfo, zfile, zpath, parents) - elif zpath.name.lower().endswith(JAR_EXTENSIONS): - zfobj = zfile.open(zinfo.filename) + ignoreentry = True + + getversion = lambda x : parse_meta(zfile.open(x[0])) if any(x) else "unknown" + version = getversion([zi for zi in zfile.filelist if zi.filename.endswith('pom.properties')]) + + getclassinfo= lambda x : Classinfo( Path(x[0].filename).as_posix(), md5_digest(zfile.open(x[0])) ) if any(x) else None + + classinfos = [ getclassinfo([zi for zi in zfile.filelist if zi.filename.lower().endswith(filename.lower())]) for filename in FILENAMES ] + classinfos = [ classinfo for classinfo in classinfos if classinfo != None ] + + if any(classinfos): + yield JARinfo(version, parents, classinfos) + + jars = [ zi for zi in zfile.filelist if zi.filename.lower().endswith(JAR_EXTENSIONS) ] + if any(jars): + for jar in jars: + zfobj = zfile.open(jar.filename) try: # Test if we can open the zfobj without errors, fallback to BytesIO otherwise # see https://github.com/fox-it/log4j-finder/pull/22 zipfile.ZipFile(zfobj) except zipfile.BadZipFile as e: - log.debug(f"Got {zinfo}: {e}, falling back to BytesIO") - zfobj = io.BytesIO(zfile.open(zinfo.filename).read()) - yield from iter_jarfile(zfobj, parents=parents + [zpath]) + log.debug(f"Got {jar}: {e}, falling back to BytesIO") + zfobj = io.BytesIO(zfile.open(jar.filename).read()) + yield from iter_jarfile(zfobj, parents=parents + [jar.filename]) + except IOError as e: log.debug(f"{fobj}: {e}") except zipfile.BadZipFile as e: @@ -179,6 +218,7 @@ def iter_jarfile(fobj, parents=None, stats=None): # RuntimeError: File 'encrypted.zip' is encrypted, password required for extraction log.debug(f"{fobj}: {e}") +#region colors def red(s): if NO_COLOR: @@ -215,8 +255,9 @@ def bold(s): return s return f"\033[1m{s}\033[0m" +#endregion -def check_vulnerable(fobj, path_chain, stats, has_jndilookup=True): +def check_vulnerable(jarinfo: JARinfo, stats): """ Test if fobj matches any of the known bad or known good MD5 hashes. Also prints message if fobj is vulnerable or known good or unknown. @@ -225,25 +266,57 @@ def check_vulnerable(fobj, path_chain, stats, has_jndilookup=True): indicate it was patched according to https://logging.apache.org/log4j/2.x/security.html using: zip -q -d log4j-core-*.jar org/apache/logging/log4j/core/lookup/JndiLookup.class """ - md5sum = md5_digest(fobj) - first_path = bold(path_chain.pop(0)) - path_chain = " -> ".join(str(p) for p in [first_path] + path_chain) - comment = collections.ChainMap(MD5_BAD, MD5_GOOD).get(md5sum, "Unknown MD5") + + first_path = bold(jarinfo.parents.pop(0)) + path_chain = " -> ".join(str(p) for p in [first_path] + jarinfo.parents) + color_map = {"vulnerable": red, "good": green, "patched": cyan, "unknown": yellow} - if md5sum in MD5_BAD: - status = "vulnerable" if has_jndilookup else "patched" - elif md5sum in MD5_GOOD: - status = "good" - else: - status = "unknown" + + comment = f"META-INF VersionId: {jarinfo.version} contains classes: " + for classinfo in jarinfo.classinfos: + comment += " " + collections.ChainMap(MD5_BAD, MD5_GOOD).get(classinfo.md5, "Unknown MD5") + f" ({classinfo.zpath}) |" + comment = comment.strip(" |").strip(" ") + + status = "unknown" + + if jarinfo.version.startswith('unknown'): + if any([ ci.md5 for ci in jarinfo.classinfos if ci.md5 in MD5_BAD.keys()]): + status = "vulnerable" #if has_jndilookup else "patched" + elif any( [ ci.md5 for ci in jarinfo.classinfos if ci.md5 in MD5_GOOD.keys() ] ): + status = "good" + if jarinfo.version.startswith('1.2'): + if any([ ci.md5 for ci in jarinfo.classinfos if ci.md5 in MD5_BAD.keys()]): + status = "vulnerable" #if has_jndilookup else "patched" + elif classinfo.md5 in MD5_GOOD: + status = "good" + if jarinfo.version.startswith('2.'): + if any([ ci.md5 for ci in jarinfo.classinfos if ci.md5 in MD5_BAD.keys()]): + if any([ ci.zpath for ci in jarinfo.classinfos if ci.zpath.lower().endswith("JndiLookup.class".lower()) ]): + status = "vulnerable" #if has_jndilookup else "patched" + else: + status = "patched" + elif any( [ ci.md5 for ci in jarinfo.classinfos if ci.md5 in MD5_GOOD.keys() ] ): + status = "good" + stats[status] += 1 color = color_map.get(status, red) now = datetime.datetime.utcnow().replace(microsecond=0) hostname = magenta(HOSTNAME) status = bold(color(status.upper())) - md5sum = color(md5sum) + md5sum = color(classinfo.md5) comment = bold(color(comment)) - print(f"[{now}] {hostname} {status}: {path_chain} [{md5sum}: {comment}]") + print(f"[{now}]\t{hostname}\t{status}:\t{path_chain}\t[{md5sum}: {comment}]") + + +def parse_meta(zfile): + meta=zfile.read().decode("utf-8") + if ('groupId=org.apache.logging.log4j' in meta and 'artifactId=log4j-core' in meta) or ('groupId=log4j' in meta and 'artifactId=log4j' in meta): + try: + return re.search('version=*(.+)\\ngroup',meta).group(1) + except: + pass + else: + return "unknown" def print_summary(stats): @@ -321,40 +394,39 @@ def main(): hostname = magenta(HOSTNAME) if not args.no_banner and not args.quiet: - print(FIGLET) + print(FIGLET) + for directory in args.path: now = datetime.datetime.utcnow().replace(microsecond=0) + if not args.quiet: print(f"[{now}] {hostname} Scanning: {directory}") + for p in iter_scandir(directory, stats=stats, exclude=args.exclude): + + # Handle unpacked .class files if p.name.lower() in FILENAMES: stats["scanned"] += 1 log.info(f"Found file: {p}") with p.open("rb") as fobj: # If we find JndiManager, we also check if JndiLookup.class exists - has_lookup = True if p.name.lower().endswith("JndiManager.class".lower()): + classinfos = [ Classinfo(str(p), md5_digest(fobj)) ] lookup_path = p.parent.parent / "lookup/JndiLookup.class" - has_lookup = lookup_path.exists() - check_vulnerable(fobj, [p], stats, has_lookup) + if lookup_path.exists(): + with lookup_path.open("rb") as fobj2: + classinfos += [ Classinfo(str(lookup_path), md5_digest(fobj2)) ] + check_vulnerable(JARinfo("unknown", [str(lookup_path)], classinfos), stats) + + # Handle JAR files. if p.suffix.lower() in JAR_EXTENSIONS: try: log.info(f"Found jar file: {p}") stats["scanned"] += 1 - for (zinfo, zfile, zpath, parents) in iter_jarfile( - p.resolve().open("rb"), parents=[p.resolve()] - ): - log.info(f"Found zfile: {zinfo} ({parents}") - with zfile.open(zinfo.filename) as zf: - # If we find JndiManager.class, we also check if JndiLookup.class exists - has_lookup = True - if zpath.name.lower().endswith("JndiManager.class".lower()): - lookup_path = zpath.parent.parent / "lookup/JndiLookup.class" - try: - has_lookup = zfile.open(lookup_path.as_posix()) - except KeyError: - has_lookup = False - check_vulnerable(zf, parents + [zpath], stats, has_lookup) + + for jarinfo in iter_jarfile(p.resolve().open("rb"), parents=[p.resolve()]): + check_vulnerable(jarinfo, stats) + except IOError as e: log.debug(f"{p}: {e}")