Skip to content

Commit

Permalink
Merge pull request #20 from sendbird/release-1.1.0
Browse files Browse the repository at this point in the history
Release v1.1.0
  • Loading branch information
jjh-kim authored Jul 26, 2024
2 parents 6bef254 + 791f347 commit e6742fe
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 11 deletions.
10 changes: 10 additions & 0 deletions doc/add-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,13 @@ Before `ALTER TABLE ... ADD INDEX` command finishes, index is temporarily create
### Free Memory (Enhanced Monitoring)
Upon creating an index, the Free Memory as reported by Enhanced Monitoring will decrease. This decrease continues rapidly until it reaches a certain value. However, Aurora has the capability to immediately reclaim memory from FreeableMemory (as observed in CloudWatch), so this should not pose a significant issue. Nonetheless, it is important to monitor and ensure that neither Free Memory nor Freeable Memory reaches zero.

### Innodb Parameters (MySQL 8.0.27 and above)
In MySQL 8.0.27 new innodb parameters `innodb_ddl_buffer_size`, `innodb_ddl_threads`, and `innodb_parallel_read_threads` were added to improve secondary index creation.
SB-OSC supports options to set these parameters in the migration configuration before creating indexes.
```yaml
innodb_ddl_buffer_size: 1048576
innodb_ddl_threads: 4
innodb_parallel_read_threads: 4
```
Please refer to the [MySQL documentation](https://dev.mysql.com/doc/refman/8.0/en/innodb-parameters.html) for more information on these parameters.
10 changes: 10 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Config

## Flow Control
SB-OSC allows user to control the flow of the migration process by setting various parameters. You can set these parameters to apply specific stages based on your environment and requirements.

### skip_bulk_import
If you set this parameter to `True`, SB-OSC will skip the bulk import stage and start from the apply DML events stage. This is useful when you have already copied the data to the destination table and only need to apply DML events. For example, when you create a clone cluster to make an initial copy and replicate changes using SB-OSC, this parameter can be set to `True`. `init_binlog_file` and `init_binlog_position` should be also set when `skip_bulk_import` is `True`, otherwise it will raise an error.

### disable_apply_dml_events
If you set this parameter to `True`, SB-OSC will pause before `apply_dml_events` stage. This is useful when you have additional steps to perform manually before applying DML events.


## Chunk
### max_chunk_count & min_chunk_size
SB-OSC calculates the number of chunks to create based on following formula
Expand Down
35 changes: 35 additions & 0 deletions doc/operation-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ SB-OSC provides two default operation classes. `BaseOperation` is the default op

You can create your own operation class by inheriting `BaseOperation` and overriding its methods. If you pass the operation class name to the `operation_class` parameter in the migration configuration, SB-OSC detect any operation class defined below `src/sbosc/opeartion` directory and use it for the migration process.

You can also add additional configs dedicated to the operation class. These configs will be passed to the operation class as `operation_config` wrapped in dataclass you defined.

```yaml
operation_class_config:
retention_days: 30
```
## Example
### BaseOperation
Expand Down Expand Up @@ -35,6 +42,8 @@ class MessageRetentionOperation(BaseOperation):
### CrossClusterOperation
```python
from sbosc.operations.base import CrossClusterBaseOperation
class CrossClusterMessageRetentionOperation(CrossClusterBaseOperation):
def _select_batch_query(self, start_pk, end_pk):
return f'''
Expand All @@ -58,3 +67,29 @@ class CrossClusterMessageRetentionOperation(CrossClusterBaseOperation):
dest_pks = [row[0] for row in dest_cursor.fetchall()]
return list(set(source_pks) - set(dest_pks))
```
### Operation Config
```python
from dataclasses import dataclass
from sbosc.operations.base import BaseOperation
from sbosc.operations.operation import MigrationOperationConfig
@dataclass
class MessageRetentionConfig(MigrationOperationConfig):
retention_days: int
class MessageRetentionOperation(BaseOperation):
operation_config_class = MessageRetentionConfig
operation_config: MessageRetentionConfig
def _insert_batch_query(self, start_pk, end_pk):
return f"""
INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns})
SELECT {self.source_columns}
FROM {self.source_db}.{self.source_table} AS source
WHERE source.id BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL {self.operation_config.retention_days} DAY)
"""
```
7 changes: 7 additions & 0 deletions src/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ class Config:
WAIT_INTERVAL_UNTIL_AUTO_SWAP_IN_SECONDS = 60
PREFERRED_WINDOW = '00:00-23:59'
SKIP_BULK_IMPORT = False
DISABLE_APPLY_DML_EVENTS = False
OPERATION_CLASS = 'BaseOperation'
INDEXES = []
INDEX_CREATED_PER_QUERY = 4
INNODB_DDL_BUFFER_SIZE = None # optional
INNODB_DDL_THREADS = None # optional
INNODB_PARALLEL_READ_THREADS = None # optional

# Worker config
MIN_BATCH_SIZE = 100
Expand Down Expand Up @@ -102,6 +106,9 @@ class Config:
PK_SET_MAX_SIZE = 1000000
EVENT_BATCH_DURATION_IN_SECONDS = 3600

# Operation Class specific config
OPERATION_CLASS_CONFIG = {}

@property
def operation_class(self):
if self._operation_class is not None:
Expand Down
13 changes: 13 additions & 0 deletions src/sbosc/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,19 @@ def add_index(self):
# add index
with self.db.cursor(host='dest') as cursor:
cursor: Cursor

# set session variables
if config.INNODB_DDL_BUFFER_SIZE is not None:
cursor.execute(f"SET SESSION innodb_ddl_buffer_size = {config.INNODB_DDL_BUFFER_SIZE}")
self.logger.info(f"Set innodb_ddl_buffer_size to {config.INNODB_DDL_BUFFER_SIZE}")
if config.INNODB_DDL_THREADS is not None:
cursor.execute(f"SET SESSION innodb_ddl_threads = {config.INNODB_DDL_THREADS}")
self.logger.info(f"Set innodb_ddl_threads to {config.INNODB_DDL_THREADS}")
if config.INNODB_PARALLEL_READ_THREADS is not None:
cursor.execute(
f"SET SESSION innodb_parallel_read_threads = {config.INNODB_PARALLEL_READ_THREADS}")
self.logger.info(f"Set innodb_parallel_read_threads to {config.INNODB_PARALLEL_READ_THREADS}")

cursor.execute(f'''
ALTER TABLE {metadata.destination_db}.{metadata.destination_table}
{', '.join([
Expand Down
2 changes: 1 addition & 1 deletion src/sbosc/eventhandler/eventhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def start(self):
self.logger.info('Initializing event handler')
self.init_event_handler()
else:
if current_stage == Stage.APPLY_DML_EVENTS:
if current_stage == Stage.APPLY_DML_EVENTS and not config.DISABLE_APPLY_DML_EVENTS:
self.apply_dml_events()
elif current_stage == Stage.APPLY_DML_EVENTS_PRE_VALIDATION:
self.apply_dml_events_pre_validation()
Expand Down
10 changes: 10 additions & 0 deletions src/sbosc/operations/operation.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
from abc import abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass
from typing import List

from MySQLdb.cursors import Cursor

from config import config
from modules.db import Database
from modules.redis import RedisData


@dataclass
class MigrationOperationConfig:
pass


class MigrationOperation:
"""Abstract class for migration operations."""
operation_config_class = MigrationOperationConfig

def __init__(self, migration_id):
self.migration_id = migration_id
Expand All @@ -24,6 +32,8 @@ def __init__(self, migration_id):
self.source_column_list: list = metadata.source_columns.split(',')
self.start_datetime = metadata.start_datetime

self.operation_config = self.operation_config_class(**config.OPERATION_CLASS_CONFIG)

@abstractmethod
def insert_batch(self, db: Database, start_pk: int, end_pk: int, upsert=False, limit=None) -> Cursor:
"""
Expand Down
3 changes: 3 additions & 0 deletions tests/configs/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ skip_bulk_import: False
operation_class: BaseOperation
indexes: []
index_created_per_query: 4
innodb_ddl_buffer_size: 1048576
innodb_ddl_threads: 4
innodb_parallel_read_threads: 4

# Worker config
min_batch_size: 100
Expand Down
26 changes: 16 additions & 10 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
get_logger()


@pytest.fixture(scope='session')
@pytest.fixture(scope='package')
def secret():
from config import secret
return secret


@pytest.fixture(scope='session')
@pytest.fixture(scope='package')
def config():
from config import config
return config
Expand All @@ -40,7 +40,7 @@ def request_id(request):
return request.node.callspec.id


@pytest.fixture(scope='session')
@pytest.fixture(scope='package')
def cursor(config, secret):
connection = MySQLdb.connect(
host=config.SOURCE_WRITER_ENDPOINT,
Expand All @@ -55,7 +55,8 @@ def cursor(config, secret):

@pytest.fixture
def sqlalchemy_engine(config, secret):
return create_engine(f'mysql+mysqldb://{secret.USERNAME}:@{config.SOURCE_WRITER_ENDPOINT}:{secret.PORT}/{config.SOURCE_DB}')
return create_engine(
f'mysql+mysqldb://{secret.USERNAME}:@{config.SOURCE_WRITER_ENDPOINT}:{secret.PORT}/{config.SOURCE_DB}')


@pytest.fixture(autouse=True)
Expand All @@ -69,13 +70,18 @@ def time_sleep_mock():


@pytest.fixture(scope='session')
def redis_data():
def migration_id():
return 1


@pytest.fixture(scope='session')
def redis_data(migration_id):
from modules.redis import RedisData
return RedisData(1, False)
return RedisData(migration_id, False)


@pytest.fixture(autouse=True, scope='session')
def init_migration(config, cursor, redis_data):
@pytest.fixture(autouse=True, scope='package')
def init_migration(config, cursor, redis_data, migration_id):
from sbosc.const import Stage
from sbosc.controller.initializer import Initializer

Expand All @@ -86,10 +92,10 @@ def init_migration(config, cursor, redis_data):
cursor.execute(f'CREATE TABLE {config.SOURCE_DB}.{config.SOURCE_TABLE} (id int)')
cursor.execute(f'CREATE TABLE {config.DESTINATION_DB}.{config.DESTINATION_TABLE} (id int)')

migration_id = Initializer().init_migration()
retrieved_migration_id = Initializer().init_migration()

# Validate Initializer.init_migration
assert migration_id == 1
assert retrieved_migration_id == migration_id
assert redis_data.current_stage == Stage.START_EVENT_HANDLER
assert redis_data.metadata.source_db == config.SOURCE_DB
assert redis_data.metadata.source_table == config.SOURCE_TABLE
Expand Down

0 comments on commit e6742fe

Please sign in to comment.