diff --git a/integrations/gitlab/gitlab_integration/events/hooks/base.py b/integrations/gitlab/gitlab_integration/events/hooks/base.py index 7cf5943ff8..66a2c1d9a0 100644 --- a/integrations/gitlab/gitlab_integration/events/hooks/base.py +++ b/integrations/gitlab/gitlab_integration/events/hooks/base.py @@ -1,10 +1,15 @@ from abc import ABC, abstractmethod -from typing import List, Any, Optional, Dict +from typing import List, Any, Dict +import typing from loguru import logger from gitlab.v4.objects import Project, Group from gitlab_integration.gitlab_service import GitlabService -from gitlab_integration.utils import ObjectKind from port_ocean.context.ocean import ocean +from port_ocean.context.event import event +from gitlab_integration.git_integration import ( + GitlabPortAppConfig, + GroupWithMembersSelector, +) class HookHandler(ABC): @@ -61,17 +66,42 @@ async def on_hook(self, event: str, body: dict[str, Any]) -> None: logger.info(f"Finished handling {event} for group {group_path}") @abstractmethod - async def _on_hook( - self, body: dict[str, Any], gitlab_group: Optional[Group] - ) -> None: + async def _on_hook(self, body: dict[str, Any], gitlab_group: Group) -> None: pass async def _register_group(self, kind: str, gitlab_group: Dict[str, Any]) -> None: - if self.gitlab_service.should_run_for_group(gitlab_group): + if self.gitlab_service.should_run_for_path(gitlab_group["full_path"]): await ocean.register_raw(kind, [gitlab_group]) async def _register_group_with_members( self, kind: str, gitlab_group: Group ) -> None: - gitlab_group = await self.gitlab_service.enrich_group_with_members(gitlab_group) - await self._register_group(kind, gitlab_group) + + resource_configs = typing.cast( + GitlabPortAppConfig, event.port_app_config + ).resources + + matching_resource_configs = [ + resource_config + for resource_config in resource_configs + if ( + resource_config.kind == kind + and isinstance(resource_config.selector, GroupWithMembersSelector) + ) + ] + + if not matching_resource_configs: + logger.info( + "Group With Member resource not found in port app config, update port app config to include the resource type" + ) + return + for resource_config in matching_resource_configs: + enrich_with_public_email = resource_config.selector.enrich_with_public_email + gitlab_group_result: Dict[str, Any] = ( + await self.gitlab_service.enrich_group_with_members( + gitlab_group, enrich_with_public_email + ) + if enrich_with_public_email + else gitlab_group.asdict() + ) + await self._register_group(kind, gitlab_group_result) diff --git a/integrations/gitlab/gitlab_integration/events/hooks/group.py b/integrations/gitlab/gitlab_integration/events/hooks/group.py index 8aa0dd49fe..086759b529 100644 --- a/integrations/gitlab/gitlab_integration/events/hooks/group.py +++ b/integrations/gitlab/gitlab_integration/events/hooks/group.py @@ -1,4 +1,4 @@ -from typing import Any, Optional +from typing import Any from loguru import logger @@ -12,20 +12,20 @@ class Groups(GroupHandler): events = ["Subgroup Hook"] system_events = ["group_destroy", "group_create", "group_rename"] - async def _on_hook( - self, body: dict[str, Any], gitlab_group: Optional[Group] - ) -> None: + async def _on_hook(self, body: dict[str, Any], gitlab_group: Group) -> None: logger.info(f"Handling {body['event_name']} for group {body['group_id']}") - if gitlab_group: - await self._register_group( - ObjectKind.GROUP, - gitlab_group.asdict(), - ) - await self._register_group_with_members( - gitlab_group, ObjectKind.GROUPWITHMEMBERS - ) - elif body["event_name"] in ("subgroup_destroy", "group_destroy"): + + if body["event_name"] in ("subgroup_destroy", "group_destroy"): await ocean.unregister_raw(ObjectKind.GROUP, [body]) await ocean.unregister_raw(ObjectKind.GROUPWITHMEMBERS, [body]) - else: - logger.warning(f"Group {body['group_id']} was filtered. Skipping ...") + logger.info(f"Unregistered group {body['group_id']}") + return + + await self._register_group( + ObjectKind.GROUP, + gitlab_group.asdict(), + ) + await self._register_group_with_members( + ObjectKind.GROUPWITHMEMBERS, gitlab_group + ) + logger.info(f"Registered group {body['group_id']}") diff --git a/integrations/gitlab/gitlab_integration/events/hooks/members.py b/integrations/gitlab/gitlab_integration/events/hooks/members.py index 4facb1b9f7..f7ab1b3e8b 100644 --- a/integrations/gitlab/gitlab_integration/events/hooks/members.py +++ b/integrations/gitlab/gitlab_integration/events/hooks/members.py @@ -1,18 +1,9 @@ -import typing -from typing import Any, List, Optional +from typing import Any from loguru import logger -import asyncio from gitlab_integration.utils import ObjectKind -from port_ocean.context.ocean import ocean from gitlab_integration.events.hooks.base import GroupHandler -from gitlab_integration.git_integration import MembersSelector -from port_ocean.context.event import event -from gitlab.v4.objects import Group, GroupMember -from gitlab_integration.git_integration import GitlabPortAppConfig -from gitlab_integration.events.utils import remove_prefix_from_keys - -CONCURENT_TASKS_LIMIT = 10 +from gitlab.v4.objects import Group class Members(GroupHandler): @@ -23,79 +14,9 @@ class Members(GroupHandler): "user_add_to_group", ] - async def _on_hook( - self, body: dict[str, Any], gitlab_group: Optional[Group] - ) -> None: + async def _on_hook(self, body: dict[str, Any], gitlab_group: Group) -> None: event_name, user_username = (body["event_name"], body["user_username"]) logger.info(f"Handling {event_name} for group member {user_username}") - - if event_name == "user_remove_from_group": - if not (await self._is_root_group_member(body["user_id"])): - body = remove_prefix_from_keys("user_", body) - await ocean.unregister_raw(ObjectKind.MEMBER, [body]) - else: - logger.warning( - f"Group member {user_username} belongs to other groups. Skipping ..." - ) - - elif gitlab_group: - if group_member := await self.gitlab_service.get_group_member( - gitlab_group, body["user_id"] - ): - await self._register_group_member(group_member) - if body["event_name"] == "user_add_to_group": - await self._register_group(gitlab_group) - - else: - logger.warning(f"Group Member {user_username} was filtered. Skipping ...") - - async def _register_group_member(self, group_member: GroupMember) -> None: - - resource_configs = typing.cast( - GitlabPortAppConfig, event.port_app_config - ).resources - - matching_resource_configs = [ - resource_config - for resource_config in resource_configs - if ( - resource_config.kind == ObjectKind.MEMBER - and isinstance(resource_config.selector, MembersSelector) - ) - ] - if not matching_resource_configs: - logger.info( - "Member resource not found in port app config, update port app config to include the resource type" - ) - return - for resource_config in matching_resource_configs: - enrich_with_public_email = resource_config.selector.enrich_with_public_email - member = ( - await self.gitlab_service.enrich_member_with_public_email(group_member) - if enrich_with_public_email - else group_member.asdict() - ) - - await ocean.register_raw(ObjectKind.MEMBER, [member]) - - async def _is_root_group_member(self, member_id: int) -> bool: - root_groups: List[Group] = self.gitlab_service.get_root_groups() - semaphore = asyncio.Semaphore(CONCURENT_TASKS_LIMIT) - - async def check_group(group: Group) -> bool: - async with semaphore: - return any( - [await self.gitlab_service.get_group_member(group, member_id)] - ) - - tasks = [asyncio.create_task(check_group(group)) for group in root_groups] - for completed_task in asyncio.as_completed(tasks): - try: - result = await completed_task - if result: - return True - except Exception as e: - logger.error( - f"Error checking group membership for member {member_id}: {e}" - ) - return False + await self._register_group_with_members( + ObjectKind.GROUPWITHMEMBERS, gitlab_group + ) diff --git a/integrations/gitlab/gitlab_integration/git_integration.py b/integrations/gitlab/gitlab_integration/git_integration.py index b2aa259226..b3d97970a7 100644 --- a/integrations/gitlab/gitlab_integration/git_integration.py +++ b/integrations/gitlab/gitlab_integration/git_integration.py @@ -121,18 +121,18 @@ class GitlabResourceConfig(ResourceConfig): selector: GitlabSelector -class MembersSelector(Selector): +class GroupWithMembersSelector(Selector): - enrich_with_public_email: bool | None = Field( + enrich_with_public_email: bool = Field( alias="enrichWithPublicEmail", default=False, - description="If set to true, the integration will enrich members with public email field. Default value is false", + description="If set to true, the integration will enrich group members with public email field. Default value is false", ) -class GitlabMembersResourceConfig(ResourceConfig): - kind: Literal["member"] - selector: MembersSelector +class GitlabGroupWithMembersResourceConfig(ResourceConfig): + kind: Literal["group-with-members"] + selector: GroupWithMembersSelector class FilesSelector(BaseModel): @@ -169,7 +169,7 @@ class GitlabPortAppConfig(PortAppConfig): default=True, description="If set to false, bots will be filtered out from the members list. Default value is true", ) - resources: list[GitlabMembersResourceConfig | GitLabFilesResourceConfig | GitlabResourceConfig] = Field(default_factory=list) # type: ignore + resources: list[GitlabGroupWithMembersResourceConfig | GitLabFilesResourceConfig | GitlabResourceConfig] = Field(default_factory=list) # type: ignore def _get_project_from_cache(project_id: int) -> Project | None: diff --git a/integrations/gitlab/gitlab_integration/gitlab_service.py b/integrations/gitlab/gitlab_integration/gitlab_service.py index 76c406b5c4..44a6237406 100644 --- a/integrations/gitlab/gitlab_integration/gitlab_service.py +++ b/integrations/gitlab/gitlab_integration/gitlab_service.py @@ -455,21 +455,13 @@ async def get_project(self, project_id: int) -> Project | None: else: return None - async def get_group(self, group_id: int) -> Optional[Group]: - try: - logger.info(f"Fetching group with ID: {group_id}") - group_response = await AsyncFetcher.fetch_single( - self.gitlab_client.groups.get, group_id - ) - group: Group = typing.cast(Group, group_response) - return group - except gitlab.exceptions.GitlabGetError as err: - if err.response_code == 404: - logger.warning(f"Group with ID {group_id} not found (404).") - return None - else: - logger.error(f"Failed to fetch group with ID {group_id}: {err}") - raise + async def get_group(self, group_id: int) -> Group: + logger.info(f"Fetching group with ID: {group_id}") + group_response = await AsyncFetcher.fetch_single( + self.gitlab_client.groups.get, group_id + ) + group: Group = typing.cast(Group, group_response) + return group @cache_iterator_result() async def get_all_groups( @@ -772,21 +764,21 @@ async def get_unsynced_group_members( ) yield unsynced_members - async def enrich_group_with_members(self, group: Group) -> dict[str, Any]: - group_members = [ - member - async for members in self.get_all_group_members(group) - for member in members - ] - group_dict: dict[str, Any] = group.asdict() - group_dict.update( - { - "__members": [ - {"id": group_member.id, "username": group_member.username} - for group_member in group_members + async def enrich_group_with_members( + self, group: Group, include_public_email: bool = False + ) -> dict[str, Any]: + group_members = [] + async for members in self.get_all_group_members(group): + if include_public_email: + tasks = [ + self.enrich_member_with_public_email(member) for member in members ] - } - ) + group_members.extend(await asyncio.gather(*tasks)) + else: + group_members.extend(member.asdict() for member in members) + + group_dict: dict[str, Any] = group.asdict() + group_dict["__members"] = group_members return group_dict async def enrich_member_with_public_email( @@ -830,9 +822,8 @@ async def get_group_member( if err.response_code == 404: logger.warning(f"Group Member with ID {member_id} not found (404).") return None - else: - logger.error(f"Failed to fetch group with ID {member_id}: {err}") - raise + logger.error(f"Failed to fetch group with ID {member_id}: {err}") + raise async def get_entities_diff( self, diff --git a/integrations/gitlab/gitlab_integration/ocean.py b/integrations/gitlab/gitlab_integration/ocean.py index d5d630be64..4daca5769c 100644 --- a/integrations/gitlab/gitlab_integration/ocean.py +++ b/integrations/gitlab/gitlab_integration/ocean.py @@ -14,7 +14,7 @@ from gitlab_integration.events.setup import setup_application from gitlab_integration.git_integration import ( GitlabResourceConfig, - GitlabMembersResourceConfig, + GitlabGroupWithMembersResourceConfig, GitLabFilesResourceConfig, ) from gitlab_integration.utils import ObjectKind, get_cached_all_services @@ -130,11 +130,21 @@ async def resync_groups(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: yield [group.asdict() for group in groups_batch] +# from memory_profiler import profile +# @profile @ocean.on_resync(ObjectKind.GROUPWITHMEMBERS) async def resync_groups_with_members(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: + gitlab_resource_config: GitlabGroupWithMembersResourceConfig = typing.cast( + GitlabGroupWithMembersResourceConfig, event.resource_config + ) + enrich_with_public_email = gitlab_resource_config.selector.enrich_with_public_email + for service in get_cached_all_services(): async for groups_batch in service.get_all_groups(): - tasks = [service.enrich_group_with_members(group) for group in groups_batch] + tasks = [ + service.enrich_group_with_members(group, enrich_with_public_email) + for group in groups_batch + ] enriched_groups = await asyncio.gather(*tasks) yield enriched_groups @@ -299,35 +309,3 @@ async def resync_pipelines(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: {**pipeline.asdict(), "__project": project.asdict()} for pipeline in pipelines_batch ] - - -@ocean.on_resync(ObjectKind.MEMBER) -async def resync_members(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: - gitlab_resource_config: GitlabMembersResourceConfig = typing.cast( - GitlabMembersResourceConfig, event.resource_config - ) - selector = gitlab_resource_config.selector - - async def process_group_members(service, group): - members = [ - member - async for members_batch in service.get_unsynced_group_members(group) - for member in members_batch - ] - - if selector.enrich_with_public_email: - enriched_member_tasks = [ - service.enrich_member_with_public_email(member) for member in members - ] - enriched_members = await asyncio.gather(*enriched_member_tasks) - return enriched_members - - return [member.asdict() for member in members] - - for service in get_cached_all_services(): - async for groups in service.get_all_groups(skip_validation=True): - group_tasks = [process_group_members(service, group) for group in groups] - for group_task in asyncio.as_completed(group_tasks): - group_members = await group_task - logger.warning(f"Enriched Members {group_members}") - yield group_members