Skip to content

Commit

Permalink
feat(s3stream): introduce tracing to s3stream (#866)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Dec 28, 2023
1 parent 99ba37c commit 6264c96
Show file tree
Hide file tree
Showing 30 changed files with 876 additions and 103 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.9.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.10.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down Expand Up @@ -134,7 +134,7 @@
<version>${s3stream.version}</version>
<exclusions>
<exclusion>
<groupId>io.opentelemetry.instrumentation</groupId>
<groupId>io.opentelemetry.*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
Expand Down
52 changes: 48 additions & 4 deletions s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.9.0-SNAPSHOT</version>
<version>0.10.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand All @@ -34,7 +34,8 @@
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<alapha.opentelemetry.version>1.32.0-alpha</alapha.opentelemetry.version>
<opentelemetry.version>1.32.0</opentelemetry.version>
<aspectj.version>1.9.20.1</aspectj.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -113,10 +114,25 @@
<artifactId>jackson-databind</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-runtime-telemetry-java8</artifactId>
<version>${alapha.opentelemetry.version}</version>
<artifactId>opentelemetry-instrumentation-annotations</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>${aspectj.version}</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>${aspectj.version}</version>
</dependency>
</dependencies>

Expand Down Expand Up @@ -239,6 +255,34 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>dev.aspectj</groupId>
<artifactId>aspectj-maven-plugin</artifactId>
<version>1.13.1</version>
<dependencies>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjtools</artifactId>
<version>${aspectj.version}</version>
</dependency>
</dependencies>
<configuration>
<complianceLevel>${maven.compiler.target}</complianceLevel>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<showWeaveInfo>true</showWeaveInfo>
<verbose>true</verbose>
<Xlint>ignore</Xlint>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
Expand Down
12 changes: 9 additions & 3 deletions s3stream/src/main/java/com/automq/stream/api/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.automq.stream.api;

import com.automq.stream.api.exceptions.StreamClientException;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;

import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -54,7 +56,11 @@ public interface Stream {
* @return - complete success with async {@link AppendResult}, when append success.
* - complete exception with {@link StreamClientException}, when append fail. TODO: specify the exception.
*/
CompletableFuture<AppendResult> append(RecordBatch recordBatch);
CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch);

default CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
return append(AppendContext.DEFAULT, recordBatch);
}

/**
* Fetch recordBatch list from stream. Note the startOffset may be in the middle in the first recordBatch.
Expand All @@ -67,10 +73,10 @@ public interface Stream {
* @return - complete success with {@link FetchResult}, when fetch success.
* - complete exception with {@link StreamClientException}, when startOffset is bigger than stream end offset.
*/
CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytesHint, ReadOptions readOptions);
CompletableFuture<FetchResult> fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint);

default CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytesHint) {
return fetch(startOffset, endOffset, maxBytesHint, ReadOptions.DEFAULT);
return fetch(FetchContext.DEFAULT, startOffset, endOffset, maxBytesHint);
}

