Skip to content

Commit

Permalink
Merge branch 'master' of github.com:airbytehq/airbyte into ykurochkin…
Browse files Browse the repository at this point in the history
…/cdk/enable-debug-logging-when-running-availability-check
  • Loading branch information
yevhenii-ldv committed Oct 12, 2023
2 parents 18b7a1f + 2f6dcbc commit 1f30f87
Show file tree
Hide file tree
Showing 405 changed files with 9,500 additions and 4,550 deletions.
62 changes: 62 additions & 0 deletions .github/ISSUE_TEMPLATE/2-issue-platform.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
name: 🐛 Report a platform, infra or deployment bug
description: Use this template when you have a problem operating Airbyte platform
labels: [type/bug, area/platform, needs-triage]
body:
- type: markdown
attributes:
value: >
<p align="center">
<a target="_blank" href="https://airbyte.com">
<image>
<source srcset="https://raw.githubusercontent.com/airbytehq/airbyte/master/.github/octavia-issue-template.svg">
<img alt="octavia-welcome" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/.github/octavia-issue-template.svg" width="auto" height="120">
</image>
</a>
</p>
- type: markdown
attributes:
value: |
Thanks for taking the time to fill out this bug report...
Make sure to update this issue with a concise title and provide all information you have to
help us debug the problem together. Issues not following the template will be closed.
- type: dropdown
id: deploy
validations:
required: true
attributes:
label: What method are you using to run Airbyte?
multiple: false
options:
- Docker
- Kubernetes
- type: input
id: platform-version
attributes:
label: Platform Version or Helm Chart Version
description: "Some examples are: (eg. 0.44.1, 0.30.0), you can find the version in the left bottom in Airbyte UI or in the .env / value.yaml file"
validations:
required: true
- type: dropdown
id: step
attributes:
label: What step the error happened?
multiple: false
options:
- On deploy
- During the Sync
- Upgrading the Platform or Helm Chart
- Other
- type: textarea
id: description
attributes:
label: Revelant information
description: Please give any additional information you have and steps to reproduce the problem.
- type: textarea
id: logs
attributes:
label: Relevant log output
description: |
Please copy and paste any relevant log output.
This will be automatically formatted into code, so no need for backticks.
We strongly recommend to upload the log file for further debugging.
render: shell
14 changes: 7 additions & 7 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ If this is a community PR, the Airbyte engineer reviewing this PR is responsible
### Airbyter

