Skip to content

Commit

Permalink
Merge pull request #24 from sendbird/bugfix/recreate-cursor-after-lon…
Browse files Browse the repository at this point in the history
…g-running-query

Recreate cursor after long running query
  • Loading branch information
jjh-kim authored Nov 16, 2024
2 parents 3e84f96 + 18e9855 commit 0640280
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions src/sbosc/controller/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,11 @@ def __validate_unmatched_pks(self):

def validate_apply_dml_events(self, start_timestamp, end_timestamp):
unmatched_pks = []
with self.db.cursor() as cursor:
cursor: Cursor

if start_timestamp <= end_timestamp:
self.logger.info(f"Start validating DML events from {start_timestamp} to {end_timestamp}")
for table in ['inserted_pk', 'updated_pk', 'deleted_pk']:
if start_timestamp <= end_timestamp:
self.logger.info(f"Start validating DML events from {start_timestamp} to {end_timestamp}")
for table in ['inserted_pk', 'updated_pk', 'deleted_pk']:
with self.db.cursor() as cursor:
cursor: Cursor
cursor.execute(f'''
ANALYZE TABLE {config.SBOSC_DB}.{table}_{self.migration_id}
''')
Expand Down Expand Up @@ -301,14 +300,20 @@ def validate_apply_dml_events(self, start_timestamp, end_timestamp):
for thread in threads:
thread.result()

cursor.executemany(f'''
INSERT IGNORE INTO {config.SBOSC_DB}.unmatched_rows (source_pk, migration_id, unmatch_type)
VALUES (%s, {self.migration_id}, %s)
''', unmatched_pks)
with self.db.cursor() as cursor:
cursor: Cursor
cursor.executemany(f'''
INSERT IGNORE INTO {config.SBOSC_DB}.unmatched_rows (source_pk, migration_id, unmatch_type)
VALUES (%s, {self.migration_id}, %s)
''', unmatched_pks)

self.__validate_unmatched_pks()
cursor.execute(
f"SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = {self.migration_id}")
unmatched_rows = cursor.fetchone()[0]

with self.db.cursor() as cursor:
cursor: Cursor
cursor.execute(
f"SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = {self.migration_id}")
unmatched_rows = cursor.fetchone()[0]

# Even though validation logic is based on data in tables following valid condition can be achieved.
# All events are being pushed to redis in validation stage.
Expand Down

0 comments on commit 0640280

Please sign in to comment.