diff --git a/app/core/consts.py b/app/core/consts.py index af18312..41e713e 100644 --- a/app/core/consts.py +++ b/app/core/consts.py @@ -3,8 +3,10 @@ class Consts: - KAFKA_INVOCATIONS = {"WEBHOOK": KafkaToWebhookProcessor, - "GITLAB": KafkaToGitLabProcessor} + KAFKA_INVOCATIONS = { + "WEBHOOK": KafkaToWebhookProcessor, + "GITLAB": KafkaToGitLabProcessor, + } KAFKA_CONSUMER_CLIENT_ID = "port-agent" diff --git a/app/invokers/gitlab_pipeline_invoker.py b/app/invokers/gitlab_pipeline_invoker.py index dc46a87..0e979fd 100644 --- a/app/invokers/gitlab_pipeline_invoker.py +++ b/app/invokers/gitlab_pipeline_invoker.py @@ -1,4 +1,5 @@ import logging +import urllib.parse import requests from core.config import settings @@ -9,15 +10,14 @@ class GitLabPipelineInvoker(BaseInvoker): - def invoke(self, body: dict, project_path: str) -> None: - logger.info("GitLabPipelineInvoker - start - project: %s", - project_path) + logger.info("GitLabPipelineInvoker - start - project: %s", project_path) res = requests.post( - f'{settings.GITLAB_URL}/api/v4/projects/{project_path}/trigger/pipeline', + f"{settings.GITLAB_URL}/api/v4/projects/" + f"{urllib.parse.quote(project_path, safe='')}/trigger/pipeline", json=body, - timeout=settings.GITLAB_PIPELINE_INVOKER_TIMEOUT + timeout=settings.GITLAB_PIPELINE_INVOKER_TIMEOUT, ) logger.info( diff --git a/app/processors/kafka/kafka_to_gitlab_processor.py b/app/processors/kafka/kafka_to_gitlab_processor.py index 289c000..0d86f34 100644 --- a/app/processors/kafka/kafka_to_gitlab_processor.py +++ b/app/processors/kafka/kafka_to_gitlab_processor.py @@ -21,18 +21,19 @@ def msg_process(msg: Message, invocation_method: dict, topic: str) -> None: if not gitlab_project or not gitlab_group: logger.info( - "Skip process message" - " from topic %s, partition %d, offset %d: %s", + "Skip process message" " from topic %s, partition %d, offset %d: %s", topic, msg.partition(), msg.offset(), - "GitLab project path is missing" + "GitLab project path is missing", ) return ref = user_inputs.get("ref", invocation_method.get("defaultRef", "main")) - trigger_token = os.environ.get(f'{gitlab_group}_{gitlab_project}', "") + trigger_token = os.environ.get( + f'{gitlab_group}_{gitlab_project.replace("/", "_")}', "" + ) if not trigger_token: logger.info( @@ -43,24 +44,25 @@ def msg_process(msg: Message, invocation_method: dict, topic: str) -> None: msg.partition(), msg.offset(), gitlab_group, - gitlab_project + gitlab_project, ) return body = { - 'token': trigger_token, - 'ref': ref, + "token": trigger_token, + "ref": ref, } if not invocation_method.get("omitUserInputs"): # GitLab variables must be strings, to be sent to a GitLab pipeline - body.update({'variables': {key: str(value) for key, - value in user_inputs.items()}}) + body.update( + {"variables": {key: str(value) for key, value in user_inputs.items()}} + ) if not invocation_method.get("omitPayload"): body["port_payload"] = msg_value.copy() - gitlab_pipeline_invoker.invoke(body, f'{gitlab_group}%2F{gitlab_project}') + gitlab_pipeline_invoker.invoke(body, f"{gitlab_group}/{gitlab_project}") logger.info( "Successfully processed message from topic %s, partition %d, offset %d", diff --git a/app/streamers/kafka/kafka_streamer.py b/app/streamers/kafka/kafka_streamer.py index e868fbb..af57bcc 100644 --- a/app/streamers/kafka/kafka_streamer.py +++ b/app/streamers/kafka/kafka_streamer.py @@ -24,12 +24,11 @@ def msg_process(self, msg: Message) -> None: invocation_method_error = self.validate_invocation_method(invocation_method) if invocation_method_error != "": logger.info( - "Skip process message" - " from topic %s, partition %d, offset %d: %s", + "Skip process message" " from topic %s, partition %d, offset %d: %s", topic, msg.partition(), msg.offset(), - invocation_method_error + invocation_method_error, ) return diff --git a/tests/unit/processors/kafka/conftest.py b/tests/unit/processors/kafka/conftest.py index 3faa289..cfeb2d8 100644 --- a/tests/unit/processors/kafka/conftest.py +++ b/tests/unit/processors/kafka/conftest.py @@ -17,9 +17,7 @@ class MockResponse: def raise_for_status(self) -> None: if 400 <= self.status_code <= 599: - raise Exception( - self.text - ) + raise Exception(self.text) def mock_post(*args: Any, **kwargs: Any) -> MockResponse: return MockResponse() @@ -234,7 +232,7 @@ def mock_gitlab_run_message() -> Callable[[dict], bytes]: "agent": True, "defaultRef": "main", "projectName": "project", - "groupName": "group" + "groupName": "group", }, "trigger": "CREATE", "description": "", @@ -259,3 +257,8 @@ def get_gitlab_run_message(invocation_method: dict) -> bytes: @pytest.fixture def mock_gitlab_token(monkeypatch: MonkeyPatch) -> None: monkeypatch.setenv("group_project", "token") + + +@pytest.fixture +def mock_gitlab_token_subgroup(monkeypatch: MonkeyPatch) -> None: + monkeypatch.setenv("group_subgroup_sub2_project", "token") diff --git a/tests/unit/processors/kafka/test_kafka_to_gitlab_processor.py b/tests/unit/processors/kafka/test_kafka_to_gitlab_processor.py index 0419ed7..0eeed2b 100644 --- a/tests/unit/processors/kafka/test_kafka_to_gitlab_processor.py +++ b/tests/unit/processors/kafka/test_kafka_to_gitlab_processor.py @@ -19,7 +19,9 @@ ], indirect=True, ) -def test_single_stream_success(mock_requests: None, mock_kafka: None) -> None: +def test_single_stream_success( + mock_requests: None, mock_kafka: None, mock_gitlab_token: None +) -> None: Timer(0.01, terminate_consumer).start() with mock.patch.object(consumer_logger, "error") as mock_error: @@ -37,9 +39,9 @@ def test_single_stream_success(mock_requests: None, mock_kafka: None) -> None: ], indirect=True, ) -def test_single_stream_failed(mock_requests: None, - mock_kafka: None, - mock_gitlab_token: None) -> None: +def test_single_stream_failed( + mock_requests: None, mock_kafka: None, mock_gitlab_token: None +) -> None: Timer(0.01, terminate_consumer).start() with mock.patch.object(consumer_logger, "error") as mock_error: @@ -60,17 +62,20 @@ def test_single_stream_failed(mock_requests: None, [ ( "mock_gitlab_run_message", - {"type": "GITLAB", - "agent": True, - "projectName": "project", - "groupName": ""}, + { + "type": "GITLAB", + "agent": True, + "projectName": "project", + "groupName": "", + }, settings.KAFKA_RUNS_TOPIC, ), ], indirect=True, ) def test_single_stream_skipped_due_to_missing_group_name( - mock_kafka: None, mock_gitlab_token: None) -> None: + mock_kafka: None, mock_gitlab_token: None +) -> None: Timer(0.01, terminate_consumer).start() with mock.patch.object(consumer_logger, "error") as mock_error, mock.patch.object( @@ -89,7 +94,7 @@ def test_single_stream_skipped_due_to_missing_group_name( ANY, 0, 0, - "GitLab project path is missing" + "GitLab project path is missing", ), ] ) @@ -100,17 +105,15 @@ def test_single_stream_skipped_due_to_missing_group_name( [ ( "mock_gitlab_run_message", - {"type": "GITLAB", - "agent": True, - "groupName": "group", - "projectName": ""}, + {"type": "GITLAB", "agent": True, "groupName": "group", "projectName": ""}, settings.KAFKA_RUNS_TOPIC, ), ], indirect=True, ) def test_single_stream_skipped_due_to_missing_project_name( - mock_kafka: None, mock_gitlab_token: None) -> None: + mock_kafka: None, mock_gitlab_token: None +) -> None: Timer(0.01, terminate_consumer).start() with mock.patch.object(consumer_logger, "error") as mock_error, mock.patch.object( @@ -129,7 +132,7 @@ def test_single_stream_skipped_due_to_missing_project_name( ANY, 0, 0, - "GitLab project path is missing" + "GitLab project path is missing", ), ] ) @@ -140,17 +143,20 @@ def test_single_stream_skipped_due_to_missing_project_name( [ ( "mock_gitlab_run_message", - {"type": "GITLAB", - "agent": True, - "groupName": "notgroup", - "projectName": "notproject"}, + { + "type": "GITLAB", + "agent": True, + "groupName": "notgroup", + "projectName": "notproject", + }, settings.KAFKA_RUNS_TOPIC, ), ], indirect=True, ) def test_single_stream_skipped_due_to_wrong_token( - mock_kafka: None, mock_gitlab_token: None) -> None: + mock_kafka: None, mock_gitlab_token: None +) -> None: Timer(0.01, terminate_consumer).start() with mock.patch.object(consumer_logger, "error") as mock_error, mock.patch.object( @@ -171,7 +177,93 @@ def test_single_stream_skipped_due_to_wrong_token( 0, 0, ANY, - ANY + ANY, + ), + ] + ) + + +@pytest.mark.parametrize("mock_requests", [{"status_code": 200}], indirect=True) +@pytest.mark.parametrize( + "mock_kafka", + [ + ( + "mock_gitlab_run_message", + { + "type": "GITLAB", + "agent": True, + "groupName": "group", + "projectName": "subgroup/sub2/project", + }, + settings.KAFKA_RUNS_TOPIC, + ), + ], + indirect=True, +) +def test_single_stream_with_subgroup_in_project_name( + mock_requests: None, mock_kafka: None, mock_gitlab_token_subgroup: None +) -> None: + Timer(0.01, terminate_consumer).start() + + with mock.patch.object(gitlab_processor_logger, "info") as mock_info: + + streamer = KafkaStreamer(Consumer()) + streamer.stream() + + call_of_missing_token = call( + "Skip process message" + " from topic %s, partition %d, offset %d:" + " no token env variable found for project %s/%s", + ANY, + 0, + 0, + ANY, + ANY, + ) + + # Check if the expected calls were not made + assert call_of_missing_token not in mock_info.call_args_list + + +@pytest.mark.parametrize("mock_requests", [{"status_code": 200}], indirect=True) +@pytest.mark.parametrize( + "mock_kafka", + [ + ( + "mock_gitlab_run_message", + { + "type": "GITLAB", + "agent": True, + "groupName": "group", + "projectName": "wrong/sub2/project", + }, + settings.KAFKA_RUNS_TOPIC, + ), + ], + indirect=True, +) +def test_single_stream_with_subgroup_in_project_name_failure( + mock_requests: None, mock_kafka: None, mock_gitlab_token_subgroup: None +) -> None: + Timer(0.01, terminate_consumer).start() + + with mock.patch.object(gitlab_processor_logger, "info") as mock_info: + + streamer = KafkaStreamer(Consumer()) + streamer.stream() + + mock_info.assert_has_calls( + [ + call(ANY, ANY), + call( + "Skip process message" + " from topic %s, partition %d, offset %d:" + " no token env variable found for project %s/%s", + ANY, + 0, + 0, + ANY, + ANY, ), ] ) diff --git a/tests/unit/streamers/kafka/conftest.py b/tests/unit/streamers/kafka/conftest.py index 38dfacb..c7b00dd 100644 --- a/tests/unit/streamers/kafka/conftest.py +++ b/tests/unit/streamers/kafka/conftest.py @@ -17,9 +17,7 @@ class MockResponse: def raise_for_status(self) -> None: if 400 <= self.status_code <= 599: - raise Exception( - self.text - ) + raise Exception(self.text) def mock_post(*args: Any, **kwargs: Any) -> MockResponse: return MockResponse() diff --git a/tests/unit/streamers/kafka/test_kafka_streamer.py b/tests/unit/streamers/kafka/test_kafka_streamer.py index f14be34..7ae4363 100644 --- a/tests/unit/streamers/kafka/test_kafka_streamer.py +++ b/tests/unit/streamers/kafka/test_kafka_streamer.py @@ -80,7 +80,7 @@ def test_single_stream_skipped_due_to_agentless(mock_kafka: None) -> None: ANY, 0, 0, - "not for agent" + "not for agent", ), ] ) @@ -116,7 +116,7 @@ def test_single_stream_skipped_due_to_unsupported_invoker(mock_kafka: None) -> N ANY, 0, 0, - "Invocation type not found / not supported" + "Invocation type not found / not supported", ), ] )