From 5f3ad980e0b5ae3583b04970e2446aef412a9c74 Mon Sep 17 00:00:00 2001 From: Oleg Tokarev Date: Tue, 14 Jul 2015 12:12:19 +0200 Subject: [PATCH] [+] consistency argument --- README.md | 10 +++++++--- lib/logstash/outputs/cassandra.rb | 14 ++++++++++---- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 3baea67..4fd9dcb 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,12 @@ output { hosts => ["127.0.0.1"] keyspace => "logs" table => "query_log" + # Cassandra consistency level. + # Options: "any", "one", "two", "three", "quorum", "all", + # "local_quorum", "each_quorum", "serial", "local_serial", + # "local_one" + # Default: "one" + consistency => "all" # Where from the event hash to take a message source => "payload" @@ -77,8 +83,6 @@ git clone https://github.com/otokarev/logstash-output-cassandra.git \ ## TODO -1. Testing Testing Testing - ## Contributing @@ -88,4 +92,4 @@ Programming is not a required skill. Whatever you've seen about open source and It is more important to the community that you are able to contribute. -For more information about contributing, see the [CONTRIBUTING](https://github.com/elasticsearch/logstash/blob/master/CONTRIBUTING.md) file. \ No newline at end of file +For more information about contributing, see the [CONTRIBUTING](https://github.com/elasticsearch/logstash/blob/master/CONTRIBUTING.md) file. diff --git a/lib/logstash/outputs/cassandra.rb b/lib/logstash/outputs/cassandra.rb index f6b7f49..b036cd7 100644 --- a/lib/logstash/outputs/cassandra.rb +++ b/lib/logstash/outputs/cassandra.rb @@ -10,8 +10,13 @@ class LogStash::Outputs::Cassandra < LogStash::Outputs::Base config_name "cassandra" - # Cassandra server hostname or IP-address + # List of Cassandra hostname(s) or IP-address(es) config :hosts, :validate => :array, :required => true + + # Cassandra consistency level. + # Options: "any", "one", "two", "three", "quorum", "all", "local_quorum", "each_quorum", "serial", "local_serial", "local_one" + # Default: "one" + config :consistency, :validate => ["any", "one", "two", "three", "quorum", "all", "local_quorum", "each_quorum", "serial", "local_serial", "local_one"], :default => "one" # The keyspace to use config :keyspace, :validate => :string, :required => true @@ -71,7 +76,8 @@ def register cluster = Cassandra.cluster( username: @username, password: @password, - hosts: @hosts + hosts: @hosts, + consistency: @consistency.to_sym ) @session = cluster.connect(@keyspace) @@ -130,7 +136,7 @@ def send_batch2cassandra stop_it = false begin batch = prepare_batch break if batch.nil? - @session.execute(batch, consistency: :all) + @session.execute(batch) @logger.info "Batch sent successfully" rescue Exception => e @logger.warn "Fail to send batch (error: #{e.to_s}). Schedule it to send later." @@ -167,7 +173,7 @@ def resend_batch2cassandra batch = batch_container[:batch] count = batch_container[:try_count] begin - @session.execute(batch, consistency: :all) + @session.execute(batch) @logger.info "Batch sent" rescue Exception => e if count > @max_retries