Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Update initial load query for old postgres to return a defined order … #31328

Merged
merged 5 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,12 @@ public PreparedStatement createCtidLegacyQueryStatement(final Connection connect
Preconditions.checkArgument(lowerBound != null, "Lower bound ctid expected");
Preconditions.checkArgument(upperBound != null, "Upper bound ctid expected");
try {
LOGGER.info("*** one more {}", lowerBound);
LOGGER.info("Preparing query for table: {}", tableName);
final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName,
quoteString);
final String wrappedColumnNames = RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString);
final String sql =
"SELECT ctid::text, %s FROM %s WHERE ctid = ANY (ARRAY (SELECT FORMAT('(%%s,%%s)', page, tuple)::tid FROM generate_series(?, ?) as page, generate_series(?,?) as tuple))"
"SELECT ctid::text, %s FROM %s WHERE ctid = ANY (ARRAY (SELECT FORMAT('(%%s,%%s)', page, tuple)::tid tid_addr FROM generate_series(?, ?) as page, generate_series(?,?) as tuple ORDER BY tid_addr))"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check for any other queries that bring in values we use in checkpointing that also need order by?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the other ctid query is WHERE ctid > '0,0' AND ctid <= '(131000,0)'
this >< range is returning rows in sequential order - it was created as an optimization for old postgres. So I don't think we need to sort there ourselves.

With the postgres 12 query, there is some logic applied by postgres that it iterates over the bigger of two ranges first (pages) . I couldn't find a way to control it other than sorting the array.

Other incremental queries are all or nothing - so if an error happened in the middle of incremental xmin for example, it will not checkpoint at all. so even if there is some case records are out of order we will be good (I don't think this can happen anywhere) .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My general recommendation is to always add a sort when you want data sorted.

SQL query optimizers can easily remove the sorting step when it's not needed (or chose a query plan that's cheaper with an implicit sort than one that would be cheaper without a sort but for which the sort is expensive).
Query plans can also change from release to release, which could change the order of the results, absent an explicit sort. The real signal to look for is a change in query cost and query performance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree Stephane
I added an explicit test that is taking a large amount of records (> single page) and makes sure records are received in order.
Because the TID Range Scan for newer postgres versions was created with the purpose of making an optimized scan, I'd prefer to not add an expensive sort. there can be millions and millions of records on each chunk we read

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rodireich but if pg 14+ is already sorting when issuing a WHERE ctid > '0,0' AND ctid <= '(131000,0)' scan I agree with @stephane-airbyte that we should add an explicit ORDER BY here.

There are 2 scenarios :

  1. It's already ordering by ctid in which case the query optimizer should ignore this.
  2. It isn't doing already doing this and the ORDER BY query adds significant latency.

If the order by for PG14+ adds significant latency, we can always :

  1. Tune chunk size so that the ORDER BY occurs in memory.
  2. Checkpoint at the end of every chunk and keep track of the largest CTID entry (and avoid the ORDER BY query at all)

I'm worried about a similar case in PG14+ where there are some cases where hte TID range scan is not returning records in order.

.formatted(
wrappedColumnNames, fullTableName);
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.PostgreSQLContainer;
Expand Down Expand Up @@ -852,4 +853,46 @@ public void testJdbcOptionsParameter() throws Exception {
}
}

@Test
@DisplayName("Make sure initial incremental load is reading records in a certain order")
void testReadIncrementalRecordOrder() throws Exception {
final JsonNode config = getConfig(PSQL_DB, dbName);
// We want to test ordering, so we can delete the NaN entry
try (final DSLContext dslContext = getDslContext(config)) {
final Database database = getDatabase(dslContext);
database.query(ctx -> {
ctx.fetch("DELETE FROM id_and_name WHERE id = 'NaN';");
for (int i = 3; i < 1000; i++) {
ctx.fetch("INSERT INTO id_and_name (id, name, power) VALUES (%d, 'gohan%d', 222.1);".formatted(i, i));
}
return null;
});

final ConfiguredAirbyteCatalog configuredCatalog =
CONFIGURED_INCR_CATALOG
.withStreams(CONFIGURED_INCR_CATALOG.getStreams().stream().filter(s -> s.getStream().getName().equals(STREAM_NAME)).collect(
Collectors.toList()));
final PostgresSource source = new PostgresSource();
source.setStateEmissionFrequencyForDebug(1);
final List<AirbyteMessage> actualMessages = MoreIterators.toList(source.read(getConfig(PSQL_DB, dbName), configuredCatalog, null));
setEmittedAtToNull(actualMessages);

// final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessage(actualMessages);

setEmittedAtToNull(actualMessages);

final Set<AirbyteMessage> expectedOutput = Sets.newHashSet(
createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("1.0"), "name", "goku", "power", null)),
createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("2.0"), "name", "vegeta", "power", 9000.1)));
for (int i = 3; i < 1000; i++) {
expectedOutput.add(
createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("%d.0".formatted(i)), "name", "gohan%d".formatted(i), "power", 222.1)));
}
assertThat(actualMessages.contains(expectedOutput));
// Assert that the Postgres source is emitting records & state messages in the correct order.
assertCorrectRecordOrderForIncrementalSync(actualMessages, "id", JsonSchemaPrimitive.NUMBER, configuredCatalog,
new AirbyteStreamNameNamespacePair("id_and_name", "public"));
}
}

}
Loading