Skip to content
This repository has been archived by the owner on Sep 19, 2024. It is now read-only.

Commit

Permalink
[AP-813] & [AP-789] make some Log based params configurable (#10)
Browse files Browse the repository at this point in the history
* Make await time and buffer size configurable in Log based

* use pytest-cov plugin

* fix makefile

* fix datetime conversion

* timezone fixes
  • Loading branch information
Samira-El authored Jul 23, 2020
1 parent 65ec7cf commit c24bac4
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 46 deletions.
8 changes: 3 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ setup: create_venv setup_local_db upgrade_pip install_dep check_dep
echo "Setup is finished"

pylint:
pylint tap_mongodb tap_mongodb/sync_strategies --rcfile=pylintrc; \
pylint tap_mongodb tap_mongodb/sync_strategies --rcfile=pylintrc

test:
pytest tests -v; \
pytest tests -v

test_cov:
coverage run -m pytest tests -v; \
coverage report --include="tap_mongodb/*";

pytest --cov=tap_mongodb tests -v
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ Create json file called `config.json`, with the following contents:
```
The following parameters are optional for your config file:

| Name | Type | Description |
| -----|------|------------ |
| `replica_set` | string | name of replica set |
| `ssl` | Boolean | can be set to true to connect using ssl, default false |
| `verify_mode` | Boolean | Default SSL verify mode, default true |
| `include_schemas_in_destination_stream_name` | Boolean | forces the stream names to take the form `<database_name>-<collection_name>` instead of `<collection_name>`, default false|
| Name | Type | Default value| Description |
| -----|------|--------|------------ |
| `replica_set` | string | null | name of replica set |
| `ssl` | Boolean | false | can be set to true to connect using ssl |
| `verify_mode` | Boolean | true | Default SSL verify mode |
| `include_schemas_in_destination_stream_name` | Boolean |false | forces the stream names to take the form `<database_name>-<collection_name>` instead of `<collection_name>`|
| `update_buffer_size` | int | 1 | [LOG_BASED] The size of the buffer that holds detected update operations in memory, the buffer is flushed once the size is reached |
| `await_time_ms` | int | 1000 | [LOG_BASED] The maximum amount of time in milliseconds the loge_base method waits for new data changes before exiting. |

All of the above attributes are required by the tap to connect to your mongo instance.
here is a [sample configuration file](./sample_config.json).
Expand Down
4 changes: 3 additions & 1 deletion sample_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@
"database": "<collection database name>",
"replica_set": "<replicaSet name if exist, null otherwise>",
"verify_mode": "<SSL verify mode | optional, default 'true'>",
"ssl": "<enable SSL | optional>"
"ssl": "<enable SSL | optional>",
"update_buffer_size": "<int>",
"await_time_ms": "<int>"
}
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
extras_require={
'dev': [
'pylint',
'nose',
'ipdb'
],
'test': [
'pytest',
'coverage'
'pytest==5.4',
'pytest-cov==2.10'
]
},
entry_points='''
Expand Down
33 changes: 26 additions & 7 deletions tap_mongodb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import ssl
import sys
from typing import List, Dict
from typing import List, Dict, Optional

import singer
from pymongo import MongoClient
Expand All @@ -13,6 +13,7 @@
import tap_mongodb.sync_strategies.common as common
import tap_mongodb.sync_strategies.full_table as full_table
import tap_mongodb.sync_strategies.incremental as incremental
from tap_mongodb.config_utils import validate_config
from tap_mongodb.db_utils import get_databases, produce_collection_schema
from tap_mongodb.errors import InvalidReplicationMethodException, NoReadPrivilegeException
from tap_mongodb.stream_utils import is_log_based_stream, is_stream_selected, write_schema_message, \
Expand Down Expand Up @@ -165,14 +166,22 @@ def sync_traditional_streams(client: MongoClient, traditional_streams: List[Dict
sync_traditional_stream(client, stream, state)


def sync_log_based_streams(client: MongoClient, log_based_streams: List[Dict], database_name: str, state: Dict):
def sync_log_based_streams(client: MongoClient,
log_based_streams: List[Dict],
database_name: str,
state: Dict,
update_buffer_size: Optional[int],
await_time_ms: Optional[int]
):
"""
Sync log_based streams all at once by listening on the database-level change streams events.
Args:
client: MongoDB client instance
log_based_streams: list of streams to sync
database_name: name of the database to sync from
state: state dictionary
update_buffer_size: the size of buffer used to hold detected updates
await_time_ms: the maximum time in milliseconds for the log based to wait for changes before exiting
"""
if not log_based_streams:
return
Expand All @@ -193,22 +202,26 @@ def sync_log_based_streams(client: MongoClient, log_based_streams: List[Dict], d

with metrics.job_timer('sync_table') as timer:
timer.tags['database'] = database_name
update_buffer_size = update_buffer_size or change_streams.MIN_UPDATE_BUFFER_LENGTH
await_time_ms = await_time_ms or change_streams.DEFAULT_AWAIT_TIME_MS

change_streams.sync_database(client[database_name], streams, state)
change_streams.sync_database(client[database_name], streams, state, update_buffer_size, await_time_ms)

state = singer.set_currently_syncing(state, None)
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))


def do_sync(client: MongoClient, catalog: Dict, database_name: str, state: Dict):
def do_sync(client: MongoClient, catalog: Dict, config: Dict, state: Dict):
"""
Syncs all the selected streams in the catalog
Args:
client: MongoDb client instance
catalog: dictionary with all the streams details
database_name: name of the database to sync from
config: config dictionary
state: state
"""
validate_config(config)

all_streams = catalog['streams']
streams_to_sync = get_streams_to_sync(all_streams, state)

Expand All @@ -219,7 +232,13 @@ def do_sync(client: MongoClient, catalog: Dict, database_name: str, state: Dict)
LOGGER.debug('Sync of traditional streams done')

LOGGER.debug('Starting sync of log based streams ...')
sync_log_based_streams(client, log_based_streams, database_name, state)
sync_log_based_streams(client,
log_based_streams,
config['database'],
state,
config.get('update_buffer_size'),
config.get('await_time_ms')
)
LOGGER.debug('Sync of log based streams done')

LOGGER.info(common.get_sync_summary(catalog))
Expand Down Expand Up @@ -263,7 +282,7 @@ def main_impl():
do_discover(client, config)
elif args.catalog:
state = args.state or {}
do_sync(client, args.catalog.to_dict(), config['database'], state)
do_sync(client, args.catalog.to_dict(), config, state)


def main():
Expand Down
40 changes: 40 additions & 0 deletions tap_mongodb/config_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import Dict

from tap_mongodb.errors import InvalidAwaitTimeError, InvalidUpdateBufferSizeError
from tap_mongodb.sync_strategies import change_streams


def validate_config(config: Dict) -> None:
"""
Goes through the config and validate it
Currently, only few parameters are validated
Args:
config: Dictionary of config to validate
Returns: None
Raises: InvalidUpdateBufferSizeError or InvalidAwaitTimeError
"""
if 'update_buffer_size' in config:
update_buffer_size = config['update_buffer_size']

if not isinstance(update_buffer_size, int):
raise InvalidUpdateBufferSizeError(update_buffer_size, 'Not integer')

if not (change_streams.MIN_UPDATE_BUFFER_LENGTH <=
update_buffer_size <= change_streams.MAX_UPDATE_BUFFER_LENGTH):

raise InvalidUpdateBufferSizeError(
update_buffer_size,
f'Not in the range [{change_streams.MIN_UPDATE_BUFFER_LENGTH}..'
f'{change_streams.MAX_UPDATE_BUFFER_LENGTH}]')


if 'await_time_ms' in config:
await_time_ms = config['await_time_ms']

if not isinstance(await_time_ms, int):
raise InvalidAwaitTimeError(await_time_ms, 'Not integer')

if await_time_ms <= 0:
raise InvalidAwaitTimeError(
await_time_ms, 'time must be > 0')
12 changes: 12 additions & 0 deletions tap_mongodb/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,15 @@ class NoReadPrivilegeException(Exception):
def __init__(self, user, db_name):
msg = f"The user '{user}' has no read privilege on the database '{db_name}'!"
super(NoReadPrivilegeException, self).__init__(msg)

class InvalidUpdateBufferSizeError(Exception):
"""Raised if the given update buffer size used in log_based is invalid"""
def __init__(self, size, reason):
msg = f"Invalid update buffer size {size}! {reason}"
super(InvalidUpdateBufferSizeError, self).__init__(msg)

class InvalidAwaitTimeError(Exception):
"""Raised if the given await time used in log_based is invalid"""
def __init__(self, time_ms, reason):
msg = f"Invalid await time {time_ms}! {reason}"
super(InvalidAwaitTimeError, self).__init__(msg)
22 changes: 14 additions & 8 deletions tap_mongodb/sync_strategies/change_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
LOGGER = singer.get_logger('tap_mongodb')

RESUME_TOKEN_KEY = 'token'
MAX_AWAIT_TIME_MS = 300000 # 5 minutes
MAX_UPDATE_BUFFER_LENGTH = 500
DEFAULT_AWAIT_TIME_MS = 1000 # the server default https://docs.mongodb.com/manual/reference/method/db.watch/#db.watch
MIN_UPDATE_BUFFER_LENGTH = 1 # default
MAX_UPDATE_BUFFER_LENGTH = common.UPDATE_BOOKMARK_PERIOD # set max as the same value as bookmark period as we flush
# the buffer anyway after every UPDATE_BOOKMARK_PERIOD


def update_bookmarks(state: Dict, tap_stream_ids: Set[str], token: Dict) -> Dict:
Expand Down Expand Up @@ -70,14 +72,18 @@ def get_token_from_state(streams_to_sync: Set[str], state: Dict) -> Optional[Dic

def sync_database(database: Database,
streams_to_sync: Dict[str, Dict],
state: Dict
state: Dict,
update_buffer_size: int,
await_time_ms: int
) -> None:
"""
Syncs the records from the given collection using ChangeStreams
Args:
database: MongoDB Database instance to sync
streams_to_sync: Dict of stream dictionary with all the stream details
state: state dictionary
update_buffer_size: the size of buffer used to hold detected updates
await_time_ms: the maximum time in milliseconds for the log based to wait for changes before exiting
"""
LOGGER.info('Starting LogBased sync for streams "%s" in database "%s"', list(streams_to_sync.keys()), database.name)

Expand All @@ -103,11 +109,13 @@ def sync_database(database: Database,
{'ns.coll': {'$in': [val['table_name'] for val in streams_to_sync.values()]}}
]
}}],
max_await_time_ms=MAX_AWAIT_TIME_MS,
max_await_time_ms=await_time_ms,
start_after=get_token_from_state(stream_ids, state)
) as cursor:
while cursor.alive:

change = cursor.try_next()

# Note that the ChangeStream's resume token may be updated
# even when no changes are returned.

Expand All @@ -123,12 +131,10 @@ def sync_database(database: Database,
'_data': cursor.resume_token['_data']
}

change = cursor.try_next()

# After MAX_AWAIT_TIME_MS has elapsed, the cursor will return None.
# write state and exit
if change is None:
LOGGER.info('No change streams after %s, updating bookmark and exiting...', MAX_AWAIT_TIME_MS)
LOGGER.info('No change streams after %s, updating bookmark and exiting...', await_time_ms)

state = update_bookmarks(state, stream_ids, resume_token)
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
Expand Down Expand Up @@ -170,7 +176,7 @@ def sync_database(database: Database,
state = update_bookmarks(state, stream_ids, resume_token)

# flush buffer if it has filled up or flush and write state every UPDATE_BOOKMARK_PERIOD messages
if sum(len(stream_buffer) for stream_buffer in update_buffer.values()) >= MAX_UPDATE_BUFFER_LENGTH or \
if sum(len(stream_buffer) for stream_buffer in update_buffer.values()) >= update_buffer_size or \
sum(rows_saved.values()) % common.UPDATE_BOOKMARK_PERIOD == 0:

LOGGER.debug('Flushing update buffer ...')
Expand Down
10 changes: 7 additions & 3 deletions tap_mongodb/sync_strategies/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,13 @@ def class_to_string(key_value: Any, key_type: str) -> str:
Raises: UnsupportedKeyTypeException if key_type is not supported
"""
if key_type == 'datetime':
timezone = tzlocal.get_localzone()
local_datetime = timezone.localize(key_value)
utc_datetime = local_datetime.astimezone(pytz.UTC)
if key_value.tzinfo is None:
timezone = tzlocal.get_localzone()
local_datetime = timezone.localize(key_value)
utc_datetime = local_datetime.astimezone(pytz.UTC)
else:
utc_datetime = key_value.astimezone(pytz.UTC)

return utils.strftime(utc_datetime)

if key_type == 'Timestamp':
Expand Down
23 changes: 12 additions & 11 deletions tests/sync_strategies/test_change_streams.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import unittest
from datetime import datetime

import bson
import pytz

from datetime import datetime
from unittest.mock import patch, Mock, PropertyMock, MagicMock
from pymongo.change_stream import CollectionChangeStream, ChangeStream
from pymongo.collection import Collection
Expand All @@ -15,6 +15,8 @@

class TestChangeStreams(unittest.TestCase):

maxDiff = None

def tearDown(self) -> None:
common.SCHEMA_COUNT.clear()
common.SCHEMA_TIMES.clear()
Expand Down Expand Up @@ -227,7 +229,7 @@ def test_sync_database(self, get_buffer_rows_from_db_mock, write_message_mock):
'_id': 'id11',
'key1': 1,
'key2': 'abc',
'key3': {'a': 1, 'b': datetime(2020, 4, 10, 14, 50, 55, 0)}
'key3': {'a': 1, 'b': datetime(2020, 4, 10, 14, 50, 55, 0, tzinfo=pytz.utc)}
}
}).return_value,
Mock(spec_set=ChangeStream, return_value={
Expand Down Expand Up @@ -311,15 +313,14 @@ def test_sync_database(self, get_buffer_rows_from_db_mock, write_message_mock):
mock_db.watch.return_value = mock_watch
type(mock_db).name = PropertyMock(return_value='mydb')

change_streams.sync_database(mock_db, streams, state)
change_streams.sync_database(mock_db, streams, state, 1, 1)

self.assertEqual({
'bookmarks': {
'mydb-stream1': {
'token':
{
'_data': 'token6'
},
'token': {
'_data': 'token6'
},
},
'mydb-stream2': {
'token': {
Expand All @@ -343,19 +344,19 @@ def test_sync_database(self, get_buffer_rows_from_db_mock, write_message_mock):

self.assertListEqual([
'RecordMessage', # insert
'RecordMessage', # update
'RecordMessage', # insert
'RecordMessage', # delete
'RecordMessage', # insert
'StateMessage',
'RecordMessage', # update
], [msg.__class__.__name__ for msg in messages])

self.assertListEqual([
{'_id': 'id11', 'document': {'_id': 'id11', 'key1': 1, 'key2': 'abc','key3': {'a': 1, 'b': '2020-04-10T11:50:55.000000Z'}}, common.SDC_DELETED_AT: None},
{'_id': 'id11', 'document': {'_id': 'id11', 'key1': 1, 'key2': 'abc','key3': {'a': 1, 'b': '2020-04-10T14:50:55.000000Z'}}, common.SDC_DELETED_AT: None},
{'_id': 'id13', 'document': {'_id': 'id13', 'key2': 'eeeeef'}, common.SDC_DELETED_AT: None},
{'_id': 'id21', 'document':{'_id': 'id21', 'key6': 12, 'key10': 'abc','key11': [1,2,3, '10']}, common.SDC_DELETED_AT: None},
{'_id': 'id22', 'document': {'_id': 'id22'}, common.SDC_DELETED_AT: '2020-05-05T00:00:00.000000Z'},
{'_id': 'id13', 'document': {'_id': 'id13', 'key3': '2020-05-05T00:00:00.000000Z'}, common.SDC_DELETED_AT: None},
{'_id': 'id13', 'document': {'_id': 'id13', 'key2': 'eeeeef' }, common.SDC_DELETED_AT: None},
], [msg.record for msg in messages if isinstance(msg, RecordMessage)])

self.assertEqual(common.COUNTS['mydb-stream1'], 3)
Expand Down
Loading

0 comments on commit c24bac4

Please sign in to comment.