Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 95 additions & 215 deletions CryptoUnLocker.py
Original file line number Diff line number Diff line change
@@ -1,297 +1,177 @@
#!/usr/bin/env python
#!/usr/bin/env python3

import struct
import os
import argparse
import shutil
import sys
from collections import namedtuple
from datetime import datetime
import csv
import re
from collections import namedtuple
from datetime import datetime

from Crypto.Cipher import AES
from Crypto.Cipher import AES, PKCS1_v1_5
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
from Crypto.Hash import SHA
from Crypto.Hash import SHA1
from Crypto.Util.number import bytes_to_long


"""
CryptoLocker file structure:

0x14 bytes : SHA1 hash of '\x00'*4 + next 0x100 bytes of file.
0x100 bytes : AES key encrypted with RSA PKCS#1 v1.5:
0x2c bytes : AES key blob

remainder : file data encrypted with AES256-CBC with IV of 0x00

Key blob is a Microsoft PUBLICKEYSTRUC:
typedef struct _PUBLICKEYSTRUC {
BYTE bType;
BYTE bVersion;
WORD reserved;
ALG_ID aiKeyAlg;
} BLOBHEADER, PUBLICKEYSTRUC;

where:
bType = 0x08
bVersion = 0x02
reserved = 0
aiKeyAlg = 0x6610 (AES-256)

followed by a DWORD length of 0x20, and finally the 32 byte AES key.
"""

PUBLICKEYSTRUC = namedtuple('PUBLICKEYSTRUC', 'bType bVersion reserved aiKeyAlg')
RSAPUBKEY = namedtuple('RSAPUBKEY', 'magic bitlen pubexp')
PRIVATEKEYBLOB = namedtuple('PRIVATEKEYBLOB', 'modulus prime1 prime2 exponent1 exponent2 coefficient privateExponent')

PUBLICKEYSTRUC_s = struct.Struct('<bbHI')
RSAPUBKEY_s = struct.Struct('<4sII')

key_re = re.compile('-----BEGIN.*KEY-----\n(.*)\n-----END.*KEY-----', re.DOTALL)
key_re = re.compile(r'-----BEGIN.*KEY-----\n(.*)\n-----END.*KEY-----', re.DOTALL)


def subtract(a,b):
if a == None or b == None:
return None
else:
return ord(b)-ord(a)

class OutputLevel:
VerboseLevel, InfoLevel, WarnLevel, ErrorLevel = range(4)

class CryptoUnLocker(object):

class CryptoUnLocker:
def __init__(self):
self.keys = []

def loadKeyFromFile(self, fn):
d = open(fn, 'rb').read()

matches = key_re.match(d)
if matches:
self.loadKeyFromString(matches.group(0))
return

# fall through if the file does not contain a PEM encoded RSA key
# try the CryptImportKey Win32 file format
if self.CryptImportKey(d):
return

# Apparently a new version of CryptoLocker is adding what looks
# like a version number to the start of the RSA key format. Try
# skipping over the first four bytes of the file then interpreting
# the rest as an RSA private key.
if self.CryptImportKey(d[4:]):
return

# if we can't import the file, raise an exception
raise Exception("Could not parse a private key from file")
def load_key_from_paste(self):
print("Please paste your RSA private key below, including the -----BEGIN RSA PRIVATE KEY----- and -----END RSA PRIVATE KEY----- lines:")
key_paste = ""
while True:
try:
line = input()
if not line:
break
key_paste += line + "\n"
except EOFError:
break

# Try importing the key directly
self.load_key_from_string(key_paste)

def load_key_from_string(self, s):
try:
r = RSA.import_key(s)
self.keys.append(r)
print("Key successfully imported")
except ValueError as e:
print(f"Error importing key: {e}")

