Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Google Drive] Add Incremental Sync #2679

Merged
merged 13 commits into from
Jul 29, 2024
199 changes: 183 additions & 16 deletions connectors/sources/google_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
es_access_control_query,
prefix_identity,
)
from connectors.source import BaseDataSource, ConfigurableFieldValueError
from connectors.es.sink import OP_DELETE, OP_INDEX
from connectors.source import (
CURSOR_SYNC_TIMESTAMP,
BaseDataSource,
ConfigurableFieldValueError,
)
from connectors.sources.google import (
GoogleServiceAccountClient,
UserFields,
Expand All @@ -23,11 +28,13 @@
)
from connectors.utils import (
EMAIL_REGEX_PATTERN,
iso_zulu,
validate_email_address,
)

GOOGLE_DRIVE_SERVICE_NAME = "Google Drive"
GOOGLE_ADMIN_DIRECTORY_SERVICE_NAME = "Google Admin Directory"
CURSOR_GOOGLE_DRIVE_KEY = "google_drives"

RETRIES = 3
RETRY_INTERVAL = 2
Expand All @@ -38,7 +45,7 @@

FOLDER_MIME_TYPE = "application/vnd.google-apps.folder"

DRIVE_ITEMS_FIELDS = "id,createdTime,driveId,modifiedTime,name,size,mimeType,fileExtension,webViewLink,owners,parents"
DRIVE_ITEMS_FIELDS = "id,createdTime,driveId,modifiedTime,name,size,mimeType,fileExtension,webViewLink,owners,parents,trashed,trashedTime"
DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS = f"{DRIVE_ITEMS_FIELDS},permissions"

# Export Google Workspace documents to TIKA compatible format, prefer 'text/plain' where possible to be
Expand All @@ -50,6 +57,12 @@
}


class SyncCursorEmpty(Exception):
"""Exception class to notify that incremental sync can't run because sync_cursor is empty."""

pass


class GoogleDriveClient(GoogleServiceAccountClient):
"""A google drive client to handle api calls made to Google Drive API."""

Expand Down Expand Up @@ -142,11 +155,12 @@ async def get_all_folders(self):

return folders

async def list_files(self, fetch_permissions=False):
async def list_files(self, fetch_permissions=False, last_sync_time=None):
"""Get files from Google Drive. Files can have any type.

Args:
include_permissions (bool): flag to select permissions in the request query
last_sync_time (str): time when last sync happened

Yields:
dict: Documents from Google Drive.
Expand All @@ -157,12 +171,15 @@ async def list_files(self, fetch_permissions=False):
if fetch_permissions
else DRIVE_ITEMS_FIELDS
)

if last_sync_time is None:
list_query = "trashed=false"
else:
list_query = f"trashed=true or modifiedTime > '{last_sync_time}' or createdTime > '{last_sync_time}'"
async for file in self.api_call_paged(
resource="files",
method="list",
corpora="allDrives",
q="trashed=false",
q=list_query,
orderBy="modifiedTime desc",
fields=f"files({files_fields}),incompleteSearch,nextPageToken",
includeItemsFromAllDrives=True,
Expand All @@ -171,7 +188,9 @@ async def list_files(self, fetch_permissions=False):
):
yield file

async def list_files_from_my_drive(self, fetch_permissions=False):
async def list_files_from_my_drive(
self, fetch_permissions=False, last_sync_time=None
):
"""Retrieves files from Google Drive, with an option to fetch permissions (DLS).

This function optimizes the retrieval process based on the 'fetch_permissions' flag.
Expand All @@ -182,15 +201,22 @@ async def list_files_from_my_drive(self, fetch_permissions=False):

Args:
include_permissions (bool): flag to select permissions in the request query
last_sync_time (str): time when last sync happened

