-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Destination BigQuery: Adapt to newer interface for Sync operations #38132
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
This stack of pull requests is managed by Graphite. Learn more about stacking. |
dc2606b
to
f6903b7
Compare
cf2bbfc
to
1a9f866
Compare
f6903b7
to
cf090a7
Compare
1a9f866
to
3a389ad
Compare
0460083
to
27931dc
Compare
27931dc
to
96ad3be
Compare
96ad3be
to
b64a0d5
Compare
b64a0d5
to
58a377a
Compare
58a377a
to
0ee722d
Compare
0ee722d
to
0d6dcd2
Compare
16b6c02
to
3488a4c
Compare
3488a4c
to
025c536
Compare
final Job completedJob = job.waitFor(RetryOption.totalTimeout(Duration.ofMinutes(30))); | ||
if (completedJob == null) { | ||
// job no longer exists | ||
LOGGER.warn("Job {} No longer exists", job.getJobId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does that mean when a job doesn't exist? Should we throw an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure when this case occurs, (followed javadoc implementation suggestion). Earlier we aren't even using the returned completedJob
value and their status.
@@ -98,7 +100,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND) | |||
FROM ${raw_table} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be worth explaining why we do a substraction. I'm not sure myself, TBH
See this comment inline on Graphite.
@@ -112,7 +114,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND) | |||
SELECT MAX(_airbyte_extracted_at) | |||
FROM ${raw_table} | |||
""")) | |||
.build()).iterateAll().iterator().next().get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is ugly as sin. Are you sure we didn't forget to iterate on the iterateAll iterator?
@@ -192,8 +194,8 @@ public void execute(final Sql sql) throws InterruptedException { | |||
} | |||
|
|||
@Override | |||
public List<DestinationInitialStatus<Impl>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for removing this Impl class...
|
||
override fun check(config: JsonNode): AirbyteConnectionStatus? { | ||
try { | ||
val datasetId = getDatasetId(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we should explicitely state the types of those objects.
* check. | ||
*/ | ||
private fun checkGcsPermission(config: JsonNode): AirbyteConnectionStatus? { | ||
val loadingMethod = config[bqConstants.LOADING_METHOD] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh this is ugly. BigQueryUtils.getLoadingMethod
returns an enum, but here we get a jsonNode from the same field that's used by getLoadingMethod
... Something to clean up and figure out I guess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. Didn't take a pass at cleaning the configs in this PR because BigQuery is one hairy config extraction in BigQueryUtils.
val missingPermissions: MutableList<String> = ArrayList() | ||
|
||
try { | ||
val credentials = getServiceAccountCredentials(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this all really should be in the CDK...
package io.airbyte.integrations.destination.bigquery | ||
|
||
import com.codepoetics.protonpack.Indexed | ||
import com.codepoetics.protonpack.StreamUtils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want a dependency on some obscure package that hasn't been maintained in 3 years?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tried removing but it has some zip the stream method which we have to implement or use alternative (not difficult just ignored in this PR). Will take a pass later in the refreshes PR.
storage.testIamPermissions(bucketName, REQUIRED_PERMISSIONS) | ||
|
||
missingPermissions.addAll( | ||
StreamUtils.zipWithIndex(permissionsCheckStatusList.stream()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does permissionsCheckStatuslist.withIndex
work here? This function is an example of a place where types are far from obvious and should be set explicitely.
I'd rather convert the returned list into a Map<Permission, IsEnabled>
than going index-based: REQUIRED_PERMISSIONS.zip(permissionsCheckStatusList).toMap()
. Then, missingPermissions becomes map.filterValues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, see above comment about StreamUtils
) | ||
|
||
log.error(e) { message.toString() } | ||
throw ConfigErrorException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why aren't we reusing the message here?
* @param catalog | ||
* - schema of the incoming messages. | ||
*/ | ||
override fun getConsumer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stale comment?
// deserialize it. Otherwise, we will let the Google library find it in | ||
// the environment during the client initialization. | ||
if (serviceAccountKey.isTextual) { | ||
// There are cases where we fail to deserialize the service account key. In these |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we ever try to deserialize the string ourselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the most ugly one where the serviceAccountKey was at one point text vs json. so trying to deduce which one it is.
...bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt
Show resolved
Hide resolved
it looks like we both converted from java to kotlin and added some logic in the same PR. Am I correct? Any chance we could split those 2? |
aa20464
to
c59d16c
Compare
No conversion from Kotlin, complete rewrite and delete of the old classes. Originally was in 2 stacks then folder because Ed was obsolete classes which are deleted down the stack anyway |
c59d16c
to
a5ec528
Compare
a5ec528
to
30e1e6a
Compare
What
Adapting BigQuery to use #38107
Review guide
Deleted old code which is no longer activated or used under
src/main/java
.Minor changes based on CDK signature changes in existing code which isn't deleted.
New code in
src/main/kotlin
User Impact
Can this PR be safely reverted and rolled back?