Skip to content

Commit

Permalink
Merge pull request #23 from villagelabsco/feat/vd-6250
Browse files Browse the repository at this point in the history
[VD-6250] Implement a degraded mode to single insert records if bulk insertion fails
  • Loading branch information
martin-village authored Feb 17, 2025
2 parents f79b515 + 927722d commit 6d368bf
Showing 1 changed file with 43 additions and 5 deletions.
48 changes: 43 additions & 5 deletions target_elasticsearch/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,19 +431,33 @@ def write_output(self, records):
# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
@param records:
"""
records = self.build_body(records)
self.logger.debug(records)
records_body = self.build_body(records)
self.logger.debug(records_body)

MAX_RETRIES = 5
RETRY_DELAY = 20

def insert_individual_records(recs):
"""Degraded mode: Helper function to insert records one by one"""
failed_records = []
for rec in recs:
single_rec_body = self.build_body([rec])
try:
bulk(self.client, single_rec_body)
self.logger.warning(f"Successfully inserted individual record")
except Exception as e:
self.logger.error(f"Failed to insert individual record: {str(e)}")
failed_records.append(rec)
return failed_records


for attempt in range(MAX_RETRIES):
try:
# Some documents may be very heavy, eg full Drive documents or diffs
# -> Apply a 25MB chunk limit instead of the default 100MB
# parallel_bulk(self.client, records, max_chunk_bytes=25*1024*1024)
# -- The above caused some dropped records - revert to the previous simple bulk insert
bulk(self.client, records)
bulk(self.client, records_body)
# Successful -> exit the loop
break
except elasticsearch.helpers.BulkIndexError as e:
Expand All @@ -457,9 +471,33 @@ def write_output(self, records):
self.logger.error(f"ConnectionTimeout on final attempt {MAX_RETRIES}: {str(e)}")
raise
except Exception as e:
self.logger.error(f"Unexpected error on attempt {attempt + 1}: {str(e)}")
raise
if attempt < MAX_RETRIES - 1:
# TransportError or elastic_transport.TlsError may happen if the payload is too big
self.logger.error(f"Unexpected error on attempt {attempt + 1}: {str(e)}. Retrying in {RETRY_DELAY} seconds with a smaller payload (switching to individual record insertion)...")
time.sleep(RETRY_DELAY)

failed_records = insert_individual_records(records)

if failed_records:
self.logger.error(
f"Failed to insert {len(failed_records)} records even in individual mode on attempt {attempt + 1}: "
)
self.logger.error("------")
self.logger.error(failed_records)
self.logger.error("------")
self.logger.error(f"Will retry in {RETRY_DELAY} seconds")
time.sleep(RETRY_DELAY)

records = failed_records
records_body = self.build_body(records)
self.logger.debug(records_body)
else:
self.logger.info("Successfully inserted all records in individual mode")
return

else:
self.logger.error(f"Unexpected error on final attempt {MAX_RETRIES}: {str(e)}")
raise

def process_batch(self, context: Dict[str, Any]) -> None:
"""
Expand Down

0 comments on commit 6d368bf

Please sign in to comment.