Skip to content

Commit

Permalink
ADH-5240
Browse files Browse the repository at this point in the history
- refactored page process provider
  • Loading branch information
VitekArkhipov committed Nov 29, 2024
1 parent d591a02 commit 4281e92
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.adb.connector.protocol.gpfdist.load.process;
package io.trino.plugin.adb.connector.protocol.gpfdist.load;

import io.trino.plugin.adb.connector.protocol.gpfdist.load.PageProcessor;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Queue;

public interface PageProcessorProvider
{
void add(PageProcessor processor);

PageProcessor take();

ConcurrentLinkedQueue<PageProcessor> getAll();
Queue<PageProcessor> getAll();

void clear();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.trino.plugin.adb.connector.protocol.gpfdist.metadata.ContextId;
import io.trino.plugin.adb.connector.protocol.gpfdist.metadata.GpfdistLoadMetadata;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -110,7 +110,7 @@ public GpfdistPageProcessorProvider getPageProcessorProvider()
@Override
public void close()
{
ConcurrentLinkedQueue<PageProcessor> pageProcessors = pageProcessorProvider.getAll();
Queue<PageProcessor> pageProcessors = pageProcessorProvider.getAll();
StringBuilder sb = new StringBuilder();
pageProcessors.forEach(processor -> {
try {
Expand All @@ -120,7 +120,7 @@ public void close()
sb.append(format("Failed to stop page processor %s. Error: %s;", processor, e.getMessage()));
}
});
pageProcessors.clear();
pageProcessorProvider.clear();
if (!sb.isEmpty()) {
throw new RuntimeException(sb.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package io.trino.plugin.adb.connector.protocol.gpfdist.load.process;

import io.trino.plugin.adb.connector.protocol.gpfdist.load.PageProcessor;
import io.trino.plugin.adb.connector.protocol.gpfdist.load.PageProcessorProvider;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -26,7 +28,7 @@ public class GpfdistPageProcessorProvider
implements PageProcessorProvider
{
private static final long ADB_SEGMENT_WAIT_TIMEOUT = 60000L;
private final ConcurrentLinkedQueue<PageProcessor> pageProcessors = new ConcurrentLinkedQueue<>();
private final Queue<PageProcessor> pageProcessors = new LinkedList<>();
private final AtomicBoolean isReadyForProcessing = new AtomicBoolean(false);
private final ReentrantLock lock = new ReentrantLock();
private final Condition isReadyForProcessingCondition = lock.newCondition();
Expand Down Expand Up @@ -80,8 +82,20 @@ public PageProcessor take()
}

@Override
public ConcurrentLinkedQueue<PageProcessor> getAll()
public Queue<PageProcessor> getAll()
{
return pageProcessors;
}

@Override
public void clear()
{
lock.lock();
try {
pageProcessors.clear();
}
finally {
lock.unlock();
}
}
}

0 comments on commit 4281e92

Please sign in to comment.