Skip to content

Commit

Permalink
Merge branch 'main' into production
Browse files Browse the repository at this point in the history
  • Loading branch information
Saied Kazemi committed Nov 20, 2023
2 parents 243dcc4 + 73ca154 commit c781ead
Show file tree
Hide file tree
Showing 21 changed files with 2,760 additions and 2,612 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.11"
- uses: dioptra-io/setup-poetry-action@v1
- name: Start services
run: docker compose up -d -t 0 traefik clickhouse minio postgres redis
- name: Install caracal
run: |
sudo curl -L -o /usr/bin/caracal https://github.com/dioptra-io/caracal/releases/download/v0.15.1/caracal-linux-amd64
sudo chmod +x /usr/bin/caracal
- name: Install package
run: poetry install
- name: Run tests
Expand Down Expand Up @@ -44,7 +48,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.11"
- uses: dioptra-io/setup-poetry-action@v1
- name: Install package
run: poetry install
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# 🕸️ Iris — An open-source internet measurement platform

[![Coverage](https://img.shields.io/codecov/c/github/dioptra-io/iris?logo=codecov&logoColor=white)](https://app.codecov.io/gh/dioptra-io/iris)
[![Tests](https://img.shields.io/github/workflow/status/dioptra-io/iris/Tests?logo=github&label=tests)](https://github.com/dioptra-io/iris/actions/workflows/tests.yml)
[![Tests](https://img.shields.io/github/actions/workflow/status/dioptra-io/iris/tests.yml?logo=github&label=tests)](https://github.com/dioptra-io/iris/actions/workflows/tests.yml)

Iris is a system to coordinate complex network measurements from multiple vantage points.
Think of it as a project similar to [CAIDA Ark](https://www.caida.org/projects/ark/) or [RIPE Atlas](https://atlas.ripe.net), with the following features:
Expand Down
4 changes: 4 additions & 0 deletions dockerfiles/iris-agent.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ ENV PYTHONUNBUFFERED=1
RUN apt-get update \
&& apt-get install --no-install-recommends --yes \
ca-certificates \
curl \
libpq5 \
mtr \
python3 \
tzdata \
&& rm -rf /var/lib/apt/lists/*

RUN curl -L https://github.com/dioptra-io/caracal/releases/download/v0.15.1/caracal-linux-amd64 > /usr/bin/caracal \
&& chmod +x /usr/bin/caracal

WORKDIR /app

COPY iris iris
Expand Down
7 changes: 7 additions & 0 deletions docs/dev.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ To be able to access them from your own machine, you need to add the following e
127.0.0.1 traefik.docker.localhost
```

You also need caracal in your $PATH if you intend to run Iris locally:
```bash
# Use caracal-macos-amd64 for macOS
curl -L https://github.com/dioptra-io/caracal/releases/download/v0.15.1/caracal-linux-amd64 > /usr/bin/caracal
chmod +x /usr/bin/caracal
```

## Running Iris

### Locally
Expand Down
147 changes: 81 additions & 66 deletions iris/agent/backend/caracal.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import os
import shlex
import signal
from asyncio.subprocess import create_subprocess_shell
from logging import LoggerAdapter
from multiprocessing import Manager, Process
from pathlib import Path

from pycaracal import prober, set_log_level

from iris.agent.settings import AgentSettings
from iris.commons.models import MeasurementRoundRequest
from iris.commons.redis import Redis
Expand All @@ -22,58 +23,61 @@ async def caracal_backend(
This is the default and reference backend for Iris.
It uses `caracal <https://github.com/dioptra-io/caracal>`_ for sending the probes.
"""
with Manager() as manager:
probing_statistics = manager.dict() # type: ignore
prober_process = Process(
target=probe,
args=(
settings,
probes_filepath,
results_filepath,
request.round.number,
request.batch_size,
request.probing_rate,
probing_statistics,
),

prober = asyncio.create_task(
probe(
settings,
logger,
probes_filepath,
results_filepath,
request.round.number,
request.batch_size,
request.probing_rate,
)
prober_process.start()
cancelled = await watch_cancellation(
)

watcher = asyncio.create_task(
watch_cancellation(
redis,
prober_process,
request.measurement_uuid,
settings.AGENT_UUID,
settings.AGENT_STOPPER_REFRESH,
)
probing_statistics = dict(probing_statistics)
)

return None if cancelled else probing_statistics
done, pending = await asyncio.wait(
[prober, watcher], return_when=asyncio.FIRST_COMPLETED
)
if watcher in done:
# Measurement was cancelled
prober.cancel()
return None

return prober.result()


async def watch_cancellation(
redis: Redis,
process: Process,
measurement_uuid: str,
agent_uuid: str,
interval: float,
) -> bool:
"""Kill the prober process if the measurement request is deleted."""
while process.is_alive():
while True:
if not await redis.get_request(measurement_uuid, agent_uuid):
process.kill()
return True
await asyncio.sleep(interval)
return False


def probe(
async def probe(
settings: AgentSettings,
logger: LoggerAdapter,
probes_filepath: Path,
results_filepath: Path,
round_number: int,
batch_size: int | None,
probing_rate: int,
probing_statistics: dict,
) -> None:
) -> dict:
"""Probing interface."""
# Cap the probing rate if superior to the maximum probing rate
measurement_probing_rate = (
Expand All @@ -82,45 +86,56 @@ def probe(
else settings.AGENT_MAX_PROBING_RATE
)

# This set the log level of the C++ logger (spdlog).
# This allows the logs to be filtered in C++ (fast)
# before being forwarded to the (slower) Python logger.
set_log_level(settings.AGENT_CARACAL_LOGGING_LEVEL)
if probes_filepath.suffix == ".zst":
input_cmd = f"zstd -cd {shlex.quote(str(probes_filepath))}"
else:
input_cmd = f"cat {shlex.quote(str(probes_filepath))}"

# Prober configuration
config = prober.Config()
config.set_output_file_csv(str(results_filepath))
if results_filepath.suffix == ".zst":
output_cmd = f"zstd -c > {shlex.quote(str(results_filepath))}"
else:
output_cmd = f"tee > {shlex.quote(str(results_filepath))}"

config.set_probing_rate(measurement_probing_rate)
config.set_rate_limiting_method(settings.AGENT_CARACAL_RATE_LIMITING_METHOD.value)
config.set_sniffer_wait_time(settings.AGENT_CARACAL_SNIFFER_WAIT_TIME)
config.set_integrity_check(settings.AGENT_CARACAL_INTEGRITY_CHECK)
config.set_meta_round(str(round_number))
caracal_cmd = [
"caracal",
f"--meta-round {shlex.quote(str(round_number))}",
f"--probing-rate {shlex.quote(str(measurement_probing_rate))}",
]

if batch_size:
config.set_batch_size(batch_size)

if settings.AGENT_CARACAL_EXCLUDE_PATH is not None:
config.set_prefix_excl_file(str(settings.AGENT_CARACAL_EXCLUDE_PATH))

prober_stats, sniffer_stats, pcap_stats = prober.probe(config, str(probes_filepath))

# Populate the statistics
# TODO: Implement __dict__ in pycaracal.
probing_statistics["probes_read"] = prober_stats.read
probing_statistics["packets_sent"] = prober_stats.sent
probing_statistics["packets_failed"] = prober_stats.failed
probing_statistics["filtered_low_ttl"] = prober_stats.filtered_lo_ttl
probing_statistics["filtered_high_ttl"] = prober_stats.filtered_hi_ttl
probing_statistics["filtered_prefix_excl"] = prober_stats.filtered_prefix_excl
probing_statistics[
"filtered_prefix_not_incl"
] = prober_stats.filtered_prefix_not_incl

probing_statistics["packets_received"] = sniffer_stats.received_count
probing_statistics[
"packets_received_invalid"
] = sniffer_stats.received_invalid_count
probing_statistics["pcap_received"] = pcap_stats.received
probing_statistics["pcap_dropped"] = pcap_stats.dropped
probing_statistics["pcap_interface_dropped"] = pcap_stats.interface_dropped
caracal_cmd.append(f"--batch-size {shlex.quote(str(batch_size))}")

if exclude_path := settings.AGENT_CARACAL_EXCLUDE_PATH:
caracal_cmd.append(
f"--filter-from-prefix-file-excl {shlex.quote(str(exclude_path))}"
)

if not settings.AGENT_CARACAL_INTEGRITY_CHECK:
caracal_cmd.append("--no-integrity-check")

cmd = f"{input_cmd} | {' '.join(caracal_cmd)} | {output_cmd}"
logger.info("Running %s", cmd)

process = await create_subprocess_shell(cmd, preexec_fn=os.setsid)
try:
await process.wait()
except asyncio.CancelledError:
logger.info("Terminating pid %s", process.pid)
os.killpg(os.getpgid(process.pid), signal.SIGKILL)

# These statistics have been lost when migrating from pycaracal to caracal.
# TODO: Re-implement them.
return {
"probes_read": 0,
"packets_sent": 0,
"packets_failed": 0,
"filtered_low_ttl": 0,
"filtered_high_ttl": 0,
"filtered_prefix_excl": 0,
"filtered_prefix_not_incl": 0,
"packets_received": 0,
"packets_received_invalid": 0,
"pcap_received": 0,
"pcap_dropped": 0,
"pcap_interface_dropped": 0,
}
2 changes: 1 addition & 1 deletion iris/agent/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import socket
import time

import aioredis
from redis import asyncio as aioredis
import psutil

from iris import __version__
Expand Down
10 changes: 0 additions & 10 deletions iris/agent/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,12 @@
from iris.commons.settings import CommonSettings


class RateLimitingMethod(str, Enum):
auto = "auto"
active = "active"
sleep = "sleep"
none = "none"


class AgentSettings(CommonSettings):
"""Agent specific settings."""

AGENT_BACKEND: Literal["atlas", "caracal"] = "caracal"

AGENT_CARACAL_EXCLUDE_PATH: Path = Path("statics/excluded_prefixes")
AGENT_CARACAL_RATE_LIMITING_METHOD: RateLimitingMethod = RateLimitingMethod.auto
AGENT_CARACAL_SNIFFER_WAIT_TIME: int = 5
AGENT_CARACAL_LOGGING_LEVEL: int = logging.INFO
AGENT_CARACAL_INTEGRITY_CHECK: bool = True

AGENT_UUID: str = str(uuid4())
Expand Down
4 changes: 3 additions & 1 deletion iris/api/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@

if __name__ == "__main__":
# Equivalent to `python -m gunicorn --worker-class iris.api.uvicorn iris.api.main:app`.
sys.argv.extend(["--worker-class", "iris.api.uvicorn.Worker", "iris.api.main:app"])
sys.argv.extend(
["--worker-class", "iris.api.uvicorn.Worker", "iris.api.main:make_app"]
)
run()
Loading

0 comments on commit c781ead

Please sign in to comment.