Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using the cassandra output fails #1

Open
valentin-fischer opened this issue Aug 6, 2015 · 12 comments
Open

using the cassandra output fails #1

valentin-fischer opened this issue Aug 6, 2015 · 12 comments

Comments

@valentin-fischer
Copy link

Hi there,

I build the gem file on a debian and when using it in logstash it fails with the following message:

/usr/bin/java -Djava.io.tmpdir=/var/lib/logstash -Xmx2G -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -Djava.awt.headless=true -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -jar /opt/logstash/vendor/jar/jruby-complete-1.7.17.jar -I/opt/logstash/lib /opt/logstash/lib/logstash/runner.rb agent -f /etc/logstash/conf.8 -l /var/log/logstash/logstash8.log --debug --verbose
Sending logstash logs to /var/log/logstash/logstash8.log.
Using milestone 2 input plugin 'redis'. This plugin should be stable, but if you see strange behavior, please let us know! For more information on plugin milestones, see http://logstash.net/docs/1.4.4/plugin-milestones {:level=>:warn}
Using milestone 1 output plugin 'cassandra'. This plugin should work, but would benefit from use by folks like you. Please let us know if you find bugs or have suggestions on how to improve this plugin. For more information on plugin milestones, see http://logstash.net/docs/1.4.4/plugin-milestones {:level=>:warn}
LoadError: no such file to load -- cassandra_murmur3
require at org/jruby/RubyKernel.java:1071
require at /opt/logstash/vendor/jar/jruby-complete-1.7.17.jar!/META-INF/jruby.home/lib/ruby/shared/rubygems/core_ext/kernel_require.rb:55
require at /opt/logstash/vendor/jar/jruby-complete-1.7.17.jar!/META-INF/jruby.home/lib/ruby/shared/rubygems/core_ext/kernel_require.rb:53
require at /opt/logstash/vendor/bundle/jruby/1.9/gems/polyglot-0.3.4/lib/polyglot.rb:65
(root) at /opt/logstash/vendor/bundle/jruby/1.9/gems/cassandra-driver-2.1.4/lib/cassandra.rb:565
require at org/jruby/RubyKernel.java:1071
require at /opt/logstash/vendor/jar/jruby-complete-1.7.17.jar!/META-INF/jruby.home/lib/ruby/shared/rubygems/core_ext/kernel_require.rb:73
require at /opt/logstash/vendor/jar/jruby-complete-1.7.17.jar!/META-INF/jruby.home/lib/ruby/shared/rubygems/core_ext/kernel_require.rb:71
require at /opt/logstash/vendor/bundle/jruby/1.9/gems/polyglot-0.3.4/lib/polyglot.rb:65
(root) at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-cassandra-0.1.0/lib/logstash/outputs/cassandra.rb:1
each at org/jruby/RubyArray.java:1613
register at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-cassandra-0.1.0/lib/logstash/outputs/cassandra.rb:60
outputworker at /opt/logstash/lib/logstash/pipeline.rb:220

Any ideas why? The cassandra-drive is installed.

@valentin-fischer
Copy link
Author

Ok...found the issue. The driver was missing after all from the jruby installation.
Ended up doing something like java -jar /opt/logstash/vendor/jar/jruby-complete-1.7.17.jar -S gem install logstash-output-cassandra-0.1.0.gem

Plus also checkout the git from ruby-driver and building using build gem..

Now the output is loaded correctly but I still don't see any inserts into the db...

@otokarev
Copy link
Owner

otokarev commented Aug 7, 2015

Hello Valentin,

It's possible that cassandra output waits payload from event[message](source =) and configured ignore bad message ( ignore_bad_messages => true), but the payload itself is located somewhere else.


BTW During debugging i ran logstash as:

cat query_log.52.csv.short | /opt/logstash/bin/logstash -f logstash.cfg -v

in most cases it reveals all problems immediately, at least for me

also my test conf can give you a clue:

