Skip to content

Commit

Permalink
TheHortonMachine#36: SMP support and framework (http://github.com/moo…
Browse files Browse the repository at this point in the history
…vida/jgrasstools/issues/36)

NEW: BlockingExecutorService

  - ommit polling after task rejection
  - allow any (unbound) ExecutorService to be used as delegate
  • Loading branch information
fb71 committed Mar 13, 2017
1 parent 1d317d2 commit f8ea4e4
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.awt.image.WritableRaster;

import javax.media.jai.iterator.RandomIter;
import javax.media.jai.iterator.RandomIterFactory;
import javax.media.jai.iterator.WritableRandomIter;

Expand All @@ -52,19 +53,16 @@
import org.jgrasstools.gears.libs.modules.FixedChunkSizePlanner;
import org.jgrasstools.gears.libs.modules.GridNode;
import org.jgrasstools.gears.libs.modules.GridNodeMultiProcessing;
import org.jgrasstools.gears.libs.monitor.DummyProgressMonitor;
import org.jgrasstools.gears.libs.monitor.PrintStreamProgressMonitor;
import org.jgrasstools.gears.utils.coverage.CoverageUtilities;
import org.jgrasstools.gears.utils.math.NumericsUtilities;
import org.jgrasstools.hortonmachine.i18n.HortonMessageHandler;

import oms3.ComponentAccess;
import oms3.annotations.Author;
import oms3.annotations.Description;
import oms3.annotations.Documentation;
import oms3.annotations.Execute;
import oms3.annotations.Finalize;
import oms3.annotations.In;
import oms3.annotations.Initialize;
import oms3.annotations.Keywords;
import oms3.annotations.Label;
import oms3.annotations.License;
Expand Down Expand Up @@ -126,6 +124,47 @@ public void process() throws Exception {
}


public void processSerial() throws Exception {
double radtodeg = doRadiants ? 1.0 : NumericsUtilities.RADTODEG;

int rows = regionMap( inElev ).getRows();
int cols = regionMap( inElev ).getCols();
double xRes = regionMap( inElev ).getXres();
double yRes = regionMap( inElev ).getYres();
WritableRaster aspectWR = CoverageUtilities.createDoubleWritableRaster( cols, rows, null, null, null );
WritableRandomIter aspectIter = RandomIterFactory.createWritable( aspectWR, null );

pm.beginTask( msg.message( "aspect.calculating" ), rows*cols );
RandomIter elevationIter = CoverageUtilities.getRandomIterator( inElev );

// Cycling into the valid region.
for (int r = 1; r < rows - 1; r++) {
for (int c = 1; c < cols - 1; c++) {
GridNode node = new GridNode( elevationIter, cols, rows, xRes, yRes, c, r );
double aspect = calculate( node, radtodeg );
aspectIter.setSample( node.col, node.row, 0, aspect );
pm.worked( 1 );
}
}
}


public void processPlanner() throws Exception {
double radtodeg = doRadiants ? 1.0 : NumericsUtilities.RADTODEG;

int rows = regionMap( inElev ).getRows();
int cols = regionMap( inElev ).getCols();
WritableRaster aspectWR = CoverageUtilities.createDoubleWritableRaster( cols, rows, null, null, null );
WritableRandomIter aspectIter = RandomIterFactory.createWritable( aspectWR, null );

pm.beginTask( msg.message( "aspect.calculating" ), rows*cols );
processGridNodes( inElev, gridNode -> {
double aspect = calculate( gridNode, radtodeg );
aspectIter.setSample( gridNode.col, gridNode.row, 0, aspect );
});
}


