Skip to content

Commit

Permalink
Add av-signature to tags and metadata for each scan
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Gilmer committed Oct 14, 2019
1 parent cee96ae commit 17b63b8
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 48 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ the table below for reference.
| AV_DEFINITION_PATH | Path containing files at runtime | /tmp/clamav_defs | No |
| AV_SCAN_START_SNS_ARN | SNS topic ARN to publish notification about start of scan | | No |
| AV_SCAN_START_METADATA | The tag/metadata indicating the start of the scan | av-scan-start | No |
| AV_SIGNATURE_METADATA | The tag/metadata name representing file's AV type | av-type | No |
| AV_STATUS_CLEAN | The value assigned to clean items inside of tags/metadata | CLEAN | No |
| AV_STATUS_INFECTED | The value assigned to clean items inside of tags/metadata | INFECTED | No |
| AV_STATUS_METADATA | The tag/metadata name representing file's AV status | av-status | No |
Expand Down
18 changes: 16 additions & 2 deletions clamav.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ def md5_from_s3_tags(bucket, key):
return ""


# Turn ClamAV Scan output into a JSON formatted data object
def scan_output_to_json(output):
summary = {}
for line in output.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
summary[key] = value.strip()
return summary


def scan_file(path):
av_env = os.environ.copy()
av_env["LD_LIBRARY_PATH"] = CLAMAVLIB_PATH
Expand All @@ -150,10 +160,14 @@ def scan_file(path):
)
output = av_proc.communicate()[0]
print("clamscan output:\n%s" % output)

# Turn the output into a data source we can read
summary = scan_output_to_json(output)
signature = summary[path]
if av_proc.returncode == 0:
return AV_STATUS_CLEAN
return AV_STATUS_CLEAN, signature
elif av_proc.returncode == 1:
return AV_STATUS_INFECTED
return AV_STATUS_INFECTED, signature
else:
msg = "Unexpected exit code from clamscan: %s.\n" % av_proc.returncode
print(msg)
Expand Down
52 changes: 52 additions & 0 deletions clamav_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
# limitations under the License.

import re
import textwrap
import unittest

from clamav import RE_SEARCH_DIR
from clamav import scan_output_to_json
from common import AV_SIGNATURE_OK


class TestClamAV(unittest.TestCase):
Expand All @@ -36,3 +39,52 @@ def test_current_library_search_path(self):
"/usr/lib",
]
self.assertEqual(all_search_paths, expected_search_paths)

def test_scan_output_to_json_clean(self):
file_path = "/tmp/test.txt"
signature = AV_SIGNATURE_OK
output = textwrap.dedent(
"""\
Scanning {0}
{0}: {1}
----------- SCAN SUMMARY -----------
Known viruses: 6305127
Engine version: 0.101.4
Scanned directories: 0
Scanned files: 1
Infected files: 0
Data scanned: 0.00 MB
Data read: 0.00 MB (ratio 0.00:1)
Time: 80.299 sec (1 m 20 s)
""".format(
file_path, signature
)
)
summary = scan_output_to_json(output)
self.assertEqual(summary[file_path], signature)
self.assertEqual(summary["Infected files"], "0")

