Skip to content

Commit

Permalink
Merge pull request #2 from mmolimar/feature/config_freaders
Browse files Browse the repository at this point in the history
Configurable file readers
  • Loading branch information
mmolimar authored Apr 2, 2017
2 parents 8dc8453 + 84e231c commit 7d6c6a0
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 50 deletions.
23 changes: 22 additions & 1 deletion docs/source/config_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,33 @@ In order to configure custom properties for this reader, the name you must use i
* Default: 4096
* Importance: medium

``file_reader.sequence.field_name.key``
Custom field name for the output key to include in the Kafka message.

* Type: string
* Default: key
* Importance: low

``file_reader.sequence.field_name.value``
Custom field name for the output value to include in the Kafka message.

* Type: string
* Default: value
* Importance: low

.. _config_options-filereaders-text:

Text
--------------------------------------------

This reader does not have any additional configuration.
In order to configure custom properties for this reader, the name you must use is ``text``.

``file_reader.text.field_name.value``
Custom field name for the output value to include in the Kafka message.

* Type: string
* Default: value
* Importance: low

.. _config_options-filereaders-delimited:

Expand Down
6 changes: 4 additions & 2 deletions docs/source/filereaders.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ SequenceFile
the Hadoop file formats which are serialized in key/value pairs.

This reader can process this file format and build a Kafka message with the
key/value pair. These two values are named ``key`` and ``value`` in the message.
key/value pair. These two values are named ``key`` and ``value`` in the message
by default but you can customize these field names.

More information about properties of this file reader
:ref:`here<config_options-filereaders-sequencefile>`.
Expand All @@ -40,7 +41,8 @@ Text
Read plain text files.

Each line represents one record which will be in a field
named ``value`` in the message sent to Kafka.
named ``value`` in the message sent to Kafka by default but you can
customize these field names.

Delimited text
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.github.mmolimar.kafka.connect</groupId>
<artifactId>kafka-connect-fs</artifactId>
<version>1.0-SNAPSHOT</version>
<version>0.1.1</version>
<packaging>jar</packaging>

<name>kafka-connect-fs</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public long getRecordOffset() {
}

