Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give more power to engines #3

Open
barmic opened this issue Dec 21, 2024 · 1 comment
Open

Give more power to engines #3

barmic opened this issue Dec 21, 2024 · 1 comment
Labels
enhancement New feature or request

Comments

@barmic
Copy link

barmic commented Dec 21, 2024

Currently the engines are only functions, it's can be useful to group more things in the engine concept.

Like we see here

def perform_engine_queries(observable, selected_engines, result):
, you can probably have id, predicate and a processor function.

If I check the file https://github.com/stanfrbd/cyberbro/blob/113259e18df471e093599b99bc8074c0457d0662/utils/export.py you can probably give export fonction to this object.

To given more pratical things, I suggest something like this (it's not effective code feel free to rework as you want):

class IpInfo:
  def __init__(self):
    self.id = 'ipinfo'

  def is_revelant(self, observable):
    return observable["type"] in ["IPv4", "IPv6"]

  def process(self, observable, secrets, PROXIES):
    return { self.id: query_ipinfo(observable["value"], secrets["ipinfo"], PROXIES) }

  def export(self, result):
    ipinfo_data = result.get(self.id, {})
    row = dict()
    row["ipinfo_cn"] = ipinfo_data.get("country_code") if ipinfo_data else None
    row["ipinfo_country"] = ipinfo_data.get("country_name") if ipinfo_data else None
    row["ipinfo_geo"] = ipinfo_data.get("geolocation") if ipinfo_data else None
    row["ipinfo_asn"] = ipinfo_data.get("asn").split(' ', 1)[0] if ipinfo_data.get("asn") else None
    row["ipinfo_org"] = ipinfo_data.get("asn").split(' ', 1)[1] if ipinfo_data.get("asn") else None
    return row

def query_ipinfo(ip, API_KEY, PROXIES):
    """
    Queries the IP information from the ipinfo.io API.
    Args:
        ip (str): The IP address to query.
    Returns:
        dict: A dictionary containing the extracted information with the following keys:
            - ip (str): The IP address.
            - geolocation (str): The geolocation in the format "city - region".
            - country (str): The country of the IP address.
            - hostname (str): The hostname associated with the IP address.
            - asn (str): The autonomous system number (ASN) or organization.
        None: If the response does not contain the 'ip' key.
    Raises:
        requests.exceptions.RequestException: If there is an issue with the HTTP request.
        ValueError: If the response cannot be parsed as JSON.
    """
    try:
        # Construct the URL for the IP info API
        url = f"https://ipinfo.io/{ip}/json?token={API_KEY}"
        
        # Make a GET request to the IP info API with proxies and SSL verification disabled
        response = requests.get(url, proxies=PROXIES, verify=False)
        
        # Parse the JSON response
        data = response.json()

        if "bogon" in data:
            return {"ip": ip, "geolocation": "", "country_code": "", "country_name": "", "hostname": "Private IP", "asn": "BOGON", "link": f"https://ipinfo.io/{ip}"}
        
        # Check if the response contains 'ip' key
        if 'ip' in data:
            # Extract relevant information from the response
            ip = data.get("ip", "Unknown")
            hostname = data.get("hostname", "Unknown")
            city = data.get("city", "Unknown")
            region = data.get("region", "Unknown")
            asn = data.get("org", "Unknown")
            country_code = data.get("country", "Unknown")
            country_name = pycountry.countries.get(alpha_2=country_code).name
            
            # Return the extracted information in a dictionary
            return {"ip": ip, "geolocation": f"{city}, {region}", "country_code": country_code, "country_name": country_name, "hostname": hostname, "asn": asn, "link": f"https://ipinfo.io/{ip}"}

    except Exception as e:
        print(e)
    # Always return None in case of failure
    return None

You initialize somewhere

engines = [
  IpInfo(),
  ReverseDns(),
  #...
]

To analyse

result = dict()
for engine in engines:
  if engine.id in selected_engines and engine.is_revelant(observable):
    result.update(engine.process(observable, secret, PROXIES)

And export

row = dict()
for engine in engines:
  if engine.id in selected_engines:
    row.update(engine.export(result))
@stanfrbd stanfrbd added the enhancement New feature or request label Dec 21, 2024
@stanfrbd
Copy link
Owner

Hello,
thank you for bringing that up! It is very kind from you.
I will try to implement this when I have the time, really appreciate your help on this topic!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants