Skip to content

Commit

Permalink
updated webhook logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mk-armah committed Nov 8, 2024
1 parent 99e3724 commit 82bfc94
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 181 deletions.
46 changes: 38 additions & 8 deletions integrations/gitlab/gitlab_integration/events/hooks/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from abc import ABC, abstractmethod
from typing import List, Any, Optional, Dict
from typing import List, Any, Dict
import typing
from loguru import logger
from gitlab.v4.objects import Project, Group
from gitlab_integration.gitlab_service import GitlabService
from gitlab_integration.utils import ObjectKind
from port_ocean.context.ocean import ocean
from port_ocean.context.event import event
from gitlab_integration.git_integration import (
GitlabPortAppConfig,
GroupWithMembersSelector,
)


class HookHandler(ABC):
Expand Down Expand Up @@ -61,17 +66,42 @@ async def on_hook(self, event: str, body: dict[str, Any]) -> None:
logger.info(f"Finished handling {event} for group {group_path}")

@abstractmethod
async def _on_hook(
self, body: dict[str, Any], gitlab_group: Optional[Group]
) -> None:
async def _on_hook(self, body: dict[str, Any], gitlab_group: Group) -> None:
pass

async def _register_group(self, kind: str, gitlab_group: Dict[str, Any]) -> None:
if self.gitlab_service.should_run_for_group(gitlab_group):
if self.gitlab_service.should_run_for_path(gitlab_group["full_path"]):
await ocean.register_raw(kind, [gitlab_group])

async def _register_group_with_members(
self, kind: str, gitlab_group: Group
) -> None:
gitlab_group = await self.gitlab_service.enrich_group_with_members(gitlab_group)
await self._register_group(kind, gitlab_group)

resource_configs = typing.cast(
GitlabPortAppConfig, event.port_app_config
).resources

matching_resource_configs = [
resource_config
for resource_config in resource_configs
if (
resource_config.kind == kind
and isinstance(resource_config.selector, GroupWithMembersSelector)
)
]

if not matching_resource_configs:
logger.info(
"Group With Member resource not found in port app config, update port app config to include the resource type"
)
return
for resource_config in matching_resource_configs:
enrich_with_public_email = resource_config.selector.enrich_with_public_email
gitlab_group_result: Dict[str, Any] = (
await self.gitlab_service.enrich_group_with_members(
gitlab_group, enrich_with_public_email
)
if enrich_with_public_email
else gitlab_group.asdict()
)
await self._register_group(kind, gitlab_group_result)
30 changes: 15 additions & 15 deletions integrations/gitlab/gitlab_integration/events/hooks/group.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Any

from loguru import logger

Expand All @@ -12,20 +12,20 @@ class Groups(GroupHandler):
events = ["Subgroup Hook"]
system_events = ["group_destroy", "group_create", "group_rename"]

async def _on_hook(
self, body: dict[str, Any], gitlab_group: Optional[Group]
) -> None:
async def _on_hook(self, body: dict[str, Any], gitlab_group: Group) -> None:
logger.info(f"Handling {body['event_name']} for group {body['group_id']}")
if gitlab_group:
await self._register_group(
ObjectKind.GROUP,
gitlab_group.asdict(),
)
await self._register_group_with_members(
gitlab_group, ObjectKind.GROUPWITHMEMBERS
)
elif body["event_name"] in ("subgroup_destroy", "group_destroy"):

if body["event_name"] in ("subgroup_destroy", "group_destroy"):
await ocean.unregister_raw(ObjectKind.GROUP, [body])
await ocean.unregister_raw(ObjectKind.GROUPWITHMEMBERS, [body])
else:
logger.warning(f"Group {body['group_id']} was filtered. Skipping ...")
logger.info(f"Unregistered group {body['group_id']}")
return

await self._register_group(
ObjectKind.GROUP,
gitlab_group.asdict(),
)
await self._register_group_with_members(
ObjectKind.GROUPWITHMEMBERS, gitlab_group
)
logger.info(f"Registered group {body['group_id']}")
91 changes: 6 additions & 85 deletions integrations/gitlab/gitlab_integration/events/hooks/members.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
import typing
from typing import Any, List, Optional
from typing import Any
from loguru import logger
import asyncio

from gitlab_integration.utils import ObjectKind
from port_ocean.context.ocean import ocean
from gitlab_integration.events.hooks.base import GroupHandler
from gitlab_integration.git_integration import MembersSelector
from port_ocean.context.event import event
from gitlab.v4.objects import Group, GroupMember
from gitlab_integration.git_integration import GitlabPortAppConfig
from gitlab_integration.events.utils import remove_prefix_from_keys

CONCURENT_TASKS_LIMIT = 10
from gitlab.v4.objects import Group


class Members(GroupHandler):
Expand All @@ -23,79 +14,9 @@ class Members(GroupHandler):
"user_add_to_group",
]

async def _on_hook(
self, body: dict[str, Any], gitlab_group: Optional[Group]
) -> None:
async def _on_hook(self, body: dict[str, Any], gitlab_group: Group) -> None:
event_name, user_username = (body["event_name"], body["user_username"])
logger.info(f"Handling {event_name} for group member {user_username}")

if event_name == "user_remove_from_group":
if not (await self._is_root_group_member(body["user_id"])):
body = remove_prefix_from_keys("user_", body)
await ocean.unregister_raw(ObjectKind.MEMBER, [body])
else:
logger.warning(
f"Group member {user_username} belongs to other groups. Skipping ..."
)

elif gitlab_group:
if group_member := await self.gitlab_service.get_group_member(
gitlab_group, body["user_id"]
):
await self._register_group_member(group_member)
if body["event_name"] == "user_add_to_group":
await self._register_group(gitlab_group)

else:
logger.warning(f"Group Member {user_username} was filtered. Skipping ...")

async def _register_group_member(self, group_member: GroupMember) -> None:

resource_configs = typing.cast(
GitlabPortAppConfig, event.port_app_config
).resources

matching_resource_configs = [
resource_config
for resource_config in resource_configs
if (
resource_config.kind == ObjectKind.MEMBER
and isinstance(resource_config.selector, MembersSelector)
)
]
if not matching_resource_configs:
logger.info(
"Member resource not found in port app config, update port app config to include the resource type"
)
return
for resource_config in matching_resource_configs:
enrich_with_public_email = resource_config.selector.enrich_with_public_email
member = (
await self.gitlab_service.enrich_member_with_public_email(group_member)
if enrich_with_public_email
else group_member.asdict()
)

await ocean.register_raw(ObjectKind.MEMBER, [member])

async def _is_root_group_member(self, member_id: int) -> bool:
root_groups: List[Group] = self.gitlab_service.get_root_groups()
semaphore = asyncio.Semaphore(CONCURENT_TASKS_LIMIT)

async def check_group(group: Group) -> bool:
async with semaphore:
return any(
[await self.gitlab_service.get_group_member(group, member_id)]
)

tasks = [asyncio.create_task(check_group(group)) for group in root_groups]
for completed_task in asyncio.as_completed(tasks):
try:
result = await completed_task
if result:
return True
except Exception as e:
logger.error(
f"Error checking group membership for member {member_id}: {e}"
)
return False
await self._register_group_with_members(
ObjectKind.GROUPWITHMEMBERS, gitlab_group
)
14 changes: 7 additions & 7 deletions integrations/gitlab/gitlab_integration/git_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,18 @@ class GitlabResourceConfig(ResourceConfig):
selector: GitlabSelector


class MembersSelector(Selector):
class GroupWithMembersSelector(Selector):

enrich_with_public_email: bool | None = Field(
enrich_with_public_email: bool = Field(
alias="enrichWithPublicEmail",
default=False,
description="If set to true, the integration will enrich members with public email field. Default value is false",
description="If set to true, the integration will enrich group members with public email field. Default value is false",
)


class GitlabMembersResourceConfig(ResourceConfig):
kind: Literal["member"]
selector: MembersSelector
class GitlabGroupWithMembersResourceConfig(ResourceConfig):
kind: Literal["group-with-members"]
selector: GroupWithMembersSelector


class FilesSelector(BaseModel):
Expand Down Expand Up @@ -169,7 +169,7 @@ class GitlabPortAppConfig(PortAppConfig):
default=True,
description="If set to false, bots will be filtered out from the members list. Default value is true",
)
resources: list[GitlabMembersResourceConfig | GitLabFilesResourceConfig | GitlabResourceConfig] = Field(default_factory=list) # type: ignore
resources: list[GitlabGroupWithMembersResourceConfig | GitLabFilesResourceConfig | GitlabResourceConfig] = Field(default_factory=list) # type: ignore


def _get_project_from_cache(project_id: int) -> Project | None:
Expand Down
55 changes: 23 additions & 32 deletions integrations/gitlab/gitlab_integration/gitlab_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,21 +455,13 @@ async def get_project(self, project_id: int) -> Project | None:
else:
return None

