diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java index d27448bda4afb..095f7a596e14c 100644 --- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java +++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java @@ -49,7 +49,7 @@ configClass = RedisSinkConfig.class ) @Slf4j -public class RedisSink implements Sink { +public class RedisSink implements Sink { private RedisSinkConfig redisSinkConfig; @@ -61,7 +61,7 @@ public class RedisSink implements Sink { private int batchSize; - private List> incomingList; + private List> incomingList; private ScheduledExecutorService flushExecutor; @@ -84,7 +84,7 @@ public void open(Map config, SinkContext sinkContext) throws Exc } @Override - public void write(Record record) throws Exception { + public void write(Record record) throws Exception { int currentSize; synchronized (this) { incomingList.add(record); @@ -108,7 +108,7 @@ public void close() throws Exception { private void flush() { final Map recordsToSet = new ConcurrentHashMap<>(); - final List> recordsToFlush; + final List> recordsToFlush; synchronized (this) { if (incomingList.isEmpty()) { @@ -119,11 +119,11 @@ private void flush() { } if (CollectionUtils.isNotEmpty(recordsToFlush)) { - for (Record record: recordsToFlush) { + for (Record record: recordsToFlush) { try { // records with null keys or values will be ignored - byte[] key = toBytes("key", record.getKey().orElse(null)); - byte[] value = toBytes("value", record.getValue()); + byte[] key = record.getKey().isPresent() ? record.getKey().get().getBytes(StandardCharsets.UTF_8) : null; + byte[] value = record.getValue(); recordsToSet.put(key, value); } catch (Exception e) { record.fail(); @@ -155,19 +155,4 @@ private void flush() { log.error("Redis mset data interrupted exception ", e); } } - - private byte[] toBytes(String src, Object obj) { - final byte[] result; - if (obj instanceof String) { - String s = (String) obj; - result = s.getBytes(StandardCharsets.UTF_8); - } else if (obj instanceof byte[]) { - result = (byte[]) obj; - } else if (null == obj) { - result = null; - } else { - throw new IllegalArgumentException(String.format("The %s for the record must be String or Bytes.", src)); - } - return result; - } } diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java index 455ef89c1e164..ce6595da43950 100644 --- a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java +++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java @@ -18,27 +18,18 @@ */ package org.apache.pulsar.io.redis.sink; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; -import org.apache.pulsar.client.impl.schema.AvroSchema; -import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl; import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.functions.source.PulsarRecord; +import org.apache.pulsar.functions.instance.SinkRecord; import org.apache.pulsar.io.redis.EmbeddedRedisUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.Optional; /** * redis Sink test @@ -48,17 +39,6 @@ public class RedisSinkTest { private EmbeddedRedisUtils embeddedRedisUtils; - /** - * A Simple class to test redis class - */ - @Data - @ToString - @EqualsAndHashCode - public static class Foo { - private String field1; - private String field2; - } - @BeforeMethod public void setUp() throws Exception { embeddedRedisUtils = new EmbeddedRedisUtils(getClass().getSimpleName()); @@ -83,26 +63,7 @@ public void TestOpenAndWriteSink() throws Exception { RedisSink sink = new RedisSink(); // prepare a foo Record - Foo obj = new Foo(); - obj.setField1("FakeFiled1"); - obj.setField2("FakeFiled1"); - AvroSchema schema = AvroSchema.of(Foo.class); - - byte[] bytes = schema.encode(obj); - ByteBuf payload = Unpooled.copiedBuffer(bytes); - AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema(); - autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo())); - - Message message = new MessageImpl("fake_topic_name", "77:777", configs, payload, autoConsumeSchema); - Record record = PulsarRecord.builder() - .message(message) - .topicName("fake_topic_name") - .build(); - - log.info("foo:{}, Message.getValue: {}, record.getValue: {}", - obj.toString(), - message.getValue().toString(), - record.getValue().toString()); + Record record = build("fakeTopic", "fakeKey", "fakeValue"); // open should success sink.open(configs, null); @@ -115,4 +76,29 @@ public void TestOpenAndWriteSink() throws Exception { Thread.sleep(1000); } + + private Record build(String topic, String key, String value) { + // prepare a SinkRecord + SinkRecord record = new SinkRecord<>(new Record() { + @Override + public Optional getKey() { + return Optional.empty(); + } + + @Override + public byte[] getValue() { + return value.getBytes(StandardCharsets.UTF_8); + } + + @Override + public Optional getDestinationTopic() { + if (topic != null) { + return Optional.of(topic); + } else { + return Optional.empty(); + } + } + }, value.getBytes(StandardCharsets.UTF_8)); + return record; + } }