/**
* Calculates the aspect in a given {@link GridNode}.
*
Expand Down Expand Up @@ -223,22 +262,31 @@ public double calculate( GridNode node, double radtodeg ) {

public static void main( String[] args ) throws Exception {
OmsAspectMP aspect = new OmsAspectMP();
aspect.pm = new DummyProgressMonitor();
//aspect.pm = new PrintStreamProgressMonitor();
//aspect.pm = new DummyProgressMonitor();
aspect.pm = new PrintStreamProgressMonitor();

ExecutionPlanner.defaultPlannerFactory = () -> new FixedChunkSizePlanner();
//ExecutionPlanner.defaultPlannerFactory = () -> new InThreadExecutionPlanner();

long start = System.currentTimeMillis();
aspect.inElev = aspect.getRaster( "/home/falko/Data/ncrast/elevation_3857.tif" );
// aspect.inElev = aspect.getRaster( "/home/falko/Data/ncrast/elevation.tif" );
aspect.inElev = aspect.getRaster( "/home/falko/Data/ncrast/DTM_calvello/dtm_all.asc" );
System.out.println( "inElev: " + aspect.inElev );
long start = System.currentTimeMillis();

// // serial
// start = System.currentTimeMillis();
// aspect.processSerial();
// System.out.println( "Serial: " + (System.currentTimeMillis()-start) + "ms" );

// FixedChunkSizePlanner
ExecutionPlanner.defaultPlannerFactory = () -> new FixedChunkSizePlanner();
start = System.currentTimeMillis();
aspect.processPlanner();
System.out.println( "FixedChunkSizePlanner: " + (System.currentTimeMillis()-start) + "ms" );

ComponentAccess.callAnnotated( aspect, Initialize.class, true );
ComponentAccess.callAnnotated( aspect, Execute.class, false );
ComponentAccess.callAnnotated( aspect, Finalize.class, true );
// // InThreadExecutionPlanner
// ExecutionPlanner.defaultPlannerFactory = () -> new InThreadExecutionPlanner();
// start = System.currentTimeMillis();
// aspect.processPlanner();
// System.out.println( "" + (System.currentTimeMillis()-start) + "ms" );

System.out.println( "" + (System.currentTimeMillis()-start) + "ms" );
System.out.println( "outAspect: " + aspect.outAspect );
System.exit( 0 );
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* polymap.org
* Copyright (C) 2017, the @authors. All rights reserved.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 3.0 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*/
package org.jgrasstools.gears.libs.modules;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Submits up to a maximum count tasks to a delegate {@link ExecutorService}. Blocks
* submitting thread if it thries to submit more than the maximum number of tasks.
*
* @author Falko Bräutigam
*/
public class BlockingExecutorService
implements ExecutorService {

private ExecutorService delegate;

private Semaphore taskCount;

/**
*
*
* @param delegate
* @param maxTaskCount The maximum number of tasks to submit to the delegate.
*/
public BlockingExecutorService( ExecutorService delegate, int maxTaskCount ) {
this.delegate = delegate;
this.taskCount = new Semaphore( maxTaskCount );
}

protected void beforeSubmit() {
try {
taskCount.acquire();
}
catch (InterruptedException e) {
throw new RuntimeException( e );
}
}

@Override
public void execute( Runnable command ) {
beforeSubmit();
delegate.execute( () -> {
try {
command.run();
}
finally {
taskCount.release();
}
});
}

@Override
public <T> Future<T> submit( Callable<T> task ) {
beforeSubmit();
return delegate.submit( () -> {
try {
return task.call();
}
finally {
taskCount.release();
}
});
}

@Override
public <T> Future<T> submit( Runnable task, T result ) {
beforeSubmit();
return delegate.submit( () -> {
try {
task.run();
}
finally {
taskCount.release();
}
}, result );
}

@Override
public Future<?> submit( Runnable task ) {
beforeSubmit();
return delegate.submit( () -> {
try {
task.run();
}
finally {
taskCount.release();
}
});
}

@Override
public void shutdown() {
delegate.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate.isShutdown();
}

@Override
public boolean isTerminated() {
return delegate.isTerminated();
}

@Override
public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException {
return delegate.awaitTermination( timeout, unit );
}

@Override
public <T> List<Future<T>> invokeAll( Collection<? extends Callable<T>> tasks ) throws InterruptedException {
// XXX Auto-generated method stub
throw new RuntimeException( "not yet implemented." );
}

@Override
public <T> List<Future<T>> invokeAll( Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit )
throws InterruptedException {
// XXX Auto-generated method stub
throw new RuntimeException( "not yet implemented." );
}

@Override
public <T> T invokeAny( Collection<? extends Callable<T>> tasks ) throws InterruptedException, ExecutionException {
// XXX Auto-generated method stub
throw new RuntimeException( "not yet implemented." );
}

@Override
public <T> T invokeAny( Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit )
throws InterruptedException, ExecutionException, TimeoutException {
// XXX Auto-generated method stub
throw new RuntimeException( "not yet implemented." );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.jgrasstools.gears.libs.modules;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -48,23 +48,26 @@ public Thread newThread( Runnable r ) {
};

int procNum = Runtime.getRuntime().availableProcessors();
defaultExecutor = new ThreadPoolExecutor( procNum, procNum, 60L, TimeUnit.SECONDS,
// keep a small number of chunks queued so that Thread.sleep() after
// refused submit has something to do
new LinkedBlockingDeque( procNum ),
ThreadPoolExecutor threadPool = new ThreadPoolExecutor( procNum, procNum, 60L, TimeUnit.SECONDS,
// with BlockingExecutorService on top we can have unbound queue
new LinkedTransferQueue(),
// new LinkedBlockingDeque( procNum ),
// new SynchronousQueue(),
// new ArrayBlockingQueue( procNum ) );
threadFactory );

defaultExecutor = new BlockingExecutorService( threadPool, procNum );
}


/**
* The default {@link ExecutorService} to be used by all planners.
* <p/>
* Set this to change the default. The {@link ExecutorService} must not have an
* unbound queue or an unlimited number of threads. In other words, the executor
* has to refuse submits when system resources are running out.
*/
public static ExecutorService defaultExecutor;
public static final BlockingExecutorService defaultExecutor;

/**
* Set this to change the default planner for all modules.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;

/**
*
Expand All @@ -31,7 +30,7 @@ public class FixedChunkSizePlanner
/**
* The absolut upper limit of the chunk size.
*/
public static final int MAX_CHUNK_SIZE = 100000;
public static final int MAX_CHUNK_SIZE = 10000;

private int targetChunkSize = -1;

Expand Down Expand Up @@ -83,13 +82,8 @@ protected void submitChunk( List<MultiProcessingTask> chunk ) {
// submit
boolean success = false;
for (int waitMillis=10; !success; waitMillis=Math.min( 100, waitMillis*2 ) ) {
try {
success = submitted.add( defaultExecutor.submit( work ) );
}
catch (RejectedExecutionException e) {
// System.out.println( "waiting " + waitMillis + "ms ..." );
try { Thread.sleep( waitMillis ); } catch (InterruptedException e1) {}
}
// System.out.println( Thread.currentThread().getName() + ": " + taskCount.availablePermits() );
success = submitted.add( defaultExecutor.submit( work ) );
}
}

Expand Down

0 comments on commit f8ea4e4

Please sign in to comment.