From 2dbfaf48e648916397279d554ff5ea9e9388cadf Mon Sep 17 00:00:00 2001 From: MatanGevaPort Date: Tue, 27 Aug 2024 11:51:28 +0300 Subject: [PATCH] Streamed projects per files + added and improved logs + added hotfix to md parsing --- .../gitlab_integration/core/async_fetcher.py | 2 +- .../gitlab/gitlab_integration/gitlab_service.py | 11 ++++++++--- integrations/gitlab/gitlab_integration/ocean.py | 14 ++++++++------ 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/integrations/gitlab/gitlab_integration/core/async_fetcher.py b/integrations/gitlab/gitlab_integration/core/async_fetcher.py index 10deaf398f..b0af322850 100644 --- a/integrations/gitlab/gitlab_integration/core/async_fetcher.py +++ b/integrations/gitlab/gitlab_integration/core/async_fetcher.py @@ -175,7 +175,7 @@ async def paginate_repository_tree( ) if not page_files: logger.info( - f"Done iterating file pages for project {project.path_with_namespace}" + f"Done iterating file pages for project {project.path_with_namespace} after {current_page_id} pages" ) break if filtering_paths and filtering_callable: diff --git a/integrations/gitlab/gitlab_integration/gitlab_service.py b/integrations/gitlab/gitlab_integration/gitlab_service.py index 6113150dd8..f043ca3200 100644 --- a/integrations/gitlab/gitlab_integration/gitlab_service.py +++ b/integrations/gitlab/gitlab_integration/gitlab_service.py @@ -648,6 +648,8 @@ def _parse_file_content( except json.JSONDecodeError: try: documents = list(yaml.load_all(file.decode(), Loader=yaml.SafeLoader)) + if not documents: + raise yaml.YAMLError() return documents if len(documents) > 1 else documents[0] except yaml.YAMLError: return file.decode().decode("utf-8") @@ -689,13 +691,16 @@ async def get_paginated_files_in_project( branch = project.default_branch try: tasks: List[Any] = [] + logger.info( + f"Getting files in project {project.path_with_namespace} based on pattern {path}" + ) async for file_paths_page in self.get_paginated_file_paths( project, path, branch, True ): + logger.info( + f"Found {len(file_paths_page)} files in project {project.path_with_namespace} files: {file_paths_page}" + ) if file_paths_page: - logger.info( - f"Found {len(file_paths_page)} files in project {project.path_with_namespace} files: {file_paths_page}" - ) files = [] tasks = [] for file_path in file_paths_page: diff --git a/integrations/gitlab/gitlab_integration/ocean.py b/integrations/gitlab/gitlab_integration/ocean.py index b201fc9fdc..d3c520aaa1 100644 --- a/integrations/gitlab/gitlab_integration/ocean.py +++ b/integrations/gitlab/gitlab_integration/ocean.py @@ -21,6 +21,7 @@ from port_ocean.context.ocean import ocean from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE from port_ocean.log.sensetive import sensitive_log_filter +from port_ocean.utils.async_iterators import stream_async_iterators_tasks NO_WEBHOOK_WARNING = "Without setting up the webhook, the integration will not export live changes from the gitlab" PROJECT_RESYNC_BATCH_SIZE = 10 @@ -179,12 +180,13 @@ async def resync_files(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: return async for projects_batch in service.get_all_projects(): - for project in projects_batch: - if service.should_process_project(project, selector.files.repos): - async for files_batch in service.get_paginated_files_in_project( - project, selector.files.path - ): - yield files_batch + tasks = [ + service.get_paginated_files_in_project(project, selector.files.path) + for project in projects_batch + if service.should_process_project(project, selector.files.repos) + ] + async for batch in stream_async_iterators_tasks(*tasks): + yield batch @ocean.on_resync(ObjectKind.MERGE_REQUEST)