@@ -182,19 +182,25 @@ def register
182
182
183
183
public
184
184
def run ( logstash_queue )
185
- $exit = false
185
+ $exit = false ;
186
+ $logstash_queue = logstash_queue
186
187
run_with_catch ( logstash_queue )
187
188
end
188
189
189
190
public
190
191
def stop
191
192
$exit = true
192
- exit_threads
193
+
194
+ until @scan_queue . empty?
195
+ end
196
+
193
197
until @queue . empty?
194
198
@logger . info ( "Flushing rest of events in logstash queue" )
195
199
event = @queue . pop ( )
196
- queue_event ( @parser . parse_stream ( event ) , logstash_queue , @host )
200
+ queue_event ( @parser . parse_stream ( event ) , $ logstash_queue, @host )
197
201
202
+
203
+ exit_threads
198
204
end
199
205
200
206
# Starts KCL app in a background thread
@@ -261,7 +267,7 @@ def setup_stream
261
267
262
268
kcl_config = KCL ::KinesisClientLibConfiguration . new ( @checkpointer , stream_arn , @credentials , worker_id ) \
263
269
. withInitialPositionInStream ( KCL ::InitialPositionInStream ::TRIM_HORIZON )
264
- cloudwatch_client = nil
270
+ cloudwatch_client = nil
265
271
if @publish_metrics
266
272
cloudwatch_client = CloudWatch ::AmazonCloudWatchClient . new ( @credentials )
267
273
else
@@ -280,10 +286,10 @@ def scan(logstash_queue)
280
286
@connector = DynamoDBBootstrap ::DynamoDBBootstrapWorker . new ( @dynamodb_client , @read_ops , @table_name , @number_of_scan_threads )
281
287
start_table_copy_thread
282
288
283
- scan_queue = @logstash_writer . getQueue ( )
284
- while !$exit
285
- if !scan_queue . empty?
286
- event = scan_queue . take ( )
289
+ @ scan_queue = @logstash_writer . getQueue ( )
290
+ while true
291
+ if !@ scan_queue. empty?
292
+ event = @ scan_queue. take ( )
287
293
if event . getEntry ( ) . nil? and event . getSize ( ) == -1
288
294
break
289
295
end # if event.isEmpty()
@@ -311,7 +317,9 @@ def stream(logstash_queue)
311
317
312
318
private
313
319
def exit_threads
314
- @worker . shutdown ( )
320
+ unless @worker . nil?
321
+ @worker . shutdown ( )
322
+ end # unless @worker.nil?
315
323
316
324
unless @dynamodb_scan_thread . nil?
317
325
@dynamodb_scan_thread . exit
0 commit comments