input {
    stdin {
    }
}
filter {
  mutate {
     split => { "message" => "<{TERM}>" }
  }
  ruby {
        code => '
ret = {}
{
        0 => "id",
        1 => "at",
        2 => "duration",
        3 => "resellerId",
        4 => "username",
        5 => "ip",
        6 => "cmd",
        7 => "args",
        8 => "errno",
        9 => "error",
        10 => "warns",
        11 => "input",
        12 => "output",
        13 => "value",
        14 => "md5",
        15 => "eppTrid",
        16 => "src",
}.each {|key, value|
        v = event["message"][key].sub(/^"/, "").sub(/"$/, "")
        if (v != "" && v != "\\N")
                ret[value] = v
        end
}
ret["date"] = DateTime.parse(ret["at"]).strftime("%F")
event["message"] = ret
'
  }
}
output {
#    stdout {
#       codec => rubydebug
#    }
    cassandra {
#       consistency => "one"
        username => "111"
        password => "111"
        hosts => ["host1", "host2"]
        keyspace => "logs"
        table => "query_log"
        source => "message"
        hints => {
            id => "int"
            at => "timestamp"
            resellerId => "int"
            errno => "int"
            duration => "float"
            ip => "inet"}
        ignore_bad_messages => true
        ignore_bad_values => true
        batch_size => 2000
        retry_delay => 3
        max_retries => 1
        #timeout => 20

    }
}

If you provide an info on your cassandra schema and logstash conf and i have enough time i will tru to reproduce the issue under ubuntu

Oleg

@valentin-fischer
Copy link
Author

Hi Oleg,
Thank you for the fast reply! After turning ignore_bad_messages to false things it started complaining about nil values for the key:

NoMethodError: undefined method `key?' for nil:NilClass
  convert2cassandra_format! at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-cassandra-0.1.0/lib/logstash/outputs/cassandra.rb:203
                       each at org/jruby/RubyHash.java:1341
  convert2cassandra_format! at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-cassandra-0.1.0/lib/logstash/outputs/cassandra.rb:202
                    receive at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-cassandra-0.1.0/lib/logstash/outputs/cassandra.rb:126
                     handle at /opt/logstash/lib/logstash/outputs/base.rb:86
                 initialize at (eval):55
                       call at org/jruby/RubyProc.java:271
                     output at /opt/logstash/lib/logstash/pipeline.rb:266
               outputworker at /opt/logstash/lib/logstash/pipeline.rb:225
              start_outputs at /opt/logstash/lib/logstash/pipeline.rb:152

So you are pretty right on the idea that there is nothing there to be taken and inserted into cassandra.

My configuration is the following

input {
redis { 
    host => "logstash-queue.local" 
    data_type => "list" 
    key => "soap-logs-cassandra" 
    password => "local"
    port => "6379"
  } 
}

filter {
grok
{
    match => [ "message","%{SPACE:garbage}%{DATA:timestamps} \(%{DATA:transaction_id}\) \[%{DATA:client_ip}\] %{GREEDYDATA:command}"]
    overwrite => [ "message" ]
    tag_on_failure => []
}   
mutate
{
   remove_field => [ "message", "timestamp", "program", "type", "host", "path", "@version", "@timestamp", "garbage" ]
}
}

output {
    cassandra {
        username => "user_local"
        password => "local"
        hosts => ["10.20.34.12"]
        keyspace => "logstash_soap"
        table => "query_log"
        consistency => "all"        
        source => "command"
        hints => {
            logsource => "varchar"
            timestamps => "timestamp"
            transaction_id => "varchar"
            client_ip => "inet"
            command => "text"
             }
        ignore_bad_messages => false
        ignore_bad_values => true
        batch_size => 100
        batch_processor_thread_period => 1
        max_retries => 3
        retry_delay => 3
    }
stdout { codec => rubydebug }
}

The messages that I need to insert into cassandra look like this ( I make them into fields)
This is an stdout with rubydebug:

         "logsource" => "soap3.cc.local", # hostname, using varchar
        "timestamps" => "20150807.074434.78", # timestamp
    "transaction_id" => "00026594", # using text
         "client_ip" => "10.23.43.49", # always inet 4/6
           "command" => "[0] rollback" # text, can be anything

The original message looks like this:

20150807.074438.63 (00019509) [12.4.2.5] response: [get_next_bulk] data: {\"firma1\":\"\",\"vergabedatum\":\"20060328\",\"firma2\":\"\",\"telg2\":\"\",\"anlage_user\":\"nic\",\"state\":0,\"ansprechpartner2\":\"\",\"domain\":\"test.at\",\"fax\":\"\",\"ticketno\":\"201508070730.5645456456\",\"invoicetype\":null,\"bulkbezeichnung\":\"564894564\",\"kunden__nr\":null,\"suchbegriff\":\"Red GmbH\",\"uid\":\"\",\"abteilung\":\"\",\"plz\":\"\",\"e__mail\":\"\",\"ansprechpartner1\":\"\",\"leistungscode\":\"X\",\"mobil\":\"\",\"land\":\"\",\"telg1\":\"\",\"ort\":\"\",\"strasse\":\"\",\"anlage_datum\":\"07.08.2015 07:30:38\"}

My cassandra table that I want to insert in, is the following:

cqlsh:logstash_soap> DESCRIBE TABLE query_log 

CREATE TABLE query_log (
  logsource text,
  timestamps timestamp,
  transaction_id text,
  client_ip inet,
  command text,
  PRIMARY KEY (logsource, timestamps, transaction_id, client_ip, command)
) WITH
  bloom_filter_fp_chance=0.010000 AND
  caching='{"keys":"ALL", "rows_per_partition":"NONE"}' AND
  comment='' AND
  dclocal_read_repair_chance=0.100000 AND
  gc_grace_seconds=864000 AND
  read_repair_chance=0.000000 AND
  default_time_to_live=0 AND
  speculative_retry='99.0PERCENTILE' AND
  memtable_flush_period_in_ms=0 AND
  compaction={'class': 'SizeTieredCompactionStrategy'} AND
  compression={'sstable_compression': 'LZ4Compressor'};

Sorry for the long comment, could you help me with the inserting of the data please? I'm not sure how to structure it to be sent correctly.

Thank you!

PS: I'm also going to try your configuration to get more info.

@otokarev
Copy link
Owner

otokarev commented Aug 7, 2015

Hi Valentin,

I have emulated redis input by echoing the message you provided to "input {stdin {}}".

20150807.074438.63 (00019509) [12.4.2.5] response: [get_next_bulk] data: {\"firma1\":\"\",\"vergabedatum\":\"20060328\",\"firma2\":\"\",\"telg2\":\"\",\"anlage_user\":\"nic\",\"state\":0,\"ansprechpartner2\":\"\",\"domain\":\"test.at\",\"fax\":\"\",\"ticketno\":\"201508070730.5645456456\",\"invoicetype\":null,\"bulkbezeichnung\":\"564894564\",\"kunden__nr\":null,\"suchbegriff\":\"Red GmbH\",\"uid\":\"\",\"abteilung\":\"\",\"plz\":\"\",\"e__mail\":\"\",\"ansprechpartner1\":\"\",\"leistungscode\":\"X\",\"mobil\":\"\",\"land\":\"\",\"telg1\":\"\",\"ort\":\"\",\"strasse\":\"\",\"anlage_datum\":\"07.08.2015 07:30:38\"}

1/ remove source="command" from the config, because it seems that a payload resides immediately in event not in event[command]
2/ after the change cassandra yielded: Missing mandatory PRIMARY KEY part logsource, probably logsource also should be added in input.


About:

NoMethodError: undefined method `key?' for nil:NilClass