def test_scan_output_to_json_infected(self):
file_path = "/tmp/eicar.com.txt"
signature = "Eicar-Test-Signature FOUND"
output = textwrap.dedent(
"""\
Scanning {0}
{0}: {1}
{0}!(0): {1}
----------- SCAN SUMMARY -----------
Known viruses: 6305127
Engine version: 0.101.4
Scanned directories: 0
Scanned files: 1
Infected files: 1
Data scanned: 0.00 MB
Data read: 0.00 MB (ratio 0.00:1)
Time: 80.299 sec (1 m 20 s)
""".format(
file_path, signature
)
)
summary = scan_output_to_json(output)
self.assertEqual(summary[file_path], signature)
self.assertEqual(summary["Infected files"], "1")
3 changes: 3 additions & 0 deletions common.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
AV_DEFINITION_PATH = os.getenv("AV_DEFINITION_PATH", "/tmp/clamav_defs")
AV_SCAN_START_SNS_ARN = os.getenv("AV_SCAN_START_SNS_ARN")
AV_SCAN_START_METADATA = os.getenv("AV_SCAN_START_METADATA", "av-scan-start")
AV_SIGNATURE_METADATA = os.getenv("AV_SIGNATURE_METADATA", "av-signature")
AV_SIGNATURE_OK = "OK"
AV_SIGNATURE_UNKNOWN = "UNKNOWN"
AV_STATUS_CLEAN = os.getenv("AV_STATUS_CLEAN", "CLEAN")
AV_STATUS_INFECTED = os.getenv("AV_STATUS_INFECTED", "INFECTED")
AV_STATUS_METADATA = os.getenv("AV_STATUS_METADATA", "av-status")
Expand Down
40 changes: 28 additions & 12 deletions display_infected.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@

import boto3

from common import AV_SIGNATURE_METADATA
from common import AV_SIGNATURE_OK
from common import AV_SIGNATURE_UNKNOWN
from common import AV_STATUS_METADATA
from common import AV_STATUS_CLEAN
from common import AV_STATUS_INFECTED


# Get all objects in an S3 bucket that are infected
def get_objects(s3_client, s3_bucket_name):
def get_objects_and_sigs(s3_client, s3_bucket_name):

s3_object_list = []

Expand All @@ -41,8 +45,11 @@ def get_objects(s3_client, s3_bucket_name):
for key in s3_list_objects_result["Contents"]:
key_name = key["Key"]
# Include only infected objects
if object_infected(s3_client, s3_bucket_name, key_name):
s3_object_list.append(key_name)
infected, av_signature = object_infected(
s3_client, s3_bucket_name, key_name
)
if infected:
s3_object_list.append((key_name, av_signature))

return s3_object_list

Expand All @@ -51,13 +58,22 @@ def get_objects(s3_client, s3_bucket_name):
def object_infected(s3_client, s3_bucket_name, key_name):
s3_object_tags = s3_client.get_object_tagging(Bucket=s3_bucket_name, Key=key_name)
if "TagSet" not in s3_object_tags:
return False
return False, None
tags = {}
for tag in s3_object_tags["TagSet"]:
if tag["Key"] == AV_STATUS_METADATA:
if tag["Value"] == AV_STATUS_INFECTED:
return True
return False
return False
tags[tag["Key"]] = tag["Value"]

print(tags)
if tags.get(AV_STATUS_METADATA, "") == AV_STATUS_CLEAN:
return False, None

if AV_SIGNATURE_METADATA in tags and tags[AV_SIGNATURE_METADATA] != AV_SIGNATURE_OK:
return True, tags[AV_SIGNATURE_METADATA]

if tags.get(AV_STATUS_METADATA, "") == AV_STATUS_INFECTED:
return True, AV_SIGNATURE_UNKNOWN

return False, None


def main(s3_bucket_name):
Expand All @@ -71,9 +87,9 @@ def main(s3_bucket_name):
sys.exit(1)

# Scan the objects in the bucket
s3_object_list = get_objects(s3_client, s3_bucket_name)
for key_name in s3_object_list:
print("Infected: {}/{}".format(s3_bucket_name, key_name))
s3_object_and_sigs_list = get_objects_and_sigs(s3_bucket_name)
for (key_name, av_signature) in s3_object_and_sigs_list:
print("Infected: {}/{}, {}".format(s3_bucket_name, key_name, av_signature))


if __name__ == "__main__":
Expand Down
70 changes: 62 additions & 8 deletions display_infected_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import botocore.session
from botocore.stub import Stubber

