-
Notifications
You must be signed in to change notification settings - Fork 158
Storing data in Riak (v2.0)
Note: This document is for the 2.x Java Client series.
Unless you're already familiar with CAP Theorem and eventual consistency, taking the time to read through at least Why Riak would be well worth your while.
It's ok, we'll wait.
Ok! Now that you've read through that and understand that Riak is a system that favors AP with eventual C, this might make some sense to you.
In Riak data is stored in buckets, and buckets can be grouped further by an optional bucket type. Buckets and bucket types have a number of options and tunable parameters, one of which is whether or not to allow sibling records. If you are using a bucket in the default bucket type, that bucket won't allow sibling creation by default. If you are using a bucket in a specific bucket type, that bucket will allow sibling creation by default. The Riak Java client is somewhat built around this in that at the most basic level, you can simply say "store this data using this key" and anything that is currently in Riak referenced by that key will be overwritten. There are, however, some issues with attempting to do that.
If you have any type of contention/concurrency where multiple threads or processes are doing read/modify/write operations on those key/values, you are likely to lose writes if the operations interleave. One will overwrite the other. At that point you need to enable siblings and deal with conflict resolution.
With that in mind, the following basic examples illustrate using Riak with the default bucket options and just storing some data.
Basic Example #1: Store a String
Basic Example #2: Store a POJO
Basic Example #3: Changing query parameters
For a more detailed example of how you would store data in Riak in an environment with concurrency, jump down to the Advanced Examples section.
## Basic Store, data is a StringUsing the Bucket.store(String, String) method, your String is stored in Riak as bytes representing UTF-8 text
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.cap.Quorum;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;
public class App {
public static void main(String[] args) throws UnknownHostException, ExecutionException, InterruptedException {
RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
Location location = new Location(new Namespace("TestBucket"), "TestKey");
String myData = "This is my data";
StoreValue sv = new StoreValue.Builder(myData).withLocation(location).build();
StoreValue.Response svResponse = client.execute(sv);
client.shutdown();
}
}
import java.net.UnknownHostException; import java.util.concurrent.ExecutionException;
public class App {
// { "foo":"Some String", "bar":"some other string","foobar":5 }
static class Pojo {
public String foo;
public String bar;
public int foobar;
}
public static void main(String[] args) throws UnknownHostException, ExecutionException, InterruptedException {
RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
Location location = new Location(new Namespace("TestBucket"), "TestKey");
Pojo obj = new Pojo();
obj.bar = "My bar data";
obj.foo = "My foo data";
obj.foobar = 42;
StoreValue sv = new StoreValue.Builder(obj).withLocation(location).build();
StoreValue.Response svResponse = client.execute(sv);
client.shutdown();
}
}
<a name="basicexample3"/>
## Store data, changing query parameters for just this request
To override the default parameters in the Bucket, you can specify them prior to calling the execute() method.
```java
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.cap.Quorum;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;
public class App {
// { "foo":"Some String", "bar":"some other string","foobar":5 }
static class Pojo {
public String foo;
public String bar;
public int foobar;
}
public static void main(String[] args) throws UnknownHostException, ExecutionException, InterruptedException {
RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
Location location = new Location(new Namespace("TestBucket"), "TestKey");
Pojo obj = new Pojo();
obj.bar = "My bar data";
obj.foo = "My foo data";
obj.foobar = 42;
StoreValue storeWithProps = new StoreValue.Builder(obj)
.withLocation(location)
.withOption(StoreValue.Option.W, Quorum.oneQuorum())
.withOption(StoreValue.Option.PW, Quorum.oneQuorum()).build();
StoreValue.Response storeWithPropsResp = client.execute(storeWithProps);
client.shutdown();
}
}
# The Hard Way ## Eventual Consistency; Resolvers, Converters, and Updates
In most environments, you're going to configure your buckets to allow siblings and write the code that deals with them. There are three Interfaces/Classes you're going to be using:
-
ConflictResolver<T>
This Interface is used to resolve sibling records returned by Riak. (This interface is identical to the one used in the 1.x Java Client series) -
Converter<T>
This interface is used to serialize/deserialize data to/from Riak. -
UpdateValue.Update<T>
This abstract class is similar to Java Client 1.x's Mutation<T> interface, it provides anapply(T)
method to mutate or change the object to update and store.
One thing worth noting is that the current RiakClient and its various interfaces are not like what you may be used to with other datastores' APIs. The Riak client provides different levels of automation, to match what you're comfortable with and what is appropriate for your program. You can start with automatic sibling resolution and object hydration with the ConflictResolver<T>
and Converter<T>
interfaces, to automatic updates with the UpdateValue.Update<T>
class, which encapsulates your entire read/modify/write cycle to be encapsulated entirely within the UpdateValue operation.
If you do not wish to completely encapsulate the read/modify/write inside the store operation, see Storing previously fetched and modified data below.
The following diagram outlines the anatomy of a read/modify/write cycle using the Bucket interface, your own domain object (T), and the StoreObject it returns:
### Figure 1 ![StoreObject anatomy](http://dl.dropbox.com/u/74693818/RJC-store-v4.png)There are two different classes available to store data with in the Java client:
The first two are only useful if you want to overwrite anything currently in Riak associated with the key you're passing in. Be aware, however, that there is a caveat. An anonymous Mutation instance is created and used. It replaces the data portion of whatever is currently stored in Riak, but not links, secondary indexes, or user metadata. If you want to overwrite everything you will need to supply your own Mutation or Converter that does so.
The second two are actually what you will most likely use if you are performing a read/modify/write cycle. As noted in figure 1 above, the interface is slightly clunky in that the object being passed in is going to be discarded when you supply the Mutation; it's only used to infer the type. The fourth version will extract the key from the object being passed in before doing so by referencing a String field annotated with @RiakKey.
The following example is a "leader board" for various games. Imagine you were providing a service where game developers would have their games send you a score every time a player completed a game. You are required to store the top 5 scores for each game. We're going to rely on the default JSONConverter to serialize/deserialize our POJO to/from Riak. If you're interested in seeing how you would implement a converter to use a different serialization library, check out Using a custom converter for an example.
package Leaderboard;
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.cap.ConflictResolverFactory;
import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.api.commands.kv.UpdateValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.ExecutionException;
public class StoreApp {
public static void main( String[] args ) throws UnknownHostException, ExecutionException, InterruptedException {
// We need some data, of course
String playerNames[] = {"Steve","Brian","Bob" };
Random generator = new Random();
GameLeaderboard gl = new GameLeaderboard("SuperCoolGame");
for (int i = 0; i < 5; i++)
{
NameScorePair nsp = new NameScorePair(playerNames[(i+3)%3], generator.nextInt(100));
gl.addScore(nsp);
}
// Store our initial leaderboard in Riak
RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
Namespace ns = new Namespace("demo_bucket");
setAllowMultForBucket(client, ns);
// Register our custom conflict resolver,
// This tells the Java client to resolve siblings of GameLeaderboard with GameLeaderboardResolver
ConflictResolverFactory.INSTANCE.registerConflictResolver(GameLeaderboard.class, new GameLeaderboardResolver());
// If you don't specify a location to the builder, it will look on the object for fields noted with
// @RiakBucketType, @RiakBucketName, and @RiakKey annotations to build the location automatically
StoreValue storeBoard = new StoreValue.Builder(gl).build();
client.execute(storeBoard);
FetchValue leaderboardFetch =
new FetchValue.Builder(
new Location(new Namespace("demo_bucket"), "SuperCoolGame")).build();
FetchValue.Response response = client.execute(leaderboardFetch);
// If there are any siblings they will be resolved on calling `getValue()`
GameLeaderboard fetchedGL = response.getValue(GameLeaderboard.class);
// Output the results!
for ( NameScorePair n : fetchedGL.getScoreList())
{
System.out.println(n.getName() + " " + n.getScore());
}
System.out.println();
/*
* Now that we have a leaderboard in Riak, lets modify it!
* This simulates a new name/score pair coming in, and we're going
* to modify the leaderboard in Riak using the GamLeaderboardMutation
* We know our sample data only has scores to 100, so using 1000 ensures
* we'll modify the object
*/
NameScorePair nsp = new NameScorePair("John", 1000);
GameLeaderboardUpdate glbu = new GameLeaderboardUpdate(nsp);
/* Note that as mentioned in the cookbook, the GameLeaderboard object
* passed to Bucket.store() is discarded after the type is inferred
* and the key extracted - all modification is done by your Mutation
*
* Note also that we're calling returnBody(true) in order to get
* the current data back
*/
UpdateValue updateValue = new UpdateValue.Builder(new Location(ns, fetchedGL.getGameName()))
.withUpdate(glbu)
.withStoreOption(StoreValue.Option.RETURN_BODY, true)
.build();
UpdateValue.Response updateResponse = client.execute(updateValue);
GameLeaderboard updatedLeaderboard = updateResponse.getValue(GameLeaderboard.class);
// Output the results!
for ( NameScorePair n : updatedLeaderboard.getScoreList())
{
System.out.println(n.getName() + " " + n.getScore());
}
client.shutdown();
System.exit(0);
}
public static void setAllowMultForBucket(RiakClient c, Namespace ns)
throws ExecutionException, InterruptedException {
StoreBucketProperties storeBucketProperties =
new StoreBucketProperties.Builder(ns)
.withAllowMulti(true).build();
c.execute(storeBucketProperties);
}
}
package Leaderboard;
import com.basho.riak.client.api.cap.ConflictResolver;
import com.basho.riak.client.api.cap.UnresolvedConflictException;
import java.util.Iterator;
import java.util.List;
public class GameLeaderboardResolver implements ConflictResolver<GameLeaderboard>
{
/*
* Riak hands us a list of GameLeaderboard objects. Our job is to reconcile
* those objects and return a single, resolved GameLeaderboard
*
* In this example, the logic is pretty straightforward. in our GameLeaderboard
* class we created a addScores(Collection<NameScorePair>) method that will do the
* heavy lifting for us. By adding all the lists into one GameLeaderboard
* via that method, we end up with the top 5 scores from all the siblings
*
* Worth noting is that your ConflictResolver is *always* called, even if
* there are no siblings, or even if there is no object in Riak
*/
public GameLeaderboard resolve(List<GameLeaderboard> siblings) throws UnresolvedConflictException
{
if (siblings.size() > 1)
{
// We have siblings, need to resolve them
Iterator<GameLeaderboard> i = siblings.iterator();
GameLeaderboard resolvedLeaderboard = new GameLeaderboard(i.next());
while (i.hasNext())
{
resolvedLeaderboard.addScores(i.next().getScoreList());
}
return resolvedLeaderboard;
}
else if (siblings.size() == 1)
{
// Only one object - just return it
return siblings.iterator().next();
}
else
{
// No object returned - return null
return null;
}
}
}
package Leaderboard;
import com.basho.riak.client.api.commands.kv.UpdateValue;
public class GameLeaderboardUpdate extends UpdateValue.Update<GameLeaderboard> {
private final NameScorePair nsp;
public GameLeaderboardUpdate(NameScorePair nsp) {
this.nsp = nsp;
}
/*
* And at the heart of things is this method. After the data in Riak has
* been converted to GameLeaderboard Objects and any siblings resolved,
* UpdateValue.Update<T>.apply() is called and it is where you will
* do any and all modifications.
*
* Here we add the NameScorePair we passed to the constructor to the
* GameLeaderboard object. After this our modified data will be stored back
* to Riak
*/
@Override
public GameLeaderboard apply(GameLeaderboard original) {
original.addScore(nsp);
return original;
}
}
package Leaderboard;
import com.basho.riak.client.api.annotations.RiakBucketName;
import com.basho.riak.client.api.annotations.RiakKey;
import com.basho.riak.client.api.annotations.RiakVClock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.TreeSet;
public final class GameLeaderboard
{
@RiakBucketName private String gameBucket = "demo_bucket";
@RiakKey private String gameName;
@RiakVClock private byte[] vClock = new byte[0]; // Used for Riak version tracking
private TreeSet<NameScorePair> scoreList = new TreeSet<NameScorePair>();
// required by Jackson for JSON serialization
public GameLeaderboard() {}
public GameLeaderboard(String gameName)
{
this.gameName = gameName;
}
public GameLeaderboard(GameLeaderboard other)
{
this.gameName = other.getGameName();
this.addScores(other.getScoreList());
}
public void addScore(NameScorePair s)
{
scoreList.add(s);
if (scoreList.size() > 5)
scoreList.pollFirst();
}
public void addScores(Collection<NameScorePair> scores)
{
scoreList.addAll(scores);
while (scoreList.size() > 5)
scoreList.pollFirst();
}
public String getGameName()
{
return gameName;
}
public ArrayList<NameScorePair> getScoreList()
{
return new ArrayList<NameScorePair>(scoreList.descendingSet());
}
public String getGameBucket() {
return gameBucket;
}
}
package Leaderboard;
public class NameScorePair implements Comparable<NameScorePair>
{
private String name;
private int score;
// Required by Jackson for JSON serialization
public NameScorePair() {}
public NameScorePair(String name, int score)
{
this.name = name;
this.score = score;
}
public int compareTo(NameScorePair t)
{
if (this.getScore() < t.getScore())
return -1;
else if (this.getScore() > t.getScore())
return 1;
else if (this.getName().equalsIgnoreCase(name))
return 0;
else
return -1;
}
@Override
public int hashCode()
{
int hash = 3;
hash = 47 * hash + (this.name != null ? this.name.hashCode() : 0);
hash = 47 * hash + this.score;
return hash;
}
@Override
public boolean equals(Object o)
{
if (o == null)
{
return false;
}
else if (o instanceof NameScorePair)
{
return ((name.equalsIgnoreCase(((NameScorePair)o).getName())) &&
(score == ((NameScorePair)o).getScore()));
}
else
return false;
}
public int getScore()
{
return score;
}
public String getName()
{
return name;
}
}
To avoid a full Read-Update-Write cycle, you can use the StoreValue
class directly. This eliminates the fetch (and associated conflict resolution), which may create siblings, but may be appropriate for your application.
Regardless of the method, if you're using your own POJO you must include a byte[]
or VClock
field annotated with the @RiakVClock
annotation. This preserves the vector clock in your POJO and is used during the subsequent store operation. You may also manually add it to the StoreValue operation with the StoreValue.Builder.withVectorClock()
helper method if you don't have a field on your POJO/object.
The code below shows how our previous example from above would be done to fit this pattern.
package Leaderboard;
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.cap.ConflictResolverFactory;
import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.ExecutionException;
public class StoreApp {
public static void main( String[] args ) throws UnknownHostException, ExecutionException, InterruptedException {
// We need some data, of course
String playerNames[] = {"Steve","Brian","Bob" };
Random generator = new Random();
GameLeaderboard gl = new GameLeaderboard("SuperCoolGame");
for (int i = 0; i < 5; i++)
{
NameScorePair nsp = new NameScorePair(playerNames[(i+3)%3], generator.nextInt(100));
gl.addScore(nsp);
}
// Store our initial leaderboard in Riak
RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
Namespace ns = new Namespace("demo_bucket");
setAllowMultForBucket(client, ns);
// Register our custom conflict resolver,
// This tells the Java client to resolve siblings of GameLeaderboard with GameLeaderboardResolver
ConflictResolverFactory.INSTANCE.registerConflictResolver(GameLeaderboard.class, new GameLeaderboardResolver());
// If you don't specify a location to the builder, it will look on the object for fields noted with
// @RiakBucketType, @RiakBucketName, and @RiakKey annotations to build the location automatically
StoreValue storeBoard = new StoreValue.Builder(gl).build();
client.execute(storeBoard);
FetchValue leaderboardFetch =
new FetchValue.Builder(
new Location(new Namespace("demo_bucket"), "SuperCoolGame")).build();
FetchValue.Response response = client.execute(leaderboardFetch);
// If there are any siblings they will be resolved on calling `getValue()`
GameLeaderboard fetchedGL = response.getValue(GameLeaderboard.class);
// Output the results!
for ( NameScorePair n : fetchedGL.getScoreList())
{
System.out.println(n.getName() + " " + n.getScore());
}
System.out.println();
/*
* Now that we have a leaderboard in Riak, lets modify it!
* This simulates a new name/score pair coming in, and we're going
* to modify the leaderboard in Riak by editing our local copy,
* and then saving it back with our local version's vclock.
* We know our sample data only has scores to 100, so using 1000 ensures
* we'll modify the object
*/
NameScorePair nsp = new NameScorePair("John", 1000);
fetchedGL.addScore(nsp);
// Create our update with just a StoreValue.
// If you don't have the Bucket Type/Bucket/Key annotated on your POJO, you will have to
// manually add them with the StoreValue.Builder.withLocation() option.
// Likewise, if you don't have a VectorClock field annotated on your POJO, you will have to
// manually add it with the StoreValue.Builder.withVectorClock() option.
StoreValue storeValue = new StoreValue.Builder(fetchedGL)
// This option is required if you don't have the bucket type, bucket, and key annotated on your POJO
.withLocation(new Location(ns, fetchedGL.getGameName()))
// This option is required if you don't have a vector clock annotated on your POJO
.withVectorClock(response.getVectorClock())
.build();
// Store the update. Since we didn't do any fetches, we may have siblings to resolve on the next fetch.
StoreValue.Response storeResponse = client.execute(storeValue);
client.shutdown();
System.exit(0);
}
public static void setAllowMultForBucket(RiakClient c, Namespace ns)
throws ExecutionException, InterruptedException {
StoreBucketProperties storeBucketProperties =
new StoreBucketProperties.Builder(ns)
.withAllowMulti(true).build();
c.execute(storeBucketProperties);
}
}