I feel awkward to show such things to users for sure :) and will change it for something more human readable soon. Thank you!

Oleg

@otokarev
Copy link
Owner

otokarev commented Aug 7, 2015

master updated with tiny fixes related to the issue.. if something goes dramatically weird, just rollback to tag 0.1.0

@valentin-fischer
Copy link
Author

Hi Oleg,
Thank you for support and sorry for the late response!

I've made the configuration changes that you suggested and I'm now able to insert the values into cassandra! :)

I have only one issue left, that is looking kind of strange.

The timestamp that I'm using is not being inserted correctly into the db for some reason. If I look into the logs, logstash is complaining about it in the following way:

{:timestamp=>"2015-08-27T12:53:37.702000+0200", :message=>"Cannot convert `timestamps` value (`2015-08-27 12:53:35 UTC`) to `timestamp` type, set to `1970-01-01 00:00:00 +0100`", :exception=>#<NoMethodError: undefined method `gsub!' for "2015-08-27T12:53:35.960Z":Time>, :backtrace=>["/opt/logstash/vendor/jar/jruby-complete-1.7.17.jar!/META-INF/jruby.home/lib/ruby/1.9/date/format.rb:839:in `_parse'", "/opt/logstash/vendor/jar/jruby-complete-1.7.17.jar!/META-INF/jruby.home/lib/ruby/1.9/time.rb:265:in `parse'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-cassandra-0.1.1/lib/logstash/outputs/cassandra.rb:213:in `convert2cassandra_format!'", "org/jruby/RubyHash.java:1341:in `each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-cassandra-0.1.1/lib/logstash/outputs/cassandra.rb:206:in `convert2cassandra_format!'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-cassandra-0.1.1/lib/logstash/outputs/cassandra.rb:131:in `receive'", "/opt/logstash/lib/logstash/outputs/base.rb:86:in `handle'", "(eval):72:in `initialize'", "org/jruby/RubyProc.java:271:in `call'", "/opt/logstash/lib/logstash/pipeline.rb:266:in `output'", "/opt/logstash/lib/logstash/pipeline.rb:225:in `outputworker'", "/opt/logstash/lib/logstash/pipeline.rb:152:in `start_outputs'"], :level=>:warn, :file=>"logstash/outputs/cassandra.rb", :line=>"265", :method=>"convert2cassandra_format!"}

My data looks like this:

{
         "logsource" => "panda3.host",
        "timestamps" => "2015-08-27T13:03:50.400Z",
    "transaction_id" => "00006064",
         "client_ip" => "1.1.1.1",
           "command" => "[0] rollback"
}

The timestamp gets converted from "2015-08-27T13:03:50.400Z" to "1970-01-01T00:00:00.000+01:00". I've tried a manual insert into cqlsh and it works without any problem.
I'm generating date timestamp using the date filter.

Any hits on why is this failing ?

Thank you!

PS: I've pulled and rebuilt your latest commits, so using the latest code.

@valentin-fischer
Copy link
Author

Ok, after looking at the code a little, I found the following line responsible for the parsing of the timestamp:

            when 'timestamp'
              Cassandra::Types::Timestamp.new(Time::parse("1970-01-01 00:00:00"))

Not sure why it's failing yet, but still investigating...

@valentin-fischer
Copy link
Author

I've made some changes to the code, more exactly removing the ::Parse function from the timestamp and just inserting it plain and now my timestamp is being inserted correctly.
This seems dirty and unsafe and I think there should be a better fix. If you have the time and pleasure maybe you can do it more nicer, safer, better.

@otokarev
Copy link
Owner

Hi Valentin,
I tried to reproduce the issue but without any success. :(

Cassandra::Types::Timestamp.new() seems refuses string as input

#  jruby -S irb                                                                                                                                     
jruby-1.7.19 :001 > require "cassandra"
...
jruby-1.7.19 :007 > Cassandra::Types::Timestamp.new("2014-09-09 12:12:12")
ArgumentError: cannot convert "2014-09-09 12:12:12" to timestamp
        from /usr/local/rvm/gems/jruby-1.7.19/gems/cassandra-driver-2.1.4-java/lib/cassandra/types.rb:187:in `new_timestamp'
        from /usr/local/rvm/gems/jruby-1.7.19/gems/cassandra-driver-2.1.4-java/lib/cassandra/types.rb:58:in `new'
        from (irb):7:in `evaluate'
        from org/jruby/RubyKernel.java:1107:in `eval'
        from org/jruby/RubyKernel.java:1507:in `loop'
        from org/jruby/RubyKernel.java:1270:in `catch'
        from org/jruby/RubyKernel.java:1270:in `catch'
        from /usr/local/rvm/rubies/jruby-1.7.19/bin/jirb:13:in `(root)'
jruby-1.7.19 :008 > Cassandra::Types::Timestamp.new(Time::parse("2014-09-09 12:12:12"))
 => 2014-09-09 12:12:12 +0200 
jruby-1.7.19 :009 > 

or did you remove whole "Cassandra::Types::Timestamp.new(Time::parse(" and left a date string only?

Could you please provide a snippet of your changes here?

@valentin-fischer
Copy link
Author

Hi Oleg,
My LS configuration is the following

input {
redis { 
    host => "logstash-queue" 
    data_type => "list" 
    key => "soap-logs-cassandra" 
    password => "asdjkh34"
    port => "6379"
  } 
}

filter {
grok
{
    match => [ "message","%{SPACE:garbage}%{DATA:timestamps} \(%{DATA:transaction_id}\) \[%{DATA:client_ip}\] %{GREEDYDATA:command}"]
    overwrite => [ "message" ]
    tag_on_failure => []
}   
date {
match => [ "timestamps", "yyyyMMdd.HHmmss.SS" ]
target => "timestamps"
}
mutate
{
   remove_field => [ "message", "timestamp", "program", "type", "host", "path", "@version", "garbage", "@timestamp" ]
}
}

output {
    cassandra {
        username => "cass"
        password => "39sj34"
        hosts => ["1.23.2.2"]
        keyspace => "logstash_soap"
        table => "query_log"
        consistency => "all"        
        hints => {
            logsource => "varchar"
            timestamps => "timestamp"
            transaction_id => "varchar"
            client_ip => "inet"
            command => "text"
            }
        ignore_bad_messages => true
        ignore_bad_values => true
        batch_size => 100
        batch_processor_thread_period => 1
        max_retries => 3
        retry_delay => 3
    }
}

The issue was related to the timestamp being converted from a form like 20150807.074434.78 to a form that is accepted by cassandra, ex 2015-01-01T01:01:01.400Z.

So I ended up changing the following lines of code:

Cassandra::Types::Timestamp.new(msg[key])
Cassandra::Types::Timestamp.new("2015-01-01T01:01:01.400Z")

So now my data is coming in cassandra how I need it and it looks like this

cqlsh:logstash_soap> SELECT logsource,transaction_id, blobAsBigint(timestampAsBlob(timestamps)), transaction_id,client_ip, command FROM query_log WHERE timestamps >= '2015-08-31 07:50:09+0200' AND timestamps <= '2015-08-31 08:50:10+0200' LIMIT 1 ALLOW FILTERING;

 logsource              | transaction_id | system.blobasbigint(system.timestampasblob(timestamps)) | transaction_id | client_ip     | command
------------------------+----------------+---------------------------------------------------------+----------------+---------------+------------------------------------------------------------------
 panda1 |       00003387 |                                           1441003286660 |       00003387 | 1.1.1.1 | command:  [get_person_data] args: {"person":"XNSG534630-X"}

Or no miliseconds...

cqlsh:logstash_soap> SELECT * FROM query_log WHERE timestamps >= '2015-08-31 07:50:09+0200' AND timestamps <= '2015-08-31 08:50:10+0200' LIMIT 1 ALLOW FILTERING;

 logsource              | timestamps               | transaction_id | client_ip     | command
------------------------+--------------------------+----------------+---------------+------------------------------------------------------------------
 panda1 | 2015-08-31 08:41:26+0200 |       00003387 | 1.1.1.1 | command:  [get_person_data] args: {"person":"XNSG534630-X"}

@gabrielandradetii
Copy link

Gentlemen , I have the file below as a source of example :

name: Gabriel
name: PEDRO

My configuration file is as follows:

input {
stdin {
}
}

filter {
grok {
match => [ "message", "name: %{WORD:custom_name}" ]
}
mutate {
lowercase => [ "custom_name" ]
}
date {
match => [ "timestamps", "yyyyMMdd.HHmmss.SS" ]
target => "timestamps"
}
mutate
{
remove_field => [ "message", "timestamp", "program", "type", "host", "path", "@Version", "garbage", "@timestamp" ]
}
}

output {

cassandra {
hosts => ["127.0.0.1"]
username => "cassandra"
password => "cassandra"
hints => {
nome => "text"
}
consistency => "one"
keyspace => "syslog"
table => "input"
ignore_bad_messages => false
ignore_bad_values => true
}

stdout { codec => rubydebug }

}

However not get success in cassandra insertion log follows below:

[root@dbfaz01p bin]# cat /var/log/teste.txt | ./logstash -f cassandra3.conf

plugin is using the 'milestone' method to declare the version of the plugin this method is deprecated in favor of declaring the version inside the gemspec. {:level=>:warn}

Logstash startup completed
{
"custom_name" => "gabriel"
}
{
"custom_name" => "pedro"
}
Failed to send batch (error: Unknown identifier custom_name). Schedule it to send later. {:level=>:warn}
Failed to send batch (error: Unknown identifier custom_name). Schedule it to send later. {:level=>:warn}
Failed to send batch again (error: unsupported statement nil). Reschedule it. {:level=>:warn}
Failed to send batch again (error: unsupported statement nil). Reschedule it. {:level=>:warn}
^CSIGINT received. Shutting down the pipeline. {:level=>:warn}
^CSIGINT received. Terminating immediately.. {:level=>:fatal}

If anyone can help me I am very grateful.

@gabrielandradetii
Copy link

I got success !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants