Skip to content

Commit

Permalink
[Gitlab] Improvements to file kind (#942)
Browse files Browse the repository at this point in the history
# Description

What - Added improvements in resync + realtime analysis for file-kind.
Why - Have better scale, performance and verbosity for the feature
How - In resync- add a search in all repositories being processed,
making sure the path we need is present in the repo, and only then the
integration will pull the file. In real-time - added pagination for
verbosity

## Type of change

Please leave one option from the following and delete the rest:

- [x] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.
  • Loading branch information
matan84 authored Sep 4, 2024
1 parent a0c6360 commit 4919d9b
Show file tree
Hide file tree
Showing 11 changed files with 406 additions and 110 deletions.
16 changes: 13 additions & 3 deletions integrations/gitlab/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

<!-- towncrier release notes start -->
0.1.114 (2024-08-29)
====================

### Improvements

- Improved Resync performance for file-kind: Now will search if the project has a file-base name for the searched file-kind, and only after the metadata object gets filtered as relevant, we pull the file kind. (#1)
- Improved Resync stability using an aiolimiter to make sure calls to the Gitlab API aren't getting rate-limited, In a way that's not blocking the event loop (as Gitlab's way of handling a rate-limit is a time.sleep, which blocks the entire event loop)
- Improved verbosity for the resync, as more logs and pagination were taken place.
- Improved Real-time mechanism - now paginating through a file instead of waiting for Gitlab's api to return the entire repository tree.


0.1.115 (2024-09-01)
====================
Expand Down Expand Up @@ -74,7 +84,7 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
0.1.107 (2024-08-19)
====================

### Bug Fixes
### Bug Fixes

- Fixed merge requests and issue resync methods to use an async method of listing root groups to avoid blocking the event loop

Expand Down Expand Up @@ -109,7 +119,7 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Improvements

- Added support for exporting files
- Added support for exporting files


0.1.102 (2024-08-13)
Expand Down Expand Up @@ -593,7 +603,7 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Improvements

- Updated templates to have description in scorecard rules and pie charts (#1)
- Updated templates to have description in scorecard rules and pie charts (#1)


## 0.1.42 (2024-01-01)
Expand Down
14 changes: 9 additions & 5 deletions integrations/gitlab/gitlab_integration/core/async_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
T = TypeVar("T", bound=RESTObject)

DEFAULT_PAGINATION_PAGE_SIZE = 100
FIRST_PAGE = 1


class AsyncFetcher:
Expand Down Expand Up @@ -67,10 +68,13 @@ async def fetch_batch(
List[Union[RESTObject, Dict[str, Any]]],
],
],
validation_func: Callable[
[Any],
bool,
],
validation_func: (
Callable[
[Any],
bool,
]
| None
) = None,
page_size: int = DEFAULT_PAGINATION_PAGE_SIZE,
**kwargs,
) -> AsyncIterator[
Expand Down Expand Up @@ -158,7 +162,7 @@ async def fetch_repository_tree(
) -> GitlabList | List[Dict[str, Any]]:
with ThreadPoolExecutor() as executor:

def fetch_func():
def fetch_func() -> GitlabList | List[Dict[str, Any]]:
return project.repository_tree(
path=path,
ref=ref,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def _on_hook(self, body: dict[str, Any], gitlab_project: Project) -> None:
if self.gitlab_service.should_process_project(
gitlab_project, selector.files.repos
):
matched_file_paths = await self.gitlab_service._get_file_paths(
matched_file_paths = await self.gitlab_service.get_all_file_paths(
gitlab_project, selector.files.path, gitlab_project.default_branch
)
await self._process_modified_files(
Expand Down
112 changes: 63 additions & 49 deletions integrations/gitlab/gitlab_integration/gitlab_service.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
import asyncio
import typing
import json
import os
import typing
from datetime import datetime, timedelta
from typing import List, Optional, Tuple, Any, Union, TYPE_CHECKING
from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, List, Optional, Tuple, Union

import aiolimiter
import anyio.to_thread
import yaml
from gitlab import Gitlab, GitlabList, GitlabError
from gitlab import Gitlab, GitlabError, GitlabList
from gitlab.base import RESTObject
from gitlab.v4.objects import (
Project,
MergeRequest,
Issue,
Group,
ProjectPipeline,
GroupMergeRequest,
ProjectPipelineJob,
Issue,
MergeRequest,
Project,
ProjectFile,
ProjectPipeline,
ProjectPipelineJob,
)
from gitlab_integration.core.async_fetcher import AsyncFetcher
from gitlab_integration.core.entities import generate_entity_from_port_yaml
from gitlab_integration.core.utils import does_pattern_apply
from loguru import logger
from yaml.parser import ParserError

from gitlab_integration.core.entities import generate_entity_from_port_yaml
from gitlab_integration.core.async_fetcher import AsyncFetcher
from gitlab_integration.core.utils import does_pattern_apply
from port_ocean.context.event import event
from port_ocean.core.models import Entity

PROJECTS_CACHE_KEY = "__cache_all_projects"
MAX_ALLOWED_FILE_SIZE_IN_BYTES = 1024 * 1024 # 1MB
PROJECT_FILES_BATCH_SIZE = 10
GITLAB_SEARCH_RATE_LIMIT = 100

if TYPE_CHECKING:
from gitlab_integration.git_integration import (
GitlabPortAppConfig,
)
from gitlab_integration.git_integration import GitlabPortAppConfig


class GitlabService:
Expand All @@ -59,6 +59,9 @@ def __init__(
self.gitlab_client = gitlab_client
self.app_host = app_host
self.group_mapping = group_mapping
self._search_rate_limiter = aiolimiter.AsyncLimiter(
GITLAB_SEARCH_RATE_LIMIT * 0.95, 60
)

def _does_webhook_exist_for_group(self, group: RESTObject) -> bool:
for hook in group.hooks.list(iterator=True):
Expand Down Expand Up @@ -94,13 +97,20 @@ def _get_changed_files_between_commits(
project = self.gitlab_client.projects.get(project_id)
return project.commits.get(head).diff()

async def _get_file_paths(
async def get_all_file_paths(
self,
project: Project,
path: str | List[str],
commit_sha: str,
return_files_only: bool = False,
) -> list[str]:
"""
This function iterates through repository tree pages and returns all files in the repository that match the path pattern.
The search features of gitlab only support searches on the default branch as for writing this code,
So in order to check the existence of a file in a specific branch, we need to fetch the entire repository tree.
https://docs.gitlab.com/ee/user/search/advanced_search.html#known-issues
"""
if not isinstance(path, list):
path = [path]
try:
Expand All @@ -122,6 +132,40 @@ async def _get_file_paths(
and does_pattern_apply(path, file["path"] or "")
]

async def search_files_in_project(
self,
project: Project,
path: str | List[str],
) -> AsyncIterator[list[dict[str, Any]]]:
paths = [path] if not isinstance(path, list) else path
for path in paths:
file_pattern = os.path.basename(path)
async with self._search_rate_limiter:
logger.info(
f"Searching project {project.path_with_namespace} for file pattern {file_pattern}"
)
async for files in AsyncFetcher.fetch_batch(
project.search,
scope="blobs",
search=f"filename:{file_pattern}",
retry_transient_errors=True,
):
logger.info(
f"Found {len(files)} files in project {project.path_with_namespace} with file pattern {file_pattern}, filtering all that don't match path pattern {path}"
)
files = typing.cast(Union[GitlabList, List[Dict[str, Any]]], files)
tasks = [
self.get_and_parse_single_file(
project, file["path"], project.default_branch
)
for file in files
if does_pattern_apply(path, file["path"])
]
parsed_files = await asyncio.gather(*tasks)
files_with_content = [file for file in parsed_files if file]
if files_with_content:
yield files_with_content

def _get_entities_from_git(
self, project: Project, file_name: str, sha: str, ref: str
) -> List[Entity]:
Expand Down Expand Up @@ -152,7 +196,7 @@ def _get_entities_from_git(
async def _get_entities_by_commit(
self, project: Project, spec: str | List["str"], commit: str, ref: str
) -> List[Entity]:
spec_paths = await self._get_file_paths(project, spec, commit)
spec_paths = await self.get_all_file_paths(project, spec, commit)
return [
entity
for path in spec_paths
Expand Down Expand Up @@ -615,6 +659,8 @@ def _parse_file_content(
except json.JSONDecodeError:
try:
documents = list(yaml.load_all(file.decode(), Loader=yaml.SafeLoader))
if not documents:
return file.decode().decode("utf-8")
return documents if len(documents) > 1 else documents[0]
except yaml.YAMLError:
return file.decode().decode("utf-8")
Expand Down Expand Up @@ -649,35 +695,3 @@ async def get_and_parse_single_file(
f"Failed to process file {file_path} in project {project.path_with_namespace}. error={e}"
)
return None

async def get_all_files_in_project(
self, project: Project, path: str
) -> typing.AsyncIterator[List[dict[str, Any]]]:
branch = project.default_branch
try:
file_paths = await self._get_file_paths(project, path, branch, True)
logger.info(
f"Found {len(file_paths)} files in project {project.path_with_namespace} files: {file_paths}"
)
files = []
tasks = []
for file_path in file_paths:
tasks.append(self.get_and_parse_single_file(project, file_path, branch))

if len(tasks) == PROJECT_FILES_BATCH_SIZE:
results = await asyncio.gather(*tasks)
files.extend([file_data for file_data in results if file_data])
yield files
files = []
tasks = []

if tasks:
results = await asyncio.gather(*tasks)
files.extend([file_data for file_data in results if file_data])
yield files
except Exception as e:
logger.error(
f"Failed to get files in project={project.path_with_namespace} for path={path} and "
f"branch={branch}. error={e}"
)
return
25 changes: 18 additions & 7 deletions integrations/gitlab/gitlab_integration/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from port_ocean.context.ocean import ocean
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from port_ocean.log.sensetive import sensitive_log_filter
from port_ocean.utils.async_iterators import stream_async_iterators_tasks

NO_WEBHOOK_WARNING = "Without setting up the webhook, the integration will not export live changes from the gitlab"
PROJECT_RESYNC_BATCH_SIZE = 10
Expand Down Expand Up @@ -178,13 +179,23 @@ async def resync_files(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
logger.warning("No path provided in the selector, skipping fetching files")
return

async for projects_batch in service.get_all_projects():
for project in projects_batch:
if service.should_process_project(project, selector.files.repos):
async for files_batch in service.get_all_files_in_project(
project, selector.files.path
):
yield files_batch
async for projects in service.get_all_projects():
projects_batch_iter = iter(projects)
projects_processed_in_full_batch = 0
while projects_batch := tuple(
islice(projects_batch_iter, PROJECT_RESYNC_BATCH_SIZE)
):
projects_processed_in_full_batch += len(projects_batch)
logger.info(
f"Processing projects files for {projects_processed_in_full_batch}/{len(projects)} projects in batch"
)
tasks = [
service.search_files_in_project(project, selector.files.path)
for project in projects_batch
if service.should_process_project(project, selector.files.repos)
]
async for batch in stream_async_iterators_tasks(*tasks):
yield batch


@ocean.on_resync(ObjectKind.MERGE_REQUEST)
Expand Down
Loading

0 comments on commit 4919d9b

Please sign in to comment.