In this tutorial, you'll learn how to setup a very simple Spark application for reading and writing data from/to Cassandra. Before you start, you need to have basic knowledge of Apache Cassandra and Apache Spark. Refer to Cassandra documentation and Spark documentation.
Install and launch a Cassandra cluster and a Spark cluster.
Configure a new Scala project with the following dependencies:
- Apache Spark and its dependencies
- Apache Cassandra thrift and clientutil libraries matching the version of Cassandra
- DataStax Cassandra driver for your Cassandra version
This driver does not depend on the Cassandra server code.
- For a detailed dependency list, see project/CassandraSparkBuild.scala
- For dependency versions, see project/Versions.scala
Add the spark-cassandra-connector
jar and its dependency jars to the following classpaths.
Make sure the Connector version you use coincides with your Spark version (i.e. Spark 1.2.x with Connector 1.2.x):
"com.datastax.spark" %% "spark-cassandra-connector" % Version
- the classpath of your project
- the classpath of every Spark cluster node
Create a simple keyspace and table in Cassandra. Run the following statements in cqlsh
:
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.kv(key text PRIMARY KEY, value int);
Then insert some example data:
INSERT INTO test.kv(key, value) VALUES ('key1', 1);
INSERT INTO test.kv(key, value) VALUES ('key2', 2);
Now you're ready to write your first Spark program using Cassandra.
Before creating the SparkContext
, set the spark.cassandra.connection.host
property to the address of one
of the Cassandra nodes:
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1")
Create a SparkContext
. Substitute 127.0.0.1
with the actual address of your Spark Master
(or use "local"
to run in local mode):
val sc = new SparkContext("spark://127.0.0.1:7077", "test", conf)
Enable Cassandra-specific functions on the SparkContext
, RDD
, and DataFrame
:
import com.datastax.spark.connector._
Use the sc.cassandraTable
method to view this table as a Spark RDD
:
val rdd = sc.cassandraTable("test", "kv")
println(rdd.count)
println(rdd.first)
println(rdd.map(_.getInt("value")).sum)
Add two more rows to the table:
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))