async def get_group(self, group_id: int) -> Optional[Group]:
try:
logger.info(f"Fetching group with ID: {group_id}")
group_response = await AsyncFetcher.fetch_single(
self.gitlab_client.groups.get, group_id
)
group: Group = typing.cast(Group, group_response)
return group
except gitlab.exceptions.GitlabGetError as err:
if err.response_code == 404:
logger.warning(f"Group with ID {group_id} not found (404).")
return None
else:
logger.error(f"Failed to fetch group with ID {group_id}: {err}")
raise
async def get_group(self, group_id: int) -> Group:
logger.info(f"Fetching group with ID: {group_id}")
group_response = await AsyncFetcher.fetch_single(
self.gitlab_client.groups.get, group_id
)
group: Group = typing.cast(Group, group_response)
return group

@cache_iterator_result()
async def get_all_groups(
Expand Down Expand Up @@ -772,21 +764,21 @@ async def get_unsynced_group_members(
)
yield unsynced_members

async def enrich_group_with_members(self, group: Group) -> dict[str, Any]:
group_members = [
member
async for members in self.get_all_group_members(group)
for member in members
]
group_dict: dict[str, Any] = group.asdict()
group_dict.update(
{
"__members": [
{"id": group_member.id, "username": group_member.username}
for group_member in group_members
async def enrich_group_with_members(
self, group: Group, include_public_email: bool = False
) -> dict[str, Any]:
group_members = []
async for members in self.get_all_group_members(group):
if include_public_email:
tasks = [
self.enrich_member_with_public_email(member) for member in members
]
}
)
group_members.extend(await asyncio.gather(*tasks))
else:
group_members.extend(member.asdict() for member in members)

group_dict: dict[str, Any] = group.asdict()
group_dict["__members"] = group_members
return group_dict

async def enrich_member_with_public_email(
Expand Down Expand Up @@ -830,9 +822,8 @@ async def get_group_member(
if err.response_code == 404:
logger.warning(f"Group Member with ID {member_id} not found (404).")
return None
else:
logger.error(f"Failed to fetch group with ID {member_id}: {err}")
raise
logger.error(f"Failed to fetch group with ID {member_id}: {err}")
raise

async def get_entities_diff(
self,
Expand Down
46 changes: 12 additions & 34 deletions integrations/gitlab/gitlab_integration/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from gitlab_integration.events.setup import setup_application
from gitlab_integration.git_integration import (
GitlabResourceConfig,
GitlabMembersResourceConfig,
GitlabGroupWithMembersResourceConfig,
GitLabFilesResourceConfig,
)
from gitlab_integration.utils import ObjectKind, get_cached_all_services
Expand Down Expand Up @@ -130,11 +130,21 @@ async def resync_groups(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
yield [group.asdict() for group in groups_batch]


# from memory_profiler import profile
# @profile
@ocean.on_resync(ObjectKind.GROUPWITHMEMBERS)
async def resync_groups_with_members(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
gitlab_resource_config: GitlabGroupWithMembersResourceConfig = typing.cast(
GitlabGroupWithMembersResourceConfig, event.resource_config
)
enrich_with_public_email = gitlab_resource_config.selector.enrich_with_public_email

for service in get_cached_all_services():
async for groups_batch in service.get_all_groups():
tasks = [service.enrich_group_with_members(group) for group in groups_batch]
tasks = [
service.enrich_group_with_members(group, enrich_with_public_email)
for group in groups_batch
]
enriched_groups = await asyncio.gather(*tasks)
yield enriched_groups

Expand Down Expand Up @@ -299,35 +309,3 @@ async def resync_pipelines(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
{**pipeline.asdict(), "__project": project.asdict()}
for pipeline in pipelines_batch
]


@ocean.on_resync(ObjectKind.MEMBER)
async def resync_members(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
gitlab_resource_config: GitlabMembersResourceConfig = typing.cast(
GitlabMembersResourceConfig, event.resource_config
)
selector = gitlab_resource_config.selector

async def process_group_members(service, group):
members = [
member
async for members_batch in service.get_unsynced_group_members(group)
for member in members_batch
]

if selector.enrich_with_public_email:
enriched_member_tasks = [
service.enrich_member_with_public_email(member) for member in members
]
enriched_members = await asyncio.gather(*enriched_member_tasks)
return enriched_members

return [member.asdict() for member in members]

for service in get_cached_all_services():
async for groups in service.get_all_groups(skip_validation=True):
group_tasks = [process_group_members(service, group) for group in groups]
for group_task in asyncio.as_completed(group_tasks):
group_members = await group_task
logger.warning(f"Enriched Members {group_members}")
yield group_members

0 comments on commit 82bfc94

Please sign in to comment.