Skip to content

Fetching Data from Riak

broach edited this page Apr 25, 2012 · 60 revisions

First, a word or two about Riak, CAP Theorem and eventual consistency.

Unless you're already familiar with CAP Theorem and eventual consistency, taking the time to read through at least The Riak Fast Track would be well worth your while.

It's ok, we'll wait.

Ok! Now that you've read through that and understand that Riak is a system that favors AP with eventual C, this might make some sense to you.

## Important note: _Houston, we have a problem_

I'm putting this first even though it'll touch on classes we haven't discussed yet because it's rather important. With the current Java client, there's a small problem in that if you're wanting to do a read/modify/write cycle ... you may not want to fetch data using the methods described on this page

The current Java client design causes a store operation to first fetch the data from Riak (see: Storing data in Riak ) which means fetching it yourself before that is inefficient as well as creating a possibility where the version of the data you currently have from your fetch isn't the current data stored in Riak (one or more write operations could have occurred between the two fetches). This is a condition where you now have "stale" data and the default Mutation combined with the default Converter along with the vector clock from that internal fetch is going to overwrite whatever is actually in Riak with what you have. The solution is to write your own Mutation and only modify data in that class, encapsulating the entire read/modify/write cycle inside the store operation. Yes, that sounds backwards and is a bit to wrap your head around. See the page on storing data.

There is a way to avoid this by dealing exclusively with the IRiakObject class and foregoing having the FetchObject (and StoreObject) convert to and from your Pojo for you. You still have the problem where there is an unnecessary fetch operation performed when you call StoreObject.execute() but you will not miss interleaved writes. This is because StoreObject deals with IRiakObjects passed to it differently providing a different Converter. The vector clock is not lost from your fetch (it's contained in the IRiakObject you fetched) and will be used when the data is stored to Riak; a sibling would be created rather than an overwrite occurring. This is covered in the last section below, fetch data with the intention of modifying it then storing it afterward. If you are only wanting to read data from Riak then these examples are also appropriate.

This is something that will be addressed in a future version of the Java client.

Fetching data in Riak with the Java client.

In Riak data is stored in buckets. Those buckets have a number of options and tunable parameters, one of which is whether or not to allow sibling records. By default, a bucket does not allow sibling creation. The Riak Java client is somewhat built around this in that at the most basic level, you can simply say "fetch the data associated with this key" and the single object that is currently in Riak referenced by that key will be returned.

This of course does not reflect how you must use the client if your application is doing a typical read/modify/write cycle and you have multiple threads or instances of your application causing concurrency. We'll discuss that in the advanced section below.

With that in mind, the following basic examples show how you can retrieve data from Riak.

Basic Example #1: Fetch data as a String
Basic Example #2: Fetch JSON data, map to POJO
Basic Example #3: Changing query parameters

## Basic Fetch as a String
public class App
{
    import com.basho.riak.client.IRiakClient;
    import com.basho.riak.client.IRiakObject;
    import com.basho.riak.client.RiakException;
    import com.basho.riak.client.RiakFactory;
    import com.basho.riak.client.bucket.Bucket;

    public static void main(String[] args) throws RiakException
    {
        riakClient = RiakFactory.httpClient();
        Bucket myBucket = riakClient.fetchBucket("TestBucket").execute();
        IRiakObject myObject = myBucket.fetch("TestKey").execute();
        // note that getValueAsString() will return null here if there's no value in Riak
        System.out.println(myObject.getValueAsString());

        riakClient.shutdown();
    }
}
## Fetch JSON data, map to POJO

By default, the Riak Java client provides a default Converter (see the advanced section below for more on this) that will automatically map JSON stored in Riak to a POJO class you provide. Please note the section Houston, we have a problem above - if you plan to modify this object and then store it back to Riak you do not want to do this.

public class App
{
    // { "foo":"Some String", "bar":"some other string","foobar":5 }
    class Pojo {
        public String foo;
        public String bar;
        public int foobar;
    }

    import com.basho.riak.client.IRiakClient;
    import com.basho.riak.client.IRiakObject;
    import com.basho.riak.client.RiakException;
    import com.basho.riak.client.RiakFactory;
    import com.basho.riak.client.bucket.Bucket;

    public static void main(String[] args) throws RiakException
    {
        riakClient = RiakFactory.httpClient();
        Bucket myBucket = riakClient.fetchBucket("TestBucket").execute();
        // Note that myObject will be null if it doesn't exist in Riak
        Pojo myObject = myBucket.fetch("TestKey", Pojo.class).execute();
        System.out.println(myObject.foo);

        riakClient.shutdown();
    }
}
## Fetch data, changing query parameters for just this request ```java public class App { import com.basho.riak.client.IRiakClient; import com.basho.riak.client.IRiakObject; import com.basho.riak.client.RiakException; import com.basho.riak.client.RiakFactory; import com.basho.riak.client.bucket.Bucket; import com.basho.riak.client.cap.Quora; import com.basho.riak.client.operations.FetchObject;
public static void main(String[] args) throws RiakException
{
    riakClient = RiakFactory.httpClient();
    Bucket myBucket = riakClient.fetchBucket("TestBucket").execute();
    FetchObject<IRiakObject> fetchObject = myBucket.fetch("TestKey");
    IRiakObject myObject = fetchObject.r(Quora.ONE).pr(Quora.ONE).execute();
    // Note getValueAsString() will return null if the object does not exist in Riak
    System.out.println(myObject.getValueAsString());
    riakClient.shutdown();
}

}


