-
Notifications
You must be signed in to change notification settings - Fork 158
Riak Java Client 2.0 Upgrade Guide
Version 2.0 of the Riak Java client (RJC) was a large overhaul of the client. The main goal was to remove years of technical debt at the core by retiring the original Protocol Buffers (PB) and HTTP clients, and replacing them with a modern core that uses Netty4. This also allowed us to remove the translation / abstraction layer that sat between IRiakClient at the top and the aforementioned original clients at the bottom in the 1.X series. Doing these things allowed us to address some long standing design issues, and helped reduce future maintainability and feature development costs.
Historically, the reason for supporting both Protocol Buffers and HTTP protocols in the client library was due to the Riak Protocol Buffers API not supporting all Riak operations. As of Riak 1.4 and the RJC 1.4.0 we have reached feature parity between the two APIs. Protocol buffers is the faster and more efficient way to talk to Riak. In addition, feature development and maintenance is again helped by not having to implement every feature multiple times.
Many Riak 2.0 clients will not support the HTTP interface going forward, but the Riak 2.0 server will still support it. The main drivers for this decision are:
- Performance - Internal tests at Basho have shown performance gains of 25% or more when using Protocol Buffers.
- Feature Support - The HTTP interface also does not support certificate-based authentication.
- Leaky Abstractions - Some HTTP abstractions don't mix well with a distributed database like Riak. For example, the
If-None-Match
,If-Match
,If-Modified-Since
, andIf-Unmodified-Since
headers can be used to invoke conditional request semantics on writes. With concurrent writes in an eventually-consistent environment, it is possible that multiple requests to a single key could all succeed their condition instead of the first one succeeding and the others failing. This was also one of the drivers for switching Riak default behavior oflast_write_wins=true
toallow_mult=true
in Riak 2.0.
As mentioned above, HTTP is not supported with the 2.0 Java Client.
The following features were not implemented in 2.x versions of the Riak Java Client prior to version 2.1.0:
- Streaming MapReduce
- Streaming 2i Queries
- Streaming List Keys
- Streaming List Buckets
The following is a guide on how to upgrade your code base from the old Java Client 1.x library to the new 2.0 library.
The Riak Java Client 1.x series was an amalgamation of two previous clients (a PB client, and an HTTP one), with an API to bridge them together. Since the internal APIs for each were quite different, maintenance and new feature development was a sizable cost. Each protocol sub-client implemented all the different Riak API methods, the majority of each sub-client was implemented in two "God" classes, and each protocol managed its own connections. Most of the operations were synchronous as well.
With the 2.0 Client, the Netty library was adopted to help manage network connections, offer asyncronous operations from the core, and simplify the code base. The API was also moved to use Command & Builder patterns to simplify it, increase testability, and slim down the RiakClient class. Below you can see v1.x and v2.0 code examples.
To replace the old RiakFactory, use the new RiakClient object and it's static [newClient](http://basho.github.io/riak-java-client/2.0.1/com/basho/riak/client/api/RiakClient.html#newClient(int, java.util.List)) helper methods to create connections to Riak. If you need to specify min/max connection settings, operation retry settings, and security settings, see the example about configuring everything with RiakNode, RiakCluster, and RiakClient objects.
import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.raw.http.HTTPClusterConfig;
import com.basho.riak.client.raw.pbc.PBClusterConfig;
import com.basho.riak.client.RiakException;
public class App
{
public static void main(String[] args) throws RiakException
{
int maxConnections = 50;
PBClusterConfig myPbClusterConfig = new PBClusterConfig(maxConnections);
PBClientConfig myPbClientConfig = PBClientConfig.defaults();
myPbClusterConfig.addHosts(myPbClientConfig, "192.168.1.10","192.168.1.11","192.168.1.12");
IRiakClient myPbClient = RiakFactory.newClient(myPbClusterConfig);
myHttpClient.shutdown();
myPbClient.shutdown();
}
}
import java.net.UnknownHostException;
import com.basho.riak.client.api.RiakClient;
public class App {
public static void main(String [] args) throws UnknownHostException {
// Riak Client with multiple node connections
LinkedList<String> ipAddresses = new LinkedList<String>();
ipAddresses.add("192.168.1.10");
ipAddresses.add("192.168.1.11");
ipAddresses.add("192.168.1.12");
RiakClient myNodeClient = RiakClient.newClient(8087, ipAddresses);
myNodeClient.shutdown();
}
}
In the 1.4 client, the simple key-value operations were done through the RiakBucket class. In the 2.0 client, these operations became first-class operations, and each has its own class and associated builder classes(see FetchValue, StoreValue, UpdateValue, and DeleteValue).
import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.IRiakObject;
import com.basho.riak.client.RiakException;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;
public class App
{
// { "foo":"Some String", "bar":"some other string","foobar":5 }
class Pojo {
public String foo;
public String bar;
public int foobar;
}
public static void main(String[] args) throws RiakException
{
IRiakClient riakClient = RiakFactory.pbcClient();
Bucket myBucket = riakClient.fetchBucket("TestBucket").execute();
Pojo obj = new Pojo();
obj.bar = "barVal";
obj.foo = "fooVal";
obj.foobar = 42;
// Store object with default options
myBucket.store("TestKey", obj).execute();
// Store object with custom options (W=ALL)
myBucket.store("TestKey", obj).w(Quora.ALL).execute();
client.shutdown();
}
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.cap.Quorum;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;
public class StoreObject {
// { "foo":"Some String", "bar":"some other string","foobar":5 }
static class Pojo {
public String foo;
public String bar;
public int foobar;
}
public static void main(String[] args) throws UnknownHostException, ExecutionException, InterruptedException {
RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
Namespace ns = new Namespace("my_bucket");
Location location = new Location(ns, "TestKey");
Pojo obj = new Pojo();
obj.bar = "barVal";
obj.foo = "fooVal";
obj.foobar = 42;
// Store object with default options
StoreValue sv = new StoreValue.Builder(obj).withLocation(location).build();
StoreValue.Response svResponse = client.execute(sv);
// Store object with custom options (W=ALL)
StoreValue storeWithProps = new StoreValue.Builder(obj)
.withLocation(location)
.withOption(StoreValue.Option.W, Quorum.allQuorum()).build();
StoreValue.Response storeWithPropsResp = client.execute(storeWithProps);
client.shutdown();
}
}
import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.IRiakObject;
import com.basho.riak.client.RiakException;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;
public class App
{
// { "foo":"Some String", "bar":"some other string","foobar":5 }
class Pojo {
public String foo;
public String bar;
public int foobar;
}
public static void main(String[] args) throws RiakException
{
IRiakClient riakClient = RiakFactory.pbcClient();
Bucket myBucket = riakClient.fetchBucket("TestBucket").execute();
Pojo obj = new Pojo();
obj.bar = "barVal";
obj.foo = "fooVal";
obj.foobar = 42;
myBucket.store("TestKey", obj).execute();
// Fetch object as String
IRiakObject myObject = myBucket.fetch("TestKey").execute();
// note that getValueAsString() will return null here if there's no value in Riak
System.out.println(myObject.getValueAsString());
// Fetch JSON object, map to class
Pojo myObject = myBucket.fetch("TestKey", Pojo.class).execute();
System.out.println(myObject.foo);
riakClient.shutdown();
}
}
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;
public class FetchObject {
// { "foo":"Some String", "bar":"some other string","foobar":5 }
static class Pojo {
public String foo;
public String bar;
public int foobar;
}
public static void main(String [] args) throws UnknownHostException, ExecutionException, InterruptedException {
RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
Namespace ns = new Namespace("my_bucket");
Location location = new Location(ns, "TestKey");
Pojo obj = new Pojo();
obj.bar = "barVal";
obj.foo = "fooVal";
obj.foobar = 42;
StoreValue sv = new StoreValue.Builder(obj).withLocation(location).build();
StoreValue.Response svResponse = client.execute(sv);
FetchValue fv = new FetchValue.Builder(location).build();
FetchValue.Response response = client.execute(fv);
// Fetch object as String
String value = response.getValue(String.class);
System.out.println(value);
// Fetch object as Pojo class (map json to object)
Pojo myObject = response.getValue(Pojo.class);
System.out.println(myObject.foo);
client.shutdown();
}
}
With the 1.4.x client, you had to have a byte[]
field annotated with the @RiakVClock
annotation to do a store without a fetch first on sibling-enabled bucket. This option is still available in the 2.0 client, but you can also use a new .withVectorClock()
builder function to attach a known vector clock value to a StoreValue operation. You may also omit the vector clock altogether if you would like to create a new sibling, but this is not recommended.
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.kv.DeleteValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;
public class DeleteObject {
public static void main(String[] args) throws UnknownHostException, ExecutionException, InterruptedException {
RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
Location loc = new Location(new Namespace("my_bucket"), "TestKey3");
// Store an object with a single value
StoreValue storeObj = new StoreValue.Builder("obj1")
.withVectorClock(knownVClock)
.withLocation(loc).build();
client.execute(storeObj);
// To delete single value objects, just create a DeleteValue object
// with the Riak object's Location and execute it.
DeleteValue delObj = new DeleteValue.Builder(loc).build();
client.execute(delObj);
client.shutdown();
}
}
In the 1.4.x client, you would use a Mutation class to modify an object during a get/modify/store automatic update lifecycle. In the 2.0.x client, you can use an UpdateValue.Update class instead. The interfaces for each are identical, the mutator was just renamed to fit better with the overall API design. You can see the full examples in the 1.x Complex Store Example and 2.x Complex Store Example
import com.basho.riak.client.cap.Mutation;
public class GameLeaderboardMutation implements Mutation<GameLeaderboard>
{
private NameScorePair nsp;
public GameLeaderboardMutation(NameScorePair nsp)
{
this.nsp = nsp;
}
/*
* And at the heart of things is this method. After the data in Riak has
* been converted to GameLeaderboard Objects and any siblings resolved,
* Mutation.apply() is called and it is where you will do any and all modifications
*
* Here we add the NameScorePair we passed to the constructor to the
* GameLeaderboard object. After this our modified data will be stored back
* to Riak
*/
public GameLeaderboard apply(GameLeaderboard original)
{
original.addScore(nsp);
return original;
}
}
// Some time later in your main application code...
NameScorePair nsp = new NameScorePair("John", 1000);
GameLeaderboardMutation glbm = new GameLeaderboardMutation(nsp);
gl = b.store(new GameLeaderboard("SuperCoolGame"))
.withMutator(glbm)
.withResolver(new GameLeaderboardResolver())
.returnBody(true)
.execute();
import com.basho.riak.client.api.commands.kv.UpdateValue;
public class GameLeaderboardUpdate extends UpdateValue.Update<GameLeaderboard> {
private final NameScorePair nsp;
public GameLeaderboardUpdate(NameScorePair nsp) {
this.nsp = nsp;
}
/*
* And at the heart of things is this method. After the data in Riak has
* been converted to GameLeaderboard Objects and any siblings resolved,
* UpdateValue.Update<T>.apply() is called and it is where you will
* do any and all modifications.
*
* Here we add the NameScorePair we passed to the constructor to the
* GameLeaderboard object. After this our modified data will be stored back
* to Riak
*/
@Override
public GameLeaderboard apply(GameLeaderboard original) {
original.addScore(nsp);
return original;
}
}
// Some time later in your main application code...
NameScorePair nsp = new NameScorePair("John", 1000);
GameLeaderboardUpdate glbu = new GameLeaderboardUpdate(nsp);
UpdateValue updateValue = new UpdateValue.Builder(new Location(ns, fetchedGL.getGameName()))
.withUpdate(glbu)
.withStoreOption(StoreValue.Option.RETURN_BODY, true)
.build();
UpdateValue.Response updateResponse = client.execute(updateValue);
import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.IRiakObject;
import com.basho.riak.client.RiakException;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;
public class App
{
public static void main(String[] args) throws RiakException
{
IRiakClient riakClient = RiakFactory.pbcClient();
Bucket myBucket = riakClient.fetchBucket("TestBucket").execute();
// Store object
myBucket.store("TestKey", "foo").execute();
// Delete object
myBucket.delete("TestKey".execute();
client.shutdown();
}
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.kv.DeleteValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;
public class DeleteObject {
public static void main(String[] args) throws UnknownHostException, ExecutionException, InterruptedException {
RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
Location loc = new Location(new Namespace("my_bucket"), "TestKey3");
// Store an object
StoreValue storeObj = new StoreValue.Builder("obj1")
.withLocation(loc).build();
client.execute(storeObj);
// To delete single value objects, just create a DeleteValue object
// with the Riak object's Location and execute it.
DeleteValue delObj = new DeleteValue.Builder(loc).build();
client.execute(delObj);
client.shutdown();
}
}
import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.RiakException;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;
public class App
{
public static void main(String[] args) throws RiakException
{
riakClient = RiakFactory.httpClient();
// If the bucket does not exist in Riak, it will be created with the default properties
// when you use the fetchBucket() method.
Bucket myBucket = riakClient.fetchBucket("TestBucket").execute();
// By using the createBucket() method you can specify properties' values. If the bucket
// already exists in Riak the bucket properties will be updated.
Bucket myOtherBucket = riakClient.createBucket("TestBucket").nVal(2).r(1).execute();
// If you have an existing Bucket object, you can modify the values and update Riak
Bucket existingBucket = riakClient.fetchBucket("TestBucket").execute();
existingBucket = riakClient.updateBucket(existingBucket).nVal(3).r(2).execute();
riakClient.shutdown();
}
}
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.buckets.FetchBucketProperties;
import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
import com.basho.riak.client.core.operations.FetchBucketPropsOperation;
import com.basho.riak.client.core.query.BucketProperties;
import com.basho.riak.client.core.query.Namespace;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;
public class BucketTypeProperties
{
public static void main(String [] args) throws UnknownHostException, ExecutionException, InterruptedException {
RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
Namespace ns = new Namespace("TestBucket");
// If the bucket does not exist in Riak, it will be created with the default properties when you query for them.
FetchBucketProperties fetchProps = new FetchBucketProperties.Builder(ns).build();
FetchBucketPropsOperation.Response fetchResponse = client.execute(fetchProps);
BucketProperties bp = fetchResponse.getBucketProperties();
// By using the StoreBucketProperties command,
// you can specify properties' values.
//
// If the bucket already exists in Riak, the bucket
// properties will be updated.
//
// Only those properties that you specify will be updated,
// there is no need to fetch the bucket properties to edit them.
StoreBucketProperties storeProps =
new StoreBucketProperties.Builder(ns)
.withNVal(2).withR(1).build();
client.execute(storeProps);
client.shutdown();
}
}