Skip to content

Commit

Permalink
✨ Introduce StateIteratorProcessor in CDK (#33312)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong authored Jan 3, 2024
1 parent fa2a2cf commit 18e0e77
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 115 deletions.
5 changes: 3 additions & 2 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,9 @@ MavenLocal debugging steps:

### Java CDK

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:--------------------------------------|
| 0.10.3 | 2024-01-03 | [\#33312](https://github.com/airbytehq/airbyte/pull/33312) | Send out count in AirbyteStateMessage |
| 0.10.1 | 2023-12-21 | [\#33723](https://github.com/airbytehq/airbyte/pull/33723) | Make memory-manager log message less scary |
| 0.10.0 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | JdbcDestinationHandler now properly implements `getInitialRawTableState`; reenable SqlGenerator test |
| 0.9.0 | 2023-12-18 | [\#33124](https://github.com/airbytehq/airbyte/pull/33124) | Make Schema Creation Separate from Table Creation, exclude the T&D module from the CDK |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.10.2
version=0.10.3
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.source.relationaldb.state;

import com.google.common.collect.AbstractIterator;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateStats;
import java.time.Instant;
import java.util.Iterator;
import javax.annotation.CheckForNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceStateIterator<T> extends AbstractIterator<AirbyteMessage> implements Iterator<AirbyteMessage> {

private static final Logger LOGGER = LoggerFactory.getLogger(SourceStateIterator.class);
private final Iterator<T> messageIterator;
private boolean hasEmittedFinalState = false;
private long recordCount = 0L;
private Instant lastCheckpoint = Instant.now();

private final SourceStateIteratorManager sourceStateIteratorManager;

public SourceStateIterator(final Iterator<T> messageIterator,
final SourceStateIteratorManager sourceStateIteratorManager) {
this.messageIterator = messageIterator;
this.sourceStateIteratorManager = sourceStateIteratorManager;
}

@CheckForNull
@Override
protected AirbyteMessage computeNext() {
boolean iteratorHasNextValue = false;
try {
iteratorHasNextValue = messageIterator.hasNext();
} catch (Exception ex) {
LOGGER.info("Caught exception while trying to get the next from message iterator. Treating hasNext to false. ", ex);
}
if (iteratorHasNextValue) {
if (sourceStateIteratorManager.shouldEmitStateMessage(recordCount, lastCheckpoint)) {
AirbyteStateMessage stateMessage = sourceStateIteratorManager.generateStateMessageAtCheckpoint();
stateMessage.withSourceStats(new AirbyteStateStats().withRecordCount((double) recordCount));

recordCount = 0L;
lastCheckpoint = Instant.now();
return new AirbyteMessage()
.withType(Type.STATE)
.withState(stateMessage);
}
// Use try-catch to catch Exception that could occur when connection to the database fails
try {
final T message = messageIterator.next();
final AirbyteMessage processedMessage = sourceStateIteratorManager.processRecordMessage(message);
recordCount++;
return processedMessage;
} catch (final Exception e) {
throw new RuntimeException(e);
}
} else if (!hasEmittedFinalState) {
hasEmittedFinalState = true;
final AirbyteStateMessage finalStateMessage = sourceStateIteratorManager.createFinalStateMessage();
return new AirbyteMessage()
.withType(Type.STATE)
.withState(finalStateMessage);
} else {
return endOfData();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.source.relationaldb.state;

import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import java.time.Instant;

public interface SourceStateIteratorManager<T> {

/**
* Returns a state message that should be emitted at checkpoint.
*/
AirbyteStateMessage generateStateMessageAtCheckpoint();

/**
* For the incoming record message, this method defines how the connector will consume it.
*/
AirbyteMessage processRecordMessage(final T message);

/**
* At the end of the iteration, this method will be called and it will generate the final state
* message.
*
* @return
*/
AirbyteStateMessage createFinalStateMessage();

/**
* Determines if the iterator has reached checkpoint or not, based on the time and number of record
* messages it has been processed since the last checkpoint.
*/
boolean shouldEmitStateMessage(final long recordCount, final Instant lastCheckpoint);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.source.relationaldb.state;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateStats;
import java.util.Iterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class SourceStateIteratorTest {

SourceStateIteratorManager mockProcessor;
Iterator<AirbyteMessage> messageIterator;

SourceStateIterator sourceStateIterator;

@BeforeEach
void setup() {
mockProcessor = mock(SourceStateIteratorManager.class);
messageIterator = mock(Iterator.class);
sourceStateIterator = new SourceStateIterator(messageIterator, mockProcessor);
}

// Provides a way to generate a record message and will verify corresponding spied functions have
// been called.
void processRecordMessage() {
doReturn(true).when(messageIterator).hasNext();
doReturn(false).when(mockProcessor).shouldEmitStateMessage(anyLong(), any());
AirbyteMessage message = new AirbyteMessage().withType(Type.RECORD).withRecord(new AirbyteRecordMessage());
doReturn(message).when(mockProcessor).processRecordMessage(any());
doReturn(message).when(messageIterator).next();

assertEquals(message, sourceStateIterator.computeNext());
verify(mockProcessor, atLeastOnce()).processRecordMessage(message);
verify(mockProcessor, atLeastOnce()).shouldEmitStateMessage(eq(0L), any());
}

@Test
void testShouldProcessRecordMessage() {
processRecordMessage();
}

@Test
void testShouldEmitStateMessage() {
processRecordMessage();
doReturn(true).when(mockProcessor).shouldEmitStateMessage(anyLong(), any());
final AirbyteStateMessage stateMessage = new AirbyteStateMessage();
doReturn(stateMessage).when(mockProcessor).generateStateMessageAtCheckpoint();
AirbyteMessage expectedMessage = new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
expectedMessage.getState().withSourceStats(new AirbyteStateStats().withRecordCount(1.0));
assertEquals(expectedMessage, sourceStateIterator.computeNext());
}

@Test
void testShouldEmitFinalStateMessage() {
processRecordMessage();
processRecordMessage();
doReturn(false).when(messageIterator).hasNext();
final AirbyteStateMessage stateMessage = new AirbyteStateMessage();
doReturn(stateMessage).when(mockProcessor).createFinalStateMessage();
AirbyteMessage expectedMessage = new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
expectedMessage.getState().withSourceStats(new AirbyteStateStats().withRecordCount(2.0));
assertEquals(expectedMessage, sourceStateIterator.computeNext());
}

@Test
void testShouldSendEndOfData() {
processRecordMessage();
doReturn(false).when(messageIterator).hasNext();
doReturn(new AirbyteStateMessage()).when(mockProcessor).createFinalStateMessage();
sourceStateIterator.computeNext();

// After sending the final state, if iterator was called again, we will return null.
assertEquals(null, sourceStateIterator.computeNext());
}

}
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.7.7'
cdkVersionRequired = '0.10.3'
features = ['db-sources']
useLocalCdk = false
}
Expand All @@ -30,6 +30,7 @@ dependencies {

implementation 'mysql:mysql-connector-java:8.0.30'
implementation 'org.apache.commons:commons-lang3:3.11'
implementation project(path: ':airbyte-cdk:java:airbyte-cdk:db-sources')

testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.3.0
dockerImageTag: 3.3.1
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
import com.google.common.annotations.VisibleForTesting;
import com.mysql.cj.MysqlType;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants;
import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil;
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIteratorManager;
import io.airbyte.commons.stream.AirbyteStreamUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -180,13 +183,16 @@ private AutoCloseableIterator<AirbyteMessage> augmentWithState(final AutoCloseab

final Duration syncCheckpointDuration =
config.get(SYNC_CHECKPOINT_DURATION_PROPERTY) != null ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: MySqlInitialSyncStateIterator.SYNC_CHECKPOINT_DURATION;
: DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: MySqlInitialSyncStateIterator.SYNC_CHECKPOINT_RECORDS;
: DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS;

final SourceStateIteratorManager<AirbyteMessage> processor =
new MySqlInitialSyncStateIteratorManager(pair, initialLoadStateManager, incrementalState,
syncCheckpointDuration, syncCheckpointRecords);

return AutoCloseableIterators.transformIterator(
r -> new MySqlInitialSyncStateIterator(r, pair, initialLoadStateManager, incrementalState,
syncCheckpointDuration, syncCheckpointRecords),
r -> new SourceStateIterator<>(r, processor),
recordIterator, pair);
}

Expand Down

This file was deleted.

Loading

0 comments on commit 18e0e77

Please sign in to comment.