-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Destination BigQuery: Consolidation of objects to StreamConfig, cleanup #38131
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
This stack of pull requests is managed by Graphite. Learn more about stacking. |
b43ce70
to
dc2606b
Compare
dc2606b
to
f6903b7
Compare
f6903b7
to
cf090a7
Compare
cf090a7
to
9c0dfc5
Compare
0724d77
to
0ded7a7
Compare
9c0dfc5
to
4328f0a
Compare
0ded7a7
to
23a237f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple minor comments, nothing blocking (and some of them are about the CDK PR this is stacked on :P )
@@ -234,7 +236,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN | |||
final String datasetLocation = BigQueryUtils.getDatasetLocation(config); | |||
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation); | |||
final Optional<String> rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET); | |||
final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation, rawNamespaceOverride); | |||
final ParsedCatalog parsedCatalog = parseCatalog(sqlGenerator, defaultNamespace, | |||
rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE), catalog); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
random thought: now that we're in kotlin, do you want to make catalogparser accept rawNamespace: String?
and do rawNamespace ?: DEFAULT_AIRBYTE_INTERNAL
? then we don't need to copy this logic into every connector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah makes sense to me.
final ParsedCatalog parsedCatalog, | ||
final Function<JsonNode, BigQueryRecordFormatter> recordFormatterCreator, | ||
final Function<String, String> tmpTableNameTransformer) { | ||
private Map<StreamDescriptor, StreamConfig> createWriteConfigs(final ConfiguredAirbyteCatalog catalog, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this just be parsedCatalog.getStreams().stream()
? afaict we don't actually need the raw protocol models for anything
(and then we don't need to plumb the raw configured catalog into this method)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, but the called method iterated over the Configuredcatalog and populates the map. Didn't want to change any functional logic for the fear of missing some defaultNamespace plumbing. up the stack the whole method will be removed.
// In Destinations V2, we will always use the 'airbyte' schema/namespace for raw tables | ||
BigQueryRecordFormatter.SCHEMA_V2, streamConfig.getId().getOriginalName(), | ||
tableId, streamConfig.getId().getOriginalName()); | ||
// In Destinations V2, we will always use the 'airbyte' schema/originalNamespace for raw tables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// In Destinations V2, we will always use the 'airbyte' schema/originalNamespace for raw tables | |
// In Destinations V2, we will always use the 'airbyte_internal' schema/originalNamespace for raw tables |
😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
DestinationSyncMode.APPEND_DEDUP, | ||
List.of(new ColumnId("foo", "bar", "fizz")), | ||
Optional.empty(), | ||
new LinkedHashMap<>()); | ||
new LinkedHashMap<>(), 0, 0, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.... do you want me to set default values for the generation/sync ID args? These diffs seem kind of dumb
4328f0a
to
3de0d51
Compare
3de0d51
to
6bf8684
Compare
What
Removing redundant references and duplicate information passed around using
WriteConfig
objects. No functional changes and resurrected all the information needed throughStreamConfig
and adapted changes accordingly.This PR should be in a mergeable state with no functional changes after the ones down the stack are published.
Review guide
BigQueryWriteConfig
and reused already builtStreamConfig
StagingOperations
interface and made concrete class, this will help for later adding a shim on this and refactoring without large changesWriteDispostion
etc. Probably remnant of bigquery-denormalized bespoke connector.User Impact
Can this PR be safely reverted and rolled back?