Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Google Drive] Add Incremental Sync #2679

Merged
merged 13 commits into from
Jul 29, 2024
165 changes: 156 additions & 9 deletions connectors/sources/google_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
es_access_control_query,
prefix_identity,
)
from connectors.source import BaseDataSource, ConfigurableFieldValueError
from connectors.es.sink import OP_DELETE, OP_INDEX
from connectors.source import (
CURSOR_SYNC_TIMESTAMP,
BaseDataSource,
ConfigurableFieldValueError,
)
from connectors.sources.google import (
GoogleServiceAccountClient,
UserFields,
Expand All @@ -23,11 +28,13 @@
)
from connectors.utils import (
EMAIL_REGEX_PATTERN,
iso_zulu,
validate_email_address,
)

GOOGLE_DRIVE_SERVICE_NAME = "Google Drive"
GOOGLE_ADMIN_DIRECTORY_SERVICE_NAME = "Google Admin Directory"
CURSOR_GOOGLE_DRIVE_KEY = "google_drives"

RETRIES = 3
RETRY_INTERVAL = 2
Expand All @@ -38,7 +45,7 @@

FOLDER_MIME_TYPE = "application/vnd.google-apps.folder"

DRIVE_ITEMS_FIELDS = "id,createdTime,driveId,modifiedTime,name,size,mimeType,fileExtension,webViewLink,owners,parents"
DRIVE_ITEMS_FIELDS = "id,createdTime,driveId,modifiedTime,name,size,mimeType,fileExtension,webViewLink,owners,parents,trashed"
DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS = f"{DRIVE_ITEMS_FIELDS},permissions"

# Export Google Workspace documents to TIKA compatible format, prefer 'text/plain' where possible to be
Expand All @@ -50,6 +57,12 @@
}


class SyncCursorEmpty(Exception):
"""Exception class to notify that incremental sync can't run because sync_cursor is empty."""

pass


class GoogleDriveClient(GoogleServiceAccountClient):
"""A google drive client to handle api calls made to Google Drive API."""

Expand Down Expand Up @@ -142,11 +155,12 @@ async def get_all_folders(self):

return folders

async def list_files(self, fetch_permissions=False):
async def list_files(self, fetch_permissions=False, last_sync_time=None):
"""Get files from Google Drive. Files can have any type.

Args:
include_permissions (bool): flag to select permissions in the request query
last_sync_time (str): time when last sync happened

Yields:
dict: Documents from Google Drive.
Expand All @@ -157,12 +171,15 @@ async def list_files(self, fetch_permissions=False):
if fetch_permissions
else DRIVE_ITEMS_FIELDS
)

if last_sync_time is None:
list_query = "trashed=false"
else:
list_query = f"trashed=true or modifiedTime > '{last_sync_time}'"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we specify explicitly trashed=true or ...

This would fetch all trashed files in each incremental sync (potentially large number of trashed files).

Could we instead omit trashed from query q and just use modifiedTime > '{last_sync_time}' in query.

And then check if a file was recently trashed (and if so, mark it as deleted) while iterating over response from that query?

async for file in self.api_call_paged(
resource="files",
method="list",
corpora="allDrives",
q="trashed=false",
q=list_query,
orderBy="modifiedTime desc",
fields=f"files({files_fields}),incompleteSearch,nextPageToken",
includeItemsFromAllDrives=True,
Expand All @@ -171,7 +188,9 @@ async def list_files(self, fetch_permissions=False):
):
yield file

async def list_files_from_my_drive(self, fetch_permissions=False):
async def list_files_from_my_drive(
self, fetch_permissions=False, last_sync_time=None
):
"""Retrieves files from Google Drive, with an option to fetch permissions (DLS).

This function optimizes the retrieval process based on the 'fetch_permissions' flag.
Expand All @@ -182,15 +201,22 @@ async def list_files_from_my_drive(self, fetch_permissions=False):

Args:
include_permissions (bool): flag to select permissions in the request query
last_sync_time (str): time when last sync happened

Yields:
dict: Documents from Google Drive.
"""

if fetch_permissions:
if fetch_permissions and last_sync_time:
jedrazb marked this conversation as resolved.
Show resolved Hide resolved
files_fields = DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS
list_query = f"(trashed=true or modifiedTime > '{last_sync_time}') and 'me' in writers"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above about trashed=true here

elif fetch_permissions and not last_sync_time:
files_fields = DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS
# Google Drive API required write access to fetch file's permissions
list_query = "trashed=false and 'me' in writers"
elif not fetch_permissions and last_sync_time:
files_fields = DRIVE_ITEMS_FIELDS
list_query = f"trashed=true or modifiedTime > '{last_sync_time}'"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above about trashed=true here

else:
files_fields = DRIVE_ITEMS_FIELDS
list_query = "trashed=false"
Expand Down Expand Up @@ -841,7 +867,7 @@ async def get_google_workspace_content(self, client, file, timestamp=None):
# We need to do sanity size after downloading the file because:
# 1. We use files/export endpoint which converts large media-rich google slides/docs
# into text/plain format. We usually we end up with tiny .txt files.
# 2. Google will ofter report the Google Workspace shared documents to have size 0
# 2. Google will offer report the Google Workspace shared documents to have size 0
# as they don't count against user's storage quota.
if not self.is_file_size_within_limit(file_size, file_name):
return
Expand Down Expand Up @@ -1003,6 +1029,7 @@ async def prepare_file(self, client, file, paths):
"mime_type": file.get("mimeType"),
"file_extension": file.get("fileExtension"),
"url": file.get("webViewLink"),
"trashed": file.get("trashed"),
}

# record "file" or "folder" type
Expand Down Expand Up @@ -1089,7 +1116,7 @@ async def get_docs(self, filtering=None):
"""Executes the logic to fetch Google Drive objects in an async manner.

Args:
filtering (optional): Advenced filtering rules. Defaults to None.
filtering (optional): Advanced filtering rules. Defaults to None.

Yields:
dict, partial: dict containing meta-data of the Google Drive objects,
Expand All @@ -1101,6 +1128,8 @@ async def get_docs(self, filtering=None):
# This is an optimization to process unique files only once.
seen_ids = set()

self.init_sync_cursor()

if self._domain_wide_delegation_sync_enabled():
# sync personal drives first
async for user in self.google_admin_directory_client.users():
Expand Down Expand Up @@ -1163,3 +1192,121 @@ async def get_docs(self, filtering=None):
seen_ids=seen_ids,
):
yield file, partial(self.get_content, google_drive_client, file)

async def get_docs_incrementally(self, sync_cursor, filtering=None):
"""Executes the logic to fetch Google Drive objects incrementally in an async manner.

Args:
sync_cursor (str): Last sync time.
filtering (optional): Advanced filtering rules. Defaults to None.

Yields:
dict, partial: dict containing meta-data of the Google Drive objects,
partial download content function
"""
self._sync_cursor = sync_cursor
timestamp = iso_zulu()
self._logger.debug(f"Current Sync Time {timestamp}")

if not self._sync_cursor:
msg = "Unable to start incremental sync. Please perform a full sync to re-enable incremental syncs."
raise SyncCursorEmpty(msg)

seen_ids = set()

if self._domain_wide_delegation_sync_enabled():
# sync personal drives first
async for user in self.google_admin_directory_client.users():
email = user.get(UserFields.EMAIL.value)
self._logger.debug(f"Syncing personal drive content for: {email}")
google_drive_client = self.google_drive_client(impersonate_email=email)
async for files_page in google_drive_client.list_files_from_my_drive(
fetch_permissions=self._dls_enabled(),
last_sync_time=self.last_sync_time(),
):
async for file in self.prepare_files(
client=google_drive_client,
files_page=files_page,
paths={},
seen_ids=seen_ids,
):
if file.get("trashed") is True:
yield file, partial(
self.get_content, google_drive_client, file
), OP_DELETE
else:
yield file, partial(
self.get_content, google_drive_client, file
), OP_INDEX

email_for_shared_drives_sync = (
self._google_google_workspace_email_for_shared_drives_sync()
)

shared_drives_client = self.google_drive_client(
impersonate_email=email_for_shared_drives_sync
)

# Build a path lookup, parentId -> parent path
resolved_paths = await self.resolve_paths(
google_drive_client=shared_drives_client
)

# sync shared drives
self._logger.debug(
f"Syncing shared drives using admin account: {email_for_shared_drives_sync}"
)
async for files_page in shared_drives_client.list_files(
fetch_permissions=self._dls_enabled(),
last_sync_time=self.last_sync_time(),
):
async for file in self.prepare_files(
client=shared_drives_client,
files_page=files_page,
paths=resolved_paths,
seen_ids=seen_ids,
):
if file.get("trashed") is True:
yield file, partial(
self.get_content, shared_drives_client, file
), OP_DELETE
else:
yield file, partial(
self.get_content, shared_drives_client, file
), OP_INDEX

else:
# Build a path lookup, parentId -> parent path
resolved_paths = await self.resolve_paths()

google_drive_client = self.google_drive_client()

# sync anything shared with the service account
async for files_page in google_drive_client.list_files(
fetch_permissions=self._dls_enabled(),
last_sync_time=self.last_sync_time(),
):
async for file in self.prepare_files(
client=google_drive_client,
files_page=files_page,
paths=resolved_paths,
seen_ids=seen_ids,
):
if file.get("trashed") is True:
yield file, partial(
self.get_content, google_drive_client, file
), OP_DELETE
else:
yield file, partial(
self.get_content, google_drive_client, file
), OP_INDEX
self.update_sync_timestamp_cursor(timestamp)

def init_sync_cursor(self):
if not self._sync_cursor:
self._sync_cursor = {
CURSOR_GOOGLE_DRIVE_KEY: {},
CURSOR_SYNC_TIMESTAMP: iso_zulu(),
}

return self._sync_cursor
Loading