Skip to content

Commit

Permalink
[+] consistency argument
Browse files Browse the repository at this point in the history
  • Loading branch information
otokarev committed Jul 14, 2015
1 parent 4ca85de commit 5f3ad98
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ output {
hosts => ["127.0.0.1"]
keyspace => "logs"
table => "query_log"
# Cassandra consistency level.
# Options: "any", "one", "two", "three", "quorum", "all",
# "local_quorum", "each_quorum", "serial", "local_serial",
# "local_one"
# Default: "one"
consistency => "all"

# Where from the event hash to take a message
source => "payload"
Expand Down Expand Up @@ -77,8 +83,6 @@ git clone https://github.com/otokarev/logstash-output-cassandra.git \
</code></pre>

## TODO
1. Testing Testing Testing


## Contributing

Expand All @@ -88,4 +92,4 @@ Programming is not a required skill. Whatever you've seen about open source and

It is more important to the community that you are able to contribute.

For more information about contributing, see the [CONTRIBUTING](https://github.com/elasticsearch/logstash/blob/master/CONTRIBUTING.md) file.
For more information about contributing, see the [CONTRIBUTING](https://github.com/elasticsearch/logstash/blob/master/CONTRIBUTING.md) file.
14 changes: 10 additions & 4 deletions lib/logstash/outputs/cassandra.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ class LogStash::Outputs::Cassandra < LogStash::Outputs::Base

config_name "cassandra"

# Cassandra server hostname or IP-address
# List of Cassandra hostname(s) or IP-address(es)
config :hosts, :validate => :array, :required => true

# Cassandra consistency level.
# Options: "any", "one", "two", "three", "quorum", "all", "local_quorum", "each_quorum", "serial", "local_serial", "local_one"
# Default: "one"
config :consistency, :validate => ["any", "one", "two", "three", "quorum", "all", "local_quorum", "each_quorum", "serial", "local_serial", "local_one"], :default => "one"

# The keyspace to use
config :keyspace, :validate => :string, :required => true
Expand Down Expand Up @@ -71,7 +76,8 @@ def register
cluster = Cassandra.cluster(
username: @username,
password: @password,
hosts: @hosts
hosts: @hosts,
consistency: @consistency.to_sym
)

@session = cluster.connect(@keyspace)
Expand Down Expand Up @@ -130,7 +136,7 @@ def send_batch2cassandra stop_it = false
begin
batch = prepare_batch
break if batch.nil?
@session.execute(batch, consistency: :all)
@session.execute(batch)
@logger.info "Batch sent successfully"
rescue Exception => e
@logger.warn "Fail to send batch (error: #{e.to_s}). Schedule it to send later."
Expand Down Expand Up @@ -167,7 +173,7 @@ def resend_batch2cassandra
batch = batch_container[:batch]
count = batch_container[:try_count]
begin
@session.execute(batch, consistency: :all)
@session.execute(batch)
@logger.info "Batch sent"
rescue Exception => e
if count > @max_retries
Expand Down

0 comments on commit 5f3ad98

Please sign in to comment.