def CryptImportKey(self, d):
def crypt_import_key(self, d):
publickeystruc = PUBLICKEYSTRUC._make(PUBLICKEYSTRUC_s.unpack_from(d))
if publickeystruc.bType == 7 and publickeystruc.bVersion == 2 and publickeystruc.aiKeyAlg == 41984:
rsapubkey = RSAPUBKEY._make(RSAPUBKEY_s.unpack_from(d[8:]))
if rsapubkey.magic == 'RSA2':
bitlen8 = rsapubkey.bitlen/8
bitlen16 = rsapubkey.bitlen/16
PRIVATEKEYBLOB_s = struct.Struct('%ds%ds%ds%ds%ds%ds%ds' % (bitlen8, bitlen16, bitlen16, bitlen16, bitlen16, bitlen16, bitlen8))
privatekey = PRIVATEKEYBLOB._make(map(bytes_to_long, PRIVATEKEYBLOB_s.unpack_from(d[20:])))

r = RSA.construct((privatekey.modulus, long(rsapubkey.pubexp), privatekey.privateExponent,
privatekey.prime1, privatekey.prime2))
if rsapubkey.magic == b'RSA2':
bitlen8 = rsapubkey.bitlen // 8
bitlen16 = rsapubkey.bitlen // 16
private_key_blob_s = struct.Struct(f'{bitlen8}s{bitlen16}s{bitlen16}s{bitlen16}s{bitlen16}s{bitlen16}s{bitlen8}s')
privatekey = PRIVATEKEYBLOB._make(map(bytes_to_long, private_key_blob_s.unpack_from(d[20:])))
r = RSA.construct((privatekey.modulus, rsapubkey.pubexp, privatekey.privateExponent,
privatekey.prime1, privatekey.prime2))
self.keys.append(r)
return True

return False

def loadKeyFromString(self, s):
r = RSA.importKey(s)
self.keys.append(r)

def isCryptoLocker(self, fn):
file_header = open(fn, 'rb').read(0x114)
def is_cryptolocker(self, fn):
with open(fn, 'rb') as f:
file_header = f.read(0x114)
if len(file_header) != 0x114:
return False
return SHA1.new(b'\x00' * 4 + file_header[0x14:0x114]).digest() == file_header[:0x14]

# validate that the header is correct
header_hash = SHA.new('\x00'*4 + file_header[0x14:0x114])
return header_hash.digest() == file_header[:0x14]

def guessIfWiped(self, fn):
file_header = open(fn, 'rb').read(64)
if len(file_header) != 64:
return False

lst = map(subtract, file_header[:32:2], file_header[1:32:2])
return not lst or [lst[0]]*len(lst) == lst

def decryptFile(self, fn):
aes_key = None

def decrypt_file(self, fn):
with open(fn, 'rb') as fp:
file_header = fp.read(0x114)

if len(file_header) != 0x114:
raise Exception("Not a CryptoLocker file")

for rsa_key in self.keys:
aes_key = self.retrieveAESKey(rsa_key, file_header)
if aes_key:
break

aes_key = next((self.retrieve_aes_key(r, file_header) for r in self.keys if self.retrieve_aes_key(r, file_header)), None)
if not aes_key:
raise Exception("Could not find the private key for this CryptoLocker file")

# read the remaining data and decrypt with the AES key
d = fp.read()
a = AES.new(aes_key, mode=AES.MODE_CBC, IV='\x00'*16)
d = a.decrypt(d)
d = d[:-ord(d[-1])]
cipher = AES.new(aes_key, AES.MODE_CBC, b'\x00' * 16)
decrypted_data = cipher.decrypt(fp.read())
return decrypted_data.rstrip(decrypted_data[-1:])

return d

def retrieveAESKey(self, r, file_header):
# we have to reverse the bytes in the header to conform with the CryptoAPI
# CryptDecrypt function.
file_header = file_header[0x14:0x114]
file_header = file_header[::-1]

