Skip to content

Commit

Permalink
[Issue 3966] [pulsar-io] Specify the RedisSink type as Bytes by defau…
Browse files Browse the repository at this point in the history
…lt (apache#3972)

* Specify the RedisSink type as Bytes by default.

* Some minor fix
  • Loading branch information
murong00 authored and merlimat committed Apr 4, 2019
1 parent b51e435 commit b7903fa
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
configClass = RedisSinkConfig.class
)
@Slf4j
public class RedisSink<T> implements Sink<T> {
public class RedisSink implements Sink<byte[]> {

private RedisSinkConfig redisSinkConfig;

Expand All @@ -61,7 +61,7 @@ public class RedisSink<T> implements Sink<T> {

private int batchSize;

private List<Record<T>> incomingList;
private List<Record<byte[]>> incomingList;

private ScheduledExecutorService flushExecutor;

Expand All @@ -84,7 +84,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
}

@Override
public void write(Record<T> record) throws Exception {
public void write(Record<byte[]> record) throws Exception {
int currentSize;
synchronized (this) {
incomingList.add(record);
Expand All @@ -108,7 +108,7 @@ public void close() throws Exception {

private void flush() {
final Map<byte[], byte[]> recordsToSet = new ConcurrentHashMap<>();
final List<Record<T>> recordsToFlush;
final List<Record<byte[]>> recordsToFlush;

synchronized (this) {
if (incomingList.isEmpty()) {
Expand All @@ -119,11 +119,11 @@ private void flush() {
}

if (CollectionUtils.isNotEmpty(recordsToFlush)) {
for (Record<T> record: recordsToFlush) {
for (Record<byte[]> record: recordsToFlush) {
try {
// records with null keys or values will be ignored
byte[] key = toBytes("key", record.getKey().orElse(null));
byte[] value = toBytes("value", record.getValue());
byte[] key = record.getKey().isPresent() ? record.getKey().get().getBytes(StandardCharsets.UTF_8) : null;
byte[] value = record.getValue();
recordsToSet.put(key, value);
} catch (Exception e) {
record.fail();
Expand Down Expand Up @@ -155,19 +155,4 @@ private void flush() {
log.error("Redis mset data interrupted exception ", e);
}
}

private byte[] toBytes(String src, Object obj) {
final byte[] result;
if (obj instanceof String) {
String s = (String) obj;
result = s.getBytes(StandardCharsets.UTF_8);
} else if (obj instanceof byte[]) {
result = (byte[]) obj;
} else if (null == obj) {
result = null;
} else {
throw new IllegalArgumentException(String.format("The %s for the record must be String or Bytes.", src));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,18 @@
*/
package org.apache.pulsar.io.redis.sink;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.io.redis.EmbeddedRedisUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* redis Sink test
Expand All @@ -48,17 +39,6 @@ public class RedisSinkTest {

private EmbeddedRedisUtils embeddedRedisUtils;

/**
* A Simple class to test redis class
*/
@Data
@ToString
@EqualsAndHashCode
public static class Foo {
private String field1;
private String field2;
}

@BeforeMethod
public void setUp() throws Exception {
embeddedRedisUtils = new EmbeddedRedisUtils(getClass().getSimpleName());
Expand All @@ -83,26 +63,7 @@ public void TestOpenAndWriteSink() throws Exception {
RedisSink sink = new RedisSink();

// prepare a foo Record
Foo obj = new Foo();
obj.setField1("FakeFiled1");
obj.setField2("FakeFiled1");
AvroSchema<Foo> schema = AvroSchema.of(Foo.class);

byte[] bytes = schema.encode(obj);
ByteBuf payload = Unpooled.copiedBuffer(bytes);
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));

Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", configs, payload, autoConsumeSchema);
Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
.message(message)
.topicName("fake_topic_name")
.build();

log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
obj.toString(),
message.getValue().toString(),
record.getValue().toString());
Record<byte[]> record = build("fakeTopic", "fakeKey", "fakeValue");

// open should success
sink.open(configs, null);
Expand All @@ -115,4 +76,29 @@ public void TestOpenAndWriteSink() throws Exception {
Thread.sleep(1000);

}

private Record<byte[]> build(String topic, String key, String value) {
// prepare a SinkRecord
SinkRecord<byte[]> record = new SinkRecord<>(new Record<byte[]>() {
@Override
public Optional<String> getKey() {
return Optional.empty();
}

@Override
public byte[] getValue() {
return value.getBytes(StandardCharsets.UTF_8);
}

@Override
public Optional<String> getDestinationTopic() {
if (topic != null) {
return Optional.of(topic);
} else {
return Optional.empty();
}
}
}, value.getBytes(StandardCharsets.UTF_8));
return record;
}
}

0 comments on commit b7903fa

Please sign in to comment.