diff --git a/.circleci/config.yml b/.circleci/config.yml
index 617d9034..544ccacc 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -61,7 +61,7 @@ jobs:
name: install Redis
command: |
sudo apt-get -y update
- sudo apt-get install -y gcc-8 g++-8 libssl-dev
+ sudo apt-get install -y libssl-dev
wget http://download.redis.io/releases/redis-6.0.10.tar.gz
tar -xzvf redis-6.0.10.tar.gz
make -C redis-6.0.10 -j`nproc` BUILD_TLS=yes
@@ -118,7 +118,7 @@ jobs:
name: install Redis
command: |
sudo apt-get -y update
- sudo apt-get install -y gcc-8 g++-8 libssl-dev
+ sudo apt-get install -y libssl-dev
wget http://download.redis.io/releases/redis-6.0.10.tar.gz
tar -xzvf redis-6.0.10.tar.gz
make -C redis-6.0.10 -j`nproc` BUILD_TLS=yes
diff --git a/Makefile b/Makefile
index 9ae9e4d8..cbdc62be 100644
--- a/Makefile
+++ b/Makefile
@@ -1,3 +1,6 @@
+# user
+USER_ACL = user alice on >p1pp0 ~* +@all
+
# STANDALONE REDIS NODE
define REDIS_STANDALONE_NODE_CONF
daemonize yes
@@ -7,6 +10,7 @@ logfile /tmp/redis_standalone_node_for_spark-redis.log
save ""
appendonly no
requirepass passwd
+$(USER_ACL)
endef
# STANDALONE REDIS NODE WITH SSL
@@ -18,6 +22,7 @@ logfile /tmp/redis_standalone_node_ssl_for_spark-redis.log
save ""
appendonly no
requirepass passwd
+$(USER_ACL)
tls-auth-clients no
tls-port 6380
tls-cert-file ./src/test/resources/tls/redis.crt
@@ -30,6 +35,7 @@ endef
define REDIS_CLUSTER_NODE1_CONF
daemonize yes
port 7379
+$(USER_ACL)
pidfile /tmp/redis_cluster_node1_for_spark-redis.pid
logfile /tmp/redis_cluster_node1_for_spark-redis.log
save ""
@@ -41,6 +47,7 @@ endef
define REDIS_CLUSTER_NODE2_CONF
daemonize yes
port 7380
+$(USER_ACL)
pidfile /tmp/redis_cluster_node2_for_spark-redis.pid
logfile /tmp/redis_cluster_node2_for_spark-redis.log
save ""
@@ -52,6 +59,7 @@ endef
define REDIS_CLUSTER_NODE3_CONF
daemonize yes
port 7381
+$(USER_ACL)
pidfile /tmp/redis_cluster_node3_for_spark-redis.pid
logfile /tmp/redis_cluster_node3_for_spark-redis.log
save ""
diff --git a/README.md b/README.md
index 6a74ad93..7c1cda22 100644
--- a/README.md
+++ b/README.md
@@ -20,12 +20,13 @@ Spark-Redis also supports Spark Streaming (DStreams) and Structured Streaming.
The library has several branches, each corresponds to a different supported Spark version. For example, 'branch-2.3' works with any Spark 2.3.x version.
The master branch contains the recent development for the next release.
-| Spark-Redis | Spark | Redis | Supported Scala Versions |
-| ----------------------------------------------------------------| ------------- | ---------------- | ------------------------ |
-| [master](https://github.com/RedisLabs/spark-redis/) | 3.0.x | >=2.9.0 | 2.12 |
-| [2.4, 2.5, 2.6](https://github.com/RedisLabs/spark-redis/tree/branch-2.4) | 2.4.x | >=2.9.0 | 2.11, 2.12 |
-| [2.3](https://github.com/RedisLabs/spark-redis/tree/branch-2.3) | 2.3.x | >=2.9.0 | 2.11 |
-| [1.4](https://github.com/RedisLabs/spark-redis/tree/branch-1.4) | 1.4.x | | 2.10 |
+| Spark-Redis | Spark | Redis | Supported Scala Versions |
+|---------------------------------------------------------------------------|-------| ---------------- | ------------------------ |
+| [master](https://github.com/RedisLabs/spark-redis/) | 3.2.x | >=2.9.0 | 2.12 |
+| [3.0](https://github.com/RedisLabs/spark-redis/tree/branch-3.0) | 3.0.x | >=2.9.0 | 2.12 |
+| [2.4, 2.5, 2.6](https://github.com/RedisLabs/spark-redis/tree/branch-2.4) | 2.4.x | >=2.9.0 | 2.11, 2.12 |
+| [2.3](https://github.com/RedisLabs/spark-redis/tree/branch-2.3) | 2.3.x | >=2.9.0 | 2.11 |
+| [1.4](https://github.com/RedisLabs/spark-redis/tree/branch-1.4) | 1.4.x | | 2.10 |
## Known limitations
diff --git a/doc/configuration.md b/doc/configuration.md
index 51671c31..3efa217a 100644
--- a/doc/configuration.md
+++ b/doc/configuration.md
@@ -5,6 +5,7 @@ The supported configuration parameters are:
* `spark.redis.host` - host or IP of the initial node we connect to. The connector will read the cluster
topology from the initial node, so there is no need to provide the rest of the cluster nodes.
* `spark.redis.port` - the initial node's TCP redis port.
+* `spark.redis.user` - the initial node's AUTH user
* `spark.redis.auth` - the initial node's AUTH password
* `spark.redis.db` - optional DB number. Avoid using this, especially in cluster mode.
* `spark.redis.timeout` - connection timeout in ms, 2000 ms by default
diff --git a/doc/dataframe.md b/doc/dataframe.md
index 3d62b78f..e23b2479 100644
--- a/doc/dataframe.md
+++ b/doc/dataframe.md
@@ -330,22 +330,23 @@ root
## DataFrame options
-| Name | Description | Type | Default |
-| -----------------------| ------------------------------------------------------------------------------------------| --------------------- | ------- |
-| model | defines the Redis model used to persist DataFrame, see [Persistence model](#persistence-model)| `enum [binary, hash]` | `hash` |
-| filter.keys.by.type | make sure the underlying data structures match persistence model | `Boolean` | `false` |
-| partitions.number | number of partitions (applies only when reading DataFrame) | `Int` | `3` |
+| Name | Description | Type | Default |
+|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------| --------------------- | ------- |
+| model | defines the Redis model used to persist DataFrame, see [Persistence model](#persistence-model) | `enum [binary, hash]` | `hash` |
+| filter.keys.by.type | make sure the underlying data structures match persistence model | `Boolean` | `false` |
+| partitions.number | number of partitions (applies only when reading DataFrame) | `Int` | `3` |
| key.column | when writing - specifies unique column used as a Redis key, by default a key is auto-generated
when reading - specifies column name to store hash key | `String` | - |
-| ttl | data time to live in `seconds`. Data doesn't expire if `ttl` is less than `1` | `Int` | `0` |
-| infer.schema | infer schema from random row, all columns will have `String` type | `Boolean` | `false` |
-| max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 100 |
-| scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 100 |
-| iterator.grouping.size | the number of items to be grouped when iterating over underlying RDD partition | `Int` | 1000 |
-| host | overrides `spark.redis.host` configured in SparkSession | `String` | `localhost` |
-| port | overrides `spark.redis.port` configured in SparkSession | `Int` | `6379` |
-| auth | overrides `spark.redis.auth` configured in SparkSession | `String` | - |
-| dbNum | overrides `spark.redis.db` configured in SparkSession | `Int` | `0` |
-| timeout | overrides `spark.redis.timeout` configured in SparkSession | `Int` | `2000` |
+| ttl | data time to live in `seconds`. Data doesn't expire if `ttl` is less than `1` | `Int` | `0` |
+| infer.schema | infer schema from random row, all columns will have `String` type | `Boolean` | `false` |
+| max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 100 |
+| scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 100 |
+| iterator.grouping.size | the number of items to be grouped when iterating over underlying RDD partition | `Int` | 1000 |
+| host | overrides `spark.redis.host` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | `localhost` |
+| port | overrides `spark.redis.port` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `6379` |
+| user | overrides `spark.redis.user` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | - |
+| auth | overrides `spark.redis.auth` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | - |
+| dbNum | overrides `spark.redis.db` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `0` |
+| timeout | overrides `spark.redis.timeout` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `2000` |
## Known limitations
diff --git a/pom.xml b/pom.xml
index 71b0e6b4..80663f35 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.redislabs
spark-redis_2.12
- 3.0.0-SNAPSHOT
+ 3.1.0-SNAPSHOT
Spark-Redis
A Spark library for Redis
http://github.com/RedisLabs/spark-redis
@@ -49,8 +49,8 @@
1.8
2.12
${scala.major.version}.0
- 3.4.1
- 3.0.1
+ 3.9.0
+ 3.2.1
1.0
@@ -271,11 +271,6 @@
-
- org.apache.commons
- commons-pool2
- 2.0
-
redis.clients
jedis
diff --git a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
index 322d8c5f..a4a2d61f 100644
--- a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
+++ b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
@@ -1,10 +1,10 @@
package com.redislabs.provider.redis
-import redis.clients.jedis.{JedisPoolConfig, Jedis, JedisPool}
import redis.clients.jedis.exceptions.JedisConnectionException
+import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
+import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
-
import scala.collection.JavaConversions._
@@ -21,11 +21,11 @@ object ConnectionPool {
poolConfig.setTestOnBorrow(false)
poolConfig.setTestOnReturn(false)
poolConfig.setTestWhileIdle(false)
- poolConfig.setMinEvictableIdleTimeMillis(60000)
- poolConfig.setTimeBetweenEvictionRunsMillis(30000)
+ poolConfig.setSoftMinEvictableIdleTime(Duration.ofMinutes(1))
+ poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(30))
poolConfig.setNumTestsPerEvictionRun(-1)
- new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl)
+ new JedisPool(poolConfig, re.host, re.port, re.timeout, re.user, re.auth, re.dbNum, re.ssl)
}
)
var sleepTime: Int = 4
diff --git a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala
index f4e80ada..34f41944 100644
--- a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala
+++ b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala
@@ -15,12 +15,14 @@ import scala.collection.JavaConversions._
*
* @param host the redis host or ip
* @param port the redis port
+ * @param user the authentication username
* @param auth the authentication password
* @param dbNum database number (should be avoided in general)
* @param ssl true to enable SSL connection. Defaults to false
*/
case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
port: Int = Protocol.DEFAULT_PORT,
+ user: String = null,
auth: String = null,
dbNum: Int = Protocol.DEFAULT_DATABASE,
timeout: Int = Protocol.DEFAULT_TIMEOUT,
@@ -36,6 +38,7 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
this(
conf.get("spark.redis.host", Protocol.DEFAULT_HOST),
conf.getInt("spark.redis.port", Protocol.DEFAULT_PORT),
+ conf.get("spark.redis.user", null),
conf.get("spark.redis.auth", null),
conf.getInt("spark.redis.db", Protocol.DEFAULT_DATABASE),
conf.getInt("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT),
@@ -54,6 +57,7 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
this(
parameters.getOrElse("host", conf.get("spark.redis.host", Protocol.DEFAULT_HOST)),
parameters.getOrElse("port", conf.get("spark.redis.port", Protocol.DEFAULT_PORT.toString)).toInt,
+ parameters.getOrElse("user", conf.get("spark.redis.user", null)),
parameters.getOrElse("auth", conf.get("spark.redis.auth", null)),
parameters.getOrElse("dbNum", conf.get("spark.redis.db", Protocol.DEFAULT_DATABASE.toString)).toInt,
parameters.getOrElse("timeout", conf.get("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT.toString)).toInt,
@@ -64,17 +68,17 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
/**
* Constructor with Jedis URI
*
- * @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
+ * @param uri connection URI in the form of redis://$user:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
*/
def this(uri: URI) {
- this(uri.getHost, uri.getPort, JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri),
+ this(uri.getHost, uri.getPort, JedisURIHelper.getUser(uri), JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri),
Protocol.DEFAULT_TIMEOUT, uri.getScheme == RedisSslScheme)
}
/**
* Constructor with Jedis URI from String
*
- * @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
+ * @param uri connection URI in the form of redis://$user:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
*/
def this(uri: String) {
this(URI.create(uri))
@@ -280,8 +284,14 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
val port = replinfo.filter(_.contains("master_port:"))(0).trim.substring(12).toInt
//simply re-enter this function witht he master host/port
- getNonClusterNodes(initialHost = new RedisEndpoint(host, port,
- initialHost.auth, initialHost.dbNum, ssl = initialHost.ssl))
+ getNonClusterNodes(initialHost = RedisEndpoint(
+ host = host,
+ port = port,
+ user = initialHost.user,
+ auth = initialHost.auth,
+ dbNum = initialHost.dbNum,
+ ssl = initialHost.ssl
+ ))
} else {
//this is a master - take its slaves
@@ -295,10 +305,17 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
val nodes = master +: slaves
val range = nodes.length
- (0 until range).map(i =>
- RedisNode(RedisEndpoint(nodes(i)._1, nodes(i)._2, initialHost.auth, initialHost.dbNum,
- initialHost.timeout, initialHost.ssl),
- 0, 16383, i, range)).toArray
+ (0 until range).map(i => {
+ val endpoint = RedisEndpoint(
+ host = nodes(i)._1,
+ port = nodes(i)._2,
+ user = initialHost.user,
+ auth = initialHost.auth,
+ dbNum = initialHost.dbNum,
+ timeout = initialHost.timeout,
+ ssl = initialHost.ssl)
+ RedisNode(endpoint, 0, 16383, i, range)
+ }).toArray
}
}
@@ -326,12 +343,15 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
val node = slotInfo(i + 2).asInstanceOf[java.util.List[java.lang.Object]]
val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]])
val port = node.get(1).toString.toInt
- RedisNode(RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum,
- initialHost.timeout, initialHost.ssl),
- sPos,
- ePos,
- i,
- slotInfo.size - 2)
+ val endpoint = RedisEndpoint(
+ host = host,
+ port = port,
+ user = initialHost.user,
+ auth = initialHost.auth,
+ dbNum = initialHost.dbNum,
+ timeout = initialHost.timeout,
+ ssl = initialHost.ssl)
+ RedisNode(endpoint, sPos, ePos, i, slotInfo.size - 2)
})
}
}.toArray
diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala
index 0b900c0f..b8c17706 100644
--- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala
+++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala
@@ -401,7 +401,7 @@ object RedisContext extends Serializable {
pipeline.set(k, v)
}
else {
- pipeline.setex(k, ttl, v)
+ pipeline.setex(k, ttl.toLong, v)
}
}
conn.close()
@@ -422,7 +422,7 @@ object RedisContext extends Serializable {
val pipeline = foreachWithPipelineNoLastSync(conn, arr) { case (pipeline, (k, v)) =>
pipeline.hset(hashName, k, v)
}
- if (ttl > 0) pipeline.expire(hashName, ttl)
+ if (ttl > 0) pipeline.expire(hashName, ttl.toLong)
pipeline.sync()
conn.close()
}
@@ -448,7 +448,7 @@ object RedisContext extends Serializable {
foreachWithPipeline(conn, arr) { (pipeline, a) =>
val (key, hashFields) = a._2
pipeline.hmset(key, hashFields)
- if (ttl > 0) pipeline.expire(key, ttl)
+ if (ttl > 0) pipeline.expire(key, ttl.toLong)
}
}
}
@@ -478,7 +478,7 @@ object RedisContext extends Serializable {
foreachWithPipeline(conn, arr) { (pipeline, a) =>
val (key, hashFields) = a._2
pipeline.hmset(key, hashFields)
- if (ttl > 0) pipeline.expire(key, ttl)
+ if (ttl > 0) pipeline.expire(key, ttl.toLong)
}
}
}
@@ -498,7 +498,7 @@ object RedisContext extends Serializable {
val pipeline = foreachWithPipelineNoLastSync(conn, arr) { case (pipeline, (k, v)) =>
pipeline.zadd(zsetName, v.toDouble, k)
}
- if (ttl > 0) pipeline.expire(zsetName, ttl)
+ if (ttl > 0) pipeline.expire(zsetName, ttl.toLong)
pipeline.sync()
conn.close()
}
@@ -516,7 +516,7 @@ object RedisContext extends Serializable {
val pipeline = foreachWithPipelineNoLastSync(conn, arr) { (pipeline, v) =>
pipeline.sadd(setName, v)
}
- if (ttl > 0) pipeline.expire(setName, ttl)
+ if (ttl > 0) pipeline.expire(setName, ttl.toLong)
pipeline.sync()
conn.close()
}
@@ -537,7 +537,7 @@ object RedisContext extends Serializable {
val pipeline = foreachWithPipelineNoLastSync(conn, arr) { (pipeline, v) =>
pipeline.rpush(listName, v)
}
- if (ttl > 0) pipeline.expire(listName, ttl)
+ if (ttl > 0) pipeline.expire(listName, ttl.toLong)
pipeline.sync()
conn.close()
}
@@ -560,7 +560,7 @@ object RedisContext extends Serializable {
foreachWithPipeline(conn, arr) { (pipeline, a) =>
val (key, listVals) = a._2
pipeline.rpush(key, listVals: _*)
- if (ttl > 0) pipeline.expire(key, ttl)
+ if (ttl > 0) pipeline.expire(key, ttl.toLong)
}
}
}
@@ -583,7 +583,7 @@ object RedisContext extends Serializable {
foreachWithPipeline(conn, arr) { (pipeline, a) =>
val (key, listVals) = a._2
pipeline.rpush(key, listVals: _*)
- if (ttl > 0) pipeline.expire(key, ttl)
+ if (ttl > 0) pipeline.expire(key, ttl.toLong)
}
}
}
diff --git a/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala
index c9b0a981..13e45300 100644
--- a/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala
+++ b/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala
@@ -16,7 +16,7 @@ class BinaryRedisPersistence extends RedisPersistence[Array[Byte]] {
override def save(pipeline: Pipeline, key: String, value: Array[Byte], ttl: Int): Unit = {
val keyBytes = key.getBytes(UTF_8)
if (ttl > 0) {
- pipeline.setex(keyBytes, ttl, value)
+ pipeline.setex(keyBytes, ttl.toLong, value)
} else {
pipeline.set(keyBytes, value)
}
diff --git a/src/main/scala/org/apache/spark/sql/redis/HashRedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/HashRedisPersistence.scala
index a22c4614..03b7f97a 100644
--- a/src/main/scala/org/apache/spark/sql/redis/HashRedisPersistence.scala
+++ b/src/main/scala/org/apache/spark/sql/redis/HashRedisPersistence.scala
@@ -19,7 +19,7 @@ class HashRedisPersistence extends RedisPersistence[Any] {
val javaValue = value.asInstanceOf[Map[String, String]].asJava
pipeline.hmset(key, javaValue)
if (ttl > 0) {
- pipeline.expire(key, ttl)
+ pipeline.expire(key, ttl.toLong)
}
}
diff --git a/src/test/scala/com/redislabs/provider/redis/RedisConfigSuite.scala b/src/test/scala/com/redislabs/provider/redis/RedisConfigSuite.scala
index 0b2eb394..dfe80eb7 100644
--- a/src/test/scala/com/redislabs/provider/redis/RedisConfigSuite.scala
+++ b/src/test/scala/com/redislabs/provider/redis/RedisConfigSuite.scala
@@ -5,8 +5,8 @@ import redis.clients.jedis.util.JedisClusterCRC16
class RedisConfigSuite extends FunSuite with Matchers {
- val redisStandaloneConfig = new RedisConfig(RedisEndpoint("127.0.0.1", 6379, "passwd"))
- val redisClusterConfig = new RedisConfig(RedisEndpoint("127.0.0.1", 7379))
+ val redisStandaloneConfig = new RedisConfig(RedisEndpoint(host = "127.0.0.1", port = 6379, auth = "passwd"))
+ val redisClusterConfig = new RedisConfig(RedisEndpoint(host = "127.0.0.1", port = 7379))
test("getNodesBySlots") {
redisStandaloneConfig.getNodesBySlots(0, 16383).length shouldBe 1
@@ -23,7 +23,7 @@ class RedisConfigSuite extends FunSuite with Matchers {
}
test("getNodes") {
- redisStandaloneConfig.getNodes(RedisEndpoint("127.0.0.1", 6379, "passwd")).length shouldBe 1
- redisClusterConfig.getNodes(RedisEndpoint("127.0.0.1", 7379)).length shouldBe 7
+ redisStandaloneConfig.getNodes(RedisEndpoint(host = "127.0.0.1", port = 6379, auth = "passwd")).length shouldBe 1
+ redisClusterConfig.getNodes(RedisEndpoint(host = "127.0.0.1", port = 7379)).length shouldBe 7
}
}
diff --git a/src/test/scala/com/redislabs/provider/redis/df/AclDataframeSuite.scala b/src/test/scala/com/redislabs/provider/redis/df/AclDataframeSuite.scala
new file mode 100644
index 00000000..86982b5d
--- /dev/null
+++ b/src/test/scala/com/redislabs/provider/redis/df/AclDataframeSuite.scala
@@ -0,0 +1,57 @@
+package com.redislabs.provider.redis.df
+
+import com.redislabs.provider.redis.util.Person.{TableNamePrefix, data}
+import com.redislabs.provider.redis.util.TestUtils.{generateTableName, interceptSparkErr}
+import org.apache.spark.sql.redis.{RedisFormat, SqlOptionTableName}
+import org.scalatest.Matchers
+import redis.clients.jedis.exceptions.JedisConnectionException
+
+/**
+ * Basic dataframe test with user/password authentication
+ */
+trait AclDataframeSuite extends RedisDataframeSuite with Matchers {
+
+ test("save and load dataframe") {
+ val tableName = generateTableName(TableNamePrefix)
+ val df = spark.createDataFrame(data)
+ df.write.format(RedisFormat)
+ .option(SqlOptionTableName, tableName)
+ .save()
+ val loadedDf = spark.read.format(RedisFormat)
+ .option(SqlOptionTableName, tableName)
+ .load()
+ .cache()
+ verifyDf(loadedDf)
+ }
+
+ test("incorrect password in dataframe options") {
+ interceptSparkErr[JedisConnectionException] {
+ val tableName = generateTableName(TableNamePrefix)
+ val df = spark.createDataFrame(data)
+ df.write.format(RedisFormat)
+ .option(SqlOptionTableName, tableName)
+ .option("user", user)
+ .option("auth", "wrong_password")
+ .save()
+ }
+ }
+
+ test("correct user/password in dataframe options") {
+ val tableName = generateTableName(TableNamePrefix)
+ val df = spark.createDataFrame(data)
+ df.write.format(RedisFormat)
+ .option(SqlOptionTableName, tableName)
+ .option("user", user)
+ .option("auth", userPassword)
+ .save()
+
+ val loadedDf = spark.read.format(RedisFormat)
+ .option(SqlOptionTableName, tableName)
+ .option("user", user)
+ .option("auth", userPassword)
+ .load()
+ .cache()
+ verifyDf(loadedDf)
+ }
+
+}
diff --git a/src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeClusterSuite.scala b/src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeClusterSuite.scala
new file mode 100644
index 00000000..a25f818f
--- /dev/null
+++ b/src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeClusterSuite.scala
@@ -0,0 +1,6 @@
+package com.redislabs.provider.redis.df.acl
+
+import com.redislabs.provider.redis.df.AclDataframeSuite
+import com.redislabs.provider.redis.env.RedisClusterAclEnv
+
+class AclDataframeClusterSuite extends AclDataframeSuite with RedisClusterAclEnv
diff --git a/src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeStandaloneSuite.scala b/src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeStandaloneSuite.scala
new file mode 100644
index 00000000..7f8288c9
--- /dev/null
+++ b/src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeStandaloneSuite.scala
@@ -0,0 +1,6 @@
+package com.redislabs.provider.redis.df.acl
+
+import com.redislabs.provider.redis.df.AclDataframeSuite
+import com.redislabs.provider.redis.env.RedisStandaloneAclEnv
+
+class AclDataframeStandaloneSuite extends AclDataframeSuite with RedisStandaloneAclEnv
diff --git a/src/test/scala/com/redislabs/provider/redis/env/Env.scala b/src/test/scala/com/redislabs/provider/redis/env/Env.scala
index 7cc9b85c..20fcd166 100644
--- a/src/test/scala/com/redislabs/provider/redis/env/Env.scala
+++ b/src/test/scala/com/redislabs/provider/redis/env/Env.scala
@@ -14,7 +14,12 @@ trait Env {
val redisHost = "127.0.0.1"
val redisPort = 6379
- val redisAuth = "passwd"
+ val redisAuth = "passwd" // password for 'default' user (AUTH )
+
+ // user credentials
+ val user = "alice"
+ val userPassword = "p1pp0"
+
val redisConfig: RedisConfig
}
diff --git a/src/test/scala/com/redislabs/provider/redis/env/RedisClusterAclEnv.scala b/src/test/scala/com/redislabs/provider/redis/env/RedisClusterAclEnv.scala
new file mode 100644
index 00000000..7fbc3629
--- /dev/null
+++ b/src/test/scala/com/redislabs/provider/redis/env/RedisClusterAclEnv.scala
@@ -0,0 +1,28 @@
+package com.redislabs.provider.redis.env
+
+import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint}
+import org.apache.spark.SparkConf
+
+/**
+ * Cluster with user/password authentication
+ */
+trait RedisClusterAclEnv extends Env {
+
+ override val redisPort = 7379
+
+ override val conf: SparkConf = new SparkConf()
+ .setMaster("local[*]").setAppName(getClass.getName)
+ .set("spark.redis.host", redisHost)
+ .set("spark.redis.port", s"$redisPort")
+ .set("spark.redis.user", user)
+ .set("spark.redis.auth", userPassword)
+ .set("spark.streaming.stopGracefullyOnShutdown", "true")
+ .set("spark.driver.bindAddress", "127.0.0.1")
+
+ override val redisConfig: RedisConfig =
+ new RedisConfig(RedisEndpoint(
+ host = redisHost,
+ port = redisPort,
+ user = user,
+ auth = userPassword))
+}
diff --git a/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneAclEnv.scala b/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneAclEnv.scala
new file mode 100644
index 00000000..fce17627
--- /dev/null
+++ b/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneAclEnv.scala
@@ -0,0 +1,26 @@
+package com.redislabs.provider.redis.env
+
+import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint}
+import org.apache.spark.SparkConf
+
+/**
+ * Standalone with user/password authentication
+ */
+trait RedisStandaloneAclEnv extends Env {
+
+ override val conf: SparkConf = new SparkConf()
+ .setMaster("local[*]").setAppName(getClass.getName)
+ .set("spark.redis.host", redisHost)
+ .set("spark.redis.port", s"$redisPort")
+ .set("spark.redis.user", user)
+ .set("spark.redis.auth", userPassword)
+ .set("spark.streaming.stopGracefullyOnShutdown", "true")
+ .set("spark.driver.bindAddress", "127.0.0.1")
+
+ override val redisConfig: RedisConfig =
+ new RedisConfig(RedisEndpoint(
+ host = redisHost,
+ port = redisPort,
+ user = user,
+ auth = userPassword))
+}
diff --git a/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneEnv.scala b/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneEnv.scala
index 88bd2759..b886e295 100644
--- a/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneEnv.scala
+++ b/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneEnv.scala
@@ -4,8 +4,8 @@ import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint}
import org.apache.spark.SparkConf
/**
- * @author The Viet Nguyen
- */
+ * @author The Viet Nguyen
+ */
trait RedisStandaloneEnv extends Env {
override val conf: SparkConf = new SparkConf()
@@ -18,5 +18,8 @@ trait RedisStandaloneEnv extends Env {
.set("spark.driver.bindAddress", "127.0.0.1")
override val redisConfig: RedisConfig =
- new RedisConfig(RedisEndpoint(redisHost, redisPort, redisAuth))
+ new RedisConfig(RedisEndpoint(
+ host = redisHost,
+ port = redisPort,
+ auth = redisAuth))
}
diff --git a/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneSSLEnv.scala b/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneSSLEnv.scala
index 2945fffc..632336c7 100644
--- a/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneSSLEnv.scala
+++ b/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneSSLEnv.scala
@@ -6,7 +6,7 @@ import org.apache.spark.SparkConf
trait RedisStandaloneSSLEnv extends Env {
override val redisPort = 6380
-
+
override val conf: SparkConf = new SparkConf()
.setMaster("local[*]").setAppName(getClass.getName)
.set("spark.redis.host", redisHost)
@@ -17,5 +17,5 @@ trait RedisStandaloneSSLEnv extends Env {
.set("spark.driver.bindAddress", "127.0.0.1")
override val redisConfig: RedisConfig =
- new RedisConfig(RedisEndpoint(redisHost, redisPort, redisAuth, ssl=true))
+ new RedisConfig(RedisEndpoint(redisHost, redisPort, auth = redisAuth, ssl = true))
}
diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala
index 6e08ac3c..17102052 100644
--- a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala
+++ b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala
@@ -3,12 +3,14 @@ package com.redislabs.provider.redis.rdd
import com.redislabs.provider.redis.util.ConnectionUtils.withConnection
import org.scalatest.Matchers
import com.redislabs.provider.redis._
+import com.redislabs.provider.redis.util.TestUtils
+import redis.clients.jedis.exceptions.JedisConnectionException
import scala.collection.JavaConverters._
/**
- * More RDD tests
- */
+ * More RDD tests
+ */
trait RedisRddExtraSuite extends SparkRedisSuite with Keys with Matchers {
implicit val redisConfig: RedisConfig
@@ -71,6 +73,33 @@ trait RedisRddExtraSuite extends SparkRedisSuite with Keys with Matchers {
verifyHash("hash2", map2)
}
+ test("connection fails with incorrect user/pass") {
+ assertThrows[JedisConnectionException] {
+ new RedisConfig(RedisEndpoint(
+ host = redisHost,
+ port = redisPort,
+ user = user,
+ auth = "wrong_password"))
+ }
+ }
+
+ test("connection with correct user/pass") {
+ val userConfig = new RedisConfig(RedisEndpoint(
+ host = redisHost,
+ port = redisPort,
+ user = user,
+ auth = userPassword))
+
+ val someKey = TestUtils.generateRandomKey()
+ val jedis = userConfig.connectionForKey(someKey)
+ jedis.set(someKey, "123")
+ jedis.get(someKey) should be("123")
+
+ // test RDD operation
+ sc.fromRedisKeyPattern(someKey)(redisConfig = userConfig)
+ .collect()(0) should be(someKey)
+ }
+
def verifyList(list: String, vals: Seq[String]): Unit = {
withConnection(redisConfig.getHost(list).endpoint.connect()) { conn =>
conn.lrange(list, 0, vals.size).asScala should be(vals.toList)
diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDClusterAclSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDClusterAclSuite.scala
new file mode 100644
index 00000000..4610f919
--- /dev/null
+++ b/src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDClusterAclSuite.scala
@@ -0,0 +1,6 @@
+package com.redislabs.provider.redis.rdd.acl
+
+import com.redislabs.provider.redis.env.RedisClusterAclEnv
+import com.redislabs.provider.redis.rdd.RedisRddSuite
+
+class RedisRDDClusterAclSuite extends RedisRddSuite with RedisClusterAclEnv
diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDStandaloneAclSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDStandaloneAclSuite.scala
new file mode 100644
index 00000000..4da96619
--- /dev/null
+++ b/src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDStandaloneAclSuite.scala
@@ -0,0 +1,6 @@
+package com.redislabs.provider.redis.rdd.acl
+
+import com.redislabs.provider.redis.env.RedisStandaloneAclEnv
+import com.redislabs.provider.redis.rdd.RedisRddSuite
+
+class RedisRDDStandaloneAclSuite extends RedisRddSuite with RedisStandaloneAclEnv