# decrypt the AES key blob
c = PKCS1_v1_5.new(r)
sentinel = '\x00' * 16
blob = c.decrypt(file_header, sentinel)
def retrieve_aes_key(self, r, file_header):
file_header = file_header[0x14:0x114][::-1]
cipher = PKCS1_v1_5.new(r)
blob = cipher.decrypt(file_header, None)
if blob and len(blob) >= 0x2c and blob[:4] == b'\x08\x02\x00\x00':
return blob[0x0c:0x0c+32]
return None

# retrieve key from file_header
(bType, bVersion, reserved, aiKeyAlg, keyLen) = struct.unpack('<BBHII', blob[:0xc])
if bType == 0x08 and bVersion == 0x02 and reserved == 0 and \
aiKeyAlg == 0x6610 and keyLen == 32:
aes_key = blob[0x0c:0x0c+32]
return aes_key
else:
return None

class CryptoUnLockerProcess(object):
class CryptoUnLockerProcess:
def __init__(self, args, unlocker):
self.args = args
self.unlocker = unlocker
self.csvfp = None
self.csv = None

def doit(self):
if self.args.csvfile:
self.csvfp = open(self.args.csvfile,'wb')
self.csv = csv.writer(self.csvfp)
self.csvfp = open(args.csvfile, 'w', newline='') if args.csvfile else None
self.csv = csv.writer(self.csvfp) if self.csvfp else None
if self.csv:
self.csv.writerow(['Timestamp', 'Filename', 'Message'])

keyfiles = []
if self.args.keyfile:
keyfiles = [self.args.keyfile]
elif self.args.keydir:
keyfiles = [os.path.join(self.args.keydir, fn) for fn in os.listdir(self.args.keydir)]

for fn in keyfiles:
try:
self.unlocker.loadKeyFromFile(fn)
self.output(OutputLevel.VerboseLevel, fn, "Successfully loaded key file")
except Exception, e:
self.output(OutputLevel.ErrorLevel, fn, "Unsuccessful loading key file: %s" % e.message)
def doit(self):
# Prompt user to paste RSA private key
self.unlocker.load_key_from_paste()

if not len(self.unlocker.keys) and not self.args.detect:
self.output(OutputLevel.ErrorLevel, '', 'No key files were successfully loaded. Exiting.')
if not self.unlocker.keys and not self.args.detect:
self.output(OutputLevel.ErrorLevel, '', 'No keys were successfully loaded. Exiting.')
return 1

if self.args.recursive:
for root, dirs, files in os.walk(self.args.encrypted_filenames[0]):
for fn in files:
self.processFile(root, fn)
else:
for fn in self.args.encrypted_filenames:
self.processFile('', fn)

for root, _, files in os.walk(self.args.encrypted_filenames[0]) if self.args.recursive else [(None, None, self.args.encrypted_filenames)]:
for fn in files:
self.process_file(root or '', fn)
return 0

def processFile(self, pathname, fn):
if fn.endswith('.bak'):
# skip backup files
return

def process_file(self, pathname, fn):
fullpath = os.path.join(pathname, fn)

try:
if self.unlocker.guessIfWiped(fullpath):
self.output(OutputLevel.VerboseLevel, fullpath, "File appears wiped")
return
elif not self.unlocker.isCryptoLocker(fullpath):
if not self.unlocker.is_cryptolocker(fullpath):
self.output(OutputLevel.VerboseLevel, fullpath, "Not a CryptoLocker file")
return
else:
if self.args.detect:
self.output(OutputLevel.InfoLevel, fullpath, "Potential CryptoLocker file")
return
except Exception, e:
self.output(OutputLevel.ErrorLevel, fullpath, "Unsuccessful opening file: %s" % e.message)
return

try:
decrypted_file = self.unlocker.decryptFile(fullpath)

decrypted_file = self.unlocker.decrypt_file(fullpath)
self.output(OutputLevel.InfoLevel, fullpath, "Successfully decrypted file")

