Skip to content

Commit

Permalink
Merge pull request #6 from port-labs/support_subproject_pipelines
Browse files Browse the repository at this point in the history
added support for subprojects
  • Loading branch information
matarpeles authored Oct 29, 2023
2 parents 371c34b + f9b45dc commit 4d07e0f
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 51 deletions.
6 changes: 4 additions & 2 deletions app/core/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@


class Consts:
KAFKA_INVOCATIONS = {"WEBHOOK": KafkaToWebhookProcessor,
"GITLAB": KafkaToGitLabProcessor}
KAFKA_INVOCATIONS = {
"WEBHOOK": KafkaToWebhookProcessor,
"GITLAB": KafkaToGitLabProcessor,
}
KAFKA_CONSUMER_CLIENT_ID = "port-agent"


Expand Down
10 changes: 5 additions & 5 deletions app/invokers/gitlab_pipeline_invoker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import urllib.parse

import requests
from core.config import settings
Expand All @@ -9,15 +10,14 @@


class GitLabPipelineInvoker(BaseInvoker):

def invoke(self, body: dict, project_path: str) -> None:
logger.info("GitLabPipelineInvoker - start - project: %s",
project_path)
logger.info("GitLabPipelineInvoker - start - project: %s", project_path)

res = requests.post(
f'{settings.GITLAB_URL}/api/v4/projects/{project_path}/trigger/pipeline',
f"{settings.GITLAB_URL}/api/v4/projects/"
f"{urllib.parse.quote(project_path, safe='')}/trigger/pipeline",
json=body,
timeout=settings.GITLAB_PIPELINE_INVOKER_TIMEOUT
timeout=settings.GITLAB_PIPELINE_INVOKER_TIMEOUT,
)

logger.info(
Expand Down
22 changes: 12 additions & 10 deletions app/processors/kafka/kafka_to_gitlab_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ def msg_process(msg: Message, invocation_method: dict, topic: str) -> None:

if not gitlab_project or not gitlab_group:
logger.info(
"Skip process message"
" from topic %s, partition %d, offset %d: %s",
"Skip process message" " from topic %s, partition %d, offset %d: %s",
topic,
msg.partition(),
msg.offset(),
"GitLab project path is missing"
"GitLab project path is missing",
)
return

ref = user_inputs.get("ref", invocation_method.get("defaultRef", "main"))

trigger_token = os.environ.get(f'{gitlab_group}_{gitlab_project}', "")
trigger_token = os.environ.get(
f'{gitlab_group}_{gitlab_project.replace("/", "_")}', ""
)

if not trigger_token:
logger.info(
Expand All @@ -43,24 +44,25 @@ def msg_process(msg: Message, invocation_method: dict, topic: str) -> None:
msg.partition(),
msg.offset(),
gitlab_group,
gitlab_project
gitlab_project,
)
return

body = {
'token': trigger_token,
'ref': ref,
"token": trigger_token,
"ref": ref,
}

if not invocation_method.get("omitUserInputs"):
# GitLab variables must be strings, to be sent to a GitLab pipeline
body.update({'variables': {key: str(value) for key,
value in user_inputs.items()}})
body.update(
{"variables": {key: str(value) for key, value in user_inputs.items()}}
)

if not invocation_method.get("omitPayload"):
body["port_payload"] = msg_value.copy()

gitlab_pipeline_invoker.invoke(body, f'{gitlab_group}%2F{gitlab_project}')
gitlab_pipeline_invoker.invoke(body, f"{gitlab_group}/{gitlab_project}")

logger.info(
"Successfully processed message from topic %s, partition %d, offset %d",
Expand Down
5 changes: 2 additions & 3 deletions app/streamers/kafka/kafka_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ def msg_process(self, msg: Message) -> None:
invocation_method_error = self.validate_invocation_method(invocation_method)
if invocation_method_error != "":
logger.info(
"Skip process message"
" from topic %s, partition %d, offset %d: %s",
"Skip process message" " from topic %s, partition %d, offset %d: %s",
topic,
msg.partition(),
msg.offset(),
invocation_method_error
invocation_method_error,
)
return

Expand Down
11 changes: 7 additions & 4 deletions tests/unit/processors/kafka/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ class MockResponse:

def raise_for_status(self) -> None:
if 400 <= self.status_code <= 599:
raise Exception(
self.text
)
raise Exception(self.text)

def mock_post(*args: Any, **kwargs: Any) -> MockResponse:
return MockResponse()
Expand Down Expand Up @@ -234,7 +232,7 @@ def mock_gitlab_run_message() -> Callable[[dict], bytes]:
"agent": True,
"defaultRef": "main",
"projectName": "project",
"groupName": "group"
"groupName": "group",
},
"trigger": "CREATE",
"description": "",
Expand All @@ -259,3 +257,8 @@ def get_gitlab_run_message(invocation_method: dict) -> bytes:
@pytest.fixture
def mock_gitlab_token(monkeypatch: MonkeyPatch) -> None:
monkeypatch.setenv("group_project", "token")


@pytest.fixture
def mock_gitlab_token_subgroup(monkeypatch: MonkeyPatch) -> None:
monkeypatch.setenv("group_subgroup_sub2_project", "token")
136 changes: 114 additions & 22 deletions tests/unit/processors/kafka/test_kafka_to_gitlab_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
],
indirect=True,
)
def test_single_stream_success(mock_requests: None, mock_kafka: None) -> None:
def test_single_stream_success(
mock_requests: None, mock_kafka: None, mock_gitlab_token: None
) -> None:
Timer(0.01, terminate_consumer).start()

