Skip to content

Commit

Permalink
[Source-postgres] : Better error messages on switching between sync m… (
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Apr 24, 2024
1 parent b5fef4e commit d790b7d
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ object AirbyteTraceMessageUtility {
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
}

@JvmStatic
fun emitTransientErrorTrace(e: Throwable, displayMessage: String?) {
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR)
}

fun emitCustomErrorTrace(displayMessage: String?, internalMessage: String?) {
emitMessage(
makeAirbyteMessageFromTraceMessage(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
testExecutionConcurrency=-1

JunitMethodExecutionTimeout=2 m
JunitMethodExecutionTimeout=5 m
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.27
dockerImageTag: 3.3.28
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.integrations.source.postgres.ctid.CtidUtils.CtidStreams;
import io.airbyte.integrations.source.postgres.ctid.CtidUtils.StreamsCategorised;
import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType;
Expand Down Expand Up @@ -67,7 +68,7 @@ public static StreamsCategorised<CursorBasedStreams> categoriseStreams(final Sta
cursorBasedSyncStreamPairs.add(pair);
statesFromCursorBasedSync.add(stateMessage);
} else {
throw new RuntimeException("Unknown state type: " + streamState.get(STATE_TYPE_KEY).asText());
throw new ConfigErrorException("You've changed replication modes - please reset the streams in this connector");
}
} else {
LOGGER.info("State type not present, syncing stream {} via cursor", streamDescriptor.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.ctid.CtidUtils.CtidStreams;
import io.airbyte.integrations.source.postgres.ctid.CtidUtils.StreamsCategorised;
Expand Down Expand Up @@ -72,7 +73,7 @@ public static StreamsCategorised<XminStreams> categoriseStreams(final StateManag
statesFromXminSync.add(stateMessage);
}
} else {
throw new RuntimeException("Unknown state type: " + streamState.get(STATE_TYPE_KEY).asText());
throw new ConfigErrorException("You've changed replication modes - please reset the streams in this connector");
}
} else {
throw new RuntimeException("State type not present");
Expand Down
Loading

0 comments on commit d790b7d

Please sign in to comment.