if not self.args.dry_run:
# Ensure destination directory exists only if 'destdir' is provided
if self.args.destdir:
destdir = os.path.join(self.args.destdir, pathname)
if not os.path.exists(destdir):
os.makedirs(destdir)
open(os.path.join(destdir, fn), 'wb').write(decrypted_file)
dest_path = os.path.join(self.args.destdir, pathname) # Full destination path
os.makedirs(dest_path, exist_ok=True) # Create the directory if it doesn't exist
else:
shutil.copy2(fullpath, fullpath + ".bak")
open(os.path.join(pathname, fn), 'wb').write(decrypted_file)
except Exception, e:
self.output(OutputLevel.ErrorLevel, fullpath, "Unsuccessful decrypting file: %s" % e.message)
dest_path = pathname # Use the current path if 'destdir' is not provided

# Add '.decrypted' to the filename
decrypted_filename = f"{os.path.splitext(fn)[0]}.decrypted{os.path.splitext(fn)[1]}"

# Write decrypted file to destination with '.decrypted' suffix
with open(os.path.join(dest_path, decrypted_filename), 'wb') as f:
f.write(decrypted_file)
except Exception as e:
self.output(OutputLevel.ErrorLevel, fullpath, f"Unsuccessful decrypting file: {str(e)}")


def output(self, level, fn, msg):
if level == OutputLevel.VerboseLevel and not self.args.verbose:
return

if self.csv:
self.csv.writerow([datetime.now(), fn, msg])
print(f"{'[+] ' if level == OutputLevel.InfoLevel else '[-] '}{msg}: {fn}")

icon = '[.]'
if level == OutputLevel.InfoLevel:
icon = '[+]'
elif level > OutputLevel.InfoLevel:
icon = '[-]'

if fn:
sys.stderr.write('%s %s: %s\n' % (icon, msg, fn))
else:
sys.stderr.write('%s %s\n' % (icon, msg))
sys.stderr.flush()

def main():
parser = argparse.ArgumentParser(description='Decrypt CryptoLocker encrypted files.')
group = parser.add_mutually_exclusive_group(required=True)

group.add_argument('--keyfile', action='store', dest='keyfile',
help='File containing the private key, or the EXE file provided for decryption')
group.add_argument('--keydir', action='store', dest='keydir',
help='Directory containing any number of private keys; the appropriate private key will be used during the decryption process')
group.add_argument('--detect', action='store_true', dest='detect', help="Don't try to decrypt; just find files that may be CryptoLockered")

parser.add_argument('-r', action='store_true', dest='recursive', help="Recursively search subdirectories")
parser.add_argument('-v', action='store_true', dest='verbose', help="Verbose output")
parser.add_argument('--dry-run', action='store_true', dest='dry_run', help="Don't actually write decrypted files")
parser.add_argument('-o', action='store', dest='destdir', help='Copy all decrypted files to an output directory, mirroring the source path')
parser.add_argument('--csv', action='store', dest='csvfile', help='Output to a CSV file')
parser.add_argument('--detect', action='store_true', help='Detect CryptoLocker files without decrypting')
parser.add_argument('-r', '--recursive', action='store_true', help='Recursive search') # Add this line
parser.add_argument('-v', action='store_true', help='Verbose output')
parser.add_argument('--dry-run', action='store_true', help='Dry run mode')
parser.add_argument('-o', dest='destdir', help='Output directory')
parser.add_argument('--csv', dest='csvfile', help='CSV output file')
parser.add_argument('encrypted_filenames', nargs='+')
return CryptoUnLockerProcess(parser.parse_args(), CryptoUnLocker()).doit()

parser.add_argument('encrypted_filenames', nargs="+")

results = parser.parse_args()
unlocker = CryptoUnLocker()
processor = CryptoUnLockerProcess(results, unlocker)

return processor.doit()

if __name__ == '__main__':
sys.exit(main())

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# CryptoLocker Encrypted File Format
# CryptoLocker Encrypted File Format - Updated with Python3

Each file encrypted by CryptoLocker is encrypted with a unique AES-256
key. The unique symmetric key is then encrypted with the public RSA-2048
Expand Down
Loading