Skip to content

Commit

Permalink
Optimized message ids serialization
Browse files Browse the repository at this point in the history
* Refactored test event builders
  • Loading branch information
Nikita-Smirnov-Exactpro committed Mar 14, 2024
1 parent 40c0478 commit 6e703b9
Show file tree
Hide file tree
Showing 38 changed files with 1,348 additions and 575 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -155,7 +155,7 @@ public static SerializedEntity<SerializedEntityMetadata, TestEventEntity> toSeri
builder.setContentSize(content.length);
}

byte[] messages = TestEventUtils.serializeLinkedMessageIds(event);
ByteBuffer messages = TestEventUtils.serializeLinkedMessageIds(event);

StoredTestEventId parentId = event.getParentId();
LocalDateTime start = TimeUtils.toLocalTimestamp(event.getStartTimestamp());
Expand All @@ -176,8 +176,9 @@ public static SerializedEntity<SerializedEntityMetadata, TestEventEntity> toSeri
builder.setEventCount(event.asBatch().getTestEventsCount());
builder.setEndTimestamp(event.getEndTimestamp());

if (messages != null)
builder.setMessages(ByteBuffer.wrap(messages));
if (messages != null) {
builder.setMessages(messages);
}

builder.setCompressed(compressed);
//TODO: this.setLabels(event.getLabels());
Expand Down
15 changes: 15 additions & 0 deletions cradle-core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
plugins {
id "me.champeau.jmh" version "0.7.2"
}

dependencies {
api platform('com.exactpro.th2:bom:4.5.0')

Expand All @@ -10,6 +14,9 @@ dependencies {

implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8'

jmh 'org.openjdk.jmh:jmh-core:1.37'
jmh 'org.openjdk.jmh:jmh-generator-annprocess:1.37'

testImplementation 'org.apache.logging.log4j:log4j-slf4j2-impl'
testImplementation 'org.apache.logging.log4j:log4j-core'
testImplementation 'org.testng:testng:7.9.0'
Expand All @@ -26,6 +33,14 @@ jar {
}
}

jmh {
jmhTimeout = "1m"
iterations = 3
fork = 2
warmupIterations = 3
warmupForks = 2
}

dependencyCheck {
suppressionFile = "${rootDir}/suppressions.xml"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.cradle.serialization;

import com.exactpro.cradle.BookId;
import com.exactpro.cradle.Direction;
import com.exactpro.cradle.messages.StoredMessageId;
import com.exactpro.cradle.testevents.StoredTestEventId;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;

import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.openjdk.jmh.annotations.Mode.Throughput;

@State(Scope.Benchmark)
public class EventMessageIdSerializerBenchmark {
private static final BookId BOOK_ID = new BookId("benchmark-book");
private static final String SCOPE = "benchmark-scope";
private static final String SESSION_ALIAS_PREFIX = "benchmark-alias-";
private static final String EVENT_ID_PREFIX = "benchmark-event-";
private static final int EVENT_NUMBER = 100;
private static final int SESSION_ALIAS_NUMBER = 0;
private static final int MESSAGES_PER_DIRECTION = 25;
@State(Scope.Thread)
public static class EventBatchState {
private final Map<StoredTestEventId, Set<StoredMessageId>> eventIdToMessageIds = new HashMap<>();
@Setup
public void init() {
int seqCounter = 0;
for (int eventIndex = 0; eventIndex < EVENT_NUMBER; eventIndex++) {
Set<StoredMessageId> msgIds = new HashSet<>();
for (int aliasIndex = 0; aliasIndex < SESSION_ALIAS_NUMBER; aliasIndex++) {
for (Direction direction : Direction.values()) {
for (int msgIndex = 0; msgIndex < MESSAGES_PER_DIRECTION; msgIndex++) {
msgIds.add(new StoredMessageId(BOOK_ID, SESSION_ALIAS_PREFIX + aliasIndex, direction, Instant.now(), ++seqCounter));
}
}
}
eventIdToMessageIds.put(new StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), EVENT_ID_PREFIX + eventIndex), msgIds);
}
}
}

@State(Scope.Thread)
public static class MessageIdsState {
private final Set<StoredMessageId> messageIds = new HashSet<>();
@Setup
public void init() {
int seqCounter = 0;
for (int aliasIndex = 0; aliasIndex < SESSION_ALIAS_NUMBER; aliasIndex++) {
for (Direction direction : Direction.values()) {
for (int msgIndex = 0; msgIndex < MESSAGES_PER_DIRECTION; msgIndex++) {
messageIds.add(new StoredMessageId(BOOK_ID, SESSION_ALIAS_PREFIX + aliasIndex, direction, Instant.now(), ++seqCounter));
}
}
}
}
}

@Benchmark
@BenchmarkMode({Throughput})
public void benchmarkSerializeBatchLinkedMessageIds(EventBatchState state) throws IOException {
EventMessageIdSerializer.serializeBatchLinkedMessageIds(state.eventIdToMessageIds);
}

@Benchmark
@BenchmarkMode({Throughput})
public void benchmarkSerializeLinkedMessageIds(MessageIdsState state) throws IOException {
EventMessageIdSerializer.serializeLinkedMessageIds(state.messageIds);
}

@Benchmark
@BenchmarkMode({Throughput})
public void benchmarkSerializeBatchLinkedMessageIds2(EventBatchState state) throws IOException {
EventMessageIdSerializer2.serializeBatchLinkedMessageIds(state.eventIdToMessageIds);
}

@Benchmark
@BenchmarkMode({Throughput})
public void benchmarkSerializeLinkedMessageIds2(MessageIdsState state) throws IOException {
EventMessageIdSerializer2.serializeLinkedMessageIds(state.messageIds);
}
}
8 changes: 5 additions & 3 deletions cradle-core/src/main/java/com/exactpro/cradle/BookId.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,8 +29,10 @@ public class BookId implements Serializable
private static final long serialVersionUID = -8051161407486679704L;
private final String name;

