get(long streamId, long startOffset, long endOffset, int maxBytes) {
+ return get(TraceContext.DEFAULT, streamId, startOffset, endOffset, maxBytes);
+ }
+
/**
* Get streamId [startOffset, endOffset) range records with maxBytes limit.
*
@@ -124,7 +131,13 @@ public boolean put(StreamRecordBatch recordBatch) {
*
* Note: the records is retained, the caller should release it.
*/
- public List get(long streamId, long startOffset, long endOffset, int maxBytes) {
+ @WithSpan
+ public List get(TraceContext context,
+ @SpanAttribute long streamId,
+ @SpanAttribute long startOffset,
+ @SpanAttribute long endOffset,
+ @SpanAttribute int maxBytes) {
+ context.currentContext();
TimerUtil timerUtil = new TimerUtil();
List records;
readLock.lock();
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/S3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/S3BlockCache.java
index 060cc7600..161780a29 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/S3BlockCache.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/S3BlockCache.java
@@ -17,6 +17,8 @@
package com.automq.stream.s3.cache;
+import com.automq.stream.s3.trace.context.TraceContext;
+
import java.util.concurrent.CompletableFuture;
/**
@@ -26,5 +28,9 @@
*/
public interface S3BlockCache {
- CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes);
+ CompletableFuture read(TraceContext context, long streamId, long startOffset, long endOffset, int maxBytes);
+
+ default CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes) {
+ return read(TraceContext.DEFAULT, streamId, startOffset, endOffset, maxBytes);
+ }
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
index 06640b21e..8981fdc83 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
@@ -26,7 +26,12 @@
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
+import com.automq.stream.s3.trace.TraceUtils;
+import com.automq.stream.s3.trace.context.TraceContext;
import com.automq.stream.utils.Threads;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.instrumentation.annotations.SpanAttribute;
+import io.opentelemetry.instrumentation.annotations.WithSpan;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
@@ -105,8 +110,12 @@ public void shutdown() {
errorHandlerExecutor.shutdown();
}
- public CompletableFuture> syncReadAhead(long streamId, long startOffset, long endOffset,
- int maxBytes, ReadAheadAgent agent, UUID uuid) {
+ @WithSpan
+ public CompletableFuture> syncReadAhead(TraceContext traceContext,
+ @SpanAttribute long streamId,
+ @SpanAttribute long startOffset,
+ @SpanAttribute long endOffset,
+ @SpanAttribute int maxBytes, ReadAheadAgent agent, UUID uuid) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] sync read ahead, stream={}, {}-{}, maxBytes={}", streamId, startOffset, endOffset, maxBytes);
}
@@ -119,9 +128,9 @@ public CompletableFuture> syncReadAhead(long streamId, l
if (inflightReadAheadTaskMap.putIfAbsent(readAheadTaskKey, readAheadTaskContext) == null) {
context.taskKeySet.add(readAheadTaskKey);
}
- return getDataBlockIndices(streamId, endOffset, context)
+ return getDataBlockIndices(traceContext, streamId, endOffset, context)
.thenComposeAsync(v ->
- handleSyncReadAhead(streamId, startOffset, endOffset, maxBytes, agent, uuid, timer, context), streamReaderExecutor)
+ handleSyncReadAhead(traceContext, streamId, startOffset, endOffset, maxBytes, agent, uuid, timer, context), streamReaderExecutor)
.whenComplete((nil, ex) -> {
for (DefaultS3BlockCache.ReadAheadTaskKey key : context.taskKeySet) {
completeInflightTask0(key, ex);
@@ -131,7 +140,8 @@ public CompletableFuture> syncReadAhead(long streamId, l
});
}
- CompletableFuture> handleSyncReadAhead(long streamId, long startOffset, long endOffset,
+ @WithSpan
+ CompletableFuture> handleSyncReadAhead(TraceContext traceContext, long streamId, long startOffset, long endOffset,
int maxBytes, ReadAheadAgent agent, UUID uuid,
TimerUtil timer, ReadContext context) {
if (context.streamDataBlocksPair.isEmpty()) {
@@ -158,7 +168,7 @@ CompletableFuture> handleSyncReadAhead(long streamId, lo
totalReserveSize, uuid, streamId, startOffset, endOffset, maxBytes);
}
- CompletableFuture throttleCf = inflightReadThrottle.acquire(uuid, totalReserveSize);
+ CompletableFuture throttleCf = inflightReadThrottle.acquire(traceContext, uuid, totalReserveSize);
return throttleCf.thenComposeAsync(nil -> {
// concurrently read all data blocks
for (int i = 0; i < streamDataBlocksToRead.size(); i++) {
@@ -182,31 +192,37 @@ CompletableFuture> handleSyncReadAhead(long streamId, lo
setInflightReadAheadStatus(new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset),
DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_FETCH_DATA);
}
- cfList.add(reserveResult.cf().thenApplyAsync(dataBlock -> {
- if (dataBlock.records().isEmpty()) {
- return new ArrayList();
- }
- // retain records to be returned
- dataBlock.records().forEach(StreamRecordBatch::retain);
- recordsMap.put(dataBlockKey, dataBlock.records());
-
- // retain records to be put into block cache
- dataBlock.records().forEach(StreamRecordBatch::retain);
- blockCache.put(streamId, dataBlock.records());
- dataBlock.release();
-
- return dataBlock.records();
- }, backgroundExecutor).whenComplete((ret, ex) -> {
- if (ex != null) {
- LOGGER.error("[S3BlockCache] sync ra fail to read data block, stream={}, {}-{}, data block: {}",
- streamId, startOffset, endOffset, streamDataBlock, ex);
- }
- completeInflightTask(context, taskKey, ex);
- if (isNotAlignedFirstBlock) {
- // in case of first data block and startOffset is not aligned with start of data block
- completeInflightTask(context, new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset), ex);
- }
- }));
+ try {
+ CompletableFuture> cf = TraceUtils.runWithSpanAsync(new TraceContext(traceContext), Attributes.empty(), "StreamReader::readDataBlock",
+ () -> reserveResult.cf().thenApplyAsync(dataBlock -> {
+ if (dataBlock.records().isEmpty()) {
+ return new ArrayList();
+ }
+ // retain records to be returned
+ dataBlock.records().forEach(StreamRecordBatch::retain);
+ recordsMap.put(dataBlockKey, dataBlock.records());
+
+ // retain records to be put into block cache
+ dataBlock.records().forEach(StreamRecordBatch::retain);
+ blockCache.put(streamId, dataBlock.records());
+ dataBlock.release();
+
+ return dataBlock.records();
+ }, backgroundExecutor).whenComplete((ret, ex) -> {
+ if (ex != null) {
+ LOGGER.error("[S3BlockCache] sync ra fail to read data block, stream={}, {}-{}, data block: {}",
+ streamId, startOffset, endOffset, streamDataBlock, ex);
+ }
+ completeInflightTask(context, taskKey, ex);
+ if (isNotAlignedFirstBlock) {
+ // in case of first data block and startOffset is not aligned with start of data block
+ completeInflightTask(context, new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset), ex);
+ }
+ }));
+ cfList.add(cf);
+ } catch (Throwable e) {
+ throw new IllegalArgumentException(e);
+ }
if (reserveResult.reserveSize() > 0) {
dataBlockReadAccumulator.readDataBlock(objectReader, streamDataBlock.dataBlockIndex());
}
@@ -274,7 +290,7 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int
DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_DATA_INDEX);
inflightReadAheadTaskMap.putIfAbsent(readAheadTaskKey, readAheadTaskContext);
context.taskKeySet.add(readAheadTaskKey);
- getDataBlockIndices(streamId, endOffset, context)
+ getDataBlockIndices(TraceContext.DEFAULT, streamId, endOffset, context)
.thenAcceptAsync(v ->
handleAsyncReadAhead(streamId, startOffset, endOffset, maxBytes, agent, timer, context), streamReaderExecutor)
.whenComplete((nil, ex) -> {
@@ -347,7 +363,7 @@ CompletableFuture handleAsyncReadAhead(long streamId, long startOffset, lo
reserveResult.reserveSize(), uuid, streamId, startOffset, endOffset, maxBytes);
}
if (reserveResult.reserveSize() > 0) {
- inflightReadThrottle.acquire(uuid, reserveResult.reserveSize()).thenAcceptAsync(nil -> {
+ inflightReadThrottle.acquire(TraceContext.DEFAULT, uuid, reserveResult.reserveSize()).thenAcceptAsync(nil -> {
// read data block
if (context.taskKeySet.contains(taskKey)) {
setInflightReadAheadStatus(taskKey, DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_FETCH_DATA);
@@ -380,7 +396,9 @@ CompletableFuture handleAsyncReadAhead(long streamId, long startOffset, lo
});
}
- CompletableFuture getDataBlockIndices(long streamId, long endOffset, ReadContext context) {
+ @WithSpan
+ CompletableFuture getDataBlockIndices(TraceContext traceContext, long streamId, long endOffset, ReadContext context) {
+ traceContext.currentContext();
CompletableFuture getObjectsCf = CompletableFuture.completedFuture(false);
if (context.objectIndex >= context.objects.size()) {
getObjectsCf = objectManager
@@ -450,7 +468,7 @@ CompletableFuture getDataBlockIndices(long streamId, long endOffset, ReadC
if (findIndexResult.isFulfilled()) {
return CompletableFuture.completedFuture(null);
}
- return getDataBlockIndices(streamId, endOffset, context);
+ return getDataBlockIndices(traceContext, streamId, endOffset, context);
}, streamReaderExecutor);
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/context/AppendContext.java b/s3stream/src/main/java/com/automq/stream/s3/context/AppendContext.java
new file mode 100644
index 000000000..de8ef8626
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/context/AppendContext.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3.context;
+
+import com.automq.stream.s3.trace.context.TraceContext;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+
+public class AppendContext extends TraceContext {
+ public static final AppendContext DEFAULT = new AppendContext();
+
+ public AppendContext() {
+ super(false, null, null);
+ }
+
+ public AppendContext(TraceContext context) {
+ super(context);
+ }
+
+ public AppendContext(boolean isTraceEnabled, Tracer tracer, Context currentContext) {
+ super(isTraceEnabled, tracer, currentContext);
+ }
+}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/context/FetchContext.java b/s3stream/src/main/java/com/automq/stream/s3/context/FetchContext.java
new file mode 100644
index 000000000..3218e864b
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/context/FetchContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3.context;
+
+import com.automq.stream.api.ReadOptions;
+import com.automq.stream.s3.trace.context.TraceContext;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+
+public class FetchContext extends TraceContext {
+ public static final FetchContext DEFAULT = new FetchContext();
+ private ReadOptions readOptions = ReadOptions.DEFAULT;
+
+ public FetchContext() {
+ super(false, null, null);
+ }
+
+ public FetchContext(TraceContext context) {
+ super(context);
+ }
+
+ public FetchContext(boolean isTraceEnabled, Tracer tracer, Context currentContext) {
+ super(isTraceEnabled, tracer, currentContext);
+ }
+
+ public ReadOptions readOptions() {
+ return readOptions;
+ }
+
+ public void setReadOptions(ReadOptions readOptions) {
+ this.readOptions = readOptions;
+ }
+}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/trace/AttributeBindings.java b/s3stream/src/main/java/com/automq/stream/s3/trace/AttributeBindings.java
new file mode 100644
index 000000000..c60247aeb
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/trace/AttributeBindings.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3.trace;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.instrumentation.annotations.SpanAttribute;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Parameter;
+import java.lang.reflect.Type;
+import java.util.function.BiFunction;
+
+public class AttributeBindings {
+ private final BiFunction[] bindings;
+
+ private AttributeBindings(BiFunction[] bindings) {
+ this.bindings = bindings;
+ }
+
+ public static AttributeBindings bind(Method method, String[] parametersNames) {
+ Parameter[] parameters = method.getParameters();
+ if (parameters.length != parametersNames.length) {
+ return new AttributeBindings(null);
+ }
+
+ BiFunction[] bindings = new BiFunction[parametersNames.length];
+ for (int i = 0; i < parametersNames.length; i++) {
+ Parameter parameter = parameters[i];
+
+ SpanAttribute spanAttribute = parameter.getAnnotation(SpanAttribute.class);
+ if (spanAttribute == null) {
+ bindings[i] = emptyBinding();
+ } else {
+ String attributeName = spanAttribute.value().isEmpty() ? parametersNames[i] : spanAttribute.value();
+ bindings[i] = createBinding(attributeName, parameter.getParameterizedType());
+ }
+ }
+ return new AttributeBindings(bindings);
+ }
+
+ public boolean isEmpty() {
+ return bindings == null || bindings.length == 0;
+ }
+
+ public void apply(AttributesBuilder target, Object[] args) {
+ if (args.length != bindings.length) {
+ return;
+ }
+
+ for (int i = 0; i < args.length; i++) {
+ bindings[i].apply(target, args[i]);
+ }
+ }
+
+ static BiFunction emptyBinding() {
+ return (builder, arg) -> builder;
+ }
+
+ static BiFunction createBinding(String name, Type type) {
+ // Simple scalar parameter types
+ if (type == String.class) {
+ AttributeKey key = AttributeKey.stringKey(name);
+ return (builder, arg) -> builder.put(key, (String) arg);
+ }
+ if (type == long.class || type == Long.class) {
+ AttributeKey key = AttributeKey.longKey(name);
+ return (builder, arg) -> builder.put(key, (Long) arg);
+ }
+ if (type == double.class || type == Double.class) {
+ AttributeKey key = AttributeKey.doubleKey(name);
+ return (builder, arg) -> builder.put(key, (Double) arg);
+ }
+ if (type == boolean.class || type == Boolean.class) {
+ AttributeKey key = AttributeKey.booleanKey(name);
+ return (builder, arg) -> builder.put(key, (Boolean) arg);
+ }
+ if (type == int.class || type == Integer.class) {
+ AttributeKey key = AttributeKey.longKey(name);
+ return (builder, arg) -> builder.put(key, ((Integer) arg).longValue());
+ }
+ if (type == float.class || type == Float.class) {
+ AttributeKey key = AttributeKey.doubleKey(name);
+ return (builder, arg) -> builder.put(key, ((Float) arg).doubleValue());
+ }
+
+ // Default parameter types
+ AttributeKey key = AttributeKey.stringKey(name);
+ return (builder, arg) -> builder.put(key, arg.toString());
+ }
+}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/trace/MethodCache.java b/s3stream/src/main/java/com/automq/stream/s3/trace/MethodCache.java
new file mode 100644
index 000000000..c0151593c
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/trace/MethodCache.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3.trace;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+final class MethodCache extends ClassValue