from common import AV_SIGNATURE_METADATA
from common import AV_SIGNATURE_OK
from common import AV_SIGNATURE_UNKNOWN
from common import AV_STATUS_CLEAN
from common import AV_STATUS_METADATA
from common import AV_STATUS_INFECTED
from display_infected import get_objects
from display_infected import get_objects_and_sigs


class TestDisplayInfected(unittest.TestCase):
Expand Down Expand Up @@ -53,7 +56,8 @@ def setUp(self):
"list_objects_v2", list_objects_v2_response, list_objects_v2_expected_params
)

def test_get_objects_infected(self):
def test_get_objects_and_sigs_infected_with_sig_unknown(self):
signature = AV_SIGNATURE_UNKNOWN

get_object_tagging_response = {
"VersionId": "abc123",
Expand All @@ -70,11 +74,61 @@ def test_get_objects_infected(self):
)

with self.stubber:
s3_object_list = get_objects(self.s3_client, self.s3_bucket_name)
expected_object_list = ["test.txt"]
s3_object_list = get_objects_and_sigs(self.s3_client, self.s3_bucket_name)
expected_object_list = [("test.txt", signature)]
self.assertEqual(s3_object_list, expected_object_list)

def test_get_objects_clean(self):
def test_get_objects_and_sigs_infected_with_sig(self):
signature = "Eicar-Test-Signature FOUND"

get_object_tagging_response = {
"VersionId": "abc123",
"TagSet": [
{"Key": AV_STATUS_METADATA, "Value": AV_STATUS_INFECTED},
{"Key": AV_SIGNATURE_METADATA, "Value": signature},
],
}
get_object_tagging_expected_params = {
"Bucket": self.s3_bucket_name,
"Key": "test.txt",
}
self.stubber.add_response(
"get_object_tagging",
get_object_tagging_response,
get_object_tagging_expected_params,
)

with self.stubber:
s3_object_list = get_objects_and_sigs(self.s3_client, self.s3_bucket_name)
expected_object_list = [("test.txt", signature)]
self.assertEqual(s3_object_list, expected_object_list)

def test_get_objects_and_sigs_infected_with_sig_ok(self):
signature = AV_SIGNATURE_OK

get_object_tagging_response = {
"VersionId": "abc123",
"TagSet": [
{"Key": AV_STATUS_METADATA, "Value": AV_STATUS_CLEAN},
{"Key": AV_SIGNATURE_METADATA, "Value": signature},
],
}
get_object_tagging_expected_params = {
"Bucket": self.s3_bucket_name,
"Key": "test.txt",
}
self.stubber.add_response(
"get_object_tagging",
get_object_tagging_response,
get_object_tagging_expected_params,
)

with self.stubber:
s3_object_list = get_objects_and_sigs(self.s3_client, self.s3_bucket_name)
expected_object_list = []
self.assertEqual(s3_object_list, expected_object_list)

def test_get_objects_and_sigs_clean(self):

get_object_tagging_response = {
"VersionId": "abc123",
Expand All @@ -91,11 +145,11 @@ def test_get_objects_clean(self):
)

with self.stubber:
s3_object_list = get_objects(self.s3_client, self.s3_bucket_name)
s3_object_list = get_objects_and_sigs(self.s3_client, self.s3_bucket_name)
expected_object_list = []
self.assertEqual(s3_object_list, expected_object_list)

def test_get_objects_unscanned(self):
def test_get_objects_and_sigs_unscanned(self):

get_object_tagging_response = {"VersionId": "abc123", "TagSet": []}
get_object_tagging_expected_params = {
Expand All @@ -109,6 +163,6 @@ def test_get_objects_unscanned(self):
)

with self.stubber:
s3_object_list = get_objects(self.s3_client, self.s3_bucket_name)
s3_object_list = get_objects_and_sigs(self.s3_client, self.s3_bucket_name)
expected_object_list = []
self.assertEqual(s3_object_list, expected_object_list)
Loading

0 comments on commit 17b63b8

Please sign in to comment.