Skip to content

Commit

Permalink
refactor(stream): format and cleanup code (#874)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Jan 2, 2024
1 parent c5670e2 commit 4bacd09
Show file tree
Hide file tree
Showing 146 changed files with 3,148 additions and 3,196 deletions.
4 changes: 2 additions & 2 deletions s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.automq.stream;

import com.automq.stream.api.RecordBatch;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.automq.stream.api.RecordBatch;
import com.automq.stream.api.RecordBatchWithContext;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
Expand All @@ -33,6 +32,12 @@ public RecordBatchWithContextWrapper(RecordBatch recordBatch, long baseOffset) {
this.baseOffset = baseOffset;
}

public static RecordBatchWithContextWrapper decode(ByteBuffer buffer) {
long baseOffset = buffer.getLong();
int count = buffer.getInt();
return new RecordBatchWithContextWrapper(new DefaultRecordBatch(count, 0, Collections.emptyMap(), buffer), baseOffset);
}

@Override
public long baseOffset() {
return baseOffset;
Expand Down Expand Up @@ -65,16 +70,10 @@ public ByteBuffer rawPayload() {

public byte[] encode() {
ByteBuffer buffer = ByteBuffer.allocate(8 + 4 + recordBatch.rawPayload().remaining())
.putLong(baseOffset)
.putInt(recordBatch.count())
.put(recordBatch.rawPayload().duplicate())
.flip();
.putLong(baseOffset)
.putInt(recordBatch.count())
.put(recordBatch.rawPayload().duplicate())
.flip();
return buffer.array();
}

public static RecordBatchWithContextWrapper decode(ByteBuffer buffer) {
long baseOffset = buffer.getLong();
int count = buffer.getInt();
return new RecordBatchWithContextWrapper(new DefaultRecordBatch(count, 0, Collections.emptyMap(), buffer), baseOffset);
}
}
1 change: 1 addition & 0 deletions s3stream/src/main/java/com/automq/stream/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public interface Client {
void start();

void shutdown();

/**
* Get stream client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.automq.stream.api;

import com.automq.stream.s3.cache.CacheAccessType;

import java.util.List;

public interface FetchResult {
Expand Down
1 change: 1 addition & 0 deletions s3stream/src/main/java/com/automq/stream/api/KVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public interface KVClient {

/**
* Put key value, overwrite if key exist, return current key value after putting.
*
* @param keyValue {@link KeyValue} k-v pair
* @return async put result. {@link KeyValue} current value after putting.
*/
Expand Down
13 changes: 8 additions & 5 deletions s3stream/src/main/java/com/automq/stream/api/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.automq.stream.api;


import java.nio.ByteBuffer;
import java.util.Objects;

Expand All @@ -44,8 +43,10 @@ public Value value() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
KeyValue keyValue = (KeyValue) o;
return Objects.equals(key, keyValue.key) && Objects.equals(value, keyValue.value);
}
Expand Down Expand Up @@ -131,8 +132,10 @@ public boolean isNull() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Value)) return false;
if (this == o)
return true;
if (!(o instanceof Value))
return false;
Value value1 = (Value) o;
return Objects.equals(value, value1.value);
}
Expand Down
8 changes: 4 additions & 4 deletions s3stream/src/main/java/com/automq/stream/api/ReadOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public class ReadOptions {
private boolean fastRead;
private boolean pooledBuf;

public static Builder builder() {
return new Builder();
}

public boolean fastRead() {
return fastRead;
}
Expand All @@ -33,10 +37,6 @@ public boolean pooledBuf() {
return pooledBuf;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private final ReadOptions options = new ReadOptions();

Expand Down
2 changes: 0 additions & 2 deletions s3stream/src/main/java/com/automq/stream/api/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.automq.stream.api.exceptions.StreamClientException;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;

import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -48,7 +47,6 @@ public interface Stream {
*/
long nextOffset();


/**
* Append recordBatch to stream.
*
Expand Down
40 changes: 20 additions & 20 deletions s3stream/src/main/java/com/automq/stream/s3/DeltaWALUploadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.utils.AsyncRateLimiter;
import com.automq.stream.utils.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
Expand All @@ -37,30 +34,33 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OBJECT_ID;

public class DeltaWALUploadTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DeltaWALUploadTask.class);
private long startTimestamp;
final boolean forceSplit;
private final Logger s3ObjectLogger;
private final Map<Long, List<StreamRecordBatch>> streamRecordsMap;
private final int objectBlockSize;
private final int objectPartSize;
private final int streamSplitSizeThreshold;
private final ObjectManager objectManager;
private final S3Operator s3Operator;
final boolean forceSplit;
private final boolean s3ObjectLogEnable;
private final CompletableFuture<Long> prepareCf = new CompletableFuture<>();
private volatile CommitStreamSetObjectRequest commitStreamSetObjectRequest;
private final CompletableFuture<CommitStreamSetObjectRequest> uploadCf = new CompletableFuture<>();
private final ExecutorService executor;
private final double rate;
private final AsyncRateLimiter limiter;
private long startTimestamp;
private volatile CommitStreamSetObjectRequest commitStreamSetObjectRequest;

public DeltaWALUploadTask(Config config, Map<Long, List<StreamRecordBatch>> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator,
ExecutorService executor, boolean forceSplit, double rate) {
public DeltaWALUploadTask(Config config, Map<Long, List<StreamRecordBatch>> streamRecordsMap,
ObjectManager objectManager, S3Operator s3Operator,
ExecutorService executor, boolean forceSplit, double rate) {
this.s3ObjectLogger = S3ObjectLogger.logger(String.format("[DeltaWALUploadTask id=%d] ", config.nodeId()));
this.streamRecordsMap = streamRecordsMap;
this.objectBlockSize = config.objectBlockSize();
Expand All @@ -85,12 +85,12 @@ public CompletableFuture<Long> prepare() {
prepareCf.complete(NOOP_OBJECT_ID);
} else {
objectManager
.prepareObject(1, TimeUnit.MINUTES.toMillis(60))
.thenAcceptAsync(prepareCf::complete, executor)
.exceptionally(ex -> {
prepareCf.completeExceptionally(ex);
return null;
});
.prepareObject(1, TimeUnit.MINUTES.toMillis(60))
.thenAcceptAsync(prepareCf::complete, executor)
.exceptionally(ex -> {
prepareCf.completeExceptionally(ex);
return null;
});
}
return prepareCf;
}
Expand Down Expand Up @@ -135,7 +135,7 @@ private void upload0(long objectId) {
request.setObjectId(objectId);
request.setOrderId(objectId);
CompletableFuture<Void> streamSetObjectCf = CompletableFuture.allOf(streamSetWriteCfList.toArray(new CompletableFuture[0]))
.thenCompose(nil -> streamSetObject.close().thenAccept(nil2 -> request.setObjectSize(streamSetObject.size())));
.thenCompose(nil -> streamSetObject.close().thenAccept(nil2 -> request.setObjectSize(streamSetObject.size())));
List<CompletableFuture<?>> allCf = new LinkedList<>(streamObjectCfList);
allCf.add(streamSetObjectCf);
CompletableFuture.allOf(allCf.toArray(new CompletableFuture[0])).thenAccept(nil -> {
Expand All @@ -150,7 +150,7 @@ private void upload0(long objectId) {
public CompletableFuture<Void> commit() {
return uploadCf.thenCompose(request -> objectManager.commitStreamSetObject(request).thenAccept(resp -> {
LOGGER.info("Upload delta WAL {}, cost {}ms, rate limiter {}bytes/s", commitStreamSetObjectRequest,
System.currentTimeMillis() - startTimestamp, rate);
System.currentTimeMillis() - startTimestamp, rate);
if (s3ObjectLogEnable) {
s3ObjectLogger.trace("{}", commitStreamSetObjectRequest);
}
Expand Down Expand Up @@ -227,10 +227,10 @@ public DeltaWALUploadTask build() {
boolean forceSplit = streamRecordsMap.size() == 1;
if (!forceSplit) {
Optional<Boolean> hasStreamSetData = streamRecordsMap.values()
.stream()
.map(records -> records.stream().mapToLong(StreamRecordBatch::size).sum() >= config.streamSplitSize())
.filter(split -> !split)
.findAny();
.stream()
.map(records -> records.stream().mapToLong(StreamRecordBatch::size).sum() >= config.streamSplitSize())
.filter(split -> !split)
.findAny();
if (hasStreamSetData.isEmpty()) {
forceSplit = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectByteBufAlloc {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectByteBufAlloc.class);
Expand Down
24 changes: 12 additions & 12 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@
import com.automq.stream.utils.CloseableIterator;
import com.automq.stream.utils.biniarysearch.IndexBlockOrderedBytes;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.ObjectWriter.Footer.FOOTER_SIZE;
import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET;
Expand Down Expand Up @@ -128,7 +127,8 @@ public void close0() {
*/
public record BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock, int blockCount, int indexBlockSize) {

public static BasicObjectInfo parse(ByteBuf objectTailBuf, S3ObjectMetadata s3ObjectMetadata) throws IndexBlockParseException {
public static BasicObjectInfo parse(ByteBuf objectTailBuf,
S3ObjectMetadata s3ObjectMetadata) throws IndexBlockParseException {
long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - FOOTER_SIZE);
int indexBlockSize = objectTailBuf.getInt(objectTailBuf.readableBytes() - 40);
if (indexBlockPosition + objectTailBuf.readableBytes() < s3ObjectMetadata.objectSize()) {
Expand Down Expand Up @@ -217,14 +217,14 @@ public FindIndexResult find(long streamId, long startOffset, long endOffset, int
int blockSize = blocks.getInt(rangeBlockId * 16 + 8);
int recordCount = blocks.getInt(rangeBlockId * 16 + 12);
rst.add(new StreamDataBlock(streamId, rangeStartOffset, rangeEndOffset, s3ObjectMetadata.objectId(),
new DataBlockIndex(rangeBlockId, blockPosition, blockSize, recordCount)));
new DataBlockIndex(rangeBlockId, blockPosition, blockSize, recordCount)));

// we consider first block as not matched because we do not know exactly how many bytes are within
// the range in first block, as a result we may read one more block than expected.
if (matched) {
int recordPayloadSize = blockSize
- recordCount * StreamRecordBatchCodec.HEADER_SIZE // sum of encoded record header size
- ObjectWriter.DataBlock.BLOCK_HEADER_SIZE; // block header size
- recordCount * StreamRecordBatchCodec.HEADER_SIZE // sum of encoded record header size
- ObjectWriter.DataBlock.BLOCK_HEADER_SIZE; // block header size
nextMaxBytes -= Math.min(nextMaxBytes, recordPayloadSize);
}
if ((endOffset != NOOP_OFFSET && nextStartOffset >= endOffset) || nextMaxBytes == 0) {
Expand Down Expand Up @@ -272,11 +272,11 @@ public long endPosition() {
@Override
public String toString() {
return "DataBlockIndex{" +
"blockId=" + blockId +
", startPosition=" + startPosition +
", size=" + size +
", recordCount=" + recordCount +
'}';
"blockId=" + blockId +
", startPosition=" + startPosition +
", size=" + size +
", recordCount=" + recordCount +
'}';
}
}

Expand Down
Loading

0 comments on commit 4bacd09

Please sign in to comment.