/**
Expand Down
41 changes: 28 additions & 13 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package com.automq.stream.s3;

import com.automq.stream.api.ReadOptions;
import com.automq.stream.api.exceptions.FastReadFailFastException;
import com.automq.stream.s3.cache.CacheAccessType;
import com.automq.stream.s3.cache.LogCache;
import com.automq.stream.s3.cache.ReadDataBlock;
import com.automq.stream.s3.cache.S3BlockCache;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
Expand All @@ -32,6 +33,7 @@
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.s3.trace.context.TraceContext;
import com.automq.stream.s3.wal.WriteAheadLog;
import com.automq.stream.utils.FutureTicker;
import com.automq.stream.utils.FutureUtil;
Expand All @@ -40,6 +42,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -259,14 +263,15 @@ public void shutdown() {


@Override
public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
@WithSpan
public CompletableFuture<Void> append(AppendContext context, StreamRecordBatch streamRecord) {
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<Void> cf = new CompletableFuture<>();
// encoded before append to free heap ByteBuf.
streamRecord.encoded();
WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, -1L, cf);
WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, -1L, cf, context);
handleAppendRequest(writeRequest);
append0(writeRequest, false);
append0(context, writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE);
Expand All @@ -279,7 +284,7 @@ public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
*
* @return backoff status.
*/
public boolean append0(WalWriteRequest request, boolean fromBackoff) {
public boolean append0(AppendContext context, WalWriteRequest request, boolean fromBackoff) {
// TODO: storage status check, fast fail the request when storage closed.
if (!fromBackoff && !backoffRecords.isEmpty()) {
backoffRecords.offer(request);
Expand All @@ -304,7 +309,7 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
Lock lock = confirmOffsetCalculator.addLock();
lock.lock();
try {
appendResult = deltaWAL.append(streamRecord.encoded());
appendResult = deltaWAL.append(new TraceContext(context), streamRecord.encoded());
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -344,7 +349,7 @@ private void tryDrainBackoffRecords() {
if (request == null) {
break;
}
if (append0(request, true)) {
if (append0(request.context, request, true)) {
LOGGER.warn("try drain backoff record fail, still backoff");
break;
}
Expand All @@ -356,20 +361,30 @@ private void tryDrainBackoffRecords() {
}

@Override
public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes, ReadOptions readOptions) {
@WithSpan
public CompletableFuture<ReadDataBlock> read(FetchContext context,
@SpanAttribute long streamId,
@SpanAttribute long startOffset,
@SpanAttribute long endOffset,
@SpanAttribute int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, readOptions), cf);
FutureUtil.propagate(read0(context, streamId, startOffset, endOffset, maxBytes), cf);
cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.READ_STORAGE));
return cf;
}

private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, long endOffset, int maxBytes, ReadOptions readOptions) {
List<StreamRecordBatch> logCacheRecords = deltaWALCache.get(streamId, startOffset, endOffset, maxBytes);
@WithSpan
private CompletableFuture<ReadDataBlock> read0(FetchContext context,
@SpanAttribute long streamId,
@SpanAttribute long startOffset,
@SpanAttribute long endOffset,
@SpanAttribute int maxBytes) {
List<StreamRecordBatch> logCacheRecords = deltaWALCache.get(context, streamId, startOffset, endOffset, maxBytes);
if (!logCacheRecords.isEmpty() && logCacheRecords.get(0).getBaseOffset() <= startOffset) {
return CompletableFuture.completedFuture(new ReadDataBlock(logCacheRecords, CacheAccessType.DELTA_WAL_CACHE_HIT));
}
if (readOptions.fastRead()) {
if (context.readOptions().fastRead()) {
// fast read fail fast when need read from block cache.
logCacheRecords.forEach(StreamRecordBatch::release);
logCacheRecords.clear();
Expand All @@ -380,7 +395,7 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset,
}
Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.warn("read from block cache timeout, stream={}, {}, maxBytes: {}", streamId, startOffset, maxBytes), 1, TimeUnit.MINUTES);
long finalEndOffset = endOffset;
return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(blockCacheRst -> {
return blockCache.read(context, streamId, startOffset, endOffset, maxBytes).thenApply(blockCacheRst -> {
List<StreamRecordBatch> rst = new ArrayList<>(blockCacheRst.getRecords());
int remainingBytesSize = maxBytes - rst.stream().mapToInt(StreamRecordBatch::size).sum();
int readIndex = -1;
Expand Down
30 changes: 20 additions & 10 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import com.automq.stream.RecordBatchWithContextWrapper;
import com.automq.stream.api.AppendResult;
import com.automq.stream.api.FetchResult;
import com.automq.stream.api.ReadOptions;
import com.automq.stream.api.RecordBatch;
import com.automq.stream.api.RecordBatchWithContext;
import com.automq.stream.api.Stream;
import com.automq.stream.api.exceptions.ErrorCode;
import com.automq.stream.api.exceptions.FastReadFailFastException;
import com.automq.stream.api.exceptions.StreamClientException;
import com.automq.stream.s3.cache.CacheAccessType;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
Expand All @@ -38,6 +39,8 @@
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.GlobalSwitch;
import io.netty.buffer.Unpooled;
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -139,7 +142,8 @@ public long nextOffset() {
}

@Override
public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
@WithSpan
public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil();
writeLock.lock();
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK);
Expand All @@ -148,7 +152,7 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
if (networkInboundLimiter != null) {
networkInboundLimiter.forceConsume(recordBatch.rawPayload().remaining());
}
return append0(recordBatch);
return append0(context, recordBatch);
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
Expand All @@ -161,13 +165,14 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
}
}

private CompletableFuture<AppendResult> append0(RecordBatch recordBatch) {
@WithSpan
private CompletableFuture<AppendResult> append0(AppendContext context, RecordBatch recordBatch) {
if (!status.isWritable()) {
return FutureUtil.failedFuture(new StreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is not writable"));
}
long offset = nextOffset.getAndAdd(recordBatch.count());
StreamRecordBatch streamRecordBatch = new StreamRecordBatch(streamId, epoch, offset, recordBatch.count(), Unpooled.wrappedBuffer(recordBatch.rawPayload()));
CompletableFuture<AppendResult> cf = storage.append(streamRecordBatch).thenApply(nil -> {
CompletableFuture<AppendResult> cf = storage.append(context, streamRecordBatch).thenApply(nil -> {
updateConfirmOffset(offset + recordBatch.count());
return new DefaultAppendResult(offset);
});
Expand All @@ -186,12 +191,16 @@ private CompletableFuture<AppendResult> append0(RecordBatch recordBatch) {
}

@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytes, ReadOptions readOptions) {
@WithSpan
public CompletableFuture<FetchResult> fetch(FetchContext context,
@SpanAttribute long startOffset,
@SpanAttribute long endOffset,
@SpanAttribute int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
readLock.lock();
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM_READ_LOCK);
try {
CompletableFuture<FetchResult> cf = exec(() -> fetch0(startOffset, endOffset, maxBytes, readOptions), LOGGER, "fetch");
CompletableFuture<FetchResult> cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM);
Expand All @@ -216,7 +225,8 @@ public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, in
}
}

private CompletableFuture<FetchResult> fetch0(long startOffset, long endOffset, int maxBytes, ReadOptions readOptions) {
@WithSpan
private CompletableFuture<FetchResult> fetch0(FetchContext context, long startOffset, long endOffset, int maxBytes) {
if (!status.isReadable()) {
return FutureUtil.failedFuture(new StreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is already closed"));
}
Expand All @@ -237,12 +247,12 @@ private CompletableFuture<FetchResult> fetch0(long startOffset, long endOffset,
if (startOffset == endOffset) {
return CompletableFuture.completedFuture(new DefaultFetchResult(Collections.emptyList(), CacheAccessType.DELTA_WAL_CACHE_HIT, false));
}
return storage.read(streamId, startOffset, endOffset, maxBytes, readOptions).thenApply(dataBlock -> {
return storage.read(context, streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> {
List<StreamRecordBatch> records = dataBlock.getRecords();
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("{} stream fetch, startOffset: {}, endOffset: {}, maxBytes: {}, records: {}", logIdent, startOffset, endOffset, maxBytes, records.size());
}
return new DefaultFetchResult(records, dataBlock.getCacheAccessType(), readOptions.pooledBuf());
return new DefaultFetchResult(records, dataBlock.getCacheAccessType(), context.readOptions().pooledBuf());
});
}

Expand Down
15 changes: 12 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package com.automq.stream.s3;

import com.automq.stream.api.ReadOptions;
import com.automq.stream.s3.cache.ReadDataBlock;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.model.StreamRecordBatch;

import java.util.concurrent.CompletableFuture;
Expand All @@ -37,9 +38,17 @@ public interface Storage {
*
* @param streamRecord {@link StreamRecordBatch}
*/
CompletableFuture<Void> append(StreamRecordBatch streamRecord);
CompletableFuture<Void> append(AppendContext context, StreamRecordBatch streamRecord);

CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes, ReadOptions readOptions);
default CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
return append(AppendContext.DEFAULT, streamRecord);
}

CompletableFuture<ReadDataBlock> read(FetchContext context, long streamId, long startOffset, long endOffset, int maxBytes);

default CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes) {
return read(FetchContext.DEFAULT, streamId, startOffset, endOffset, maxBytes);
}

/**
* Force stream record in WAL upload to s3
Expand Down
Loading

0 comments on commit 6264c96

Please sign in to comment.