From 8f7d4a9aadd8931f88354e3228f132f7cfc77dc5 Mon Sep 17 00:00:00 2001 From: fe2s Date: Tue, 14 Dec 2021 19:59:46 -0500 Subject: [PATCH 1/5] bump version (prepare for 3.1.0 release) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 71b0e6b4..147abd50 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.redislabs spark-redis_2.12 - 3.0.0-SNAPSHOT + 3.1.0-SNAPSHOT Spark-Redis A Spark library for Redis http://github.com/RedisLabs/spark-redis From 47c826ebeadc4973b0df557aa6849e9a4e7621ea Mon Sep 17 00:00:00 2001 From: fe2s Date: Tue, 14 Dec 2021 19:59:46 -0500 Subject: [PATCH 2/5] issue #328: add user/password authentication (Redis 6 ACL feature) --- Makefile | 8 +++ doc/configuration.md | 1 + doc/dataframe.md | 31 +++++----- pom.xml | 7 +-- .../provider/redis/ConnectionPool.scala | 10 ++-- .../provider/redis/RedisConfig.scala | 50 +++++++++++----- .../provider/redis/redisFunctions.scala | 18 +++--- .../sql/redis/BinaryRedisPersistence.scala | 2 +- .../sql/redis/HashRedisPersistence.scala | 2 +- .../provider/redis/RedisConfigSuite.scala | 8 +-- .../provider/redis/df/AclDataframeSuite.scala | 57 +++++++++++++++++++ .../df/acl/AclDataframeClusterSuite.scala | 6 ++ .../df/acl/AclDataframeStandaloneSuite.scala | 6 ++ .../redislabs/provider/redis/env/Env.scala | 7 ++- .../redis/env/RedisClusterAclEnv.scala | 28 +++++++++ .../redis/env/RedisStandaloneAclEnv.scala | 26 +++++++++ .../redis/env/RedisStandaloneEnv.scala | 9 ++- .../redis/env/RedisStandaloneSSLEnv.scala | 4 +- .../redis/rdd/RedisRddExtraSuite.scala | 33 ++++++++++- .../rdd/acl/RedisRDDClusterAclSuite.scala | 6 ++ .../rdd/acl/RedisRDDStandaloneAclSuite.scala | 6 ++ 21 files changed, 261 insertions(+), 64 deletions(-) create mode 100644 src/test/scala/com/redislabs/provider/redis/df/AclDataframeSuite.scala create mode 100644 src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeClusterSuite.scala create mode 100644 src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeStandaloneSuite.scala create mode 100644 src/test/scala/com/redislabs/provider/redis/env/RedisClusterAclEnv.scala create mode 100644 src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneAclEnv.scala create mode 100644 src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDClusterAclSuite.scala create mode 100644 src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDStandaloneAclSuite.scala diff --git a/Makefile b/Makefile index 9ae9e4d8..cbdc62be 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,6 @@ +# user +USER_ACL = user alice on >p1pp0 ~* +@all + # STANDALONE REDIS NODE define REDIS_STANDALONE_NODE_CONF daemonize yes @@ -7,6 +10,7 @@ logfile /tmp/redis_standalone_node_for_spark-redis.log save "" appendonly no requirepass passwd +$(USER_ACL) endef # STANDALONE REDIS NODE WITH SSL @@ -18,6 +22,7 @@ logfile /tmp/redis_standalone_node_ssl_for_spark-redis.log save "" appendonly no requirepass passwd +$(USER_ACL) tls-auth-clients no tls-port 6380 tls-cert-file ./src/test/resources/tls/redis.crt @@ -30,6 +35,7 @@ endef define REDIS_CLUSTER_NODE1_CONF daemonize yes port 7379 +$(USER_ACL) pidfile /tmp/redis_cluster_node1_for_spark-redis.pid logfile /tmp/redis_cluster_node1_for_spark-redis.log save "" @@ -41,6 +47,7 @@ endef define REDIS_CLUSTER_NODE2_CONF daemonize yes port 7380 +$(USER_ACL) pidfile /tmp/redis_cluster_node2_for_spark-redis.pid logfile /tmp/redis_cluster_node2_for_spark-redis.log save "" @@ -52,6 +59,7 @@ endef define REDIS_CLUSTER_NODE3_CONF daemonize yes port 7381 +$(USER_ACL) pidfile /tmp/redis_cluster_node3_for_spark-redis.pid logfile /tmp/redis_cluster_node3_for_spark-redis.log save "" diff --git a/doc/configuration.md b/doc/configuration.md index 51671c31..3efa217a 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -5,6 +5,7 @@ The supported configuration parameters are: * `spark.redis.host` - host or IP of the initial node we connect to. The connector will read the cluster topology from the initial node, so there is no need to provide the rest of the cluster nodes. * `spark.redis.port` - the initial node's TCP redis port. +* `spark.redis.user` - the initial node's AUTH user * `spark.redis.auth` - the initial node's AUTH password * `spark.redis.db` - optional DB number. Avoid using this, especially in cluster mode. * `spark.redis.timeout` - connection timeout in ms, 2000 ms by default diff --git a/doc/dataframe.md b/doc/dataframe.md index 3d62b78f..e23b2479 100644 --- a/doc/dataframe.md +++ b/doc/dataframe.md @@ -330,22 +330,23 @@ root ## DataFrame options -| Name | Description | Type | Default | -| -----------------------| ------------------------------------------------------------------------------------------| --------------------- | ------- | -| model | defines the Redis model used to persist DataFrame, see [Persistence model](#persistence-model)| `enum [binary, hash]` | `hash` | -| filter.keys.by.type | make sure the underlying data structures match persistence model | `Boolean` | `false` | -| partitions.number | number of partitions (applies only when reading DataFrame) | `Int` | `3` | +| Name | Description | Type | Default | +|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------| --------------------- | ------- | +| model | defines the Redis model used to persist DataFrame, see [Persistence model](#persistence-model) | `enum [binary, hash]` | `hash` | +| filter.keys.by.type | make sure the underlying data structures match persistence model | `Boolean` | `false` | +| partitions.number | number of partitions (applies only when reading DataFrame) | `Int` | `3` | | key.column | when writing - specifies unique column used as a Redis key, by default a key is auto-generated
when reading - specifies column name to store hash key | `String` | - | -| ttl | data time to live in `seconds`. Data doesn't expire if `ttl` is less than `1` | `Int` | `0` | -| infer.schema | infer schema from random row, all columns will have `String` type | `Boolean` | `false` | -| max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 100 | -| scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 100 | -| iterator.grouping.size | the number of items to be grouped when iterating over underlying RDD partition | `Int` | 1000 | -| host | overrides `spark.redis.host` configured in SparkSession | `String` | `localhost` | -| port | overrides `spark.redis.port` configured in SparkSession | `Int` | `6379` | -| auth | overrides `spark.redis.auth` configured in SparkSession | `String` | - | -| dbNum | overrides `spark.redis.db` configured in SparkSession | `Int` | `0` | -| timeout | overrides `spark.redis.timeout` configured in SparkSession | `Int` | `2000` | +| ttl | data time to live in `seconds`. Data doesn't expire if `ttl` is less than `1` | `Int` | `0` | +| infer.schema | infer schema from random row, all columns will have `String` type | `Boolean` | `false` | +| max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 100 | +| scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 100 | +| iterator.grouping.size | the number of items to be grouped when iterating over underlying RDD partition | `Int` | 1000 | +| host | overrides `spark.redis.host` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | `localhost` | +| port | overrides `spark.redis.port` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `6379` | +| user | overrides `spark.redis.user` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | - | +| auth | overrides `spark.redis.auth` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | - | +| dbNum | overrides `spark.redis.db` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `0` | +| timeout | overrides `spark.redis.timeout` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `2000` | ## Known limitations diff --git a/pom.xml b/pom.xml index 147abd50..8c1fae5c 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ 1.8 2.12 ${scala.major.version}.0 - 3.4.1 + 3.7.0 3.0.1 1.0 @@ -271,11 +271,6 @@ - - org.apache.commons - commons-pool2 - 2.0 - redis.clients jedis diff --git a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala index 322d8c5f..a4a2d61f 100644 --- a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala +++ b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala @@ -1,10 +1,10 @@ package com.redislabs.provider.redis -import redis.clients.jedis.{JedisPoolConfig, Jedis, JedisPool} import redis.clients.jedis.exceptions.JedisConnectionException +import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} +import java.time.Duration import java.util.concurrent.ConcurrentHashMap - import scala.collection.JavaConversions._ @@ -21,11 +21,11 @@ object ConnectionPool { poolConfig.setTestOnBorrow(false) poolConfig.setTestOnReturn(false) poolConfig.setTestWhileIdle(false) - poolConfig.setMinEvictableIdleTimeMillis(60000) - poolConfig.setTimeBetweenEvictionRunsMillis(30000) + poolConfig.setSoftMinEvictableIdleTime(Duration.ofMinutes(1)) + poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(30)) poolConfig.setNumTestsPerEvictionRun(-1) - new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl) + new JedisPool(poolConfig, re.host, re.port, re.timeout, re.user, re.auth, re.dbNum, re.ssl) } ) var sleepTime: Int = 4 diff --git a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala index f4e80ada..34f41944 100644 --- a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala +++ b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala @@ -15,12 +15,14 @@ import scala.collection.JavaConversions._ * * @param host the redis host or ip * @param port the redis port + * @param user the authentication username * @param auth the authentication password * @param dbNum database number (should be avoided in general) * @param ssl true to enable SSL connection. Defaults to false */ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, port: Int = Protocol.DEFAULT_PORT, + user: String = null, auth: String = null, dbNum: Int = Protocol.DEFAULT_DATABASE, timeout: Int = Protocol.DEFAULT_TIMEOUT, @@ -36,6 +38,7 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, this( conf.get("spark.redis.host", Protocol.DEFAULT_HOST), conf.getInt("spark.redis.port", Protocol.DEFAULT_PORT), + conf.get("spark.redis.user", null), conf.get("spark.redis.auth", null), conf.getInt("spark.redis.db", Protocol.DEFAULT_DATABASE), conf.getInt("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT), @@ -54,6 +57,7 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, this( parameters.getOrElse("host", conf.get("spark.redis.host", Protocol.DEFAULT_HOST)), parameters.getOrElse("port", conf.get("spark.redis.port", Protocol.DEFAULT_PORT.toString)).toInt, + parameters.getOrElse("user", conf.get("spark.redis.user", null)), parameters.getOrElse("auth", conf.get("spark.redis.auth", null)), parameters.getOrElse("dbNum", conf.get("spark.redis.db", Protocol.DEFAULT_DATABASE.toString)).toInt, parameters.getOrElse("timeout", conf.get("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT.toString)).toInt, @@ -64,17 +68,17 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, /** * Constructor with Jedis URI * - * @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL + * @param uri connection URI in the form of redis://$user:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL */ def this(uri: URI) { - this(uri.getHost, uri.getPort, JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri), + this(uri.getHost, uri.getPort, JedisURIHelper.getUser(uri), JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri), Protocol.DEFAULT_TIMEOUT, uri.getScheme == RedisSslScheme) } /** * Constructor with Jedis URI from String * - * @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL + * @param uri connection URI in the form of redis://$user:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL */ def this(uri: String) { this(URI.create(uri)) @@ -280,8 +284,14 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val port = replinfo.filter(_.contains("master_port:"))(0).trim.substring(12).toInt //simply re-enter this function witht he master host/port - getNonClusterNodes(initialHost = new RedisEndpoint(host, port, - initialHost.auth, initialHost.dbNum, ssl = initialHost.ssl)) + getNonClusterNodes(initialHost = RedisEndpoint( + host = host, + port = port, + user = initialHost.user, + auth = initialHost.auth, + dbNum = initialHost.dbNum, + ssl = initialHost.ssl + )) } else { //this is a master - take its slaves @@ -295,10 +305,17 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val nodes = master +: slaves val range = nodes.length - (0 until range).map(i => - RedisNode(RedisEndpoint(nodes(i)._1, nodes(i)._2, initialHost.auth, initialHost.dbNum, - initialHost.timeout, initialHost.ssl), - 0, 16383, i, range)).toArray + (0 until range).map(i => { + val endpoint = RedisEndpoint( + host = nodes(i)._1, + port = nodes(i)._2, + user = initialHost.user, + auth = initialHost.auth, + dbNum = initialHost.dbNum, + timeout = initialHost.timeout, + ssl = initialHost.ssl) + RedisNode(endpoint, 0, 16383, i, range) + }).toArray } } @@ -326,12 +343,15 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val node = slotInfo(i + 2).asInstanceOf[java.util.List[java.lang.Object]] val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]]) val port = node.get(1).toString.toInt - RedisNode(RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum, - initialHost.timeout, initialHost.ssl), - sPos, - ePos, - i, - slotInfo.size - 2) + val endpoint = RedisEndpoint( + host = host, + port = port, + user = initialHost.user, + auth = initialHost.auth, + dbNum = initialHost.dbNum, + timeout = initialHost.timeout, + ssl = initialHost.ssl) + RedisNode(endpoint, sPos, ePos, i, slotInfo.size - 2) }) } }.toArray diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala index 0b900c0f..b8c17706 100644 --- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala +++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala @@ -401,7 +401,7 @@ object RedisContext extends Serializable { pipeline.set(k, v) } else { - pipeline.setex(k, ttl, v) + pipeline.setex(k, ttl.toLong, v) } } conn.close() @@ -422,7 +422,7 @@ object RedisContext extends Serializable { val pipeline = foreachWithPipelineNoLastSync(conn, arr) { case (pipeline, (k, v)) => pipeline.hset(hashName, k, v) } - if (ttl > 0) pipeline.expire(hashName, ttl) + if (ttl > 0) pipeline.expire(hashName, ttl.toLong) pipeline.sync() conn.close() } @@ -448,7 +448,7 @@ object RedisContext extends Serializable { foreachWithPipeline(conn, arr) { (pipeline, a) => val (key, hashFields) = a._2 pipeline.hmset(key, hashFields) - if (ttl > 0) pipeline.expire(key, ttl) + if (ttl > 0) pipeline.expire(key, ttl.toLong) } } } @@ -478,7 +478,7 @@ object RedisContext extends Serializable { foreachWithPipeline(conn, arr) { (pipeline, a) => val (key, hashFields) = a._2 pipeline.hmset(key, hashFields) - if (ttl > 0) pipeline.expire(key, ttl) + if (ttl > 0) pipeline.expire(key, ttl.toLong) } } } @@ -498,7 +498,7 @@ object RedisContext extends Serializable { val pipeline = foreachWithPipelineNoLastSync(conn, arr) { case (pipeline, (k, v)) => pipeline.zadd(zsetName, v.toDouble, k) } - if (ttl > 0) pipeline.expire(zsetName, ttl) + if (ttl > 0) pipeline.expire(zsetName, ttl.toLong) pipeline.sync() conn.close() } @@ -516,7 +516,7 @@ object RedisContext extends Serializable { val pipeline = foreachWithPipelineNoLastSync(conn, arr) { (pipeline, v) => pipeline.sadd(setName, v) } - if (ttl > 0) pipeline.expire(setName, ttl) + if (ttl > 0) pipeline.expire(setName, ttl.toLong) pipeline.sync() conn.close() } @@ -537,7 +537,7 @@ object RedisContext extends Serializable { val pipeline = foreachWithPipelineNoLastSync(conn, arr) { (pipeline, v) => pipeline.rpush(listName, v) } - if (ttl > 0) pipeline.expire(listName, ttl) + if (ttl > 0) pipeline.expire(listName, ttl.toLong) pipeline.sync() conn.close() } @@ -560,7 +560,7 @@ object RedisContext extends Serializable { foreachWithPipeline(conn, arr) { (pipeline, a) => val (key, listVals) = a._2 pipeline.rpush(key, listVals: _*) - if (ttl > 0) pipeline.expire(key, ttl) + if (ttl > 0) pipeline.expire(key, ttl.toLong) } } } @@ -583,7 +583,7 @@ object RedisContext extends Serializable { foreachWithPipeline(conn, arr) { (pipeline, a) => val (key, listVals) = a._2 pipeline.rpush(key, listVals: _*) - if (ttl > 0) pipeline.expire(key, ttl) + if (ttl > 0) pipeline.expire(key, ttl.toLong) } } } diff --git a/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala index c9b0a981..13e45300 100644 --- a/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala +++ b/src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala @@ -16,7 +16,7 @@ class BinaryRedisPersistence extends RedisPersistence[Array[Byte]] { override def save(pipeline: Pipeline, key: String, value: Array[Byte], ttl: Int): Unit = { val keyBytes = key.getBytes(UTF_8) if (ttl > 0) { - pipeline.setex(keyBytes, ttl, value) + pipeline.setex(keyBytes, ttl.toLong, value) } else { pipeline.set(keyBytes, value) } diff --git a/src/main/scala/org/apache/spark/sql/redis/HashRedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/HashRedisPersistence.scala index a22c4614..03b7f97a 100644 --- a/src/main/scala/org/apache/spark/sql/redis/HashRedisPersistence.scala +++ b/src/main/scala/org/apache/spark/sql/redis/HashRedisPersistence.scala @@ -19,7 +19,7 @@ class HashRedisPersistence extends RedisPersistence[Any] { val javaValue = value.asInstanceOf[Map[String, String]].asJava pipeline.hmset(key, javaValue) if (ttl > 0) { - pipeline.expire(key, ttl) + pipeline.expire(key, ttl.toLong) } } diff --git a/src/test/scala/com/redislabs/provider/redis/RedisConfigSuite.scala b/src/test/scala/com/redislabs/provider/redis/RedisConfigSuite.scala index 0b2eb394..dfe80eb7 100644 --- a/src/test/scala/com/redislabs/provider/redis/RedisConfigSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/RedisConfigSuite.scala @@ -5,8 +5,8 @@ import redis.clients.jedis.util.JedisClusterCRC16 class RedisConfigSuite extends FunSuite with Matchers { - val redisStandaloneConfig = new RedisConfig(RedisEndpoint("127.0.0.1", 6379, "passwd")) - val redisClusterConfig = new RedisConfig(RedisEndpoint("127.0.0.1", 7379)) + val redisStandaloneConfig = new RedisConfig(RedisEndpoint(host = "127.0.0.1", port = 6379, auth = "passwd")) + val redisClusterConfig = new RedisConfig(RedisEndpoint(host = "127.0.0.1", port = 7379)) test("getNodesBySlots") { redisStandaloneConfig.getNodesBySlots(0, 16383).length shouldBe 1 @@ -23,7 +23,7 @@ class RedisConfigSuite extends FunSuite with Matchers { } test("getNodes") { - redisStandaloneConfig.getNodes(RedisEndpoint("127.0.0.1", 6379, "passwd")).length shouldBe 1 - redisClusterConfig.getNodes(RedisEndpoint("127.0.0.1", 7379)).length shouldBe 7 + redisStandaloneConfig.getNodes(RedisEndpoint(host = "127.0.0.1", port = 6379, auth = "passwd")).length shouldBe 1 + redisClusterConfig.getNodes(RedisEndpoint(host = "127.0.0.1", port = 7379)).length shouldBe 7 } } diff --git a/src/test/scala/com/redislabs/provider/redis/df/AclDataframeSuite.scala b/src/test/scala/com/redislabs/provider/redis/df/AclDataframeSuite.scala new file mode 100644 index 00000000..86982b5d --- /dev/null +++ b/src/test/scala/com/redislabs/provider/redis/df/AclDataframeSuite.scala @@ -0,0 +1,57 @@ +package com.redislabs.provider.redis.df + +import com.redislabs.provider.redis.util.Person.{TableNamePrefix, data} +import com.redislabs.provider.redis.util.TestUtils.{generateTableName, interceptSparkErr} +import org.apache.spark.sql.redis.{RedisFormat, SqlOptionTableName} +import org.scalatest.Matchers +import redis.clients.jedis.exceptions.JedisConnectionException + +/** + * Basic dataframe test with user/password authentication + */ +trait AclDataframeSuite extends RedisDataframeSuite with Matchers { + + test("save and load dataframe") { + val tableName = generateTableName(TableNamePrefix) + val df = spark.createDataFrame(data) + df.write.format(RedisFormat) + .option(SqlOptionTableName, tableName) + .save() + val loadedDf = spark.read.format(RedisFormat) + .option(SqlOptionTableName, tableName) + .load() + .cache() + verifyDf(loadedDf) + } + + test("incorrect password in dataframe options") { + interceptSparkErr[JedisConnectionException] { + val tableName = generateTableName(TableNamePrefix) + val df = spark.createDataFrame(data) + df.write.format(RedisFormat) + .option(SqlOptionTableName, tableName) + .option("user", user) + .option("auth", "wrong_password") + .save() + } + } + + test("correct user/password in dataframe options") { + val tableName = generateTableName(TableNamePrefix) + val df = spark.createDataFrame(data) + df.write.format(RedisFormat) + .option(SqlOptionTableName, tableName) + .option("user", user) + .option("auth", userPassword) + .save() + + val loadedDf = spark.read.format(RedisFormat) + .option(SqlOptionTableName, tableName) + .option("user", user) + .option("auth", userPassword) + .load() + .cache() + verifyDf(loadedDf) + } + +} diff --git a/src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeClusterSuite.scala b/src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeClusterSuite.scala new file mode 100644 index 00000000..a25f818f --- /dev/null +++ b/src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeClusterSuite.scala @@ -0,0 +1,6 @@ +package com.redislabs.provider.redis.df.acl + +import com.redislabs.provider.redis.df.AclDataframeSuite +import com.redislabs.provider.redis.env.RedisClusterAclEnv + +class AclDataframeClusterSuite extends AclDataframeSuite with RedisClusterAclEnv diff --git a/src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeStandaloneSuite.scala b/src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeStandaloneSuite.scala new file mode 100644 index 00000000..7f8288c9 --- /dev/null +++ b/src/test/scala/com/redislabs/provider/redis/df/acl/AclDataframeStandaloneSuite.scala @@ -0,0 +1,6 @@ +package com.redislabs.provider.redis.df.acl + +import com.redislabs.provider.redis.df.AclDataframeSuite +import com.redislabs.provider.redis.env.RedisStandaloneAclEnv + +class AclDataframeStandaloneSuite extends AclDataframeSuite with RedisStandaloneAclEnv diff --git a/src/test/scala/com/redislabs/provider/redis/env/Env.scala b/src/test/scala/com/redislabs/provider/redis/env/Env.scala index 7cc9b85c..20fcd166 100644 --- a/src/test/scala/com/redislabs/provider/redis/env/Env.scala +++ b/src/test/scala/com/redislabs/provider/redis/env/Env.scala @@ -14,7 +14,12 @@ trait Env { val redisHost = "127.0.0.1" val redisPort = 6379 - val redisAuth = "passwd" + val redisAuth = "passwd" // password for 'default' user (AUTH ) + + // user credentials + val user = "alice" + val userPassword = "p1pp0" + val redisConfig: RedisConfig } diff --git a/src/test/scala/com/redislabs/provider/redis/env/RedisClusterAclEnv.scala b/src/test/scala/com/redislabs/provider/redis/env/RedisClusterAclEnv.scala new file mode 100644 index 00000000..7fbc3629 --- /dev/null +++ b/src/test/scala/com/redislabs/provider/redis/env/RedisClusterAclEnv.scala @@ -0,0 +1,28 @@ +package com.redislabs.provider.redis.env + +import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint} +import org.apache.spark.SparkConf + +/** + * Cluster with user/password authentication + */ +trait RedisClusterAclEnv extends Env { + + override val redisPort = 7379 + + override val conf: SparkConf = new SparkConf() + .setMaster("local[*]").setAppName(getClass.getName) + .set("spark.redis.host", redisHost) + .set("spark.redis.port", s"$redisPort") + .set("spark.redis.user", user) + .set("spark.redis.auth", userPassword) + .set("spark.streaming.stopGracefullyOnShutdown", "true") + .set("spark.driver.bindAddress", "127.0.0.1") + + override val redisConfig: RedisConfig = + new RedisConfig(RedisEndpoint( + host = redisHost, + port = redisPort, + user = user, + auth = userPassword)) +} diff --git a/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneAclEnv.scala b/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneAclEnv.scala new file mode 100644 index 00000000..fce17627 --- /dev/null +++ b/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneAclEnv.scala @@ -0,0 +1,26 @@ +package com.redislabs.provider.redis.env + +import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint} +import org.apache.spark.SparkConf + +/** + * Standalone with user/password authentication + */ +trait RedisStandaloneAclEnv extends Env { + + override val conf: SparkConf = new SparkConf() + .setMaster("local[*]").setAppName(getClass.getName) + .set("spark.redis.host", redisHost) + .set("spark.redis.port", s"$redisPort") + .set("spark.redis.user", user) + .set("spark.redis.auth", userPassword) + .set("spark.streaming.stopGracefullyOnShutdown", "true") + .set("spark.driver.bindAddress", "127.0.0.1") + + override val redisConfig: RedisConfig = + new RedisConfig(RedisEndpoint( + host = redisHost, + port = redisPort, + user = user, + auth = userPassword)) +} diff --git a/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneEnv.scala b/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneEnv.scala index 88bd2759..b886e295 100644 --- a/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneEnv.scala +++ b/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneEnv.scala @@ -4,8 +4,8 @@ import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint} import org.apache.spark.SparkConf /** - * @author The Viet Nguyen - */ + * @author The Viet Nguyen + */ trait RedisStandaloneEnv extends Env { override val conf: SparkConf = new SparkConf() @@ -18,5 +18,8 @@ trait RedisStandaloneEnv extends Env { .set("spark.driver.bindAddress", "127.0.0.1") override val redisConfig: RedisConfig = - new RedisConfig(RedisEndpoint(redisHost, redisPort, redisAuth)) + new RedisConfig(RedisEndpoint( + host = redisHost, + port = redisPort, + auth = redisAuth)) } diff --git a/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneSSLEnv.scala b/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneSSLEnv.scala index 2945fffc..632336c7 100644 --- a/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneSSLEnv.scala +++ b/src/test/scala/com/redislabs/provider/redis/env/RedisStandaloneSSLEnv.scala @@ -6,7 +6,7 @@ import org.apache.spark.SparkConf trait RedisStandaloneSSLEnv extends Env { override val redisPort = 6380 - + override val conf: SparkConf = new SparkConf() .setMaster("local[*]").setAppName(getClass.getName) .set("spark.redis.host", redisHost) @@ -17,5 +17,5 @@ trait RedisStandaloneSSLEnv extends Env { .set("spark.driver.bindAddress", "127.0.0.1") override val redisConfig: RedisConfig = - new RedisConfig(RedisEndpoint(redisHost, redisPort, redisAuth, ssl=true)) + new RedisConfig(RedisEndpoint(redisHost, redisPort, auth = redisAuth, ssl = true)) } diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala index 6e08ac3c..17102052 100644 --- a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRddExtraSuite.scala @@ -3,12 +3,14 @@ package com.redislabs.provider.redis.rdd import com.redislabs.provider.redis.util.ConnectionUtils.withConnection import org.scalatest.Matchers import com.redislabs.provider.redis._ +import com.redislabs.provider.redis.util.TestUtils +import redis.clients.jedis.exceptions.JedisConnectionException import scala.collection.JavaConverters._ /** - * More RDD tests - */ + * More RDD tests + */ trait RedisRddExtraSuite extends SparkRedisSuite with Keys with Matchers { implicit val redisConfig: RedisConfig @@ -71,6 +73,33 @@ trait RedisRddExtraSuite extends SparkRedisSuite with Keys with Matchers { verifyHash("hash2", map2) } + test("connection fails with incorrect user/pass") { + assertThrows[JedisConnectionException] { + new RedisConfig(RedisEndpoint( + host = redisHost, + port = redisPort, + user = user, + auth = "wrong_password")) + } + } + + test("connection with correct user/pass") { + val userConfig = new RedisConfig(RedisEndpoint( + host = redisHost, + port = redisPort, + user = user, + auth = userPassword)) + + val someKey = TestUtils.generateRandomKey() + val jedis = userConfig.connectionForKey(someKey) + jedis.set(someKey, "123") + jedis.get(someKey) should be("123") + + // test RDD operation + sc.fromRedisKeyPattern(someKey)(redisConfig = userConfig) + .collect()(0) should be(someKey) + } + def verifyList(list: String, vals: Seq[String]): Unit = { withConnection(redisConfig.getHost(list).endpoint.connect()) { conn => conn.lrange(list, 0, vals.size).asScala should be(vals.toList) diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDClusterAclSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDClusterAclSuite.scala new file mode 100644 index 00000000..4610f919 --- /dev/null +++ b/src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDClusterAclSuite.scala @@ -0,0 +1,6 @@ +package com.redislabs.provider.redis.rdd.acl + +import com.redislabs.provider.redis.env.RedisClusterAclEnv +import com.redislabs.provider.redis.rdd.RedisRddSuite + +class RedisRDDClusterAclSuite extends RedisRddSuite with RedisClusterAclEnv diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDStandaloneAclSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDStandaloneAclSuite.scala new file mode 100644 index 00000000..4da96619 --- /dev/null +++ b/src/test/scala/com/redislabs/provider/redis/rdd/acl/RedisRDDStandaloneAclSuite.scala @@ -0,0 +1,6 @@ +package com.redislabs.provider.redis.rdd.acl + +import com.redislabs.provider.redis.env.RedisStandaloneAclEnv +import com.redislabs.provider.redis.rdd.RedisRddSuite + +class RedisRDDStandaloneAclSuite extends RedisRddSuite with RedisStandaloneAclEnv From 7db60ff9f7a75110f9f36705b8c962837aacd5e4 Mon Sep 17 00:00:00 2001 From: fe2s Date: Sun, 12 Jun 2022 16:40:33 -0500 Subject: [PATCH 3/5] fix circleci: don't install gcc-8 --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 617d9034..1bd1d89b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -118,7 +118,7 @@ jobs: name: install Redis command: | sudo apt-get -y update - sudo apt-get install -y gcc-8 g++-8 libssl-dev + sudo apt-get install -y libssl-dev wget http://download.redis.io/releases/redis-6.0.10.tar.gz tar -xzvf redis-6.0.10.tar.gz make -C redis-6.0.10 -j`nproc` BUILD_TLS=yes From b873303f880aa4b9cbe2c2b4f22b1b3432a36460 Mon Sep 17 00:00:00 2001 From: fe2s Date: Sun, 12 Jun 2022 16:42:41 -0500 Subject: [PATCH 4/5] fix circleci: don't install gcc-8 --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 1bd1d89b..544ccacc 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -61,7 +61,7 @@ jobs: name: install Redis command: | sudo apt-get -y update - sudo apt-get install -y gcc-8 g++-8 libssl-dev + sudo apt-get install -y libssl-dev wget http://download.redis.io/releases/redis-6.0.10.tar.gz tar -xzvf redis-6.0.10.tar.gz make -C redis-6.0.10 -j`nproc` BUILD_TLS=yes From db233470fe728ae045fa15b3b9b3fd4306823225 Mon Sep 17 00:00:00 2001 From: fe2s Date: Sun, 12 Jun 2022 16:59:26 -0500 Subject: [PATCH 5/5] update jedis to 3.9.0 and spark to 3.2.1 --- README.md | 13 +++++++------ pom.xml | 4 ++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 6a74ad93..7c1cda22 100644 --- a/README.md +++ b/README.md @@ -20,12 +20,13 @@ Spark-Redis also supports Spark Streaming (DStreams) and Structured Streaming. The library has several branches, each corresponds to a different supported Spark version. For example, 'branch-2.3' works with any Spark 2.3.x version. The master branch contains the recent development for the next release. -| Spark-Redis | Spark | Redis | Supported Scala Versions | -| ----------------------------------------------------------------| ------------- | ---------------- | ------------------------ | -| [master](https://github.com/RedisLabs/spark-redis/) | 3.0.x | >=2.9.0 | 2.12 | -| [2.4, 2.5, 2.6](https://github.com/RedisLabs/spark-redis/tree/branch-2.4) | 2.4.x | >=2.9.0 | 2.11, 2.12 | -| [2.3](https://github.com/RedisLabs/spark-redis/tree/branch-2.3) | 2.3.x | >=2.9.0 | 2.11 | -| [1.4](https://github.com/RedisLabs/spark-redis/tree/branch-1.4) | 1.4.x | | 2.10 | +| Spark-Redis | Spark | Redis | Supported Scala Versions | +|---------------------------------------------------------------------------|-------| ---------------- | ------------------------ | +| [master](https://github.com/RedisLabs/spark-redis/) | 3.2.x | >=2.9.0 | 2.12 | +| [3.0](https://github.com/RedisLabs/spark-redis/tree/branch-3.0) | 3.0.x | >=2.9.0 | 2.12 | +| [2.4, 2.5, 2.6](https://github.com/RedisLabs/spark-redis/tree/branch-2.4) | 2.4.x | >=2.9.0 | 2.11, 2.12 | +| [2.3](https://github.com/RedisLabs/spark-redis/tree/branch-2.3) | 2.3.x | >=2.9.0 | 2.11 | +| [1.4](https://github.com/RedisLabs/spark-redis/tree/branch-1.4) | 1.4.x | | 2.10 | ## Known limitations diff --git a/pom.xml b/pom.xml index 8c1fae5c..80663f35 100644 --- a/pom.xml +++ b/pom.xml @@ -49,8 +49,8 @@ 1.8 2.12 ${scala.major.version}.0 - 3.7.0 - 3.0.1 + 3.9.0 + 3.2.1 1.0