|
| 1 | +import os |
| 2 | +import requests |
| 3 | +import time |
| 4 | +from pathlib import Path |
| 5 | +from datetime import datetime as dt |
| 6 | +from selenium import webdriver |
| 7 | +from selenium.webdriver.common.by import By |
| 8 | +from selenium.webdriver.common.keys import Keys |
| 9 | +from selenium.webdriver.support.ui import WebDriverWait |
| 10 | +from selenium.webdriver.support import expected_conditions as EC |
| 11 | +from rich.progress import Progress |
| 12 | + |
| 13 | +class VirusExchangeScraper: |
| 14 | + def __init__(self): |
| 15 | + print("init") |
| 16 | + self.driver = webdriver.Chrome() |
| 17 | + self.login_url = "https://virus.exchange/users/log_in" |
| 18 | + self.samples_url = "https://virus.exchange/samples" |
| 19 | + self.wait = WebDriverWait(self.driver, 10) |
| 20 | + |
| 21 | + def login(self, email, password): |
| 22 | + # Login to the Virus Exchange site |
| 23 | + print('login') |
| 24 | + self.driver.get(self.login_url) |
| 25 | + email_field = self.wait.until(EC.presence_of_element_located((By.NAME, "user[email]"))) |
| 26 | + password_field = self.driver.find_element(By.NAME, "user[password]") |
| 27 | + email_field.send_keys(email) |
| 28 | + password_field.send_keys(password) |
| 29 | + password_field.send_keys(Keys.RETURN) |
| 30 | + self.driver.get(self.samples_url) |
| 31 | + return 1 |
| 32 | + |
| 33 | + def get_samples_data(self): |
| 34 | + # Wait for the sample list to load |
| 35 | + print('getting samples') |
| 36 | + self.wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, "li.relative.flex.items-center"))) |
| 37 | + |
| 38 | + # Find all sample items on the page |
| 39 | + sample_elements = self.driver.find_elements(By.CSS_SELECTOR, "li.relative.flex.items-center") |
| 40 | + samples = [] |
| 41 | + |
| 42 | + for element in sample_elements: |
| 43 | + sha256 = element.find_element(By.CSS_SELECTOR, "h2 a span.whitespace-nowrap").text.strip() |
| 44 | + try: |
| 45 | + # Check for the presence of download link and ensure it's ready |
| 46 | + download_link = element.find_element(By.CSS_SELECTOR, "a[download]").get_attribute("href") |
| 47 | + samples.append({"sha256": sha256, "download_link": download_link}) |
| 48 | + except: |
| 49 | + print(f"Sample with SHA256 {sha256} is not yet ready. Skipping...") |
| 50 | + |
| 51 | + return samples |
| 52 | + |
| 53 | + def download_samples(self, samples): |
| 54 | + # Directory setup for downloads |
| 55 | + print('downloading samples') |
| 56 | + download_dir = Path("Downloaded-Malwares") |
| 57 | + download_dir.mkdir(exist_ok=True) |
| 58 | + date_str = dt.now().strftime("%Y-%m-%d") |
| 59 | + |
| 60 | + # Download each sample file with progress |
| 61 | + with Progress() as progress: |
| 62 | + task = progress.add_task("Downloading samples...", total=len(samples)) |
| 63 | + |
| 64 | + for sample in samples: |
| 65 | + sha256_hash = sample['sha256'] |
| 66 | + download_link = sample['download_link'] |
| 67 | + file_name = f"malware_{sha256_hash[:6]}_{date_str}.zip" |
| 68 | + file_path = download_dir / file_name |
| 69 | + |
| 70 | + # Skip download if file already exists |
| 71 | + if file_path.exists(): |
| 72 | + progress.update(task, advance=1) |
| 73 | + continue |
| 74 | + |
| 75 | + response = requests.get(download_link) |
| 76 | + |
| 77 | + if response.status_code == 200: |
| 78 | + with open(file_path, "wb") as f: |
| 79 | + f.write(response.content) |
| 80 | + progress.update(task, advance=1) |
| 81 | + else: |
| 82 | + print(f"Failed to download {sha256_hash[:6]}") |
| 83 | + |
| 84 | + def close(self): |
| 85 | + self.driver.quit() |
0 commit comments