forked from bcdev/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stack-optimized-operator-executor.patch
255 lines (241 loc) · 11.9 KB
/
stack-optimized-operator-executor.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
Index: beam-gpf/src/main/java/org/esa/beam/gpf/operators/standard/WriteOp.java
===================================================================
--- beam-gpf/src/main/java/org/esa/beam/gpf/operators/standard/WriteOp.java (revision 53df621b85ed5ed4fa66c6e59d2532e2cf0029fb)
+++ beam-gpf/src/main/java/org/esa/beam/gpf/operators/standard/WriteOp.java (revision )
@@ -201,7 +201,7 @@
getLogger().info("Start writing product " + getTargetProduct().getName() + " to " + getFile());
OperatorExecutor operatorExecutor = OperatorExecutor.create(this);
try {
- operatorExecutor.execute(ExecutionOrder.ROW_BAND_COLUMN, pm);
+ operatorExecutor.execute(ExecutionOrder.SCHEDULE_ROW_COLUMN_BAND, pm);
getLogger().info("End writing product " + getTargetProduct().getName() + " to " + getFile());
Index: beam-gpf/src/test/java/org/esa/beam/framework/gpf/internal/OperatorExecutorTest.java
===================================================================
--- beam-gpf/src/test/java/org/esa/beam/framework/gpf/internal/OperatorExecutorTest.java (revision 53df621b85ed5ed4fa66c6e59d2532e2cf0029fb)
+++ beam-gpf/src/test/java/org/esa/beam/framework/gpf/internal/OperatorExecutorTest.java (revision )
@@ -234,19 +234,16 @@
sourceProduct.setPreferredTileSize(50, 50);
Operator op = new TestOP(sourceProduct);
OperatorExecutor operatorExecutor = OperatorExecutor.create(op);
- operatorExecutor.execute(ExecutionOrder.ROW_COLUMN_BAND, ProgressMonitor.NULL);
+ System.setProperty("beam.gpf.executionOrder", "SCHEDULE_ROW_COLUMN_BAND");
+ operatorExecutor.execute(ExecutionOrder.PULL_ROW_BAND_COLUMN, ProgressMonitor.NULL);
- assertEquals(17, recordingTileScheduler.recordedCalls.size());
+ assertEquals(13, recordingTileScheduler.recordedCalls.size());
- assertEquals(8, recordingTileScheduler.requestedTileIndices.size());
+ assertEquals(4, recordingTileScheduler.requestedTileIndices.size());
assertEquals(new Point(0, 0), recordingTileScheduler.requestedTileIndices.get(0));
- assertEquals(new Point(0, 0), recordingTileScheduler.requestedTileIndices.get(1));
- assertEquals(new Point(1, 0), recordingTileScheduler.requestedTileIndices.get(2));
- assertEquals(new Point(1, 0), recordingTileScheduler.requestedTileIndices.get(3));
- assertEquals(new Point(0, 1), recordingTileScheduler.requestedTileIndices.get(4));
- assertEquals(new Point(0, 1), recordingTileScheduler.requestedTileIndices.get(5));
- assertEquals(new Point(1, 1), recordingTileScheduler.requestedTileIndices.get(6));
- assertEquals(new Point(1, 1), recordingTileScheduler.requestedTileIndices.get(7));
+ assertEquals(new Point(1, 0), recordingTileScheduler.requestedTileIndices.get(1));
+ assertEquals(new Point(0, 1), recordingTileScheduler.requestedTileIndices.get(2));
+ assertEquals(new Point(1, 1), recordingTileScheduler.requestedTileIndices.get(3));
}
private Product createSourceProduct() {
Index: beam-gpf/src/main/java/org/esa/beam/framework/gpf/internal/OperatorExecutor.java
===================================================================
--- beam-gpf/src/main/java/org/esa/beam/framework/gpf/internal/OperatorExecutor.java (revision 53df621b85ed5ed4fa66c6e59d2532e2cf0029fb)
+++ beam-gpf/src/main/java/org/esa/beam/framework/gpf/internal/OperatorExecutor.java (revision )
@@ -44,7 +44,7 @@
* target product of the given operator have. The computation of these tiles is
* parallelized to use all available CPUs (cores) using the JAI
* {@link TileScheduler}.
- *
+ *
* @author Marco Zuehlke
* @since BEAM 4.7
*/
@@ -66,15 +66,19 @@
}
public enum ExecutionOrder {
- ROW_COLUMN_BAND,
- ROW_BAND_COLUMN,
+ SCHEDULE_ROW_COLUMN_BAND,
+ SCHEDULE_ROW_BAND_COLUMN,
/**
* Minimize disk seeks if following conditions are met:<br/>
* 1. Bands can be computed independently of each other<br/>
* 2. I/O-bound processing (time to compute band pixels will less than
* time for I/O).<br/>
*/
- BAND_ROW_COLUMN,
+ SCHEDULE_BAND_ROW_COLUMN,
+ /**
+ * for debugging purpose
+ */
+ PULL_ROW_BAND_COLUMN,
}
private final int tileCountX;
@@ -87,7 +91,7 @@
public OperatorExecutor(PlanarImage[] images, int tileCountX, int tileCountY) {
this(images, tileCountX, tileCountY, JAI.getDefaultInstance().getTileScheduler().getParallelism());
}
-
+
public OperatorExecutor(PlanarImage[] images, int tileCountX, int tileCountY, int parallelism) {
this.images = images;
this.tileCountX = tileCountX;
@@ -97,27 +101,29 @@
}
public void execute(ProgressMonitor pm) {
- execute(ExecutionOrder.ROW_BAND_COLUMN, pm);
+ execute(ExecutionOrder.SCHEDULE_ROW_BAND_COLUMN, pm);
}
public void execute(ExecutionOrder executionOrder, ProgressMonitor pm) {
final Semaphore semaphore = new Semaphore(parallelism, true);
final TileComputationListener tcl = new OperatorTileComputationListener(semaphore);
- final TileComputationListener[] listeners = new TileComputationListener[] { tcl };
+ final TileComputationListener[] listeners = new TileComputationListener[]{tcl};
-
+
ImagingListener imagingListener = JAI.getDefaultInstance().getImagingListener();
JAI.getDefaultInstance().setImagingListener(new GPFImagingListener());
pm.beginTask("Executing operator...", tileCountX * tileCountY * images.length);
-
+
+ ExecutionOrder effectiveExecutionOrder = getEffectiveExecutionOrder(executionOrder);
+
try {
- if (executionOrder == ExecutionOrder.ROW_BAND_COLUMN) {
- // for debugging purpose
- // executeRowBandColumn(pm);
+ if (effectiveExecutionOrder == ExecutionOrder.SCHEDULE_ROW_BAND_COLUMN) {
scheduleRowBandColumn(semaphore, listeners, pm);
- } else if (executionOrder == ExecutionOrder.ROW_COLUMN_BAND) {
+ } else if (effectiveExecutionOrder == ExecutionOrder.SCHEDULE_ROW_COLUMN_BAND) {
scheduleRowColumnBand(semaphore, listeners, pm);
- } else if (executionOrder == ExecutionOrder.BAND_ROW_COLUMN) {
+ } else if (effectiveExecutionOrder == ExecutionOrder.SCHEDULE_BAND_ROW_COLUMN) {
scheduleBandRowColumn(semaphore, listeners, pm);
+ } else if (effectiveExecutionOrder == ExecutionOrder.PULL_ROW_BAND_COLUMN) {
+ executeRowBandColumn(pm);
} else {
throw new IllegalArgumentException("executionOrder");
}
@@ -132,6 +138,18 @@
}
}
+ private ExecutionOrder getEffectiveExecutionOrder(ExecutionOrder executionOrder) {
+ ExecutionOrder effectiveExecutionOrder = executionOrder;
+ String executionOrderProperty = System.getProperty("beam.gpf.executionOrder");
+ if (executionOrderProperty != null) {
+ effectiveExecutionOrder = ExecutionOrder.valueOf(executionOrderProperty);
+ }
+ if (effectiveExecutionOrder != executionOrder) {
+ BeamLogManager.getSystemLogger().info("Changing execution order from " + executionOrder + " to " + effectiveExecutionOrder);
+ }
+ return effectiveExecutionOrder;
+ }
+
private void scheduleBandRowColumn(Semaphore semaphore, TileComputationListener[] listeners, ProgressMonitor pm) {
for (final PlanarImage image : images) {
for (int tileY = 0; tileY < tileCountY; tileY++) {
@@ -155,12 +173,28 @@
}
private void scheduleRowColumnBand(Semaphore semaphore, TileComputationListener[] listeners, ProgressMonitor pm) {
+ //better handle stack operators, should equal well work for normal operators
+ final TileComputationListener tcl = new OperatorTileComputationListenerStack(semaphore, images);
+ listeners = new TileComputationListener[]{tcl};
+
for (int tileY = 0; tileY < tileCountY; tileY++) {
for (int tileX = 0; tileX < tileCountX; tileX++) {
- BeamLogManager.getSystemLogger().info("Scheduling tile column " + tileX + ", row " + tileY);
- for (final PlanarImage image : images) {
- scheduleTile(image, tileX, tileY, semaphore, listeners, pm);
+ BeamLogManager.getSystemLogger().info("Scheduling tile x=" + tileX + " y=" + tileY);
+ checkForCancelation(pm);
+ acquirePermits(semaphore, 1);
+ if (error != null) {
+ semaphore.release(parallelism);
+ throw error;
}
+ Point[] points = new Point[]{new Point(tileX, tileY)};
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Note: GPF pull-processing is triggered here!!!
+ //
+ tileScheduler.scheduleTiles(images[0], points, listeners);
+ //
+ /////////////////////////////////////////////////////////////////////
+ pm.worked(1);
}
}
}
@@ -173,7 +207,7 @@
semaphore.release(parallelism);
throw error;
}
- Point[] points = new Point[] { new Point(tileX, tileY) };
+ Point[] points = new Point[]{new Point(tileX, tileY)};
/////////////////////////////////////////////////////////////////////
//
// Note: GPF pull-processing is triggered here!!!
@@ -243,6 +277,45 @@
}
}
+ private class OperatorTileComputationListenerStack implements TileComputationListener {
+
+ private final Semaphore semaphore;
+ private final PlanarImage[] images;
+
+ OperatorTileComputationListenerStack(Semaphore semaphore, PlanarImage[] images) {
+ this.semaphore = semaphore;
+ this.images = images;
+ }
+
+ @Override
+ public void tileComputed(Object eventSource, TileRequest[] requests, PlanarImage image, int tileX, int tileY,
+ Raster raster) {
+ for (PlanarImage planarImage : images) {
+ if (image != planarImage) {
+ planarImage.getTile(tileX, tileY);
+ }
+ }
+ semaphore.release();
+ }
+
+ @Override
+ public void tileCancelled(Object eventSource, TileRequest[] requests, PlanarImage image, int tileX, int tileY) {
+ if (error == null) {
+ error = new OperatorException("Operation cancelled.");
+ }
+ semaphore.release(parallelism);
+ }
+
+ @Override
+ public void tileComputationFailure(Object eventSource, TileRequest[] requests, PlanarImage image, int tileX,
+ int tileY, Throwable situation) {
+ if (error == null) {
+ error = new OperatorException("Operation failed.", situation);
+ }
+ semaphore.release(parallelism);
+ }
+ }
+
private class OperatorTileComputationListener implements TileComputationListener {
private final Semaphore semaphore;
@@ -274,17 +347,17 @@
semaphore.release(parallelism);
}
}
-
+
private class GPFImagingListener implements ImagingListener {
@Override
public boolean errorOccurred(String message, Throwable thrown, Object where, boolean isRetryable)
- throws RuntimeException {
+ throws RuntimeException {
if (error == null && !thrown.getClass().getSimpleName().equals("MediaLibLoadException")) {
error = new OperatorException(thrown);
}
return false;
}
}
-
+
}