Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADH-5431]: Implement processing decimal types with exceeding precision #8

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.plugin.adb.connector.protocol.gpfdist.unload.InputDataProcessor;
import io.trino.plugin.adb.connector.protocol.gpfdist.unload.InputDataProcessorFactory;
import io.trino.plugin.adb.connector.protocol.gpfdist.unload.context.ReadContext;
import io.trino.plugin.adb.connector.protocol.gpfdist.unload.process.GpfdistBufferedRowProcessingService;
import io.trino.plugin.adb.connector.protocol.gpfdist.unload.process.GpfdistSegmentRequestProcessor;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
Expand Down Expand Up @@ -184,7 +185,7 @@ else if (!isLast(request)) {
}
}

private static void processNotFoundQueryRequest(String tableName, AsyncResponse asyncResponse,
private void processNotFoundQueryRequest(String tableName, AsyncResponse asyncResponse,
GpfdistWritableRequest request)
{
String errorMessage = "No active query for writeable table: " + tableName;
Expand All @@ -197,9 +198,11 @@ private static void processNotFoundQueryRequest(String tableName, AsyncResponse
private void processInitialRequest(AsyncResponse asyncResponse, ReadContext readContext,
GpfdistWritableRequest request)
{
GpfdistBufferedRowProcessingService rowProcessingService = readContext.getRowProcessingService();
InputDataProcessor dataProcessor = inputDataProcessorFactory.create(readContext.getRowDecoder(),
readContext.getRowProcessingService());
readContext.getSegmentDataProcessors().putIfAbsent(request.getSegmentId(),
rowProcessingService);
rowProcessingService.setTotalSegmentCount(request.getSegmentsCount());
rowProcessingService.getSegmentDataProcessors().putIfAbsent(request.getSegmentId(),
new GpfdistSegmentRequestProcessor(request.getSegmentId(), dataProcessor));
asyncResponse.resume(Response.ok()
.header(X_GP_PROTO, request.getGpProtocol())
Expand Down Expand Up @@ -238,7 +241,7 @@ private void processTearDownRequest(AsyncResponse asyncResponse, ReadContext rea

private GpfdistSegmentRequestProcessor getSegmentProcessor(ReadContext readContext, Integer segmentId)
{
return Optional.ofNullable(readContext.getSegmentDataProcessors().get(segmentId))
return Optional.ofNullable(readContext.getRowProcessingService().getSegmentDataProcessors().get(segmentId))
.orElseThrow(() -> new IllegalStateException(
"Failed to get segment request processor by segmentId: " + segmentId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@

public class GpfdistWritableRequest
{
private static final String REQUEST_HEADER_NOT_FOUND_ERROR_MSG = "Request header not found: ";
private String requestId;
private String transactionId;
private String commandId;
private String scanId;
private Integer segmentId;
private Optional<Integer> segmentsCount;
private int segmentsCount;
private Optional<Integer> lineDelimiterLength;
private short gpProtocol;
private Optional<String> gpProtocolVersion;
Expand All @@ -51,7 +52,7 @@ public GpfdistWritableRequest(
String commandId,
String scanId,
Integer segmentId,
Optional<Integer> segmentsCount,
int segmentsCount,
Optional<Integer> lineDelimiterLength,
short gpProtocol,
Optional<String> gpProtocolVersion,
Expand Down Expand Up @@ -83,9 +84,12 @@ public static GpfdistWritableRequest create(String tableName, MultivaluedMap<Str
Optional.ofNullable(values.get(X_GP_SEGMENT_ID))
.map(v -> Integer.parseInt(v.getFirst()))
.orElseThrow(
() -> new IllegalArgumentException("Request header not found: " + X_GP_SEGMENT_ID)),
() -> new IllegalArgumentException(
REQUEST_HEADER_NOT_FOUND_ERROR_MSG + X_GP_SEGMENT_ID)),
Optional.ofNullable(values.get(X_GP_SEGMENT_COUNT))
.map(v -> Integer.parseInt(v.getFirst())),
.map(v -> Integer.parseInt(v.getFirst()))
.orElseThrow(() -> new IllegalArgumentException(
REQUEST_HEADER_NOT_FOUND_ERROR_MSG + X_GP_SEGMENT_COUNT)),
Optional.ofNullable(values.get(X_GP_LINE_DELIM_LENGTH))
.map(v -> Integer.parseInt(v.getFirst())),
Short.parseShort(values.get(X_GP_PROTO).getFirst()),
Expand Down Expand Up @@ -121,6 +125,11 @@ public Optional<Boolean> isLastChunk()
return isLastChunk;
}

public int getSegmentsCount()
{
return segmentsCount;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import io.trino.plugin.adb.connector.protocol.gpfdist.metadata.ContextId;
import io.trino.plugin.adb.connector.protocol.gpfdist.metadata.GpfdistUnloadMetadata;
import io.trino.plugin.adb.connector.protocol.gpfdist.unload.process.GpfdistBufferedRowProcessingService;
import io.trino.plugin.adb.connector.protocol.gpfdist.unload.process.GpfdistSegmentRequestProcessor;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class ReadContext
Expand Down Expand Up @@ -63,11 +61,6 @@ public GpfdistUnloadMetadata getMetadata()
return metadata;
}

public Map<Integer, GpfdistSegmentRequestProcessor> getSegmentDataProcessors()
{
return rowProcessingService.getSegmentDataProcessors();
}

public RowDecoder<String> getRowDecoder()
{
return rowDecoder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
Expand All @@ -34,6 +35,7 @@ public class GpfdistBufferedRowProcessingService
private final Queue<ConnectorRow> rowsQueue = new LinkedList<>();
private final AtomicLong usedBufferedRowsMemory = new AtomicLong(0);
private final AtomicBoolean isStopped = new AtomicBoolean(false);
private final AtomicInteger segmentCount = new AtomicInteger(0);
private final Map<Integer, GpfdistSegmentRequestProcessor> segmentDataProcessors = new ConcurrentHashMap<>();
private final Condition isFullCondition;
private final Condition isReadyForTransferCondition;
Expand Down Expand Up @@ -132,21 +134,22 @@ public void clear()

private boolean isNotReady()
{
return isDataTransferNotInitialized() || isNotAllDataProcessed();
return !isStopped.get() && (isDataTransferNotInitialized() || isNotAllDataProcessed());
}

private boolean isDataTransferNotInitialized()
{
//checked that insert adb query is not finished and there are no segment processors yet
return !isStopped.get() && segmentDataProcessors.isEmpty();
return segmentDataProcessors.isEmpty();
}

private boolean isNotAllDataProcessed()
{
//check that there are no rows in queue and segment processors are not finished yet
//check that there are no rows in queue and segment processors are not finished yet for all segments
return isEmpty()
&& segmentDataProcessors.values().stream()
.anyMatch(req -> req.getStatus() != SegmentRequestStatus.FINISHED);
.filter(processor -> processor.getStatus() == SegmentRequestStatus.FINISHED)
.count() != segmentCount.get();
}

private boolean isFull()
Expand All @@ -168,4 +171,9 @@ public Map<Integer, GpfdistSegmentRequestProcessor> getSegmentDataProcessors()
{
return segmentDataProcessors;
}

public void setTotalSegmentCount(int segmentCount)
{
this.segmentCount.compareAndSet(0, segmentCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public boolean advanceNextPosition()
else {
log.info("Data processing is finished. Unloaded rows %s. Processing result: %s",
unloadedRows,
readContext.getSegmentDataProcessors().values());
closeCtx();
readContext.getRowProcessingService().getSegmentDataProcessors().values());
return false;
}
}
Expand All @@ -99,7 +98,7 @@ public boolean advanceNextPosition()
}
log.warn("Processed rows to target: %d. Unloading from adb result: %s",
unloadedRows,
readContext.getSegmentDataProcessors().values());
readContext.getRowProcessingService().getSegmentDataProcessors().values());
closeCtx();
throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR,
"Failed to unload data from adb: " + e.getMessage(), e);
Expand Down Expand Up @@ -192,6 +191,7 @@ private <T> T get(int field)
public void close()
{
if (!this.isClosed) {
log.debug("Closing cursor for contextId: %s", readContext.getId());
this.isClosed = true;
closeCtx();
}
Expand Down
Loading
Loading