with mock.patch.object(consumer_logger, "error") as mock_error:
Expand All @@ -37,9 +39,9 @@ def test_single_stream_success(mock_requests: None, mock_kafka: None) -> None:
],
indirect=True,
)
def test_single_stream_failed(mock_requests: None,
mock_kafka: None,
mock_gitlab_token: None) -> None:
def test_single_stream_failed(
mock_requests: None, mock_kafka: None, mock_gitlab_token: None
) -> None:
Timer(0.01, terminate_consumer).start()

with mock.patch.object(consumer_logger, "error") as mock_error:
Expand All @@ -60,17 +62,20 @@ def test_single_stream_failed(mock_requests: None,
[
(
"mock_gitlab_run_message",
{"type": "GITLAB",
"agent": True,
"projectName": "project",
"groupName": ""},
{
"type": "GITLAB",
"agent": True,
"projectName": "project",
"groupName": "",
},
settings.KAFKA_RUNS_TOPIC,
),
],
indirect=True,
)
def test_single_stream_skipped_due_to_missing_group_name(
mock_kafka: None, mock_gitlab_token: None) -> None:
mock_kafka: None, mock_gitlab_token: None
) -> None:
Timer(0.01, terminate_consumer).start()

with mock.patch.object(consumer_logger, "error") as mock_error, mock.patch.object(
Expand All @@ -89,7 +94,7 @@ def test_single_stream_skipped_due_to_missing_group_name(
ANY,
0,
0,
"GitLab project path is missing"
"GitLab project path is missing",
),
]
)
Expand All @@ -100,17 +105,15 @@ def test_single_stream_skipped_due_to_missing_group_name(
[
(
"mock_gitlab_run_message",
{"type": "GITLAB",
"agent": True,
"groupName": "group",
"projectName": ""},
{"type": "GITLAB", "agent": True, "groupName": "group", "projectName": ""},
settings.KAFKA_RUNS_TOPIC,
),
],
indirect=True,
)
def test_single_stream_skipped_due_to_missing_project_name(
mock_kafka: None, mock_gitlab_token: None) -> None:
mock_kafka: None, mock_gitlab_token: None
) -> None:
Timer(0.01, terminate_consumer).start()

