-
Notifications
You must be signed in to change notification settings - Fork 0
/
BasicWorkflowWorker.php
324 lines (282 loc) · 13.5 KB
/
BasicWorkflowWorker.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
<?php
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
require_once dirname(dirname(dirname(dirname(__FILE__)))) . DIRECTORY_SEPARATOR . 'sdk.class.php';
require_once 'HistoryEventIterator.php';
/*
* A decider can be written by modeling the workflow as a state machine.
* For complex workflows, this is the easiest model to use.
*
* The decider reads the history to figure out which state the workflow is currently in,
* and makes a decision based on the current state.
*
* This implementation of the decider ignores activity failures.
* You can handle them by adding more states.
* This decider also only supports having a single activity open at a time.
*/
abstract class BasicWorkflowWorkerStates {
// A new workflow is in this state
const START = 0;
// If a timer is open, and not an activity.
const TIMER_OPEN = 1;
// If an activity is open, and not a timer.
const ACTIVITY_OPEN = 2;
// If both a timer and an activity are open.
const TIMER_AND_ACTIVITY_OPEN = 3;
// Nothing is open.
const NOTHING_OPEN = 4;
}
/*
* At some point it makes sense to separate polling logic and worker logic, but we've left
* them together here for simplicity.
*/
class BasicWorkflowWorker {
const DEBUG = false;
const WORKFLOW_NAME = "myWorkflowName";
const WORKFLOW_VERSION = "myWorkflowVersion";
const ACTIVITY_NAME_KEY = 'activityName';
const ACTIVITY_VERSION_KEY = 'activityVersion';
const ACTIVITY_TASK_LIST_KEY = 'activityTaskList';
const ACTIVITY_INPUT_KEY = 'activityInput';
const TIMER_DURATION_KEY = 'timerDuration';
// If you increase this value, you should also
// increase your workflow execution timeout accordingly so that a
// new generation is started before the workflow times out.
const EVENT_THRESHOLD_BEFORE_NEW_GENERATION = 150;
protected $swf;
protected $domain;
protected $task_list;
public function __construct(AmazonSWF $swf_service, $domain, $task_list) {
$this->domain = $domain;
$this->task_list = $task_list;
$this->swf = $swf_service;
}
public function start() {
$this->_poll();
}
protected function _poll() {
while (true) {
$opts = array(
'domain' => $this->domain,
'taskList' => array(
'name' => $this->task_list
)
);
$response = $this->swf->poll_for_decision_task($opts);
if ($response->isOK()) {
$task_token = (string) $response->body->taskToken;
if (!empty($task_token)) {
if (self::DEBUG) {
echo "Got history; handing to decider\n";
}
$history = $response->body->events();
try {
$decision_list = self::_decide(new HistoryEventIterator($this->swf, $opts, $response));
} catch (Exception $e) {
// If failed decisions are recoverable, one could drop the task and allow it to be redriven by the task timeout.
echo 'Failing workflow; exception in decider: ', $e->getMessage(), "\n", $e->getTraceAsString(), "\n";
$decision_list = array(
wrap_decision_opts_as_decision('FailWorkflowExecution', array(
'reason' => substr('Exception in decider: ' . $e->getMessage(), 0, 256),
'details' => substr($e->getTraceAsString(), 0, 32768)
))
);
}
if (self::DEBUG) {
echo 'Responding with decisions: ';
print_r($decision_list);
}
$complete_opt = array(
'taskToken' => $task_token,
'decisions'=> $decision_list
);
$complete_response = $this->swf->respond_decision_task_completed($complete_opt);
if ($complete_response->isOK()) {
echo "RespondDecisionTaskCompleted SUCCESS\n";
} else {
// a real application may want to report this failure and retry
echo "RespondDecisionTaskCompleted FAIL\n";
echo "Response body: \n";
print_r($complete_response->body);
echo "Request JSON: \n";
echo json_encode($complete_opt) . "\n";
}
} else {
echo "PollForDecisionTask received empty response\n";
}
} else {
echo 'ERROR: ';
print_r($response->body);
sleep(2);
}
}
}
/**
* A decider inspects the history of a workflow and then schedules more tasks based on the current state of
* the workflow.
*/
protected static function _decide($history) {
$workflow_state = BasicWorkflowWorkerStates::START;
$timer_opts = null;
$activity_opts = null;
$continue_as_new_opts = null;
$max_event_id = 0;
foreach ($history as $event) {
self::_process_event($event, $workflow_state, $timer_opts, $activity_opts, $continue_as_new_opts, $max_event_id);
}
$timer_decision = wrap_decision_opts_as_decision('StartTimer', $timer_opts);
$activity_decision = wrap_decision_opts_as_decision('ScheduleActivityTask', $activity_opts);
$continue_as_new_decision = wrap_decision_opts_as_decision('ContinueAsNewWorkflowExecution', $continue_as_new_opts);
if ($workflow_state === BasicWorkflowWorkerStates::START) {
return array(
$timer_decision
);
} else if ($workflow_state === BasicWorkflowWorkerStates::NOTHING_OPEN) {
if ($max_event_id >= BasicWorkflowWorker::EVENT_THRESHOLD_BEFORE_NEW_GENERATION) {
return array(
$continue_as_new_decision
);
} else {
return array(
$timer_decision,
$activity_decision
);
}
} else {
return array();
}
}
/*
* By reading events in the history, we can determine which state the workflow is in.
* And then, based on the current state of the workflow, the decider knows what should happen next.
*/
protected static function _process_event($event, &$workflow_state, &$timer_opts, &$activity_opts, &$continue_as_new_opts, &$max_event_id) {
$event_type = (string) $event->eventType;
$max_event_id = max($max_event_id, intval($event->eventId));
if (BasicWorkflowWorker::DEBUG) {
echo "event type: $event_type\n";
print_r($event);
}
switch ($event_type) {
case 'TimerStarted':
if ($workflow_state === BasicWorkflowWorkerStates::NOTHING_OPEN ||
$workflow_state === BasicWorkflowWorkerStates::START) {
$workflow_state = BasicWorkflowWorkerStates::TIMER_OPEN;
} else if ($workflow_state === BasicWorkflowWorkerStates::ACTIVITY_OPEN) {
$workflow_state = BasicWorkflowWorkerStates::TIMER_AND_ACTIVITY_OPEN;
}
break;
case 'TimerFired':
if ($workflow_state === BasicWorkflowWorkerStates::TIMER_OPEN) {
$workflow_state = BasicWorkflowWorkerStates::NOTHING_OPEN;
} else if ($workflow_state === BasicWorkflowWorkerStates::TIMER_AND_ACTIVITY_OPEN) {
$workflow_state = BasicWorkflowWorkerStates::ACTIVITY_OPEN;
}
break;
case 'ActivityTaskScheduled':
if ($workflow_state === BasicWorkflowWorkerStates::NOTHING_OPEN) {
$workflow_state = BasicWorkflowWorkerStates::ACTIVITY_OPEN;
} else if ($workflow_state === BasicWorkflowWorkerStates::TIMER_OPEN) {
$workflow_state = BasicWorkflowWorkerStates::TIMER_AND_ACTIVITY_OPEN;
}
break;
case 'ActivityTaskCanceled':
// add cancellation handling here
case 'ActivityTaskFailed':
// add failure handling here
// when an activity fails, a real application may want to retry it or report the incident
case 'ActivityTaskTimedOut':
// add timeout handling here
// when an activity times out, a real application may want to retry it or report the incident
case 'ActivityTaskCompleted':
if ($workflow_state === BasicWorkflowWorkerStates::ACTIVITY_OPEN) {
$workflow_state = BasicWorkflowWorkerStates::NOTHING_OPEN;
} else if ($workflow_state === BasicWorkflowWorkerStates::TIMER_AND_ACTIVITY_OPEN) {
$workflow_state = BasicWorkflowWorkerStates::TIMER_OPEN;
}
break;
// This is the only case which doesn't only transition state;
// it also gathers the user's workflow input.
case 'WorkflowExecutionStarted':
$workflow_state = BasicWorkflowWorkerStates::START;
// gather gather gather
$event_attributes = $event->workflowExecutionStartedEventAttributes;
$workflow_input = json_decode($event_attributes->input, true);
if (BasicWorkflowWorker::DEBUG) {
echo 'Workflow input: ';
print_r($workflow_input);
}
$activity_opts = BasicWorkflowWorker::create_activity_opts_from_workflow_input($workflow_input);
$timer_opts = BasicWorkflowWorker::create_timer_opts_from_workflow_input($workflow_input);
$continue_as_new_opts = BasicWorkflowWorker::create_continue_as_new_opts_from_workflow_start($event_attributes);
break;
}
}
public static function create_activity_opts_from_workflow_input($input) {
$activity_name = $input[BasicWorkflowWorker::ACTIVITY_NAME_KEY];
$activity_version = $input[BasicWorkflowWorker::ACTIVITY_VERSION_KEY];
$activity_task_list = $input[BasicWorkflowWorker::ACTIVITY_TASK_LIST_KEY];
$activity_input = $input[BasicWorkflowWorker::ACTIVITY_INPUT_KEY];
$activity_opts = array(
'activityType' => array(
'name' => $activity_name,
'version' => $activity_version
),
'activityId' => 'myActivityId-' . time(),
'input' => $activity_input,
// This is what specifying a task list at scheduling time looks like.
// You can also register a type with a default task list and not specify one at scheduling time.
// The value provided at scheduling time always takes precedence.
'taskList' => array('name' => $activity_task_list),
// This is what specifying timeouts at scheduling time looks like.
// You can also register types with default timeouts and not specify them at scheduling time.
// The value provided at scheduling time always takes precedence.
'scheduleToCloseTimeout' => '30',
'scheduleToStartTimeout' => '10',
'startToCloseTimeout' => '60',
'heartbeatTimeout' => 'NONE'
);
return $activity_opts;
}
public static function create_timer_opts_from_workflow_input($input) {
$timer_duration = (string) $input[BasicWorkflowWorker::TIMER_DURATION_KEY];
$timer_opts = array(
'startToFireTimeout' => $timer_duration,
'timerId' => '0'
);
return $timer_opts;
}
/*
* When you continue a workflow execution as a new workflow execution,
* the start options don't carry over, so you need to specify them again.
*/
public static function create_continue_as_new_opts_from_workflow_start($start_attributes) {
$continue_as_new_opts = array(
'childPolicy' => (string) $start_attributes->childPolicy,
'input' => (string) $start_attributes->input,
'workflowTypeVersion' => (string) $start_attributes->workflowType->version,
// This is what specifying a task list at scheduling time looks like.
// You can also register a type with a default task list and not specify one at scheduling time.
// The value provided at scheduling time always takes precedence.
'taskList' => array('name' => (string) $start_attributes->taskList->name),
// This is what specifying timeouts at scheduling time looks like.
// You can also register types with default timeouts and not specify them at scheduling time.
// The value provided at scheduling time always takes precedence.
'executionStartToCloseTimeout' => (string) $start_attributes->executionStartToCloseTimeout,
'taskStartToCloseTimeout' => (string) $start_attributes->taskStartToCloseTimeout
);
return $continue_as_new_opts;
}
}