Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chesr 187 #16

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ compileTestKotlin {

ext {
cradleVersion = '2.17.0'
grpcCrawlerVersion = '0.2.0'
grpcCrawlerVersion = '0.2.0-CHESR-187-SNAPSHOT'
sharedDir = file("${project.rootDir}/shared")
}

Expand Down
116 changes: 80 additions & 36 deletions src/main/java/com/exactpro/th2/crawler/Crawler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,6 @@

package com.exactpro.th2.crawler;

import static com.exactpro.th2.common.message.MessageUtils.toTimestamp;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;
import static java.util.stream.Collectors.toUnmodifiableMap;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.BinaryOperator;

import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.exactpro.cradle.CradleStorage;
import com.exactpro.cradle.intervals.Interval;
import com.exactpro.cradle.intervals.IntervalsWorker;
Expand All @@ -44,6 +25,8 @@
import com.exactpro.th2.crawler.dataprocessor.grpc.DataProcessorInfo;
import com.exactpro.th2.crawler.dataprocessor.grpc.DataProcessorService;
import com.exactpro.th2.crawler.dataprocessor.grpc.IntervalInfo;
import com.exactpro.th2.crawler.dataprocessor.grpc.IntervalStartResponse;
import com.exactpro.th2.crawler.dataprocessor.grpc.Status;
import com.exactpro.th2.crawler.exception.UnexpectedDataProcessorException;
import com.exactpro.th2.crawler.exception.UnsupportedRecoveryStateException;
import com.exactpro.th2.crawler.metrics.CrawlerMetrics;
Expand All @@ -55,6 +38,24 @@
import com.exactpro.th2.crawler.util.CrawlerUtils;
import com.exactpro.th2.dataprovider.grpc.DataProviderService;
import com.google.protobuf.Timestamp;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.BinaryOperator;

import static com.exactpro.th2.common.message.MessageUtils.toTimestamp;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;
import static java.util.stream.Collectors.toUnmodifiableMap;

public class Crawler {
private static final Logger LOGGER = LoggerFactory.getLogger(Crawler.class);
Expand All @@ -73,13 +74,13 @@ public class Crawler {
private final boolean workAlone;
private final DataType crawlerType;
private final int batchSize;
private final DataProcessorInfo info;
private final CrawlerId crawlerId;
private final StateService<RecoveryState> stateService;
private final CrawlerMetrics metrics;
private final DataTypeStrategy<Continuation, DataPart> typeStrategy;
private final ProcessorInfo processorInfo;

private final Instant from;
private Instant from;
private Instant to;
private boolean reachedTo;
private Instant lastIntervalCompatibilityChecked;
Expand Down Expand Up @@ -109,7 +110,8 @@ public Crawler(
this.crawlerId = CrawlerId.newBuilder().setName(configuration.getName()).build();
this.sessionAliases = configuration.getSessionAliases();
metrics = requireNonNull(crawlerContext.getMetrics(), "'metrics' must not be null");
info = crawlerConnect(dataProcessor, CrawlerInfo.newBuilder().setId(crawlerId).build());
DataProcessorInfo info = crawlerConnect(dataProcessor, CrawlerInfo.newBuilder().setId(crawlerId).build());
this.processorInfo = new ProcessorInfo(info.getName(), info.getVersion());
// TODO: overrides value of configuration.maxOutgoingDataSize by info.maxOutgoingDataSize if the info's value is less than configuration
Map<DataType, DataTypeStrategyFactory<Continuation, DataPart>> knownStrategies = loadStrategies();
DataTypeStrategyFactory<Continuation, DataPart> factory = requireNonNull(knownStrategies.get(crawlerType),
Expand Down Expand Up @@ -147,11 +149,9 @@ public Duration process() throws IOException, UnexpectedDataProcessorException {
return metrics.measureTimeWithException(crawlerType, Method.HANDLE_INTERVAL, this::internalProcess);
}

private Duration internalProcess() throws IOException, UnexpectedDataProcessorException {
String dataProcessorName = info.getName();
String dataProcessorVersion = info.getVersion();

FetchIntervalReport fetchIntervalReport = getOrCreateInterval(dataProcessorName, dataProcessorVersion, crawlerType);
private Duration internalProcess() throws IOException {
FetchIntervalReport fetchIntervalReport =
getOrCreateInterval(processorInfo.getDataProcessorName(), processorInfo.getDataProcessorVersion(), crawlerType);

Interval interval = fetchIntervalReport.interval;

Expand All @@ -167,7 +167,21 @@ private Duration internalProcess() throws IOException, UnexpectedDataProcessorEx

InternalInterval currentInt = new InternalInterval(stateService, interval);
RecoveryState state = currentInt.getState();
intervalStartForProcessor(dataProcessor, interval, state);

Report<IntervalStartReport> report = intervalStartForProcessor(dataProcessor, interval, state);

if (report.getAction() == Action.HANDSHAKE) {
DataProcessorInfo info = crawlerConnect(dataProcessor, CrawlerInfo.newBuilder().setId(crawlerId).build());
String name = info.getName();
String version = info.getVersion();
eugene-zheltov marked this conversation as resolved.
Show resolved Hide resolved

processorInfo.setDataProcessorName(name);
processorInfo.setDataProcessorVersion(version);
from = getOrCreateInterval(name, version, crawlerType).interval.getStartTime();

LOGGER.info("Crawler received a handshake from {}:{} when it tried to send interval start request. " +
"Further processing starts from {}", name, version, from);
eugene-zheltov marked this conversation as resolved.
Show resolved Hide resolved
}

Continuation continuation = state == null ? null : typeStrategy.continuationFromState(state);
DataParameters parameters = new DataParameters(crawlerId, sessionAliases);
Expand All @@ -187,7 +201,8 @@ private Duration internalProcess() throws IOException, UnexpectedDataProcessorEx
sendingReport = typeStrategy.processData(dataProcessor, currentInt, parameters, nextPart, prevCheckpoint);

if (sendingReport.getAction() == Action.HANDSHAKE) {
LOGGER.info("Handshake from {}:{} received. Stop processing", dataProcessorName, dataProcessorVersion);
LOGGER.info("Handshake from {}:{} received. Stop processing",
processorInfo.getDataProcessorName(), processorInfo.getDataProcessorVersion());
break;
}

Expand Down Expand Up @@ -224,12 +239,19 @@ private Duration internalProcess() throws IOException, UnexpectedDataProcessorEx
switch (action) {
case HANDSHAKE:
DataProcessorInfo info = crawlerConnect(dataProcessor, CrawlerInfo.newBuilder().setId(crawlerId).build());
if (!dataProcessorName.equals(info.getName()) || !dataProcessorVersion.equals(info.getVersion())) {
throw new UnexpectedDataProcessorException("Need to restart Crawler because of changed name and/or version of data-service. " +
"Old name: " + dataProcessorName + ", old version: " + dataProcessorVersion + ". " +
"New name: " + info.getName() + ", new version: " + info.getVersion());

String name = info.getName();
String version = info.getVersion();

if (!processorInfo.getDataProcessorName().equals(info.getName()) || !processorInfo.getDataProcessorVersion().equals(info.getVersion())) {
processorInfo.setDataProcessorName(name);
processorInfo.setDataProcessorVersion(version);
from = getOrCreateInterval(name, version, crawlerType).interval.getStartTime();
LOGGER.info("Got a new name and/or version of data-service. " +
"Old name: " + name + ", old version: " + version + ". " +
"New name: " + name + ", new version: " + version + ". Staring processing from " + from);
}
LOGGER.info("Got the same name ({}) and version ({}) from repeated crawlerConnect", dataProcessorName, dataProcessorVersion);
LOGGER.info("Got the same name ({}) and version ({}) from repeated crawlerConnect", name, version);
break;
case CONTINUE:
currentInt.processed(true, intervalsWorker);
Expand All @@ -255,14 +277,24 @@ private CrawlerData<Continuation, DataPart> requestData(Timestamp startTime, Tim
return typeStrategy.requestData(startTime, endTime, parameters, continuation);
}

private void intervalStartForProcessor(DataProcessorService dataProcessor, Interval interval, RecoveryState state) {
private Report<IntervalStartReport> intervalStartForProcessor(DataProcessorService dataProcessor, Interval interval, RecoveryState state) {
LOGGER.trace("Notifying about interval start (from {} to {})", interval.getStartTime(), interval.getEndTime());
var intervalInfoBuilder = IntervalInfo.newBuilder()
.setId(crawlerId)
.setStartTime(toTimestamp(interval.getStartTime()))
.setEndTime(toTimestamp(interval.getEndTime()));
typeStrategy.setupIntervalInfo(intervalInfoBuilder, state);
dataProcessor.intervalStart(intervalInfoBuilder.build());
IntervalStartResponse response = dataProcessor.intervalStart(intervalInfoBuilder.build());

if (response.hasStatus()) {
if (response.getStatus().getHandshakeRequired()) {
return Report.handshake();
}
}

metrics.processorMethodInvoked(ProcessorMethod.INTERVAL_START);
eugene-zheltov marked this conversation as resolved.
Show resolved Hide resolved

return Report.empty();
}

private GetIntervalReport getInterval(Iterable<Interval> intervals) throws IOException {
Expand Down Expand Up @@ -454,6 +486,10 @@ private long getSleepTime(Instant from, Instant to) {
return Duration.between(from, to).abs().toMillis();
}

public ProcessorInfo getProcessorInfo() {
eugene-zheltov marked this conversation as resolved.
Show resolved Hide resolved
return processorInfo;
}

private static class GetIntervalReport {
private final Interval foundInterval;
private final Interval lastInterval;
Expand All @@ -477,4 +513,12 @@ private FetchIntervalReport(Interval interval, long sleepTime, boolean processFr
this.processFromStart = processFromStart;
}
}

eugene-zheltov marked this conversation as resolved.
Show resolved Hide resolved
private static class IntervalStartReport implements Continuation {
private final Status status;

public IntervalStartReport(Status status) {
this.status = status;
}
}
}
27 changes: 27 additions & 0 deletions src/main/java/com/exactpro/th2/crawler/ProcessorInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.exactpro.th2.crawler;
eugene-zheltov marked this conversation as resolved.
Show resolved Hide resolved

public class ProcessorInfo {
eugene-zheltov marked this conversation as resolved.
Show resolved Hide resolved
private String dataProcessorName;
private String dataProcessorVersion;

public ProcessorInfo(String dataProcessorName, String dataProcessorVersion) {
this.dataProcessorName = dataProcessorName;
this.dataProcessorVersion = dataProcessorVersion;
}

public String getDataProcessorName() {
return dataProcessorName;
}

public void setDataProcessorName(String dataProcessorName) {
this.dataProcessorName = dataProcessorName;
}

public String getDataProcessorVersion() {
return dataProcessorVersion;
}

public void setDataProcessorVersion(String dataProcessorVersion) {
this.dataProcessorVersion = dataProcessorVersion;
}
}
20 changes: 19 additions & 1 deletion src/test/java/com/exactpro/th2/crawler/CrawlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.exactpro.th2.crawler.dataprocessor.grpc.DataProcessorInfo;
import com.exactpro.th2.crawler.dataprocessor.grpc.EventDataRequest;
import com.exactpro.th2.crawler.dataprocessor.grpc.EventResponse;
import com.exactpro.th2.crawler.dataprocessor.grpc.IntervalInfo;
import com.exactpro.th2.crawler.dataprocessor.grpc.IntervalStartResponse;
import com.exactpro.th2.crawler.dataprocessor.grpc.MessageDataRequest;
import com.exactpro.th2.crawler.dataprocessor.grpc.MessageResponse;
import com.exactpro.th2.crawler.dataprocessor.grpc.Status;
Expand All @@ -44,6 +46,7 @@

import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -118,6 +121,8 @@ public void processMethodCall() throws IOException, UnexpectedDataProcessorExcep
"2021-06-16T12:00:00.00Z", DataType.EVENTS, Collections.emptySet());
CrawlerManager manager = new CrawlerManager(configuration);

when(manager.getDataServiceMock().intervalStart(any(IntervalInfo.class))).thenReturn(IntervalStartResponse.getDefaultInstance());

Crawler crawler = manager.createCrawler();
crawler.process();

Expand All @@ -138,6 +143,7 @@ public void testCrawlerMessages() throws IOException, UnexpectedDataProcessorExc
Iterator<StreamResponse> iterator = new MessageSearchResponse(messages).iterator();
when(manager.getDataProviderMock().searchMessages(any(MessageSearchRequest.class))).thenReturn(iterator);
when(manager.getDataServiceMock().sendMessage(any(MessageDataRequest.class))).thenReturn(MessageResponse.getDefaultInstance());
when(manager.getDataServiceMock().intervalStart(any(IntervalInfo.class))).thenReturn(IntervalStartResponse.getDefaultInstance());

crawler.process();

Expand Down Expand Up @@ -175,6 +181,7 @@ public void testCrawlerMessagesMaxOutgoingMessageSize() throws IOException, Unex
MessageResponse.newBuilder()
.addIds(createMessageID("alias1", Direction.SECOND, 3))
.build());
when(manager.getDataServiceMock().intervalStart(any(IntervalInfo.class))).thenReturn(IntervalStartResponse.getDefaultInstance());

crawler.process();

Expand Down Expand Up @@ -209,6 +216,7 @@ public void testCrawlerMessagesMaxOutgoingMessageSizeExceeded() throws IOExcepti
Iterator<StreamResponse> iterator = new MessageSearchResponse(messages).iterator();
when(manager.getDataProviderMock().searchMessages(any(MessageSearchRequest.class))).thenReturn(iterator);
when(manager.getDataServiceMock().sendMessage(any(MessageDataRequest.class))).thenReturn(MessageResponse.getDefaultInstance());
when(manager.getDataServiceMock().intervalStart(any(IntervalInfo.class))).thenReturn(IntervalStartResponse.getDefaultInstance());

Assertions.assertThrows(IllegalStateException.class, crawler::process);
}
Expand Down Expand Up @@ -236,6 +244,9 @@ public void handshakeNeededAnother() throws IOException, UnexpectedDataProcessor
CrawlerManager manager = new CrawlerManager(configuration);
Crawler crawler = manager.createCrawler();

String dataProcessorName = "another_crawler";
String dataProcessorVersion = CrawlerManager.VERSION;

when(manager.getDataServiceMock().crawlerConnect(any(CrawlerInfo.class)))
.thenReturn(DataProcessorInfo.newBuilder().setName("another_crawler").setVersion(CrawlerManager.VERSION).build());

Expand All @@ -249,7 +260,14 @@ public void handshakeNeededAnother() throws IOException, UnexpectedDataProcessor
return EventResponse.newBuilder().setId(eventID).setStatus(Status.newBuilder().setHandshakeRequired(true).build()).build();
});

Assertions.assertThrows(UnexpectedDataProcessorException.class, crawler::process);
when(manager.getDataServiceMock().intervalStart(any(IntervalInfo.class))).thenReturn(
IntervalStartResponse.newBuilder().setStatus(Status.newBuilder().setHandshakeRequired(true).build()).build()
);

crawler.process();

Assertions.assertEquals(dataProcessorName, crawler.getProcessorInfo().getDataProcessorName());
Assertions.assertEquals(dataProcessorVersion, crawler.getProcessorInfo().getDataProcessorVersion());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please extract to separate test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There used to be assertThrows check, now there's no exception in Crawler in this case. So I just assert a correct name and version. What exactly do you want me to extract to a separate test?

}

@Test
Expand Down