Skip to content

Commit

Permalink
ADH-5242
Browse files Browse the repository at this point in the history
- fixed tests for decimal type with exceeding precision
- fixed unloading data bug
  • Loading branch information
VitekArkhipov committed Dec 12, 2024
1 parent 5e90ba3 commit 3470969
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private void processInitialRequest(AsyncResponse asyncResponse, ReadContext read
{
InputDataProcessor dataProcessor = inputDataProcessorFactory.create(readContext.getRowDecoder(),
readContext.getRowProcessingService());
readContext.getSegmentDataProcessors().putIfAbsent(request.getSegmentId(),
readContext.getRowProcessingService().getSegmentDataProcessors().putIfAbsent(request.getSegmentId(),
new GpfdistSegmentRequestProcessor(request.getSegmentId(), dataProcessor));
asyncResponse.resume(Response.ok()
.header(X_GP_PROTO, request.getGpProtocol())
Expand Down Expand Up @@ -238,7 +238,7 @@ private void processTearDownRequest(AsyncResponse asyncResponse, ReadContext rea

private GpfdistSegmentRequestProcessor getSegmentProcessor(ReadContext readContext, Integer segmentId)
{
return Optional.ofNullable(readContext.getSegmentDataProcessors().get(segmentId))
return Optional.ofNullable(readContext.getRowProcessingService().getSegmentDataProcessors().get(segmentId))
.orElseThrow(() -> new IllegalStateException(
"Failed to get segment request processor by segmentId: " + segmentId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ public interface RowProcessingService

void stop();

void stopExceptionally(Throwable error);

void clear();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import io.trino.plugin.adb.connector.protocol.gpfdist.metadata.ContextId;
import io.trino.plugin.adb.connector.protocol.gpfdist.metadata.GpfdistUnloadMetadata;
import io.trino.plugin.adb.connector.protocol.gpfdist.unload.process.GpfdistBufferedRowProcessingService;
import io.trino.plugin.adb.connector.protocol.gpfdist.unload.process.GpfdistSegmentRequestProcessor;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class ReadContext
Expand Down Expand Up @@ -63,11 +61,6 @@ public GpfdistUnloadMetadata getMetadata()
return metadata;
}

public Map<Integer, GpfdistSegmentRequestProcessor> getSegmentDataProcessors()
{
return rowProcessingService.getSegmentDataProcessors();
}

public RowDecoder<String> getRowDecoder()
{
return rowDecoder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import io.trino.plugin.adb.AdbPluginConfig;
import io.trino.plugin.adb.connector.protocol.gpfdist.ConnectorRow;
import io.trino.plugin.adb.connector.protocol.gpfdist.unload.RowProcessingService;
import io.trino.plugin.adb.connector.protocol.gpfdist.unload.SegmentRequestStatus;

import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -34,6 +34,7 @@ public class GpfdistBufferedRowProcessingService
private final Queue<ConnectorRow> rowsQueue = new LinkedList<>();
private final AtomicLong usedBufferedRowsMemory = new AtomicLong(0);
private final AtomicBoolean isStopped = new AtomicBoolean(false);
private final AtomicReference<Throwable> processingError = new AtomicReference<>();
private final Map<Integer, GpfdistSegmentRequestProcessor> segmentDataProcessors = new ConcurrentHashMap<>();
private final Condition isFullCondition;
private final Condition isReadyForTransferCondition;
Expand Down Expand Up @@ -109,44 +110,48 @@ public void stop()
{
lock.lock();
try {
isStopped.set(true);
isReadyForTransferCondition.signalAll();
stopInternal();
}
finally {
lock.unlock();
}
}

@Override
public void clear()
public void stopExceptionally(Throwable error)
{
lock.lock();
try {
segmentDataProcessors.clear();
rowsQueue.clear();
stopInternal();
processingError.set(error);
}
finally {
lock.unlock();
}
}

private boolean isNotReady()
private void stopInternal()
{
return isDataTransferNotInitialized() || isNotAllDataProcessed();
isStopped.set(true);
isReadyForTransferCondition.signalAll();
}

private boolean isDataTransferNotInitialized()
@Override
public void clear()
{
//checked that insert adb query is not finished and there are no segment processors yet
return !isStopped.get() && segmentDataProcessors.isEmpty();
lock.lock();
try {
segmentDataProcessors.clear();
rowsQueue.clear();
}
finally {
lock.unlock();
}
}

private boolean isNotAllDataProcessed()
private boolean isNotReady()
{
//check that there are no rows in queue and segment processors are not finished yet
return isEmpty()
&& segmentDataProcessors.values().stream()
.anyMatch(req -> req.getStatus() != SegmentRequestStatus.FINISHED);
return !isStopped.get() && (segmentDataProcessors.isEmpty() || isEmpty());
}

private boolean isFull()
Expand All @@ -168,4 +173,9 @@ public Map<Integer, GpfdistSegmentRequestProcessor> getSegmentDataProcessors()
{
return segmentDataProcessors;
}

public AtomicReference<Throwable> getProcessingError()
{
return processingError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public boolean advanceNextPosition()
}
currentRow = rowProcessingService.take();

if (queryExecutionException != null) {
throw queryExecutionException;
if (rowProcessingService.getProcessingError().get() != null) {
throw rowProcessingService.getProcessingError().get();
}

if (currentRow != null) {
Expand All @@ -88,8 +88,7 @@ public boolean advanceNextPosition()
else {
log.info("Data processing is finished. Unloaded rows %s. Processing result: %s",
unloadedRows,
readContext.getSegmentDataProcessors().values());
closeCtx();
readContext.getRowProcessingService().getSegmentDataProcessors().values());
return false;
}
}
Expand All @@ -99,7 +98,7 @@ public boolean advanceNextPosition()
}
log.warn("Processed rows to target: %d. Unloading from adb result: %s",
unloadedRows,
readContext.getSegmentDataProcessors().values());
readContext.getRowProcessingService().getSegmentDataProcessors().values());
closeCtx();
throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR,
"Failed to unload data from adb: " + e.getMessage(), e);
Expand All @@ -113,11 +112,14 @@ public boolean advanceNextPosition()
private CompletableFuture<Void> executeTransferDataQuery()
{
return dataTransferQueryExecutor.execute().whenComplete((_, error) -> {
readContext.getRowProcessingService().stop();
log.debug("Data transfer query finished for contextId: %s", readContext.getId());
if (error != null) {
queryExecutionException = error;
readContext.getRowProcessingService().stopExceptionally(error);
throw new CompletionException(error);
}
else {
readContext.getRowProcessingService().stop();
}
});
}

Expand Down
Loading

0 comments on commit 3470969

Please sign in to comment.