static class GenericRecordToStruct implements ReaderAdapter<GenericRecord> {
static final int CACHE_SIZE = 100;
AvroData avroData;
private static final int CACHE_SIZE = 100;
private final AvroData avroData;

public GenericRecordToStruct() {
this.avroData = new AvroData(CACHE_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public DelimitedTextFileReader(FileSystem fs, Path filePath, Map<String, Object>

SchemaBuilder schemaBuilder = SchemaBuilder.struct();
if (hasNext()) {
String firstLine = inner.nextRecord();
String firstLine = inner.nextRecord().getValue();
String columns[] = firstLine.split(token);
IntStream.range(0, columns.length).forEach(index -> {
String columnName = hasHeader ? columns[index] : DEFAULT_COLUMN_NAME + "_" + ++index;
Expand Down Expand Up @@ -61,7 +61,7 @@ protected void configure(Map<String, Object> config) {
@Override
protected DelimitedRecord nextRecord() {
offset.inc();
return new DelimitedRecord(schema, inner.nextRecord().split(token));
return new DelimitedRecord(schema, inner.nextRecord().getValue().split(token));
}

@Override
Expand Down Expand Up @@ -123,8 +123,8 @@ public Struct apply(DelimitedRecord record) {
}

static class DelimitedRecord {
final Schema schema;
final String[] values;
private final Schema schema;
private final String[] values;

public DelimitedRecord(Schema schema, String[] values) {
this.schema = schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public long getRecordOffset() {
}

static class GenericRecordToStruct implements ReaderAdapter<GenericRecord> {
static final int CACHE_SIZE = 100;
AvroData avroData;
private static final int CACHE_SIZE = 100;
private final AvroData avroData;

public GenericRecordToStruct() {
this.avroData = new AvroData(CACHE_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@

public class SequenceFileReader extends AbstractFileReader<SequenceFileReader.SequenceRecord<Writable, Writable>> {

public static final String FIELD_NAME_KEY_DEFAULT = "key";
public static final String FIELD_NAME_VALUE_DEFAULT = "value";

private static final int DEFAULT_BUFFER_SIZE = 4096;
private static final String FILE_READER_SEQUENCE = FILE_READER_PREFIX + "sequence.";
private static final String FILE_READER_SEQUENCE_FIELD_NAME_PREFIX = FILE_READER_SEQUENCE + "field_name.";

public static final String FILE_READER_BUFFER_SIZE = FILE_READER_SEQUENCE + "buffer_size";
public static final String FILE_READER_SEQUENCE_FIELD_NAME_KEY = FILE_READER_SEQUENCE_FIELD_NAME_PREFIX + "key";
public static final String FILE_READER_SEQUENCE_FIELD_NAME_VALUE = FILE_READER_SEQUENCE_FIELD_NAME_PREFIX + "value";

private static final String FIELD_KEY = "key";
private static final String FIELD_VALUE = "value";

private final SequenceFile.Reader reader;
private final Writable key, value;
private final Schema schema;
private final SeqOffset offset;
private final Schema schema;
private String keyFieldName, valueFieldName;
private long recordIndex, hasNextIndex;
private boolean hasNext;

Expand All @@ -42,14 +48,28 @@ public SequenceFileReader(FileSystem fs, Path filePath, Map<String, Object> conf
this.key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
this.value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
this.schema = SchemaBuilder.struct()
.field(FIELD_KEY, getSchema(key)).field(FIELD_VALUE, getSchema(value)).build();
.field(keyFieldName, getSchema(this.key))
.field(valueFieldName, getSchema(this.value))
.build();
this.offset = new SeqOffset(0);
this.recordIndex = this.hasNextIndex = -1;
this.hasNext = false;
}

@Override
protected void configure(Map<String, Object> config) {
if (config.get(FILE_READER_SEQUENCE_FIELD_NAME_KEY) == null ||
config.get(FILE_READER_SEQUENCE_FIELD_NAME_KEY).toString().equals("")) {
this.keyFieldName = FIELD_NAME_KEY_DEFAULT;
} else {
this.keyFieldName = config.get(FILE_READER_SEQUENCE_FIELD_NAME_KEY).toString();
}
if (config.get(FILE_READER_SEQUENCE_FIELD_NAME_VALUE) == null ||
config.get(FILE_READER_SEQUENCE_FIELD_NAME_VALUE).toString().equals("")) {
this.valueFieldName = FIELD_NAME_VALUE_DEFAULT;
} else {
this.valueFieldName = config.get(FILE_READER_SEQUENCE_FIELD_NAME_VALUE).toString();
}
}

private Schema getSchema(Writable writable) {
Expand Down Expand Up @@ -95,7 +115,7 @@ protected SequenceRecord<Writable, Writable> nextRecord() {
throw new NoSuchElementException("There are no more records in file: " + getFilePath());
}
recordIndex++;
return new SequenceRecord<Writable, Writable>(schema, key, value);
return new SequenceRecord<Writable, Writable>(schema, keyFieldName, key, valueFieldName, value);
}

@Override
Expand Down Expand Up @@ -149,8 +169,8 @@ static class SeqToStruct implements ReaderAdapter<SequenceRecord<Writable, Writa
@Override
public Struct apply(SequenceRecord<Writable, Writable> record) {
return new Struct(record.schema)
.put(FIELD_KEY, toSchemaValue(record.key))
.put(FIELD_VALUE, toSchemaValue(record.value));
.put(record.keyFieldName, toSchemaValue(record.key))
.put(record.valueFieldName, toSchemaValue(record.value));
}

private Object toSchemaValue(Writable writable) {
Expand All @@ -176,13 +196,17 @@ private Object toSchemaValue(Writable writable) {
}

static class SequenceRecord<T, U> {
final Schema schema;
final T key;
final U value;
private final Schema schema;
private final String keyFieldName;
private final T key;
private final String valueFieldName;
private final U value;

public SequenceRecord(Schema schema, T key, U value) {
public SequenceRecord(Schema schema, String keyFieldName, T key, String valueFieldName, U value) {
this.schema = schema;
this.keyFieldName = keyFieldName;
this.key = key;
this.valueFieldName = valueFieldName;
this.value = value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@
import java.util.Map;
import java.util.NoSuchElementException;

public class TextFileReader extends AbstractFileReader<String> {
import static com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig.FILE_READER_PREFIX;

public static final String FIELD_VALUE = "value";
public class TextFileReader extends AbstractFileReader<TextFileReader.TextRecord> {

public static final String FIELD_NAME_VALUE_DEFAULT = "value";

private static final String FILE_READER_TEXT = FILE_READER_PREFIX + "text.";
private static final String FILE_READER_SEQUENCE_FIELD_NAME_PREFIX = FILE_READER_TEXT + "field_name.";

public static final String FILE_READER_TEXT_FIELD_NAME_VALUE = FILE_READER_SEQUENCE_FIELD_NAME_PREFIX + "value";

private final TextOffset offset;
private String currentLine;
private boolean finished = false;
private LineNumberReader reader;
private Schema schema;

public TextFileReader(FileSystem fs, Path filePath, Map<String, Object> config) throws IOException {
super(fs, filePath, new TxtToStruct(), config);
Expand All @@ -31,6 +39,16 @@ public TextFileReader(FileSystem fs, Path filePath, Map<String, Object> config)

@Override
protected void configure(Map<String, Object> config) {
String valueFieldName;
if (config.get(FILE_READER_TEXT_FIELD_NAME_VALUE) == null ||
config.get(FILE_READER_TEXT_FIELD_NAME_VALUE).toString().equals("")) {
valueFieldName = FIELD_NAME_VALUE_DEFAULT;
} else {
valueFieldName = config.get(FILE_READER_TEXT_FIELD_NAME_VALUE).toString();
}
this.schema = SchemaBuilder.struct()
.field(valueFieldName, Schema.STRING_SCHEMA)
.build();
}

@Override
Expand Down Expand Up @@ -58,14 +76,14 @@ public boolean hasNext() {
}

@Override
protected String nextRecord() {
protected TextRecord nextRecord() {
if (!hasNext()) {
throw new NoSuchElementException("There are no more records in file: " + getFilePath());
}
String aux = currentLine;
currentLine = null;

return aux;
return new TextRecord(schema, aux);
}

@Override
Expand Down Expand Up @@ -117,13 +135,26 @@ public long getRecordOffset() {
}
}

static class TxtToStruct implements ReaderAdapter<String> {
final Schema schema = SchemaBuilder.struct()
.field(FIELD_VALUE, SchemaBuilder.STRING_SCHEMA).build();
static class TxtToStruct implements ReaderAdapter<TextRecord> {

@Override
public Struct apply(String record) {
return new Struct(schema).put(FIELD_VALUE, record);
public Struct apply(TextRecord record) {
return new Struct(record.schema)
.put(record.schema.fields().get(0), record.value);
}
}

static class TextRecord {
private final Schema schema;
private final String value;

public TextRecord(Schema schema, String value) {
this.schema = schema;
this.value = value;
}

public String getValue() {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,31 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kafka.connect.data.Struct;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.IntStream;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class SequenceFileReaderTest extends HdfsFileReaderTestBase {

private static final String FIELD_KEY = "key";
private static final String FIELD_VALUE = "value";
private static final String FIELD_NAME_KEY = "key";
private static final String FIELD_NAME_VALUE = "value";

@BeforeClass
public static void setUp() throws IOException {
readerClass = SequenceFileReader.class;
dataFile = createDataFile();
readerConfig = new HashMap<>();
readerConfig = new HashMap<String, Object>() {{
put(SequenceFileReader.FILE_READER_SEQUENCE_FIELD_NAME_KEY, FIELD_NAME_KEY);
put(SequenceFileReader.FILE_READER_SEQUENCE_FIELD_NAME_VALUE, FIELD_NAME_VALUE);
}};
}

private static Path createDataFile() throws IOException {
Expand Down Expand Up @@ -63,14 +69,35 @@ private static Path createDataFile() throws IOException {
return path;
}

@Test
public void defaultFieldNames() throws Throwable {
Map<String, Object> customReaderCfg = new HashMap<String, Object>();
reader = getReader(fs, dataFile, customReaderCfg);
assertTrue(reader.getFilePath().equals(dataFile));

assertTrue(reader.hasNext());

int recordCount = 0;
while (reader.hasNext()) {
Struct record = reader.next();
checkData(SequenceFileReader.FIELD_NAME_KEY_DEFAULT, SequenceFileReader.FIELD_NAME_VALUE_DEFAULT, record, recordCount);
recordCount++;
}
assertEquals("The number of records in the file does not match", NUM_RECORDS, recordCount);
}

@Override
protected Offset getOffset(long offset) {
return new SequenceFileReader.SeqOffset(offset);
}

@Override
protected void checkData(Struct record, long index) {
assertTrue((Integer) record.get(FIELD_KEY) == index);
assertTrue(record.get(FIELD_VALUE).toString().startsWith(index + "_"));
checkData(FIELD_NAME_KEY, FIELD_NAME_VALUE, record, index);
}

private void checkData(String keyFieldName, String valueFieldName, Struct record, long index) {
assertTrue((Integer) record.get(keyFieldName) == index);
assertTrue(record.get(valueFieldName).toString().startsWith(index + "_"));
}
}
Loading

0 comments on commit 7d6c6a0

Please sign in to comment.