-
Notifications
You must be signed in to change notification settings - Fork 16
Redis 1.x.x
Redis.scala represents an asynchronous client that can also be used to issue synchronous commands. It makes use of a pool of clients and features automatic pipelining.
import scredis._
// Initializes a Redis asynchronous client with default parameters (see reference.conf)
val redis = Redis()
You can provide your own Config file as follows.
import scredis._
import com.typesafe.config.ConfigFactory
val config = ConfigFactory.load("my-config-file.conf")
// Initializes a Redis asynchronous client with the provided configuration file
val myRedis = Redis(config)
// Or you can have scredis load your config file for you
val myRedis2 = Redis("my-config-file.conf")
Sometimes you need to specify a different config file based on different execution environments.
import scredis._
// Connects to redis.dev.my.com
val developmentRedis = Redis("my-config-file.conf", "development.scredis")
// Connects to redis.my.com
val productionRedis = Redis("my-config-file.conf", "production.scredis")
And in my-config-file.conf, you would put something like:
development.scredis {
client {
host = redis.dev.my.com
password = development-password
}
}
production.scredis {
client {
host = redis.my.com
password = production-password
}
}
In Redis.scala, all commands are asynchronous. There are however some commands that cannot be made asynchronous because of their blocking nature, e.g. BLPOP, BRPOP, etc. See the next section for details on how you can run synchronous commands within Redis.scala.
import scredis._
import scala.concurrent.Future
import scala.util.{ Success, Failure }
val redis = Redis()
// import the callback executor or any other executor of your choice to handle callbacks
import redis.ec
redis.hmSet("user:1")(("name", "Alex"), ("age", "25"), ("cash", "50")) onComplete {
case Success(_) => redis.hGet("user:1")("age") onComplete {
case Success(age) => println(age)
case Failure(e) => e.printStackTrace()
}
case Failure(e) => e.printStackTrace()
} // Prints Some("25")
val x: Future[Long] = for {
cash <- redis.hGet("user:1")("cash")
if cash.isDefined
newCash <- redis.hIncrBy("user:1")("cash", cash.get.toLong)
} yield newCash
x onComplete {
case Success(newCash) => println(newCash)
case Failure(e) => e.printStackTrace()
} // Prints 100
import scredis._
val redis = Redis()
// I need a result right now
val str = redis.sync(_.get("str"))
// This is equivalent
val str2 = redis.withClient(_.get("str"))
// This command is synchronous by definition and is therefore not accessible asynchronously
val msg = redis.sync(_.blPop(0, "queue"))
This is what scredis use to dispatch and process asynchronous commands. You don't have to take care of creating one as this will be automatically taken care of. However, advanced users may want to tweak the performance further by providing their own execution context.
The default execution context is a fixed thread pool of n threads where n corresponds to the max-threads parameter as defined in reference.conf.
import scredis._
import akka.actor.ActorSystem
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
// Defines a custom internal execution context
implicit val ec = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
// ec will be used by scredis for running asynchronous commands internally
val redis = Redis.withExecutionContext()
// Alternatively, you can use the dispatcher of an ActorSystem as the execution context
val system = ActorSystem()
val redisWithActorSystem = Redis.withExecutionContext("my-config")(system.dispatcher)
IMPORTANT NOTE: if you provide your own execution context, it is your responsibility to shut it down.
By default, all asynchronous commands do not get sent immediately but instead get queued. Once every x time unit (interval), or once enough commands have been queued (threshold), they all get sent together in a pipeline. This process improves overall performance, especially when loads of commands are issued in a short period of time, since the the number of client-server round-trips is reduced. However, there are cases where this can actually slow down things, e.g. when sending only one command (because scredis will wait until the configured interval has elapsed before actually sending it to the server). To cope with this issue, scredis provides a way to bypass the queuing mechanism on selected commands. This is achieved with the force boolean command option.
To sum up, every asynchronous command will be queued unless:
- Automatic pipelining is disabled
- Force is set to true for that specific command
- The command is a transactional or pipeline command
See examples below.
import scredis._
val redis = Redis()
// Gets queued
val future = redis.get("str")
// Gets sent immediately
val future2 = redis.get("str")(CommandOptions(force = true))
// Gets sent immediately
val future3 = redis.pipelined { p =>
p.ping()
p.ping()
p.ping()
}
You can also explicitely require the pipeline to be flushed on a global basis.
import scredis._
val redis = Redis()
// Gets queued
val future1 = redis.get("str")
// Gets queued
val future2 = redis.get("str")
// Gets queued
val future3 = redis.get("str")
// Send all queued commands right now
redis.flushAutomaticPipeline()
Now, what happens to CommandOptions? When auto-pipelining is enabled and force is false, then the CommandOptions associated to the queued commands will be aggregated using the MAX function. See example below (taken from RedisSpec.scala).
val c1 = redis.get("STR")(CommandOptions(
timeout = 1 second,
tries = 1,
sleep = None
))
val c2 = redis.set("STR", "Hello World!")(CommandOptions(
timeout = 3 seconds,
tries = 3,
sleep = Some(500 milliseconds)
))
val c3 = redis.get("STR")(CommandOptions(
timeout = 2 seconds,
tries = 2,
sleep = Some(1 second)
))
val c4 = redis.get("STR")(CommandOptions(
timeout = 2 seconds,
tries = 2,
sleep = Some(1 second),
force = true
))
Await.result(c4, Duration.Inf) must be('empty)
Await.result(c1, Duration.Inf) must be('empty)
Await.result(c2, Duration.Inf) must be(())
Await.result(c3, Duration.Inf) must be(Some("Hello World!"))
In the above test case, three commands (c1, c2 and c3) get queued for auto-pipelining and c4 gets sent immediately. The pipeline.sync() method will be called with the following aggregated CommandOptions.
CommandOptions(
timeout = 3 seconds, // max(1 second, 3 seconds, 2 seconds)
tries = 3, // max(1, 3, 2)
sleep = Some(1 second) // max(0, 500 millisedoncs, 1 second)
)