Skip to content

Commit

Permalink
Merge branch 'master' into patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosmarxm authored Jul 12, 2024
2 parents 85e8b87 + 5725a3b commit 0ef7234
Show file tree
Hide file tree
Showing 122 changed files with 1,400 additions and 2,450 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.63.6
current_version = 0.63.7
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
99 changes: 99 additions & 0 deletions .github/workflows/approve-regression-tests-command.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
name: Approve Regression Tests
permissions:
pull-requests: write
statuses: write
on:
workflow_dispatch:
inputs:
pr:
description: "Pull request number. Used to pull the proper branch ref, including on forks."
type: number
required: false
comment-id:
description: "Optional. The comment-id of the slash command. Used to update the comment with the status."
required: false

# These must be declared, but they are unused and ignored.
# TODO: Infer 'repo' and 'gitref' from PR number on other workflows, so we can remove these.
repo:
description: "Repo (Ignored)"
required: false
default: "airbytehq/airbyte"
gitref:
description: "Ref (Ignored)"
required: false

run-name: "Approve Regression Tests #${{ github.event.inputs.pr }}"

jobs:
approve-regression-tests:
name: "Approve Regression Tests"
runs-on: ubuntu-latest
steps:
- name: Get job variables
id: job-vars
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
shell: bash
run: |
PR_JSON=$(gh api repos/${{ github.repository }}/pulls/${{ github.event.inputs.pr }})
echo "PR_JSON: $PR_JSON"
echo "repo=$(echo "$PR_JSON" | jq -r .head.repo.full_name)" >> $GITHUB_OUTPUT
BRANCH=$(echo "$PR_JSON" | jq -r .head.ref)
echo "branch=$BRANCH" >> $GITHUB_OUTPUT
echo "run-url=https://github.com/$GITHUB_REPOSITORY/actions/runs/$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
LATEST_COMMIT=$(gh api repos/${{ github.repository }}/commits/$BRANCH | jq -r .sha)
echo "latest_commit=$LATEST_COMMIT" >> $GITHUB_OUTPUT
- name: Append comment with job run link
# If comment-id is not provided, this will create a new
# comment with the job run link.
id: first-comment-action
uses: peter-evans/create-or-update-comment@v4
with:
comment-id: ${{ github.event.inputs.comment-id }}
issue-number: ${{ github.event.inputs.pr }}
body: |
> [Check job output.][1]
[1]: ${{ steps.job-vars.outputs.run-url }}
- name: Approve regression tests
id: approve
run: |
echo "approving ...."
response=$(curl --write-out '%{http_code}' --silent --output /dev/null \
--request POST \
--url ${{ github.api_url }}/repos/${{ github.repository }}/statuses/${{ steps.job-vars.outputs.latest_commit }} \
--header 'authorization: Bearer ${{ secrets.GITHUB_TOKEN }}' \
--header 'content-type: application/json' \
--data '{
"state": "success",
"context": "Regression Test Results Reviewed and Approved",
"target_url": "https://github.com/airbytehq/airbyte/tree/master/airbyte-ci/connectors/live-tests"
}')
if [ $response -ne 201 ]; then
echo "Failed to approve regression tests. HTTP status code: $response"
exit 1
else
echo "Regression tests approved."
fi
- name: Append success comment
uses: peter-evans/create-or-update-comment@v4
if: success()
with:
comment-id: ${{ steps.first-comment-action.outputs.comment-id }}
reactions: "+1"
body: |
> ✅ Approving regression tests
- name: Append failure comment
uses: peter-evans/create-or-update-comment@v4
if: failure()
with:
comment-id: ${{ steps.first-comment-action.outputs.comment-id }}
reactions: confused
body: |
> ❌ Regression test approval failed
2 changes: 1 addition & 1 deletion .github/workflows/connectors_up_to_date.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
inputs:
connectors-options:
description: "Options to pass to the 'airbyte-ci connectors' command group."
default: "--concurrency=10 --language=python --language=low-code"
default: "--concurrency=10 --language=python --language=low-code --language=manifest-only"
auto-merge:
description: "Whether to auto-merge the PRs created by the action."
default: "false"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/slash-commands.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ jobs:
test-performance
publish-java-cdk
connector-performance
approve-regression-tests
static-args: |
repo=${{ steps.getref.outputs.repo }}
gitref=${{ steps.getref.outputs.ref }}
Expand Down
2 changes: 2 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.41.1 | 2024-07-11 | [\#41212](https://github.com/airbytehq/airbyte/pull/41212) | Improve debezium logging. |
| 0.41.0 | 2024-07-11 | [\#38240](https://github.com/airbytehq/airbyte/pull/38240) | Sources : Changes in CDC interfaces to support WASS algorithm |
| 0.40.11 | 2024-07-08 | [\#41041](https://github.com/airbytehq/airbyte/pull/41041) | Destinations: Fix truncate refreshes incorrectly discarding data if successful attempt had 0 records |
| 0.40.10 | 2024-07-05 | [\#40719](https://github.com/airbytehq/airbyte/pull/40719) | Update test to refrlect isResumable field in catalog |
| 0.40.9 | 2024-07-01 | [\#39473](https://github.com/airbytehq/airbyte/pull/39473) | minor changes around error logging and testing |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.40.11
version=0.41.1
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicLong

private val LOGGER = KotlinLogging.logger {}
/**
Expand All @@ -38,29 +39,31 @@ class AirbyteDebeziumHandler<T>(
) {
internal inner class CapacityReportingBlockingQueue<E>(capacity: Int) :
LinkedBlockingQueue<E>(capacity) {
private var lastReport: Instant? = null
private var lastReport: Instant = Instant.MIN
private var puts = AtomicLong()
private var polls = AtomicLong()

private fun reportQueueUtilization() {
if (
lastReport == null ||
Duration.between(lastReport, Instant.now())
.compareTo(Companion.REPORT_DURATION) > 0
) {
private fun reportQueueUtilization(put: Long = 0L, poll: Long = 0L) {
if (Duration.between(lastReport, Instant.now()) > REPORT_DURATION) {
LOGGER.info {
"CDC events queue size: ${this.size}. remaining ${this.remainingCapacity()}"
"CDC events queue stats: " +
"size=${this.size}, " +
"cap=${this.remainingCapacity()}, " +
"puts=${puts.addAndGet(put)}, " +
"polls=${polls.addAndGet(poll)}"
}
synchronized(this) { lastReport = Instant.now() }
}
}

@Throws(InterruptedException::class)
override fun put(e: E) {
reportQueueUtilization()
reportQueueUtilization(put = 1L)
super.put(e)
}

override fun poll(): E {
reportQueueUtilization()
reportQueueUtilization(poll = 1L)
return super.poll()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.debezium.engine.ChangeEvent
import io.github.oshai.kotlinlogging.KotlinLogging
import java.lang.reflect.Field
import java.time.Duration
import java.time.Instant
import java.time.LocalDateTime
import java.util.*
import java.util.concurrent.*
Expand Down Expand Up @@ -51,34 +52,60 @@ class DebeziumRecordIterator<T>(
private var lastHeartbeatPosition: T? = null
private var maxInstanceOfNoRecordsFound = 0
private var signalledDebeziumEngineShutdown = false
private var numUnloggedPolls: Int = -1
private var lastLoggedPoll: Instant = Instant.MIN

// The following logic incorporates heartbeat (CDC postgres only for now):
// The following logic incorporates heartbeat:
// 1. Wait on queue either the configured time first or 1 min after a record received
// 2. If nothing came out of queue finish sync
// 3. If received heartbeat: check if hearbeat_lsn reached target or hasn't changed in a while
// finish sync
// 4. If change event lsn reached target finish sync
// 5. Otherwise check message queuen again
// 5. Otherwise check message queue again
override fun computeNext(): ChangeEventWithMetadata? {
// keep trying until the publisher is closed or until the queue is empty. the latter case is
// possible when the publisher has shutdown but the consumer has not yet processed all
// messages it
// emitted.
// messages it emitted.
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
val next: ChangeEvent<String?, String?>?

val waitTime =
if (receivedFirstRecord) this.subsequentRecordWaitTime else this.firstRecordWaitTime
val instantBeforePoll: Instant = Instant.now()
try {
next = queue.poll(waitTime.seconds, TimeUnit.SECONDS)
} catch (e: InterruptedException) {
throw RuntimeException(e)
}
val instantAfterPoll: Instant = Instant.now()
val isEventLogged: Boolean =
numUnloggedPolls >= POLL_LOG_MAX_CALLS_INTERVAL - 1 ||
Duration.between(lastLoggedPoll, instantAfterPoll) > pollLogMaxTimeInterval ||
next == null ||
isHeartbeatEvent(next)
if (isEventLogged) {
val pollDuration: Duration = Duration.between(instantBeforePoll, Instant.now())
LOGGER.info {
"CDC events queue poll(): " +
when (numUnloggedPolls) {
-1 -> "blocked for $pollDuration in its first call."
0 ->
"blocked for $pollDuration after " +
"its previous call which was also logged."
else ->
"blocked for $pollDuration after " +
"$numUnloggedPolls previous call(s) which were not logged."
}
}
numUnloggedPolls = 0
lastLoggedPoll = instantAfterPoll
} else {
numUnloggedPolls++
}

// if within the timeout, the consumer could not get a record, it is time to tell the
// producer to
// shutdown.
// producer to shutdown.
if (next == null) {
LOGGER.info { "CDC events queue poll(): returned nothing." }
if (
!receivedFirstRecord || hasSnapshotFinished || maxInstanceOfNoRecordsFound >= 10
) {
Expand All @@ -90,17 +117,35 @@ class DebeziumRecordIterator<T>(
DebeziumCloseReason.TIMEOUT
)
}
LOGGER.info { "no record found. polling again." }

maxInstanceOfNoRecordsFound++
LOGGER.info {
"CDC events queue poll(): " +
"returned nothing, polling again, attempt $maxInstanceOfNoRecordsFound."
}
continue
}

if (isHeartbeatEvent(next)) {
if (!hasSnapshotFinished) {
LOGGER.info {
"CDC events queue poll(): " +
"returned a heartbeat event while snapshot is not finished yet."
}
continue
}

val heartbeatPos = getHeartbeatPosition(next)
val isProgressing = heartbeatPos != lastHeartbeatPosition
LOGGER.info {
"CDC events queue poll(): " +
"returned a heartbeat event: " +
if (isProgressing) {
"progressing to $heartbeatPos."
} else {
"no progress since last heartbeat."
}
}
// wrap up sync if heartbeat position crossed the target OR heartbeat position
// hasn't changed for
// too long
Expand All @@ -118,7 +163,7 @@ class DebeziumRecordIterator<T>(
)
}

if (heartbeatPos != lastHeartbeatPosition) {
if (isProgressing) {
this.tsLastHeartbeat = LocalDateTime.now()
this.lastHeartbeatPosition = heartbeatPos
}
Expand All @@ -128,6 +173,14 @@ class DebeziumRecordIterator<T>(
val changeEventWithMetadata = ChangeEventWithMetadata(next)
hasSnapshotFinished = !changeEventWithMetadata.isSnapshotEvent

if (isEventLogged) {
val source: JsonNode? = changeEventWithMetadata.eventValueAsJson()["source"]
LOGGER.info {
"CDC events queue poll(): " +
"returned a change event with \"source\": $source."
}
}

// if the last record matches the target file position, it is time to tell the producer
// to shutdown.
if (targetPosition.reachedTargetPosition(changeEventWithMetadata)) {
Expand All @@ -137,6 +190,9 @@ class DebeziumRecordIterator<T>(
)
}
this.tsLastHeartbeat = null
if (!receivedFirstRecord) {
LOGGER.info { "Received first record from debezium." }
}
this.receivedFirstRecord = true
this.maxInstanceOfNoRecordsFound = 0
return changeEventWithMetadata
Expand Down Expand Up @@ -197,15 +253,21 @@ class DebeziumRecordIterator<T>(
}

private fun heartbeatPosNotChanging(): Boolean {
// Closing debezium due to heartbeat position not changing only exists as an escape hatch
// for
// testing setups. In production, we rely on the platform heartbeats to kill the sync
if (!isTest() || this.tsLastHeartbeat == null) {
if (this.tsLastHeartbeat == null) {
return false
} else if (!isTest() && receivedFirstRecord) {
// Closing debezium due to heartbeat position not changing only exists as an escape
// hatch
// for testing setups. In production, we rely on the platform heartbeats to kill the
// sync
// ONLY if we haven't received a record from Debezium. If a record has not been received
// from Debezium and the heartbeat isn't changing, the sync should be shut down due to
// heartbeat position not changing.
return false
}
val timeElapsedSinceLastHeartbeatTs =
Duration.between(this.tsLastHeartbeat, LocalDateTime.now())
return timeElapsedSinceLastHeartbeatTs.compareTo(firstRecordWaitTime.dividedBy(2)) > 0
return timeElapsedSinceLastHeartbeatTs.compareTo(firstRecordWaitTime) > 0
}

private fun requestClose(closeLogMessage: String, closeReason: DebeziumCloseReason) {
Expand Down Expand Up @@ -272,5 +334,8 @@ class DebeziumRecordIterator<T>(
HEARTBEAT_NOT_PROGRESSING
}

companion object {}
companion object {
val pollLogMaxTimeInterval: Duration = Duration.ofSeconds(5)
const val POLL_LOG_MAX_CALLS_INTERVAL = 1_000
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,12 @@ abstract class AbstractJdbcSource<Datatype>(
)
return augmentWithStreamStatus(
airbyteStream,
initialLoadHandler.getIteratorForStream(airbyteStream, table, Instant.now())
initialLoadHandler.getIteratorForStream(
airbyteStream,
table,
Instant.now(),
Optional.empty()
)
)
}

Expand Down
Loading

0 comments on commit 0ef7234

Please sign in to comment.