diff --git a/CHANGELOG.md b/CHANGELOG.md index a046004c2..71c9877b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,12 +3,23 @@ ## next release ### Breaking ### Features +### Improvements +### Bugfix + +## 13.1.0 +### Features * `pre_detector` now normalizes timestamps with configurable parameters timestamp_field, source_format, source_timezone and target_timezone * `pre_detector` now writes tags in failure cases * `ProcessingWarnings` now can write `tags` to the event +* add `timeout` parameter to logprep http generator to set the timeout in seconds for requests +* add primitive rate limiting to `http_input` connector ### Improvements + +* switch to `uvloop` as default loop for the used threaded http uvicorn server +* switch to `httptools` as default http implementation for the used threaded http uvicorn server + ### Bugfix * remove redundant chart features for mounting secrets diff --git a/Dockerfile b/Dockerfile index fb36e9c64..f5e2f8a12 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,10 +8,10 @@ ARG no_proxy ADD . /logprep WORKDIR /logprep +RUN python -m pip install --upgrade pip wheel setuptools>=72.2.0 RUN python -m venv /opt/venv # Make sure we use the virtualenv: ENV PATH="/opt/venv/bin:$PATH" -RUN python -m pip install --upgrade pip wheel setuptools RUN if [ "$LOGPREP_VERSION" = "dev" ]; then pip install .;\ elif [ "$LOGPREP_VERSION" = "latest" ]; then pip install git+https://github.com/fkie-cad/Logprep.git@latest; \ diff --git a/README.md b/README.md index bf7d2371c..bf5fc3c16 100644 --- a/README.md +++ b/README.md @@ -232,9 +232,8 @@ Details about the rule language and how to write rules for the processors can be ## Getting Started -For installation instructions see: https://logprep.readthedocs.io/en/latest/getting_started.html#installation -For execution instructions see: https://logprep.readthedocs.io/en/latest/getting_started.html#run-logprep - +For installation instructions see: https://logprep.readthedocs.io/en/latest/installation.html +For execution instructions see: https://logprep.readthedocs.io/en/latest/user_manual/execution.html ### Reload the Configuration diff --git a/examples/exampledata/config/grafana/dashboards/logprep-http-input.json b/examples/exampledata/config/grafana/dashboards/logprep-http-input.json new file mode 100644 index 000000000..e28781ac7 --- /dev/null +++ b/examples/exampledata/config/grafana/dashboards/logprep-http-input.json @@ -0,0 +1,2056 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 7, + "links": [], + "liveNow": false, + "panels": [ + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 19, + "panels": [], + "title": "Average Behavior Over Selected Timeframe", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Anzahl der Events, die Logprep schafft aus kafka sich abzuholen. Ereigniss pro Sekunde je Logprep Prozess.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "decimals": 2, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "yellow", + "value": 300 + }, + { + "color": "green", + "value": 500 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 3, + "x": 0, + "y": 1 + }, + "id": 142, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "mean" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(rate(logprep_number_of_processed_events_total{component=\"input\"}[1m])) / sum(count by(pod) ((logprep_config_refresh_interval)) -1 )", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B" + } + ], + "title": "Durchsatz / Prozess", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Processing times of input connector. ", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 3, + "x": 3, + "y": 1 + }, + "id": 38, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "mean" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "avg(increase(logprep_processing_time_per_event_sum{component=\"input\"}[$__range])/increase(logprep_processing_time_per_event_count{component=\"input\"}[$__range]) > 0)", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B" + } + ], + "title": "Input Processing Times", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Duration of all connector and pipeline processing times combined.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 4, + "x": 6, + "y": 1 + }, + "id": 20, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "mean" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(avg by (component)((increase(logprep_processing_time_per_event_sum{component=~\"input|output|pipeline\"}[$__range])/increase(logprep_processing_time_per_event_count{component=~\"input|output|pipeline\"}[$__range]) > 0)))", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B" + } + ], + "title": "Total Processing Times", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Stellt die Ratio von eingehenden und ausgehenden Logs dar. Im Falle der Verwendung des Selective Extractors muss beachtet werden, dass immer doppelt so viele Logs ausgehen geschrieben werden. Gleiches gilt für Logs mit vielen Pseudonymen oder SREs.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "decimals": 0, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 2, + "x": 10, + "y": 1 + }, + "id": 31, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(increase(logprep_number_of_processed_events_total{component=\"input\"}[$__range])) - sum(increase(logprep_number_of_processed_events_total{component=\"output\"}[$__range]))", + "hide": true, + "instant": false, + "legendFormat": "Absolut", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(increase(logprep_number_of_processed_events_total{component=\"output\"}[$__range])) / sum(increase(logprep_number_of_processed_events_total{component=\"input\"}[$__range])>0)", + "hide": false, + "instant": false, + "legendFormat": "Relative", + "range": true, + "refId": "B" + } + ], + "title": "Input/Output Ratio", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "The increase of number of incoming events", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 4, + "x": 12, + "y": 1 + }, + "id": 30, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(increase(logprep_number_of_processed_events_total{component=\"input\"}[$__range]))", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Incoming Events", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Gibt an, wieviel logs grad in der Queue sind", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 300000 + }, + { + "color": "red", + "value": 1000000 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 4, + "x": 16, + "y": 1 + }, + "id": 157, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "logprep_message_backlog_size{component=\"input\"}", + "format": "table", + "hide": false, + "instant": true, + "legendFormat": "__auto", + "range": false, + "refId": "A" + } + ], + "title": "Queue Size", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "The increase of number of warnings in the selected time range", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 2, + "x": 20, + "y": 1 + }, + "id": 150, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "round(sum(increase(logprep_number_of_warnings_total[$__range])))", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Warnings", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Shows the currently active config version of logprep, if specified.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 2, + "x": 22, + "y": 1 + }, + "id": 90, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "/^config$/", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "max(logprep_version_info) by (logprep, config)", + "format": "table", + "instant": true, + "legendFormat": "{{label_name}}", + "range": false, + "refId": "A" + } + ], + "title": "Config Version", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "filterFieldsByName", + "options": { + "include": { + "names": [ + "config" + ] + } + } + } + ], + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Anzahl der Events, die Logprep schafft aus kafka sich abzuholen. Ereigniss pro Sekunde.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "decimals": 2, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "yellow", + "value": 1000 + }, + { + "color": "green", + "value": 5000 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 3, + "x": 0, + "y": 4 + }, + "id": 116, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "mean" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(rate(logprep_number_of_processed_events_total{component=\"input\"}[1m]))", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B" + } + ], + "title": "Durchsatz Gesamt", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Average processing times of all output connectors", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 3, + "x": 3, + "y": 4 + }, + "id": 37, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "mean" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "avg(increase(logprep_processing_time_per_event_sum{component=\"output\"}[$__range])/increase(logprep_processing_time_per_event_count{component=\"output\"}[$__range])) > 0", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B" + } + ], + "title": "Output Processing Times", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Is the sum of the whole pipeline: parsing event, passing it through the pipeline and sending it to the output connector.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 4, + "x": 6, + "y": 4 + }, + "id": 61, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "mean" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "avg(increase(logprep_processing_time_per_event_sum{component=\"pipeline\"}[$__range])/increase(logprep_processing_time_per_event_count{component=\"pipeline\"}[$__range]) > 0)", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B" + } + ], + "title": "Pipeline Processing Times", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Anzahl der laufenden Pipeline Prozesse", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1000 + }, + { + "color": "yellow", + "value": 1000 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 2, + "x": 10, + "y": 4 + }, + "id": 145, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "expr": "sum(count by(pod) ((logprep_config_refresh_interval)) -1 )", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B" + } + ], + "title": "Process Count", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "The increase of number of outgoing events", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 4, + "x": 12, + "y": 4 + }, + "id": 149, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(increase(logprep_number_of_processed_events_total{component=\"output\"}[$__range])) ", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Outgoing Events", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Anzahl der Requests auf dem Konnektor", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 30000 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 4, + "x": 16, + "y": 4 + }, + "id": 158, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(increase(logprep_number_of_http_requests_total{component=\"input\"}[$__range])) ", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Incoming Requests", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "The increase of number of errors in the selected time range", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 2, + "x": 20, + "y": 4 + }, + "id": 156, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "round(sum(increase(logprep_number_of_errors_total[$__range])))", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Errors", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Shows the currently active logprep version.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 2, + "x": 22, + "y": 4 + }, + "id": 93, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "/^logprep$/", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "10.4.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "max(logprep_version_info) by (logprep, config)", + "format": "table", + "instant": true, + "legendFormat": "{{label_name}}", + "range": false, + "refId": "A" + } + ], + "title": "Logprep Version", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "filterFieldsByName", + "options": { + "include": { + "names": [ + "logprep" + ] + } + } + } + ], + "type": "stat" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 7 + }, + "id": 13, + "panels": [], + "title": "Connectors", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Shows the per-second average of processed events per connector.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "confluentkafka_output" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": false, + "viz": true + } + } + ] + } + ] + }, + "gridPos": { + "h": 7, + "w": 24, + "x": 0, + "y": 8 + }, + "id": 10, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "right", + "showLegend": true, + "width": 200 + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "expr": "sum by(type)(rate(logprep_number_of_processed_events_total{component=\"input\"}[1m]))", + "hide": false, + "instant": false, + "legendFormat": "{{type}}", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "expr": "sum by(type)(rate(logprep_number_of_processed_events_total{component=\"output\"}[1m]))", + "hide": false, + "instant": false, + "legendFormat": "{{type}}", + "range": true, + "refId": "B" + } + ], + "title": "Processed Event Rate per Connector per Second", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Shows the difference between processed and failed events as a per-second average. Failed events are events that couldn't be parsed or where processors/rules canceled the processing due to an error. ", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 24, + "x": 0, + "y": 15 + }, + "id": 11, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "right", + "showLegend": true, + "width": 200 + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "sum(rate(logprep_number_of_processed_events_total{component=\"output\"}[1m]))", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Processed Events", + "range": true, + "refId": "A", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "sum(rate(logprep_number_of_failed_events_total{component=\"output\"}[1m]))", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Failed Events", + "range": true, + "refId": "B", + "useBackend": false + } + ], + "title": "Success/Failed Event Rate per Output per Second", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "Shows the processing time per connector ", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 22 + }, + "id": 59, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "right", + "showLegend": true, + "width": 200 + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "expr": "sum by (type)(increase(logprep_processing_time_per_event_sum{component=\"input\"}[1m])/increase(logprep_processing_time_per_event_count{component=\"input\"}[1m]))", + "instant": false, + "legendFormat": "{{type}}", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "expr": "sum by (type)(increase(logprep_processing_time_per_event_sum{component=\"output\"}[1m])/increase(logprep_processing_time_per_event_count{component=\"output\"}[1m]))", + "hide": false, + "instant": false, + "legendFormat": "{{type}}", + "range": true, + "refId": "B" + } + ], + "title": "Processing Time Per Connector", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 30 + }, + "id": 62, + "panels": [], + "title": "Kafka", + "type": "row" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 31 + }, + "id": 74, + "panels": [], + "title": "Errors / Warnings", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "in the selected time range", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 24, + "x": 0, + "y": 32 + }, + "id": 72, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "right", + "showLegend": true, + "width": 200 + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.4", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum by (component) (increase(logprep_number_of_errors_total[1m]))", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Number of Errors", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "description": "in the selected time range", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [ + { + "options": { + "match": "nan", + "result": { + "index": 0, + "text": "0" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 24, + "x": 0, + "y": 37 + }, + "id": 73, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "right", + "showLegend": true, + "width": 200 + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.4", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "promz2394" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum by (component) (increase(logprep_number_of_warnings_total[1m]))", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Number of Warnings", + "type": "timeseries" + } + ], + "refresh": "5s", + "schemaVersion": 39, + "tags": [ + "Logprep" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-5m", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "HTTP Input v1.0.0", + "uid": "deb1220a-968c-44b9-b36a-c22f1606b950", + "version": 1, + "weekStart": "" +} \ No newline at end of file diff --git a/examples/exampledata/config/http_pipeline.yml b/examples/exampledata/config/http_pipeline.yml index b07fab85a..5a63faca4 100644 --- a/examples/exampledata/config/http_pipeline.yml +++ b/examples/exampledata/config/http_pipeline.yml @@ -1,7 +1,7 @@ version: 2 -process_count: 2 -config_refresh_interval: 10 - +process_count: 8 +config_refresh_interval: 300 +profile_pipelines: false logger: level: INFO loggers: @@ -21,19 +21,27 @@ metrics: server_header: false date_header: false workers: 1 + ws: none + interface: asgi3 + backlog: 16384 + timeout_keep_alive: 65 input: httpinput: type: http_input - message_backlog_size: 150 - collect_meta: True + message_backlog_size: 1500000 + collect_meta: true metafield_name: "@metadata" uvicorn_config: host: 0.0.0.0 port: 9000 - workers: 2 + workers: 1 access_log: true server_header: false date_header: false + ws: none + interface: asgi3 + backlog: 16384 + timeout_keep_alive: 65 endpoints: /auth-json: json /json: json @@ -48,5 +56,10 @@ output: send_timeout: 0 kafka_config: bootstrap.servers: 127.0.0.1:9092 - compression.type: gzip + compression.type: none statistics.interval.ms: "60000" + queue.buffering.max.messages: "100000000" + queue.buffering.max.kbytes: "1048576" + queue.buffering.max.ms: "5000" + batch.size: "1000000" + request.required.acks: "-1" diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index 55690affd..c60b527c6 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -256,11 +256,9 @@ async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-diff """json endpoint method""" self.collect_metrics() data = await self.get_data(req) - data = data.decode("utf8") - metadata = kwargs.get("metadata", {}) if data: event = self._decoder.decode(data) - self.messages.put({**event, **metadata}, block=False) + self.messages.put(event | kwargs["metadata"], block=False) class JSONLHttpEndpoint(HttpEndpoint): @@ -274,14 +272,9 @@ class JSONLHttpEndpoint(HttpEndpoint): async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ """jsonl endpoint method""" self.collect_metrics() - data = await self.get_data(req) - data = data.decode("utf8") - event = kwargs.get("metadata", {}) - metadata = kwargs.get("metadata", {}) - stripped_lines = map(str.strip, data.splitlines()) - events = (self._decoder.decode(line) for line in stripped_lines if line) + events = self._decoder.decode_lines(await self.get_data(req)) for event in events: - self.messages.put({**event, **metadata}, block=False) + self.messages.put(event | kwargs["metadata"], block=False, batch_size=len(events)) class PlaintextHttpEndpoint(HttpEndpoint): @@ -295,9 +288,8 @@ async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-diff """plaintext endpoint method""" self.collect_metrics() data = await self.get_data(req) - metadata = kwargs.get("metadata", {}) event = {"message": data.decode("utf8")} - self.messages.put({**event, **metadata}, block=False) + self.messages.put(event | kwargs["metadata"], block=False) class HttpInput(Input): @@ -435,12 +427,6 @@ def setup(self): raise FatalInputError( self, "Necessary instance attribute `pipeline_index` could not be found." ) - logger.debug( - "HttpInput Connector started on target %s and queue %s with queue_size: %s", - self.target, - id(self.messages), - self.messages._maxsize, # pylint: disable=protected-access - ) # Start HTTP Input only when in first process if self.pipeline_index != 1: return diff --git a/logprep/connector/http/output.py b/logprep/connector/http/output.py index a85786e60..7c3d393c2 100644 --- a/logprep/connector/http/output.py +++ b/logprep/connector/http/output.py @@ -113,6 +113,8 @@ class Config(Output.Config): """Password that is used for the basic auth http request""" target_url: str """URL of the endpoint that receives the events""" + timeout: int = field(validator=validators.instance_of(int), default=2) + """Timeout in seconds for the http request""" @property def user(self): @@ -124,6 +126,11 @@ def password(self): """Return the password that is used for the http request""" return self._config.password + @property + def timeout(self): + """Return the timeout in seconds for the http request""" + return self._config.timeout + @cached_property def _headers(self): return {"Content-Type": "application/x-ndjson; charset=utf-8"} @@ -181,7 +188,7 @@ def store_custom(self, document: dict | tuple | list, target: str) -> None: headers=self._headers, verify=False, auth=(self.user, self.password), - timeout=2, + timeout=self.timeout, data=request_data, ) logger.debug("Servers response code is: %i", response.status_code) diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index bba5ac540..1d085af9f 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -5,6 +5,7 @@ import logging import logging.handlers import multiprocessing +import multiprocessing.managers import multiprocessing.queues import random import time @@ -22,6 +23,32 @@ logger = logging.getLogger("Manager") +class ThrottlingQueue(multiprocessing.queues.Queue): + """A queue that throttles the number of items that can be put into it.""" + + wait_time = 0.0000000000000001 + + @property + def consumed_percent(self): + """Return the percentage of items consumed.""" + return self.qsize() / self.capacity + + def __init__(self, ctx, maxsize): + super().__init__(ctx=ctx, maxsize=maxsize) + self.capacity = maxsize + self.call_time = None + + def throttle(self, batch_size=1): + """Throttle put by sleeping.""" + time.sleep((self.wait_time**self.consumed_percent) / batch_size) + + def put(self, obj, block=True, timeout=None, batch_size=1): + """Put an obj into the queue.""" + if self.consumed_percent >= 0.9: + self.throttle(batch_size) + super().put(obj, block=block, timeout=timeout) + + class PipelineManager: """Manage pipelines via multi-processing.""" @@ -87,7 +114,7 @@ def _set_http_input_queue(self, configuration): if not is_http_input and HttpInput.messages is not None: return message_backlog_size = input_config.get("message_backlog_size", 15000) - HttpInput.messages = multiprocessing.Queue(maxsize=message_backlog_size) + HttpInput.messages = ThrottlingQueue(multiprocessing.get_context(), message_backlog_size) def set_count(self, count: int): """Set the pipeline count. @@ -165,7 +192,9 @@ def restart(self): def _create_pipeline(self, index) -> multiprocessing.Process: pipeline = Pipeline(pipeline_index=index, config=self._configuration) logger.info("Created new pipeline") - process = multiprocessing.Process(target=pipeline.run, daemon=True) + process = multiprocessing.Process( + target=pipeline.run, daemon=True, name=f"Pipeline-{index}" + ) process.stop = pipeline.stop process.start() return process diff --git a/logprep/generator/http/controller.py b/logprep/generator/http/controller.py index d784edd28..664f5218c 100644 --- a/logprep/generator/http/controller.py +++ b/logprep/generator/http/controller.py @@ -34,6 +34,7 @@ def __init__(self, **kwargs): "user": kwargs.get("user"), "password": kwargs.get("password"), "target_url": kwargs.get("target_url"), + "timeout": kwargs.get("timeout", 2), } } self.output: HttpOutput = Factory.create(output_config) diff --git a/logprep/run_logprep.py b/logprep/run_logprep.py index e0f537df8..e4255d5b5 100644 --- a/logprep/run_logprep.py +++ b/logprep/run_logprep.py @@ -266,6 +266,12 @@ def generate_kafka(config, file): required=False, default="INFO", ) +@click.option( + "--timeout", + help="Timeout in seconds for the http requests", + required=False, + default=2, +) def generate_http(**kwargs): """ Generates events based on templated sample files stored inside a dataset directory. diff --git a/logprep/util/http.py b/logprep/util/http.py index 8c75450b8..a96a3bbc6 100644 --- a/logprep/util/http.py +++ b/logprep/util/http.py @@ -59,8 +59,9 @@ def __init__( self.shut_down() internal_uvicorn_config = { "lifespan": "off", - "loop": "asyncio", - "timeout_graceful_shutdown": 5, + "timeout_graceful_shutdown": 10, + "loop": "uvloop", + "http": "httptools", } uvicorn_config = {**internal_uvicorn_config, **uvicorn_config} self._logger_name = logger_name diff --git a/pyproject.toml b/pyproject.toml index 3ea2abb8a..bf280885e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,8 @@ dependencies = [ "pandas", "tabulate", "falcon==3.1.3", + "uvloop", + "httptools", ] [project.optional-dependencies] diff --git a/tests/unit/connector/test_http_input.py b/tests/unit/connector/test_http_input.py index b20bf26a7..09379815c 100644 --- a/tests/unit/connector/test_http_input.py +++ b/tests/unit/connector/test_http_input.py @@ -17,6 +17,7 @@ from logprep.abc.input import FatalInputError from logprep.connector.http.input import HttpInput from logprep.factory import Factory +from logprep.framework.pipeline_manager import ThrottlingQueue from logprep.util.defaults import ENV_NAME_LOGPREP_CREDENTIALS_FILE from tests.unit.connector.base import BaseInputTestCase @@ -48,7 +49,9 @@ def create_credentials(tmp_path): class TestHttpConnector(BaseInputTestCase): def setup_method(self): - HttpInput.messages = multiprocessing.Queue(maxsize=self.CONFIG.get("message_backlog_size")) + HttpInput.messages = ThrottlingQueue( + ctx=multiprocessing.get_context(), maxsize=self.CONFIG.get("message_backlog_size") + ) super().setup_method() self.object.pipeline_index = 1 self.object.setup() diff --git a/tests/unit/generator/http/test_controller.py b/tests/unit/generator/http/test_controller.py index f5ef85f31..ee04694df 100644 --- a/tests/unit/generator/http/test_controller.py +++ b/tests/unit/generator/http/test_controller.py @@ -1,7 +1,6 @@ # pylint: disable=missing-docstring # pylint: disable=attribute-defined-outside-init # pylint: disable=protected-access -import logging import os from unittest import mock @@ -15,7 +14,7 @@ class TestController: def setup_method(self): self.target_url = "http://testendpoint" self.batch_size = 10 - self.contoller = Controller( + self.controller = Controller( input_dir="", batch_size=self.batch_size, replace_timestamp=True, @@ -54,13 +53,13 @@ def test_run(self, tmp_path): class_name="class_two", config=class_two_config, ) - self.contoller.input.input_root_path = dataset_path - self.contoller.input.temp_dir = tmp_path / "tmp_input_file" # Mock temp dir for test - os.makedirs(self.contoller.input._temp_dir, exist_ok=True) + self.controller.input.input_root_path = dataset_path + self.controller.input.temp_dir = tmp_path / "tmp_input_file" # Mock temp dir for test + os.makedirs(self.controller.input._temp_dir, exist_ok=True) expected_status_code = 200 responses.add(responses.POST, f"{self.target_url}/target-one", status=expected_status_code) responses.add(responses.POST, f"{self.target_url}/target-two", status=expected_status_code) - self.contoller.run() + self.controller.run() for call_id, call in enumerate(responses.calls): if call_id < (class_one_number_events / self.batch_size): @@ -77,7 +76,7 @@ def test_run(self, tmp_path): @mock.patch("logprep.generator.http.controller.ThreadPoolExecutor") def test_run_with_multiple_threads(self, mock_executor_class, tmp_path): - self.contoller = Controller( + self.controller = Controller( input_dir="", batch_size=self.batch_size, replace_timestamp=True, @@ -101,9 +100,9 @@ def test_run_with_multiple_threads(self, mock_executor_class, tmp_path): class_name="class_one", config=class_one_config, ) - self.contoller.input.input_root_path = dataset_path + self.controller.input.input_root_path = dataset_path mock_executor_instance = mock.MagicMock() mock_executor_class.return_value.__enter__.return_value = mock_executor_instance - self.contoller.run() + self.controller.run() mock_executor_class.assert_called_with(max_workers=2) mock_executor_instance.map.assert_called() diff --git a/tests/unit/test_run_logprep.py b/tests/unit/test_run_logprep.py index cf4e4870a..41c6eae28 100644 --- a/tests/unit/test_run_logprep.py +++ b/tests/unit/test_run_logprep.py @@ -330,6 +330,7 @@ def test_generator_cli_runs_generator_with_default_values(self, mock_controller_ replace_timestamp=True, tag="loadtest", loglevel="INFO", + timeout=2, ) mock_controller_instance.run.assert_called() @@ -379,6 +380,7 @@ def test_generator_cli_overwrites_default_values(self, mock_generator): replace_timestamp=False, tag="test-tag", loglevel="DEBUG", + timeout=2, ) mock_controller.run.assert_called()