<hr>
<a name="advanced"/>
# The Hard Way
## Eventual Consistency; Resolvers, and Converters

In many environments, you're going to configure your buckets to allow siblings and write the code that deals with them.

If you haven't already, please read [Houston, we have a problem](#houston) above. If you are fetching data from Riak with the intent of modifying it then storing it back to Riak, you do not want to use the Converter class to serialize / deserialize your POJO inside the FetchObject (or possibly use the FetchObject at all). 

With that being said, there are two Interfaces you're going to be using:

* [ConflictResolver&lt;T&gt;](http://basho.github.com/riak-java-client/1.0.5/com/basho/riak/client/cap/ConflictResolver.html)<BR>
    This Interface is used to resolve sibling records returned by Riak
* [Converter&lt;T&gt;](http://basho.github.com/riak-java-client/1.0.5/com/basho/riak/client/convert/Converter.html)<br>
    This interface is used to serialize/deserialize data to/from Riak

Here's the anatomy of making a fetch request using the Bucket interface and the returned FetchObject:

<a name="figure1"/>
### Figure 1
![Riak fetch](http://dl.dropbox.com/u/74693818/RJC-fetch-v2.png)

There are three versions of fetch() available via the Bucket interface. The first takes only a (String) key as an argument and returns a FetchObject&lt;IRiakObject&gt;. If you wish to modify your data and then store it back to Riak, this is what you want to use. See the next section: [fetch data with the intention of modifying it then storing it afterward](#iriakobject).

If you are only interested in reading data from Riak and having it deserialized to your own POJO, the second and third versions fit the bill. The second takes a (String) key and your POJO class (e.g. MyPojo.class) as arguments. The &lt;T&gt; Generic is inferred from this (see above), providing you with a FetchObject&lt;MyPojo&gt;. The third takes an instance of your POJO class with a String field annotated with @RiakKey which contains the key. Again, the &lt;T&gt; is inferred from this and you are returned a FetchObject&lt;MyPojo&gt;. 

If you do not provide a ConflictResolver, an instance of [DefaultResolver&lt;T&gt;](http://basho.github.com/riak-java-client/1.0.5/com/basho/riak/client/cap/DefaultResolver.html) is provided. This is actually not really a resolver at all; it throws an exception if siblings are present. If you do not provide a Converter, the [JSONConverter&lt;T&gt;](http://basho.github.com/riak-java-client/1.0.5/com/basho/riak/client/convert/JSONConverter.html) is provided. This Converter uses the Jackson JSON library to deserialize your POJO from JSON stored in Riak. 

### App.java
```java
import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.RiakException;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;
import java.util.Random;

public class App 
{
    public static void main( String[] args ) throws RiakException, InterruptedException
    {
        // We need some data, of course
        String playerNames[] = {"Steve","Brian","Bob" };
        Random generator = new Random();
        GameLeaderboard gl = new GameLeaderboard("SuperCoolGame");

        for (int i = 0; i < 5; i++)
        {
            NameScorePair nsp = new NameScorePair(playerNames[(i+3)%3], generator.nextInt(100));
            gl.addScore(nsp);
        }		

        // Store our initial leaderboard in Riak
        IRiakClient myDefaultHttpClient = RiakFactory.httpClient();
        Bucket b = myDefaultHttpClient.createBucket("demo_bucket").allowSiblings(true).execute();
        b.store(gl).execute();

        gl = b.fetch("SuperCoolGame", GameLeaderboard.class).execute();
        // Ouput the results!
        for ( NameScorePair n : gl.getScoreList())
        {
            System.out.println(n.getName() + " " + n.getScore());
        }
}

GameLeaderboardResolver.java

import com.basho.riak.client.cap.ConflictResolver;
import java.util.Collection;
import java.util.Iterator;

public class GameLeaderboardResolver implements ConflictResolver<GameLeaderboard>
{

    /*  
     * Riak hands us a list of GameLeaderboard objects. Our job is to reconcile
     * those objects and return a single, resolved GameLeaderboard
     *   
     * In this example, the logic is pretty straightforard. in our GameLeaderboard
     * class we created a addScores(Collection<NameScorePair>) method that will do the 
     * heavy lifting for us. By adding all the lists into one GameLeaderboard
     * via that method, we end up with the top 5 scores from all the siblings
     *   
     * Worth noting is that your ConflictResolver is *always* called, even if  
     * there are no siblings, or even if there is no object in Riak
     */  
        
    public GameLeaderboard resolve(Collection<GameLeaderboard> siblings)
    {   
        if (siblings.size() > 1)
        {       
            // We have siblings, need to resolve them
            Iterator<GameLeaderboard> i = siblings.iterator();

            GameLeaderboard resolvedLeaderboard = new GameLeaderboard(i.next());
                        
            while (i.hasNext())
            {           
                resolvedLeaderboard.addScores(i.next().getScoreList());
            }           
                        
            return resolvedLeaderboard;
        }       
        else if (siblings.size() == 1)
        {       
            // Only one object - just return it
            return siblings.iterator().next();
        }       
        else    
        {       
            // No object returned - return null 
            return null;
        }       
    }   
}

GameLeaderboard.java

import com.basho.riak.client.convert.RiakKey;
import java.util.ArrayList;
import java.util.Collection;
import java.util.TreeSet;

public final class GameLeaderboard
{
    @RiakKey private String gameName;
    private TreeSet<NameScorePair> scoreList = new TreeSet<NameScorePair>();
        
    // required by Jackson for JSON serialization
    public GameLeaderboard() {}
        
    public GameLeaderboard(String gameName)
    {   
        this.gameName = gameName;
    }   
        
    public GameLeaderboard(GameLeaderboard other)
    {   
        this.gameName = other.getGameName();
        this.addScores(other.getScoreList());
    }   
        
    public void addScore(NameScorePair s)
    {   
        scoreList.add(s);
        if (scoreList.size() > 5)
            scoreList.pollFirst();
    }   
        
    public void addScores(Collection<NameScorePair> scores)
    {   
        scoreList.addAll(scores);
        while (scoreList.size() > 5)
            scoreList.pollFirst();
                
    }   
        
    public String getGameName()
    {   
        return gameName;
    }   
        
    public ArrayList<NameScorePair> getScoreList()
    {   
        return new ArrayList<NameScorePair>(scoreList.descendingSet());
    }   
}

NameScorePair.java

public class NameScorePair implements Comparable<NameScorePair>
{
   private String name;
   private int score;

   // Required by Jackson for JSON serialization
   public NameScorePair() {}
   
   public NameScorePair(String name, int score)
   {    
      this.name = name;
      this.score = score;
   }    
        
   public int compareTo(NameScorePair t)
   {    
      if (this.getScore() < t.getScore())
         return -1;     
      else if (this.getScore() > t.getScore())
         return 1;      
      else if (this.getName().equalsIgnoreCase(name))
         return 0;      
      else      
         return -1;     
   }    

   @Override
   public int hashCode()
   {    
      int hash = 3; 
      hash = 47 * hash + (this.name != null ? this.name.hashCode() : 0);
      hash = 47 * hash + this.score;
      return hash;
   }    

   @Override
   public boolean equals(Object o)
   {    
      if (o == null)
      {         
         return false;  
      }         
      else if (o instanceof NameScorePair)
      {         
         return ((name.equalsIgnoreCase(((NameScorePair)o).getName())) &&
            (score == ((NameScorePair)o).getScore()));
      }         
      else      
         return false;  
   }    
        
   public int getScore()
   {    
      return score;
   }    
   
   public String getName()
   {
      return name;
   }
}
## Fetch data with the intention of modifying it then storing it afterward

As noted in Houston, we have a problem, if you want to do a read/modify/write cycle outside of the store operation, you need to forgo passing your own POJO class and use the Bucket.fetch() method that only takes a (String) key as an argument. This will return a FetchObject<IRiakObject> to you. You will still need to provide a ConflictResolver to resolve siblings as the default one provided simply throws an exception if siblings are present. When you call FetchObject.execute() an IRiakObject is returned to you which contains your data as a payload.

If you're storing JSON, you'll want to use the JSON library of your choice to deserialize the byte[] array held inside the IRiakObject, modify the data, then put it back in the IRiakObject. The same applies if you're using a different serialization library such as Kryo for example. Unfortunately you will also have to do the same in your ConflictResolver.

(More to come - the cookbook is a work in progress)