public BookId(String name)
{
public BookId(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Book name can't be empty");
}
this.name = StringUtils.lowerCase(name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class BookInfo {
private static final long MAX_EPOCH_DAY = getEpochDay(Instant.MAX);
private static final IPageInterval EMPTY_PAGE_INTERVAL = new EmptyPageInterval();

private static final BookId EMPTY_BOOK_ID = new BookId("");
private static final BookId EMPTY_BOOK_ID = new BookId("th2-internal-empty-book");

static {
METRICS.setPageCacheSize(EMPTY_BOOK_ID, HOT, HOT_CACHE_SIZE);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,11 +18,9 @@

import com.exactpro.cradle.messages.GroupedMessageBatchToStore;
import com.exactpro.cradle.messages.MessageBatchToStore;
import com.exactpro.cradle.testevents.StoredTestEventId;
import com.exactpro.cradle.testevents.TestEventBatchToStore;
import com.exactpro.cradle.testevents.TestEventBatchToStoreBuilder;
import com.exactpro.cradle.testevents.TestEventSingleToStoreBuilder;
import com.exactpro.cradle.utils.CradleStorageException;

/**
* Factory to create entities to be used with {@link CradleStorage}. Created objects will conform with particular CradleStorage settings.
Expand Down Expand Up @@ -55,10 +53,6 @@ public GroupedMessageBatchToStore groupedMessageBatch(String group) {
return new GroupedMessageBatchToStore(group, maxMessageBatchSize, storeActionRejectionThreshold);
}

public TestEventBatchToStore testEventBatch(StoredTestEventId id, String name, StoredTestEventId parentId) throws CradleStorageException {
return new TestEventBatchToStore(id, name, parentId, maxTestEventBatchSize, storeActionRejectionThreshold);
}

public TestEventBatchToStoreBuilder testEventBatchBuilder() {
return new TestEventBatchToStoreBuilder(maxTestEventBatchSize, storeActionRejectionThreshold);
}
Expand Down
12 changes: 7 additions & 5 deletions cradle-core/src/main/java/com/exactpro/cradle/CradleStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.exactpro.cradle.testevents.StoredTestEvent;
import com.exactpro.cradle.testevents.StoredTestEventId;
import com.exactpro.cradle.testevents.TestEventBatchToStore;
import com.exactpro.cradle.testevents.TestEventBatchToStoreBuilder;
import com.exactpro.cradle.testevents.TestEventFilter;
import com.exactpro.cradle.testevents.TestEventSingleToStore;
import com.exactpro.cradle.testevents.TestEventSingleToStoreBuilder;
Expand Down Expand Up @@ -753,8 +754,9 @@ TestEventToStore alignEventTimestampsToPage(TestEventToStore event, PageInfo pag

logger.warn("Batch contains events from different pages, aligning event timestamps to first event's page's end ({})", event.getId());

TestEventBatchToStore newBatch = entitiesFactory.testEventBatch(event.getId(), event.getName(), event.getParentId());
newBatch.setType(event.getType());
TestEventBatchToStoreBuilder newBatch = entitiesFactory.testEventBatchBuilder()
.id(event.getId())
.parentId(event.getParentId());

for (var e : batch.getTestEvents()) {
TestEventSingleToStore newEvent = new TestEventSingleToStoreBuilder(storeActionRejectionThreshold)
Expand All @@ -770,7 +772,7 @@ TestEventToStore alignEventTimestampsToPage(TestEventToStore event, PageInfo pag
newBatch.addTestEvent(newEvent);
}

return newBatch;
return newBatch.build();
}

/**
Expand All @@ -786,7 +788,7 @@ public final void storeTestEvent(TestEventToStore event) throws IOException, Cra
logger.debug("Storing test event {}", id);
PageInfo page = findPage(id.getBookId(), id.getStartTimestamp());

TestEventUtils.validateTestEvent(event, getBookCache().getBook(id.getBookId()), storeActionRejectionThreshold);
TestEventUtils.validateTestEvent(event, getBookCache().getBook(id.getBookId()));
final TestEventToStore alignedEvent = alignEventTimestampsToPage(event, page);

doStoreTestEvent(alignedEvent, page);
Expand All @@ -812,7 +814,7 @@ public final CompletableFuture<Void> storeTestEventAsync(TestEventToStore event)
logger.debug("Storing test event {} asynchronously", id);
PageInfo page = findPage(id.getBookId(), id.getStartTimestamp());

TestEventUtils.validateTestEvent(event, getBookCache().getBook(id.getBookId()), storeActionRejectionThreshold);
TestEventUtils.validateTestEvent(event, getBookCache().getBook(id.getBookId()));
final TestEventToStore alignedEvent = alignEventTimestampsToPage(event, page);

CompletableFuture<Void> result = doStoreTestEventAsync(alignedEvent, page);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,6 +41,7 @@ public class StoredMessageId implements Serializable {
private final Direction direction;
private final Instant timestamp;
private final long sequence;
private final int hash;

public StoredMessageId(BookId bookId, String sessionAlias, Direction direction, Instant timestamp, long sequence) {
this.bookId = bookId;
Expand All @@ -52,6 +53,7 @@ public StoredMessageId(BookId bookId, String sessionAlias, Direction direction,
sequence, bookId, sessionAlias, direction.getLabel()));
}
this.sequence = sequence;
this.hash = Objects.hash(bookId, sessionAlias, direction, timestamp, sequence);
}


Expand Down Expand Up @@ -97,7 +99,7 @@ public String toString() {

@Override
public int hashCode() {
return Objects.hash(bookId, sessionAlias, direction, timestamp, sequence);
return hash;
}


Expand All @@ -110,8 +112,10 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass())
return false;
StoredMessageId other = (StoredMessageId) obj;
return Objects.equals(bookId, other.bookId) && Objects.equals(sessionAlias, other.sessionAlias)
&& direction == other.direction && Objects.equals(timestamp, other.timestamp)
&& sequence == other.sequence;
return Objects.equals(timestamp, other.timestamp) &&
sequence == other.sequence &&
Objects.equals(sessionAlias, other.sessionAlias) &&
direction == other.direction &&
Objects.equals(bookId, other.bookId);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import com.exactpro.cradle.testevents.BatchedStoredTestEvent;
import com.exactpro.cradle.testevents.StoredTestEventId;
import com.exactpro.cradle.testevents.TestEventBatchToStore;
import com.exactpro.cradle.testevents.TestEventSingleToStore;

import java.nio.ByteBuffer;
Expand All @@ -28,7 +29,7 @@
import java.util.List;

import static com.exactpro.cradle.serialization.EventsSizeCalculator.calculateBatchEventSize;
import static com.exactpro.cradle.serialization.EventsSizeCalculator.calculateEventRecordSize;
import static com.exactpro.cradle.serialization.EventsSizeCalculator.getEventRecordSize;
import static com.exactpro.cradle.serialization.Serialization.EventBatchConst.EVENT_BATCH_ENT_MAGIC;
import static com.exactpro.cradle.serialization.Serialization.EventBatchConst.EVENT_BATCH_MAGIC;
import static com.exactpro.cradle.serialization.Serialization.EventBatchConst.EVENT_BATCH_PROTOCOL_VER;
Expand All @@ -41,7 +42,7 @@ public class EventBatchSerializer {


public byte[] serializeEventRecord(BatchedStoredTestEvent event) {
ByteBuffer allocate = ByteBuffer.allocate(calculateEventRecordSize(event));
ByteBuffer allocate = ByteBuffer.allocate(getEventRecordSize(event));
this.serializeEventRecord(event, allocate);
return allocate.array();
}
Expand Down Expand Up @@ -78,6 +79,14 @@ public SerializedEntityData<SerializedEntityMetadata> serializeEventBatch(Collec
return new SerializedEntityData<>(serializedEventMetadata, buffer.array());
}

public SerializedEntityData<SerializedEntityMetadata> serializeEventBatch(TestEventBatchToStore batch) {
SerializationBatchSizes sizes = EventsSizeCalculator.getBatchEventSize(batch);
ByteBuffer buffer = ByteBuffer.allocate(sizes.total);
List<SerializedEntityMetadata> serializedEventMetadata = serializeEventBatch(batch.getTestEvents(), buffer, sizes);

return new SerializedEntityData<>(serializedEventMetadata, buffer.array());
}

public void serializeEventBatch(Collection<BatchedStoredTestEvent> batch, ByteBuffer buffer) {
SerializationBatchSizes eventBatchSizes = calculateBatchEventSize(batch);
serializeEventBatch(batch, buffer, eventBatchSizes);
Expand Down
Loading

0 comments on commit 6e703b9

Please sign in to comment.