Skip to content

Commit

Permalink
Merge pull request #27 from sendbird/release-1.1.2
Browse files Browse the repository at this point in the history
Release v1.1.2
  • Loading branch information
jjh-kim authored Nov 20, 2024
2 parents 3e84f96 + 38d9d9a commit 5c37f79
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
2 changes: 2 additions & 0 deletions src/sbosc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def is_preferred_window():
start_time_str, end_time_str = config.PREFERRED_WINDOW.split('-')
start_time = datetime.strptime(start_time_str, '%H:%M').time()
end_time = datetime.strptime(end_time_str, '%H:%M').time()
if start_time >= end_time:
return start_time <= current_time or current_time <= end_time
return start_time <= current_time <= end_time

def get_migration_id(self):
Expand Down
31 changes: 18 additions & 13 deletions src/sbosc/controller/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,11 @@ def __validate_unmatched_pks(self):

def validate_apply_dml_events(self, start_timestamp, end_timestamp):
unmatched_pks = []
with self.db.cursor() as cursor:
cursor: Cursor

if start_timestamp <= end_timestamp:
self.logger.info(f"Start validating DML events from {start_timestamp} to {end_timestamp}")
for table in ['inserted_pk', 'updated_pk', 'deleted_pk']:
if start_timestamp <= end_timestamp:
self.logger.info(f"Start validating DML events from {start_timestamp} to {end_timestamp}")
for table in ['inserted_pk', 'updated_pk', 'deleted_pk']:
with self.db.cursor() as cursor:
cursor: Cursor
cursor.execute(f'''
ANALYZE TABLE {config.SBOSC_DB}.{table}_{self.migration_id}
''')
Expand Down Expand Up @@ -301,14 +300,20 @@ def validate_apply_dml_events(self, start_timestamp, end_timestamp):
for thread in threads:
thread.result()

cursor.executemany(f'''
INSERT IGNORE INTO {config.SBOSC_DB}.unmatched_rows (source_pk, migration_id, unmatch_type)
VALUES (%s, {self.migration_id}, %s)
''', unmatched_pks)
with self.db.cursor() as cursor:
cursor: Cursor
cursor.executemany(f'''
INSERT IGNORE INTO {config.SBOSC_DB}.unmatched_rows (source_pk, migration_id, unmatch_type)
VALUES (%s, {self.migration_id}, %s)
''', unmatched_pks)

self.__validate_unmatched_pks()
cursor.execute(
f"SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = {self.migration_id}")
unmatched_rows = cursor.fetchone()[0]

with self.db.cursor() as cursor:
cursor: Cursor
cursor.execute(
f"SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = {self.migration_id}")
unmatched_rows = cursor.fetchone()[0]

# Even though validation logic is based on data in tables following valid condition can be achieved.
# All events are being pushed to redis in validation stage.
Expand Down
4 changes: 4 additions & 0 deletions src/sbosc/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ def bulk_import(self):
start_pk = self.get_start_pk(chunk_info)
if start_pk is None:
return
# If the start_pk is greater than the end_pk, set the last_pk_inserted to end_pk
# This can happen when chunk ended with a duplicate key error
elif start_pk > chunk_info.end_pk:
chunk_info.last_pk_inserted = chunk_info.end_pk

end_pk = chunk_info.end_pk
chunk_info.status = ChunkStatus.IN_PROGRESS
Expand Down

0 comments on commit 5c37f79

Please sign in to comment.