The flume-cassandra-plugin allows you to use Cassandra as a Flume sink.
-
Copy the flume-cassandra-plugin directory into flume_dir/plugins/. There should also be a helloworld directory there.
-
cd into flume-cassandra-plugin
-
Build by running 'ant'. A cassandra_plugin.jar file should be created.
-
Modify flume-site.xml (you may start out by copying flume-site.xml.template and removing the body of the file) to include:
flume.plugin.classes org.apache.cassandra.plugins.SimpleCassandraSink,org.apache.cassandra.plugins.LogsandraSyslogSink Comma separated list of plugin classes -
cd into the top-level flume directory (above plugins).
-
Set FLUME_CLASSPATH for all terminals which will run Flume master or node:
export FLUME_CLASSPATH=
pwd
/plugins/flume-cassandra-plugin/cassandra_plugin.jar:pwd
/plugins/flume-cassandra-plugin/lib/jug-asl-2.0.0.jar
You may want to just put this in your ~/.bashrc file. If you do, make sure to start a new terminal or run:
source ~/.bashrc
in any terminals you will use.
This plugin primarily targets log storage right now.
There are two sinks available for use: the SimpleCassandraSink and the LogsandraSyslogSink.
The Simple Cassandra Sink requires four arguments for its constructor:
- A keyspace (String). For example, 'Keyspace1'.
- A column family name (String) for storing data in.
- A column family name (String) for storing indexes in.
- A list Cassandra server hostname:port combinations (Strings)
Cassandra must already be configured so that the keyspace and both of the column families must already exist. The index column family should use a TimeUUIDType comparator. For example, in cassandra.yaml you would have:
- name: FlumeIndexes
compare_with: TimeUUIDType
comment: 'Stores the v1 uuids for log events'
The data storage column family can use BytesType.
When the Cassandra sink receives an event, it does the following:
- In the index column family: a. Creates a column where the name is a type 1 UUID (timestamp based) and the value is empty. b. Inserts it into row "YYYYMMDDHH" (the current date and hour) in the given column family.
- In the data column family: a. Creates a column where the name is 'data' and the value is the flume event body. b. Inserts it into a row with a key that is the same uuid from step 1.
This allows you to easily fetch all logs for a slice of time. Simply use something like get_slice() on the index column family to get the uuids you want for a particular slice of time, and then multiget the data column family using those uuids as the keys.
The constructor string for this sink is "simpleCassandraSink".
The Logsandra Syslog Sink allows syslog messages to be stored in Cassandra in a way that Logsandra can make use of them. You can find Logsandra here:
The Logsandra Syslog Sink accepts a list of "host:port" for its constructor.
Cassandra must be configured to already have a 'logsandra' keyspace with two column families named 'entries' and 'by_date'. They should similar to this in a cassandra.yaml:
keyspaces:
- name: logsandra
replica_placement_strategy: org.apache.cassandra.locator.RackUnawareStrategy
replication_factor: 1
column_families:
- name: entries
compare_with: BytesType
- name: by_date
compare_with: LongType
This sink happily accepts input from a syslog source, such as syslogTcp or syslogUdp.
The constructor string for this sink is "logsandraSyslogSink".
In Logsandra, you may query by the following fields:
- The source, which is a hostname or IP. Example: "127.0.0.1"
- The syslog facility. Can be: "kernel", "user", "mail", "system", "sec/auth", "syslog", "lpr", "news", "uucp", "clock", "sec/auth", "ftp", "ntp", "log audit", "log alert", "clock", "local0", "local1", "local2", "local3", "local4", "local5", "local6", "local7"
- The syslog severity. Can be:
- DEBUG
- INFO
- WARN
- ERROR
- FATAL