Before merging:
- [ ] Pull Request description explains what problem it is solving
- [ ] Code change is unit tested
- [ ] Build and my-py check pass
- [ ] Smoke test the change on at least one affected connector
- Pull Request description explains what problem it is solving
- Code change is unit tested
- Build and my-py check pass
- Smoke test the change on at least one affected connector
- On Github: Run [this workflow](https://github.com/airbytehq/airbyte/actions/workflows/connectors_tests.yml), passing `--use-local-cdk --name=source-<connector>` as options
- Locally: `airbyte-ci connectors --use-local-cdk --name=source-<connector> test`
- [ ] PR is reviewed and approved
- PR is reviewed and approved

After merging:
- [ ] [Publish the CDK](https://github.com/airbytehq/airbyte/actions/workflows/publish-cdk-command-manually.yml)
- [Publish the CDK](https://github.com/airbytehq/airbyte/actions/workflows/publish-cdk-command-manually.yml)
- The CDK does not follow proper semantic versioning. Choose minor if this the change has significant user impact or is a breaking change. Choose patch otherwise.
- Write a thoughtful changelog message so we know what was updated.
- [ ] Merge the platform PR that was auto-created for updating the Connector Builder's CDK version
- Merge the platform PR that was auto-created for updating the Connector Builder's CDK version
- This step is optional if the change does not affect the connector builder or declarative connectors.

</details>
2 changes: 1 addition & 1 deletion .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ jobs:
> :warning: The publish slash command is now deprecated.<br>
The connector publication happens on merge to the master branch.<br>
Please use /legacy-publish if you need to publish normalization images.<br>
Please join the #publish-on-merge-updates slack channel to track ongoing publish pipelines.<br>
Please join the #connector-publish-updates slack channel to track ongoing publish pipelines.<br>
Please reach out to the @dev-connector-ops team if you need support in publishing a connector.
15 changes: 10 additions & 5 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,14 @@ MavenLocal debugging steps:

### Java CDK

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------ |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.1.8 | 2023-10-11 | [\#31322](https://github.com/airbytehq/airbyte/pull/31322) | Cap log line length to 32KB to prevent loss of records |
| 0.1.7 | 2023-10-10 | [\#31194](https://github.com/airbytehq/airbyte/pull/31194) | Deallocate unused per stream buffer memory when empty |
| 0.1.6 | 2023-10-10 | [\#31083](https://github.com/airbytehq/airbyte/pull/31083) | Fix precision of numeric values in async destinations |
| 0.1.5 | 2023-10-09 | [\#31196](https://github.com/airbytehq/airbyte/pull/31196) | Update typo in CDK (CDN_LSN -> CDC_LSN) |
| 0.1.4 | 2023-10-06 | [\#31139](https://github.com/airbytehq/airbyte/pull/31139) | Reduce async buffer |
| 0.1.1 | 2023-09-28 | [\#30835](https://github.com/airbytehq/airbyte/pull/30835) | JDBC destinations now avoid staging area name collisions by using the raw table name as the stage name. (previously we used the stream name as the stage name) |
| 0.1.0 | 2023-09-27 | [\#30445](https://github.com/airbytehq/airbyte/pull/30445) | First launch, including shared classes for all connectors. |
| 0.0.2 | 2023-08-21 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Version bump only (no other changes). |
| 0.0.1 | 2023-08-08 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Initial release for testing. |
| 0.1.0 | 2023-09-27 | [\#30445](https://github.com/airbytehq/airbyte/pull/30445) | First launch, including shared classes for all connectors. |
| 0.0.2 | 2023-08-21 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Version bump only (no other changes). |
| 0.0.1 | 2023-08-08 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Initial release for testing. |
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void accept(final String messageString, final Integer sizeInBytes) throws
public static PartialAirbyteMessage deserializeAirbyteMessage(final String messageString) {
// TODO: (ryankfu) plumb in the serialized AirbyteStateMessage to match AirbyteRecordMessage code
// parity. https://github.com/airbytehq/airbyte/issues/27530 for additional context
final var partial = Jsons.tryDeserialize(messageString, PartialAirbyteMessage.class)
final var partial = Jsons.tryDeserializeExact(messageString, PartialAirbyteMessage.class)
.orElseThrow(() -> new RuntimeException("Unable to deserialize PartialAirbyteMessage."));

final var msgType = partial.getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class GlobalMemoryManager {
// In cases where a queue is rapidly expanding, a larger block size allows less allocation calls. On
// the flip size, a smaller block size allows more granular memory management. Since this overhead
// is minimal for now, err on a smaller block sizes.
public static final long BLOCK_SIZE_BYTES = 30 * 1024 * 1024; // 30MB
public static final long BLOCK_SIZE_BYTES = 10 * 1024 * 1024; // 10MB
private final long maxMemoryBytes;

private final AtomicLong currentMemoryBytes = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,19 @@ public MemoryAwareMessageBatch take(final StreamDescriptor streamDescriptor, fin
}
}

queue.addMaxMemory(-bytesRead.get());
if (queue.isEmpty()) {
final var batchSizeBytes = bytesRead.get();
final var allocatedBytes = queue.getMaxMemoryUsage();

// Free unused allocation for the queue.
// When the batch flushes it will flush its allocation.
memoryManager.free(allocatedBytes - batchSizeBytes);

// Shrink queue to 0 — any new messages will reallocate.
queue.addMaxMemory(-allocatedBytes);
} else {
queue.addMaxMemory(-bytesRead.get());
}

return new MemoryAwareMessageBatch(
output,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.integrations.destination_async.buffers;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.cdk.integrations.destination_async.AirbyteFileUtils;
import io.airbyte.cdk.integrations.destination_async.FlushWorkers;
import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager;
Expand Down Expand Up @@ -61,6 +62,16 @@ public GlobalAsyncStateManager getStateManager() {
return stateManager;
}

@VisibleForTesting
protected GlobalMemoryManager getMemoryManager() {
return memoryManager;
}

@VisibleForTesting
protected ConcurrentMap<StreamDescriptor, StreamAwareQueue> getBuffers() {
return buffers;
}

public BufferEnqueue getBufferEnqueue() {
return bufferEnqueue;
}
Expand Down Expand Up @@ -95,8 +106,9 @@ private void printQueueInfo() {
for (final var entry : buffers.entrySet()) {
final var queue = entry.getValue();
queueInfo.append(
String.format(" Queue name: %s, num records: %d, num bytes: %s",
entry.getKey().getName(), queue.size(), AirbyteFileUtils.byteCountToDisplaySize(queue.getCurrentMemoryUsage())))
String.format(" Queue name: %s, num records: %d, num bytes: %s, allocated bytes: %s",
entry.getKey().getName(), queue.size(), AirbyteFileUtils.byteCountToDisplaySize(queue.getCurrentMemoryUsage()),
AirbyteFileUtils.byteCountToDisplaySize(queue.getMaxMemoryUsage())))
.append(System.lineSeparator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> poll(final long timeout, f
return hiddenQueue.poll(timeout, unit);
}

public long getMaxMemoryUsage() {
return hiddenQueue.getMaxMemoryUsage();
}

/**
* Extends LinkedBlockingQueue so that we can get a LinkedBlockingQueue bounded by memory. Hidden as
* an inner class, so it doesn't get misused, see top-level javadoc comment.
Expand All @@ -82,6 +86,10 @@ public HiddenQueue(final long maxMemoryUsage) {
this.maxMemoryUsage = new AtomicLong(maxMemoryUsage);
}

public long getMaxMemoryUsage() {
return maxMemoryUsage.get();
}

public boolean offer(final E e, final long itemSizeInBytes) {
final long newMemoryUsage = currentMemoryUsage.addAndGet(itemSizeInBytes);
if (newMemoryUsage <= maxMemoryUsage.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,18 @@ public long getCurrentMemoryUsage() {
return memoryAwareQueue.getCurrentMemoryUsage();
}

public long getMaxMemoryUsage() {
return memoryAwareQueue.getMaxMemoryUsage();
}

public void addMaxMemory(final long maxMemoryUsage) {
memoryAwareQueue.addMaxMemory(maxMemoryUsage);
}

public boolean isEmpty() {
return memoryAwareQueue.size() == 0;
}

public Optional<Instant> getTimeOfLastMessage() {
// if the queue is empty, the time of last message is irrelevant
if (size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

<Appenders>
<Console name="ConsoleJSONAppender" target="SYSTEM_OUT">
<JsonTemplateLayout eventTemplateUri="classpath:AirbyteLogMessageTemplate.json"/>
<!--#30781 - we set log4j's ConsoleAppender to use a buffer of 32KB. In order to ensure that the log write out is atomic-->
<!-- We cap both the log message and the printed stacktrace to 16KB bytes each. combined with other json character-->
<!-- this will ensure that we stay below 32768 characters-->
<JsonTemplateLayout eventTemplateUri="classpath:AirbyteLogMessageTemplate.json" maxStringLength="16000"/>
</Console>
</Appenders>
<Loggers>
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.1.3
version=0.1.8
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.platform.commons.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -107,4 +109,25 @@ private AirbyteLogMessage validateAirbyteMessageIsLog(final AirbyteMessage airby
return airbyteMessage.getLog();
}

@ParameterizedTest
@ValueSource(ints = {2, 100, 9000})
public void testAirbyteLogMessageLength(int stringRepeatitions) throws java.io.IOException {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < stringRepeatitions; i++) {
sb.append("abcd");
}
LOGGER.info(sb.toString(), new RuntimeException("aaaaa bbbbbb ccccccc dddddd"));
outputContent.flush();
final String logMessage = outputContent.toString(StandardCharsets.UTF_8);

final AirbyteMessage airbyteMessage = validateLogIsAirbyteMessage(logMessage);
final AirbyteLogMessage airbyteLogMessage = validateAirbyteMessageIsLog(airbyteMessage);
final String connectorLogMessage = airbyteLogMessage.getMessage();

// #30781 - message length is capped at 16,000 charcters.
int j = connectorLogMessage.length();
assertFalse(connectorLogMessage.length() > 16_001);
assertTrue(logMessage.length() < 32768);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -239,6 +240,22 @@ void deserializeAirbyteMessageWithAirbyteRecord() {
assertEquals(airbyteRecordString, partial.getSerialized());
}

@Test
void deserializeAirbyteMessageWithBigDecimalAirbyteRecord() {
final JsonNode payload = Jsons.jsonNode(Map.of(
"foo", new BigDecimal("1234567890.1234567890")));
final AirbyteMessage airbyteMessage = new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(STREAM_NAME)
.withNamespace(SCHEMA_NAME)
.withData(payload));
final String serializedAirbyteMessage = Jsons.serialize(airbyteMessage);
final String airbyteRecordString = Jsons.serialize(payload);
final PartialAirbyteMessage partial = AsyncStreamConsumer.deserializeAirbyteMessage(serializedAirbyteMessage);
assertEquals(airbyteRecordString, partial.getSerialized());
}

@Test
void deserializeAirbyteMessageWithEmptyAirbyteRecord() {
final Map emptyMap = Map.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ public class GlobalMemoryManagerTest {

@Test
void test() {
final GlobalMemoryManager mgr = new GlobalMemoryManager(35 * BYTES_MB);
final GlobalMemoryManager mgr = new GlobalMemoryManager(15 * BYTES_MB);

assertEquals(30 * BYTES_MB, mgr.requestMemory());
assertEquals(10 * BYTES_MB, mgr.requestMemory());
assertEquals(5 * BYTES_MB, mgr.requestMemory());
assertEquals(0, mgr.requestMemory());

mgr.free(10 * BYTES_MB);
assertEquals(10 * BYTES_MB, mgr.requestMemory());
mgr.free(31 * BYTES_MB);
assertEquals(30 * BYTES_MB, mgr.requestMemory());
mgr.free(16 * BYTES_MB);
assertEquals(10 * BYTES_MB, mgr.requestMemory());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.integrations.destination_async.buffers;

import static io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager.BLOCK_SIZE_BYTES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -132,4 +133,38 @@ void testMetadataOperationsError() {
assertTrue(dequeue.getTimeOfLastRecord(ghostStream).isEmpty());
}

@Test
void cleansUpMemoryForEmptyQueues() throws Exception {
final var bufferManager = new BufferManager();
final var enqueue = bufferManager.getBufferEnqueue();
final var dequeue = bufferManager.getBufferDequeue();
final var memoryManager = bufferManager.getMemoryManager();

// we initialize with a block for state
assertEquals(BLOCK_SIZE_BYTES, memoryManager.getCurrentMemoryBytes());

// allocate a block for new stream
enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES);
assertEquals(2 * BLOCK_SIZE_BYTES, memoryManager.getCurrentMemoryBytes());

enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES);
enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES);
enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES);

// no re-allocates as we haven't breached block size
assertEquals(2 * BLOCK_SIZE_BYTES, memoryManager.getCurrentMemoryBytes());

final var totalBatchSize = RECORD_SIZE_20_BYTES * 4;

// read the whole queue
try (final var batch = dequeue.take(STREAM_DESC, totalBatchSize)) {
// slop allocation gets cleaned up
assertEquals(BLOCK_SIZE_BYTES + totalBatchSize, memoryManager.getCurrentMemoryBytes());
batch.close();
// back to initial state after flush clears the batch
assertEquals(BLOCK_SIZE_BYTES, memoryManager.getCurrentMemoryBytes());
assertEquals(0, bufferManager.getBuffers().get(STREAM_DESC).getMaxMemoryUsage());
}
}

}
Loading

0 comments on commit 1f30f87

Please sign in to comment.