diff --git a/lib/logstash-input-dynamodb_jars.rb b/lib/logstash-input-dynamodb_jars.rb deleted file mode 100644 index 88e19c3..0000000 --- a/lib/logstash-input-dynamodb_jars.rb +++ /dev/null @@ -1,73 +0,0 @@ -# this is a generated file, to avoid over-writing it just delete this comment -require 'jar_dependencies' - -require_jar( 'com.amazonaws', 'aws-java-sdk-elasticbeanstalk', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-ses', '1.10.11' ) -require_jar( 'log4j', 'log4j', '1.2.17' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-opsworks', '1.10.11' ) -require_jar( 'com.amazonaws', 'dynamodb-streams-kinesis-adapter', '1.0.0' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-sqs', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-emr', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-cloudformation', '1.10.11' ) -require_jar( 'com.beust', 'jcommander', '1.48' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-redshift', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-iam', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-codedeploy', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-dynamodb', '1.10.10' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-directconnect', '1.10.11' ) -require_jar( 'org.apache.httpcomponents', 'httpclient', '4.3.6' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-sns', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-directory', '1.10.11' ) -require_jar( 'com.google.protobuf', 'protobuf-java', '2.6.1' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-cloudfront', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-kinesis', '1.10.8' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-workspaces', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-swf-libraries', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-cloudhsm', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-simpledb', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-codepipeline', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-s3', '1.10.10' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-cognitoidentity', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-machinelearning', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-logs', '1.10.11' ) -require_jar( 'org.apache.commons', 'commons-lang3', '3.3.2' ) -require_jar( 'commons-codec', 'commons-codec', '1.6' ) -require_jar( 'com.fasterxml.jackson.core', 'jackson-annotations', '2.5.0' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-sts', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-route53', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-elasticloadbalancing', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-storagegateway', '1.10.11' ) -require_jar( 'org.apache.httpcomponents', 'httpcore', '4.3.3' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-efs', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-ec2', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-ssm', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-core', '1.10.10' ) -require_jar( 'com.amazonaws', 'dynamodb-import-export-tool', '1.0.0' ) -require_jar( 'commons-lang', 'commons-lang', '2.6' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-config', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-cloudtrail', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-elastictranscoder', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-codecommit', '1.10.11' ) -require_jar( 'joda-time', 'joda-time', '2.5' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-importexport', '1.10.11' ) -require_jar( 'com.fasterxml.jackson.core', 'jackson-databind', '2.5.3' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-cloudsearch', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk', '1.10.11' ) -require_jar( 'com.amazonaws', 'amazon-kinesis-client', '1.6.0' ) -require_jar( 'com.google.guava', 'guava', '15.0' ) -require_jar( 'com.fasterxml.jackson.core', 'jackson-core', '2.5.3' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-rds', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-cognitosync', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-datapipeline', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-support', '1.10.11' ) -require_jar( 'commons-logging', 'commons-logging', '1.1.3' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-cloudwatchmetrics', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-glacier', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-elasticache', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-simpleworkflow', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-lambda', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-autoscaling', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-ecs', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-devicefarm', '1.10.11' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-kms', '1.10.10' ) -require_jar( 'com.amazonaws', 'aws-java-sdk-cloudwatch', '1.10.8' ) diff --git a/lib/logstash/inputs/DynamoDBLogParser.rb b/lib/logstash/inputs/DynamoDBLogParser.rb index 48da24a..f77d345 100644 --- a/lib/logstash/inputs/DynamoDBLogParser.rb +++ b/lib/logstash/inputs/DynamoDBLogParser.rb @@ -65,13 +65,15 @@ def parse_scan(log, new_image_size) @hash_template["dynamodb"]["sequenceNumber"] = "0" @hash_template["dynamodb"]["sizeBytes"] = size_bytes @hash_template["dynamodb"]["streamViewType"] = @view_type.upcase - + @hash_template["eventParser"] = "scan" return parse_view_type(@hash_template) end public def parse_stream(log) - return parse_view_type(JSON.parse(@mapper.writeValueAsString(log))["internalObject"]) + data_hash = JSON.parse(@mapper.writeValueAsString(log))["internalObject"] + data_hash["eventParser"] = "stream" + return parse_view_type(data_hash) end private diff --git a/lib/logstash/inputs/dynamodb.rb b/lib/logstash/inputs/dynamodb.rb index 97efa15..14376f5 100644 --- a/lib/logstash/inputs/dynamodb.rb +++ b/lib/logstash/inputs/dynamodb.rb @@ -182,17 +182,26 @@ def register public def run(logstash_queue) - begin - run_with_catch(logstash_queue) - rescue LogStash::ShutdownSignal - exit_threads - until @queue.empty? - @logger.info("Flushing rest of events in logstash queue") - event = @queue.pop() - queue_event(@parser.parse_stream(event), logstash_queue, @host) - end # until !@queue.empty? - end # begin - end # def run(logstash_queue) + $exit = false; + $logstash_queue = logstash_queue + run_with_catch(logstash_queue) + end + + public + def stop + $exit = true + + until @scan_queue.empty? + end + + until @queue.empty? + @logger.info("Flushing rest of events in logstash queue") + event = @queue.pop() + queue_event(@parser.parse_stream(event), $logstash_queue, @host) + end # until !@queue.empty? + + exit_threads + end # Starts KCL app in a background thread # Starts parallel scan if need be in a background thread @@ -258,7 +267,7 @@ def setup_stream kcl_config = KCL::KinesisClientLibConfiguration.new(@checkpointer, stream_arn, @credentials, worker_id) \ .withInitialPositionInStream(KCL::InitialPositionInStream::TRIM_HORIZON) - cloudwatch_client = nil + cloudwatch_client = nil if @publish_metrics cloudwatch_client = CloudWatch::AmazonCloudWatchClient.new(@credentials) else @@ -277,13 +286,17 @@ def scan(logstash_queue) @connector = DynamoDBBootstrap::DynamoDBBootstrapWorker.new(@dynamodb_client, @read_ops, @table_name, @number_of_scan_threads) start_table_copy_thread - scan_queue = @logstash_writer.getQueue() + @scan_queue = @logstash_writer.getQueue() while true - event = scan_queue.take() - if event.getEntry().nil? and event.getSize() == -1 - break - end # if event.isEmpty() - queue_event(@parser.parse_scan(event.getEntry(), event.getSize()), logstash_queue, @host) + if !@scan_queue.empty? + event = @scan_queue.take() + if event.getEntry().nil? and event.getSize() == -1 + break + end # if event.isEmpty() + queue_event(@parser.parse_scan(event.getEntry(), event.getSize()), logstash_queue, @host) + else + sleep(0.01) + end end # while true end @@ -292,14 +305,22 @@ def stream(logstash_queue) @logger.info("Starting stream...") start_kcl_thread - while true - event = @queue.pop() - queue_event(@parser.parse_stream(event), logstash_queue, @host) + while !$exit + if !@queue.empty? + event = @queue.pop() + queue_event(@parser.parse_stream(event), logstash_queue, @host) + else + sleep(0.01) + end end # while true end private def exit_threads + unless @worker.nil? + @worker.shutdown() + end # unless @worker.nil? + unless @dynamodb_scan_thread.nil? @dynamodb_scan_thread.exit end # unless @dynamodb_scan_thread.nil? diff --git a/logstash-input-dynamodb.gemspec b/logstash-input-dynamodb.gemspec index 49ac9e9..918507e 100644 --- a/logstash-input-dynamodb.gemspec +++ b/logstash-input-dynamodb.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-input-dynamodb' - s.version = '1.0.0' + s.version = '1.0.1' s.licenses = ['Apache License (2.0)'] s.summary = "This input plugin scans a specified DynamoDB table and then reads changes to a DynamoDB table from the associated DynamoDB Stream." s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" @@ -19,82 +19,21 @@ Gem::Specification.new do |s| s.metadata = { "logstash_plugin" => "true", "logstash_group" => "input" } # Gem dependencies - s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0' - s.add_runtime_dependency "logstash-codec-json" + s.add_runtime_dependency "logstash-core", ">= 2.0.0", "< 3.0.0" + s.add_runtime_dependency 'logstash-codec-json' + s.add_runtime_dependency 'stud', '>= 0.0.22' s.add_runtime_dependency "activesupport-json_encoder" + s.add_development_dependency 'logstash-devutils', '>= 0.0.16' # Jar dependencies - s.requirements << "jar 'com.amazonaws:aws-java-sdk-elasticbeanstalk', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-ses', '1.10.11' " s.requirements << "jar 'log4j:log4j', '1.2.17'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-opsworks', '1.10.11'" - s.requirements << "jar 'com.amazonaws:dynamodb-streams-kinesis-adapter', '1.0.0'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-sqs', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-emr', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudformation', '1.10.11'" - s.requirements << "jar 'com.beust:jcommander', '1.48'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-redshift', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-iam', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-codedeploy', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-dynamodb', '1.10.10'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-directconnect', '1.10.11'" - s.requirements << "jar 'org.apache.httpcomponents:httpclient', '4.3.6'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-sns', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-directory', '1.10.11'" - s.requirements << "jar 'com.google.protobuf:protobuf-java', '2.6.1'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudfront', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-kinesis', '1.10.8'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-workspaces', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-swf-libraries', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudhsm', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-simpledb', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-codepipeline', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-s3', '1.10.10'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-cognitoidentity', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-machinelearning', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-logs', '1.10.11'" - s.requirements << "jar 'org.apache.commons:commons-lang3', '3.3.2'" - s.requirements << "jar 'commons-codec:commons-codec', '1.6'" - s.requirements << "jar 'com.fasterxml.jackson.core:jackson-annotations', '2.5.0'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-sts', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-route53', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-elasticloadbalancing', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-storagegateway', '1.10.11'" - s.requirements << "jar 'org.apache.httpcomponents:httpcore', '4.3.3'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-efs', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-ec2', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-ssm', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-core', '1.10.10'" + s.requirements << "jar 'com.amazonaws:aws-java-sdk', '1.10.67'" s.requirements << "jar 'com.amazonaws:dynamodb-import-export-tool', '1.0.0'" - s.requirements << "jar 'commons-lang:commons-lang', '2.6'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-config', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudtrail', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-elastictranscoder', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-codecommit', '1.10.11'" - s.requirements << "jar 'joda-time:joda-time', '2.5'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-importexport', '1.10.11'" - s.requirements << "jar 'com.fasterxml.jackson.core:jackson-databind', '2.5.3'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudsearch', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk', '1.10.11'" - s.requirements << "jar 'com.amazonaws:amazon-kinesis-client', '1.6.0'" + s.requirements << "jar 'com.amazonaws:amazon-kinesis-client', '1.6.2'" + s.requirements << "jar 'org.apache.httpcomponents:httpclient', '4.4.1'" + s.requirements << "jar 'org.apache.httpcomponents:httpcore', '4.4.1'" + s.requirements << "jar 'com.amazonaws:dynamodb-streams-kinesis-adapter', '1.0.2'" s.requirements << "jar 'com.google.guava:guava', '15.0'" - s.requirements << "jar 'com.fasterxml.jackson.core:jackson-core', '2.5.3'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-rds', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-cognitosync', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-datapipeline', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-support', '1.10.11'" - s.requirements << "jar 'commons-logging:commons-logging', '1.1.3'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudwatchmetrics', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-glacier', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-elasticache', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-simpleworkflow', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-lambda', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-autoscaling', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-ecs', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-devicefarm', '1.10.11'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-kms', '1.10.10'" - s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudwatch', '1.10.8'" s.add_runtime_dependency 'jar-dependencies' # Development dependencies - s.add_development_dependency "logstash-devutils" s.add_development_dependency "mocha" end