Skip to content

Commit

Permalink
APPS-136 HyperLogLog commands (#192)
Browse files Browse the repository at this point in the history
Co-authored-by: yrizhkov <[email protected]>
  • Loading branch information
korotkov-aerospike and reugn authored Apr 24, 2023
1 parent 36c3888 commit 5935e02
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 3 deletions.
15 changes: 14 additions & 1 deletion src/main/kotlin/com/aerospike/skyhook/command/RedisCommand.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ import com.aerospike.skyhook.command.CommandsDetails.multiCommandDetails
import com.aerospike.skyhook.command.CommandsDetails.persistCommandDetails
import com.aerospike.skyhook.command.CommandsDetails.pexpireCommandDetails
import com.aerospike.skyhook.command.CommandsDetails.pexpireatCommandDetails
import com.aerospike.skyhook.command.CommandsDetails.pfaddCommandDetails
import com.aerospike.skyhook.command.CommandsDetails.pfcountCommandDetails
import com.aerospike.skyhook.command.CommandsDetails.pfmergeCommandDetails
import com.aerospike.skyhook.command.CommandsDetails.pingCommandDetails
import com.aerospike.skyhook.command.CommandsDetails.psetexCommandDetails
import com.aerospike.skyhook.command.CommandsDetails.pttlCommandDetails
Expand Down Expand Up @@ -114,6 +117,7 @@ import com.aerospike.skyhook.listener.key.*
import com.aerospike.skyhook.listener.list.*
import com.aerospike.skyhook.listener.map.*
import com.aerospike.skyhook.listener.scan.*
import com.aerospike.skyhook.listener.hyperlog.*
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.redis.ArrayHeaderRedisMessage
import mu.KotlinLogging
Expand Down Expand Up @@ -245,7 +249,12 @@ enum class RedisCommand(
DISCARD(discardCommandDetails, ::DiscardCommandHandler),
EXEC(execCommandDetails, ::ExecCommandHandler),

COMMAND(commandCommandDetails, ::CommandCommandHandler);
COMMAND(commandCommandDetails, ::CommandCommandHandler),

PFADD(pfaddCommandDetails, ::PfaddCommandListener),
PFCOUNT(pfcountCommandDetails, ::PfcountCommandListener),
PFMERGE(pfmergeCommandDetails, ::PfmergeCommandListener)
;

companion object {
private val log = KotlinLogging.logger {}
Expand Down Expand Up @@ -393,6 +402,10 @@ object CommandsDetails {
val bgsaveCommandDetails = RedisCommandDetails("bgsave", -1, arrayListOf("admin", "noscript"), 0, 0, 0)
val commandCommandDetails = RedisCommandDetails("command", -1, arrayListOf("random", "loading", "stale"), 0, 0, 0)

val pfaddCommandDetails = RedisCommandDetails("pfadd", -2, arrayListOf("write", "denyoom", "fast"), 1 , 1 ,1)
val pfcountCommandDetails = RedisCommandDetails("pfcount", -2, arrayListOf("readonly", "may_replicate"), 1 , -1 ,1)
val pfmergeCommandDetails = RedisCommandDetails("pfmerge", -2, arrayListOf("write", "denyoom"), 1 , -1 ,1)

val authCommandDetails = RedisCommandDetails(
"auth",
-2,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.aerospike.skyhook.listener.hyperlog

import com.aerospike.client.Key
import com.aerospike.client.Record
import com.aerospike.client.listener.RecordListener
import com.aerospike.client.operation.HLLOperation
import com.aerospike.client.operation.HLLPolicy
import com.aerospike.skyhook.command.RequestCommand
import com.aerospike.skyhook.listener.BaseListener
import com.aerospike.skyhook.util.Typed
import io.netty.channel.ChannelHandlerContext

open class PfaddCommandListener(
ctx: ChannelHandlerContext
) : BaseListener(ctx), RecordListener {

override fun handle(cmd: RequestCommand) {
require(cmd.argCount > 2) { argValidationErrorMsg(cmd) }

val key = createKey(cmd.key)

val operation = HLLOperation.add(
HLLPolicy.Default,
aeroCtx.bin,
getValues(cmd),
16
)
client.operate(null, this, defaultWritePolicy, key, operation)
}

protected open fun getValues(cmd: RequestCommand) =
cmd.args.drop(2).map { Typed.getValue(it) }

override fun onSuccess(key: Key?, record: Record?) {
if (record == null) {
writeNullString()
flushCtxTransactionAware()
} else {
try {
val entitiesWritten = record.getLong(aeroCtx.bin)
writeLong(if (entitiesWritten > 0) 1L else 0L)
flushCtxTransactionAware()
} catch (e: Exception) {
closeCtx(e)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.aerospike.skyhook.listener.hyperlog

import com.aerospike.client.Key
import com.aerospike.client.Record
import com.aerospike.client.listener.RecordListener
import com.aerospike.client.operation.HLLOperation
import com.aerospike.skyhook.command.RequestCommand
import com.aerospike.skyhook.listener.BaseListener
import io.netty.channel.ChannelHandlerContext

class PfcountCommandListener(
ctx: ChannelHandlerContext
) : BaseListener(ctx), RecordListener {

override fun handle(cmd: RequestCommand) {
require(cmd.argCount > 1) { argValidationErrorMsg(cmd) }

if (cmd.args.size == 2) {
val key = createKey(cmd.args[1])
countSingleKey(key)
} else {
val keys = cmd.args.drop(1).map(::createKey)
countMultipleKeys(keys)
}
}

private fun countSingleKey(key: Key) {
val operation = HLLOperation.getCount(aeroCtx.bin)
client.operate(null, this, defaultWritePolicy, key, operation)
}

private fun countMultipleKeys(keys: List<Key>) {
val hllValuesByKey = keys
.associateWith { client.get(null, it) }
.filterValues { it != null }
.mapValues { it.value.getHLLValue(aeroCtx.bin) }

if (hllValuesByKey.isEmpty()) {
writeZero()
return
}

val operation = HLLOperation.getUnionCount(aeroCtx.bin, hllValuesByKey.values.toList())

client.operate(null, this, defaultWritePolicy, hllValuesByKey.keys.first(), operation)
}

override fun onSuccess(key: Key?, record: Record?) {
if (record == null) {
writeZero()
} else {
try {
writeLong(record.getLong(aeroCtx.bin))
flushCtxTransactionAware()
} catch (e: Exception) {
closeCtx(e)
}
}
}

private fun writeZero() {
writeLong(0L)
flushCtxTransactionAware()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.aerospike.skyhook.listener.hyperlog

import com.aerospike.client.Key
import com.aerospike.client.Record
import com.aerospike.client.listener.RecordListener
import com.aerospike.client.operation.HLLOperation
import com.aerospike.client.operation.HLLPolicy
import com.aerospike.skyhook.command.RequestCommand
import com.aerospike.skyhook.listener.BaseListener
import io.netty.channel.ChannelHandlerContext

class PfmergeCommandListener(
ctx: ChannelHandlerContext
) : BaseListener(ctx), RecordListener {

override fun handle(cmd: RequestCommand) {
require(cmd.argCount > 3) { argValidationErrorMsg(cmd) }

val key = createKey(cmd.key)

val hllValues = cmd.args.drop(2)
.map(::createKey)
.mapNotNull { client.get(null, it)}
.map { it.getHLLValue(aeroCtx.bin) }

val operationPut = HLLOperation.setUnion(HLLPolicy.Default, aeroCtx.bin, hllValues)

client.operate(null, this, defaultWritePolicy, key, operationPut)
}

override fun onSuccess(key: Key?, record: Record?) {
if (record == null) {
writeNullString()
flushCtxTransactionAware()
} else {
try {
writeOK()
flushCtxTransactionAware()
} catch (e: Exception) {
closeCtx(e)
}
}
}
}
121 changes: 121 additions & 0 deletions src/test/kotlin/com/aerospike/skyhook/HyperLogCommandsTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.aerospike.skyhook

import com.aerospike.skyhook.command.RedisCommand
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals

class HyperLogCommandsTest() : SkyhookIntegrationTestBase() {

@Test
fun simpleAdd() {
writeCommand(RedisCommand.PFADD, "ids ABC")
assertEquals(1, readLong())
writeCommand(RedisCommand.PFCOUNT, "ids")
assertEquals(1, readLong())
}

@Test
fun multipleAdd() {
writeCommand(RedisCommand.PFADD, "ids 1 2 3")
assertEquals(1, readLong())
writeCommand(RedisCommand.PFCOUNT, "ids")
assertEquals(3, readLong())
}

@Test
fun duplicateAdd() {
writeCommand(RedisCommand.PFADD, "ids ABC")
assertEquals(1, readLong())
writeCommand(RedisCommand.PFADD, "ids ABC")
assertEquals(0, readLong())
writeCommand(RedisCommand.PFADD, "ids ABC ABC")
assertEquals(0, readLong())
writeCommand(RedisCommand.PFCOUNT, "ids")
assertEquals(1, readLong())
}

@Test
fun redisDocumentationExample() {
writeCommand(RedisCommand.PFADD, "hll foo bar zap")
assertEquals(1, readLong())
writeCommand(RedisCommand.PFADD, "hll zap zap zap")
assertEquals(0, readLong())
writeCommand(RedisCommand.PFADD, "hll foo bar")
assertEquals(0, readLong())
writeCommand(RedisCommand.PFCOUNT, "hll")
assertEquals(3, readLong())
writeCommand(RedisCommand.PFADD, "some-other-hll 1 2 3")
assertEquals(1, readLong())
writeCommand(RedisCommand.PFCOUNT, "hll some-other-hll")
assertEquals(6, readLong())
}

@Test
fun union() {
writeCommand(RedisCommand.PFADD, "a 1 2")
assertEquals(1, readLong())
writeCommand(RedisCommand.PFADD, "b 2 3")
assertEquals(1, readLong())

writeCommand(RedisCommand.PFCOUNT, "a b")
assertEquals(3, readLong())
writeCommand(RedisCommand.PFCOUNT, "a")
assertEquals(2, readLong())
writeCommand(RedisCommand.PFCOUNT, "b")
assertEquals(2, readLong())
}

@Test
fun countNonExistent() {
writeCommand(RedisCommand.PFCOUNT, "key")
assertEquals(0, readLong())
writeCommand(RedisCommand.PFADD, "a 1")
assertEquals(1, readLong())
writeCommand(RedisCommand.PFCOUNT, "key a")
assertEquals(1, readLong())
writeCommand(RedisCommand.PFCOUNT, "a key")
assertEquals(1, readLong())
}

@Test
fun countAllNonExistent() {
writeCommand(RedisCommand.PFCOUNT, "key")
assertEquals(0, readLong())
writeCommand(RedisCommand.PFCOUNT, "key key2")
assertEquals(0, readLong())
}

@Test
fun countMany() {
val n = 10L
(0 until n).forEach {
writeCommand(RedisCommand.PFADD, "key${it} $it")
assertEquals(1, readLong())
}
val args = (0 until n).joinToString(" ") { "key${it}" }
writeCommand(RedisCommand.PFCOUNT, args)
assertEquals(n, readLong())
}

@Test
fun merge() {
writeCommand(RedisCommand.PFADD, "a 1 2")
assertEquals(1, readLong())
writeCommand(RedisCommand.PFADD, "b 2 3")
assertEquals(1, readLong())
writeCommand(RedisCommand.PFMERGE, "m a b")
assertEquals(ok, readString())
writeCommand(RedisCommand.PFCOUNT, "m")
assertEquals(3, readLong())
}

@Test
fun mergeNonExistent() {
writeCommand(RedisCommand.PFADD, "a 1 2")
assertEquals(1, readLong())
writeCommand(RedisCommand.PFMERGE, "m a b")
assertEquals(ok, readString())
writeCommand(RedisCommand.PFCOUNT, "m")
assertEquals(2, readLong())
}
}
2 changes: 1 addition & 1 deletion src/test/kotlin/com/aerospike/skyhook/ScanCommandsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ScanCommandsTest() : SkyhookIntegrationTestBase() {
assertEquals(ok, readString())

writeCommand("${RedisCommand.KEYS.name} k1*")
Thread.sleep(3000)
Thread.sleep(5000)
val r = readStringArray()
assertEquals("k11", r[0])
assertEquals("k1", r[1])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,16 @@ abstract class SkyhookIntegrationTestBase {
return Bin(config.bin, bin)
}

protected fun writeCommand(command: RedisCommand, args: String) {
writeCommand(command.name + " " + args)
}

protected fun writeCommand(command: String) {
val sb = StringBuilder()
val list = command.split(" ")
if (list.size > 1) {
sb.append("*${list.size}$eol")
command.split(" ").forEach { s ->
list.forEach { s ->
sb.append("$${s.length}$eol")
sb.append("$s$eol")
}
Expand Down

0 comments on commit 5935e02

Please sign in to comment.