The flume-cassandra-plugin allows you to use Apache Cassandra as a Flume sink.
-
Download flume-cassandra-plugin-X.Y.tar.gz from the Downloads section.
-
Extract the tarball with
tar -xzf
. -
Set $FLUME_CLASSPATH for all terminals which will run Flume master or node:
export FLUME_CLASSPATH=`pwd`/flume-cassandra-plugin-1.0.0.jar:`pwd`/uuid-3.2.0.jar:`pwd`/cassandra-thrift-1.0.6.jar
- Modify flume-site.xml (you may start out by copying flume-site.xml.template and removing the body of the file) to include:
<configuration>
<property>
<name>flume.plugin.classes</name>
<value>org.apache.cassandra.plugins.flume.sink.SimpleCassandraSink,org.apache.cassandra.plugins.flume.sink.LogsandraSyslogSink</value>
<description>Comma separated list of plugin classes</description>
</property>
</configuration>
- Start the flume master and a node. The node should log something like:
2011-08-07 21:29:54,793 [main] INFO conf.SinkFactoryImpl: Found sink builder simpleCassandraSink in org.apache.cassandra.plugins.flume.sink.SimpleCassandraSink
...
2011-08-07 21:29:54,793 [main] INFO conf.SinkFactoryImpl: Found sink builder logsandraSyslogSink in org.apache.cassandra.plugins.flume.sink.LogsandraSyslogSink
This plugin primarily targets log storage right now.
There are two sinks available for use: the SimpleCassandraSink and the LogsandraSyslogSink.
The Simple Cassandra Sink requires four arguments for its constructor:
- A keyspace (String). For example, 'Keyspace1'.
- A column family name (String) for storing data in.
- A column family name (String) for storing indexes in.
- A list Cassandra server hostname:port combinations (Strings)
Cassandra must already be configured so that the keyspace and both of the column families must already exist. The index column family should use a TimeUUIDType comparator. The data storage column family can use BytesType.
For example, in cassandra-cli you might create them like:
[default@unknown] connect localhost/9160;
Connected to: "Test Cluster" on localhost/9160
[default@unknown] create keyspace Keyspace1;
23d30bd0-c16b-11e0-0000-242d50cf1ffd
Waiting for schema agreement...
... schemas agree across the cluster
[default@unknown] use Keyspace1;
Authenticated to keyspace: Keyspace1
[default@Keyspace1] create column family FlumeData;
2fcefa70-c16b-11e0-0000-242d50cf1ffd
Waiting for schema agreement...
... schemas agree across the cluster
[default@Keyspace1] create column family FlumeIndexes with comparator = 'TimeUUIDType';
4f1d8130-c16b-11e0-0000-242d50cf1ffd
Waiting for schema agreement...
... schemas agree across the cluster
[default@Keyspace1]
When creating this sink with web UI (which you can access by default at http://localhost:35871/flumeconfig.jsp), you will use a sink like:
simpleCassandraSink("Keyspace1", "FlumeData", "FlumeIndexes", "localhost:9160")
For multiple servers use:
simpleCassandraSink("Keyspace1", "FlumeData", "FlumeIndexes", "localhost:9160", "hostname2:9160", "hostname3:9160")
If you're new to flume and you want to test that the plugin works, I recommend
using a Source like asciisynth(20, 100)
. You should see 20 corresponding entries
in each of the column families if you use list
in cassandra-cli.
When the Cassandra sink receives an event, it does the following:
- In the index column family: a. Creates a column where the name is a type 1 UUID (timestamp based) and the value is empty. b. Inserts it into row "YYYYMMDDHH" (the current date and hour) in the given column family.
- In the data column family: a. Creates a column where the name is 'data' and the value is the flume event body. b. Inserts it into a row with a key that is the same uuid from step 1.
This allows you to easily fetch all logs for a slice of time. Simply use something like get_slice() on the index column family to get the uuids you want for a particular slice of time, and then multiget the data column family using those uuids as the keys.
Your keyspace may of defaulted to the org.apache.cassandra.locator.NetworkTopologyStrategy
replication strategy resulting in Cassandra being unable to successfully commit. You can check
using describe Keyspace1;
in the cassandra-cli. Try using the following create keyspace
statement instead:
create keyspace Keyspace1 with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = {replication_factor:1};