Yields:
dict: Documents from Google Drive.
"""

if fetch_permissions:
if fetch_permissions and last_sync_time:
jedrazb marked this conversation as resolved.
Show resolved Hide resolved
files_fields = DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS
list_query = f"(trashed=true or modifiedTime > '{last_sync_time}' or createdTime > '{last_sync_time}') and 'me' in writers"
elif fetch_permissions and not last_sync_time:
files_fields = DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS
# Google Drive API required write access to fetch file's permissions
list_query = "trashed=false and 'me' in writers"
elif not fetch_permissions and last_sync_time:
files_fields = DRIVE_ITEMS_FIELDS
list_query = f"trashed=true or modifiedTime > '{last_sync_time}' or createdTime > '{last_sync_time}'"
else:
files_fields = DRIVE_ITEMS_FIELDS
list_query = "trashed=false"
Expand Down Expand Up @@ -841,7 +867,7 @@ async def get_google_workspace_content(self, client, file, timestamp=None):
# We need to do sanity size after downloading the file because:
# 1. We use files/export endpoint which converts large media-rich google slides/docs
# into text/plain format. We usually we end up with tiny .txt files.
# 2. Google will ofter report the Google Workspace shared documents to have size 0
# 2. Google will offer report the Google Workspace shared documents to have size 0
# as they don't count against user's storage quota.
if not self.is_file_size_within_limit(file_size, file_name):
return
Expand Down Expand Up @@ -988,7 +1014,7 @@ async def prepare_file(self, client, file, paths):
file (dict): File metadata returned from the Drive.

Returns:
dict: Formatted file metadata.
file_document, trashedTime (tuple): Formatted file metadata along with trashedTime for files deleted from shared drive
"""

file_id, file_name = file.get("id"), file.get("name")
Expand All @@ -1003,6 +1029,7 @@ async def prepare_file(self, client, file, paths):
"mime_type": file.get("mimeType"),
"file_extension": file.get("fileExtension"),
"url": file.get("webViewLink"),
"trashed": file.get("trashed"),
}

# record "file" or "folder" type
Expand Down Expand Up @@ -1061,8 +1088,7 @@ async def prepare_file(self, client, file, paths):
self._logger.error(exception_log_msg)

file_document[ACCESS_CONTROL] = self._process_permissions(permissions)

return file_document
return file_document, file.get("trashedTime")

async def prepare_files(self, client, files_page, paths, seen_ids):
"""Generate file document.
Expand All @@ -1079,7 +1105,12 @@ async def prepare_files(self, client, files_page, paths, seen_ids):
new_files = [file for file in files if file.get("id") not in seen_ids]

prepared_files = await self._process_items_concurrently(
new_files, lambda f: self.prepare_file(client=client, file=f, paths=paths)
new_files,
lambda f: self.prepare_file(
client=client,
file=f,
paths=paths,
),
)

for file in prepared_files:
Expand All @@ -1089,7 +1120,7 @@ async def get_docs(self, filtering=None):
"""Executes the logic to fetch Google Drive objects in an async manner.

Args:
filtering (optional): Advenced filtering rules. Defaults to None.
filtering (optional): Advanced filtering rules. Defaults to None.

Yields:
dict, partial: dict containing meta-data of the Google Drive objects,
Expand All @@ -1101,6 +1132,8 @@ async def get_docs(self, filtering=None):
# This is an optimization to process unique files only once.
seen_ids = set()

self.init_sync_cursor()

if self._domain_wide_delegation_sync_enabled():
# sync personal drives first
async for user in self.google_admin_directory_client.users():
Expand All @@ -1110,7 +1143,7 @@ async def get_docs(self, filtering=None):
async for files_page in google_drive_client.list_files_from_my_drive(
fetch_permissions=self._dls_enabled()
):
async for file in self.prepare_files(
async for file, _ in self.prepare_files(
client=google_drive_client,
files_page=files_page,
paths={},
Expand Down Expand Up @@ -1138,7 +1171,7 @@ async def get_docs(self, filtering=None):
async for files_page in shared_drives_client.list_files(
fetch_permissions=self._dls_enabled()
):
async for file in self.prepare_files(
async for file, _ in self.prepare_files(
client=shared_drives_client,
files_page=files_page,
paths=resolved_paths,
Expand All @@ -1156,10 +1189,144 @@ async def get_docs(self, filtering=None):
async for files_page in google_drive_client.list_files(
fetch_permissions=self._dls_enabled()
):
async for file in self.prepare_files(
async for file, _ in self.prepare_files(
client=google_drive_client,
files_page=files_page,
paths=resolved_paths,
seen_ids=seen_ids,
):
yield file, partial(self.get_content, google_drive_client, file)

async def get_docs_incrementally(self, sync_cursor, filtering=None):
"""Executes the logic to fetch Google Drive objects incrementally in an async manner.

Args:
sync_cursor (str): Last sync time.
filtering (optional): Advanced filtering rules. Defaults to None.

Yields:
dict, partial: dict containing meta-data of the Google Drive objects,
partial download content function
"""
self._sync_cursor = sync_cursor
timestamp = iso_zulu()
self._logger.debug(f"Current Sync Time {timestamp}")

if not self._sync_cursor:
msg = "Unable to start incremental sync. Please perform a full sync to re-enable incremental syncs."
raise SyncCursorEmpty(msg)

seen_ids = set()

if self._domain_wide_delegation_sync_enabled():
# sync personal drives first
async for user in self.google_admin_directory_client.users():
email = user.get(UserFields.EMAIL.value)
self._logger.debug(f"Syncing personal drive content for: {email}")
google_drive_client = self.google_drive_client(impersonate_email=email)
async for files_page in google_drive_client.list_files_from_my_drive(
fetch_permissions=self._dls_enabled(),
last_sync_time=self.last_sync_time(),
):
# personal drive files have no property called trashedTime(time when file was deleted)
async for file, _ in self.prepare_files(
client=google_drive_client,
files_page=files_page,
paths={},
seen_ids=seen_ids,
):
if file.get("trashed") is True:
yield file, partial(
self.get_content, google_drive_client, file
), OP_DELETE
else:
yield file, partial(
self.get_content, google_drive_client, file
), OP_INDEX

email_for_shared_drives_sync = (
self._google_google_workspace_email_for_shared_drives_sync()
)

shared_drives_client = self.google_drive_client(
impersonate_email=email_for_shared_drives_sync
)

# Build a path lookup, parentId -> parent path
resolved_paths = await self.resolve_paths(
google_drive_client=shared_drives_client
)

# sync shared drives
self._logger.debug(
f"Syncing shared drives using admin account: {email_for_shared_drives_sync}"
)
async for files_page in shared_drives_client.list_files(
fetch_permissions=self._dls_enabled(),
last_sync_time=self.last_sync_time(),
):
# trashedTime(time when file was deleted) is a property exclusive to files present in shared drive
async for file, trashedTime in self.prepare_files(
client=shared_drives_client,
files_page=files_page,
paths=resolved_paths,
seen_ids=seen_ids,
):
if (
trashedTime is None or trashedTime > self.last_sync_time()
) and file.get("trashed") is True:
yield file, partial(
self.get_content, shared_drives_client, file
), OP_DELETE
elif (
trashedTime is not None and trashedTime < self.last_sync_time()
) and file.get("trashed") is True:
continue
else:
yield file, partial(
self.get_content, shared_drives_client, file
), OP_INDEX

else:
# Build a path lookup, parentId -> parent path
resolved_paths = await self.resolve_paths()

google_drive_client = self.google_drive_client()

# sync anything shared with the service account
# shared drives can also be shared with service account
# making it possible to sync shared drives without domain wide delegation
async for files_page in google_drive_client.list_files(
fetch_permissions=self._dls_enabled(),
last_sync_time=self.last_sync_time(),
):
async for file, trashedTime in self.prepare_files(
client=google_drive_client,
files_page=files_page,
paths=resolved_paths,
seen_ids=seen_ids,
):
if (
trashedTime is None or trashedTime > self.last_sync_time()
) and file.get("trashed") is True:
yield file, partial(
self.get_content, google_drive_client, file
), OP_DELETE
elif (
trashedTime is not None and trashedTime < self.last_sync_time()
) and file.get("trashed") is True:
continue
else:
yield file, partial(
self.get_content, google_drive_client, file
), OP_INDEX
self.update_sync_timestamp_cursor(timestamp)

def init_sync_cursor(self):
if not self._sync_cursor:
self._sync_cursor = {
CURSOR_GOOGLE_DRIVE_KEY: {},
CURSOR_SYNC_TIMESTAMP: iso_zulu(),
}

return self._sync_cursor
Loading
Loading