with mock.patch.object(consumer_logger, "error") as mock_error, mock.patch.object(
Expand All @@ -129,7 +132,7 @@ def test_single_stream_skipped_due_to_missing_project_name(
ANY,
0,
0,
"GitLab project path is missing"
"GitLab project path is missing",
),
]
)
Expand All @@ -140,17 +143,20 @@ def test_single_stream_skipped_due_to_missing_project_name(
[
(
"mock_gitlab_run_message",
{"type": "GITLAB",
"agent": True,
"groupName": "notgroup",
"projectName": "notproject"},
{
"type": "GITLAB",
"agent": True,
"groupName": "notgroup",
"projectName": "notproject",
},
settings.KAFKA_RUNS_TOPIC,
),
],
indirect=True,
)
def test_single_stream_skipped_due_to_wrong_token(
mock_kafka: None, mock_gitlab_token: None) -> None:
mock_kafka: None, mock_gitlab_token: None
) -> None:
Timer(0.01, terminate_consumer).start()

with mock.patch.object(consumer_logger, "error") as mock_error, mock.patch.object(
Expand All @@ -171,7 +177,93 @@ def test_single_stream_skipped_due_to_wrong_token(
0,
0,
ANY,
ANY
ANY,
),
]
)


@pytest.mark.parametrize("mock_requests", [{"status_code": 200}], indirect=True)
@pytest.mark.parametrize(
"mock_kafka",
[
(
"mock_gitlab_run_message",
{
"type": "GITLAB",
"agent": True,
"groupName": "group",
"projectName": "subgroup/sub2/project",
},
settings.KAFKA_RUNS_TOPIC,
),
],
indirect=True,
)
def test_single_stream_with_subgroup_in_project_name(
mock_requests: None, mock_kafka: None, mock_gitlab_token_subgroup: None
) -> None:
Timer(0.01, terminate_consumer).start()

with mock.patch.object(gitlab_processor_logger, "info") as mock_info:

streamer = KafkaStreamer(Consumer())
streamer.stream()

call_of_missing_token = call(
"Skip process message"
" from topic %s, partition %d, offset %d:"
" no token env variable found for project %s/%s",
ANY,
0,
0,
ANY,
ANY,
)

# Check if the expected calls were not made
assert call_of_missing_token not in mock_info.call_args_list


@pytest.mark.parametrize("mock_requests", [{"status_code": 200}], indirect=True)
@pytest.mark.parametrize(
"mock_kafka",
[
(
"mock_gitlab_run_message",
{
"type": "GITLAB",
"agent": True,
"groupName": "group",
"projectName": "wrong/sub2/project",
},
settings.KAFKA_RUNS_TOPIC,
),
],
indirect=True,
)
def test_single_stream_with_subgroup_in_project_name_failure(
mock_requests: None, mock_kafka: None, mock_gitlab_token_subgroup: None
) -> None:
Timer(0.01, terminate_consumer).start()

with mock.patch.object(gitlab_processor_logger, "info") as mock_info:

streamer = KafkaStreamer(Consumer())
streamer.stream()

mock_info.assert_has_calls(
[
call(ANY, ANY),
call(
"Skip process message"
" from topic %s, partition %d, offset %d:"
" no token env variable found for project %s/%s",
ANY,
0,
0,
ANY,
ANY,
),
]
)
4 changes: 1 addition & 3 deletions tests/unit/streamers/kafka/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ class MockResponse:

def raise_for_status(self) -> None:
if 400 <= self.status_code <= 599:
raise Exception(
self.text
)
raise Exception(self.text)

def mock_post(*args: Any, **kwargs: Any) -> MockResponse:
return MockResponse()
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/streamers/kafka/test_kafka_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def test_single_stream_skipped_due_to_agentless(mock_kafka: None) -> None:
ANY,
0,
0,
"not for agent"
"not for agent",
),
]
)
Expand Down Expand Up @@ -116,7 +116,7 @@ def test_single_stream_skipped_due_to_unsupported_invoker(mock_kafka: None) -> N
ANY,
0,
0,
"Invocation type not found / not supported"
"Invocation type not found / not supported",
),
]
)

0 comments on commit 4d07e0f

Please sign in to comment.