Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParquetFileReader - Maximum of two batches processed using file_reader.batch_size set to > 0 #100

Open
DNewt opened this issue Sep 29, 2022 · 0 comments · May be fixed by #103
Open

ParquetFileReader - Maximum of two batches processed using file_reader.batch_size set to > 0 #100

DNewt opened this issue Sep 29, 2022 · 0 comments · May be fixed by #103

Comments

@DNewt
Copy link

DNewt commented Sep 29, 2022

Hey, currently setting up the connector to read Parquet files from HDFS locally using docker compose. Can someone please verify that they have this reader (ParquetFileReader) working with the file_reader.batch_size > 0?

Currently it only reads a maximum of 2 batches, if these 2 batches are not enough to read the entire file then it will continuously seek to the offset of the second batch.

For example, below I have a file_reader.batch_size of 2 and a parquet file with 24 records. It seeks to offset 2, then 4, then continues seeking to 4. This behaviour has been seen with both the SimplePolicy and the SleepyPolicy.

{
    "name":"FsSourceConnector",
    "config": {
        "connector.class":"com.github.mmolimar.kafka.connect.fs.FsSourceConnector",
        "tasks.max":1,
        "fs.uris":"hdfs://192.168.84.89:9000/data",
        "topic":"test-topic-one",
        "policy.class":"com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy",
        "policy.recursive":true,
        "policy.regexp":".",
        "policy.batch_size":1,
        "poll.interval.ms": 30000,
        "policy.cleanup":"none",
        "policy.sleepy.sleep": 60000,
        "file_reader.class":"com.github.mmolimar.kafka.connect.fs.file.reader.ParquetFileReader",
        "file_reader.batch_size":2
    }
}

Kafka version for all components: Tried 6.1.0 and 7.1.3 - same behaviour on both.

Note: I have broken the logs up for readability below - it was one big block previously, nothing has been altered.

First Read - Processes first 2 records

[2022-09-29 05:47:01,504] INFO AvroDataConfig values:
connect.meta.data = true
enhanced.avro.schema.support = false
schemas.cache.config = 100
(io.confluent.connect.avro.AvroDataConfig)
[2022-09-29 05:47:01,520] INFO RecordReader initialized will read a total of 24 records. (org.apache.parquet.hadoop.InternalParquetRecordReader)
[2022-09-29 05:47:01,520] INFO at row 0. reading next block (org.apache.parquet.hadoop.InternalParquetRecordReader)
[2022-09-29 05:47:01,522] INFO block read in memory in 2 ms. row count = 24 (org.apache.parquet.hadoop.InternalParquetRecordReader)
[2022-09-29 05:47:01,522] INFO FsSourceTask Processing records for file [path = hdfs://192.168.84.89:9000/data/test_24_records.parquet, length = 478, blocks = [[offset = 0, length = 478, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
[2022-09-29 05:48:01,015] DEBUG WorkerSourceTask{id=FsSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2022-09-29 05:48:01,015] INFO WorkerSourceTask{id=FsSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2022-09-29 05:48:01,015] INFO WorkerSourceTask{id=FsSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2022-09-29 05:48:01,016] DEBUG Submitting 1 entries to backing store. The offsets are: {{path=hdfs://192.168.84.89:9000/data/test_24_records.parquet}={offset=2, file-size=478, eof=false}} (org.apache.kafka.connect.storage.OffsetStorageWriter)
[2022-09-29 05:48:01,017] INFO WorkerSourceTask{id=FsSourceConnector-0} Finished commitOffsets successfully in 2 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)

Second Read - Seeks to offset 2

[2022-09-29 05:48:02,021] INFO SleepyPolicy Seeking to offset [2] for file [hdfs://192.168.84.89:9000/data/test_24_records.parquet]. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy)
[2022-09-29 05:48:02,021] INFO AvroDataConfig values:
connect.meta.data = true
enhanced.avro.schema.support = false
schemas.cache.config = 100
(io.confluent.connect.avro.AvroDataConfig)
[2022-09-29 05:48:02,034] INFO RecordReader initialized will read a total of 24 records. (org.apache.parquet.hadoop.InternalParquetRecordReader)
[2022-09-29 05:48:02,034] INFO at row 0. reading next block (org.apache.parquet.hadoop.InternalParquetRecordReader)
[2022-09-29 05:48:02,036] INFO block read in memory in 1 ms. row count = 24 (org.apache.parquet.hadoop.InternalParquetRecordReader)
[2022-09-29 05:48:02,036] INFO FsSourceTask Processing records for file [path = hdfs://192.168.84.89:9000/data/test_24_records.parquet, length = 478, blocks = [[offset = 0, length = 478, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
[2022-09-29 05:49:01,017] DEBUG WorkerSourceTask{id=FsSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2022-09-29 05:49:01,018] INFO WorkerSourceTask{id=FsSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2022-09-29 05:49:01,018] INFO WorkerSourceTask{id=FsSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2022-09-29 05:49:01,018] DEBUG Submitting 1 entries to backing store. The offsets are: {{path=hdfs://192.168.84.89:9000/data/test_24_records.parquet}={offset=4, file-size=478, eof=false}} (org.apache.kafka.connect.storage.OffsetStorageWriter)
[2022-09-29 05:49:01,019] INFO WorkerSourceTask{id=FsSourceConnector-0} Finished commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)

Third Read - Seeks to offset 4

[2022-09-29 05:49:02,524] INFO SleepyPolicy Seeking to offset [4] for file [hdfs://192.168.84.89:9000/data/test_24_records.parquet]. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy)
[2022-09-29 05:49:02,524] INFO AvroDataConfig values:
connect.meta.data = true
enhanced.avro.schema.support = false
schemas.cache.config = 100
(io.confluent.connect.avro.AvroDataConfig)
[2022-09-29 05:49:02,541] INFO RecordReader initialized will read a total of 24 records. (org.apache.parquet.hadoop.InternalParquetRecordReader)
[2022-09-29 05:49:02,541] INFO at row 0. reading next block (org.apache.parquet.hadoop.InternalParquetRecordReader)
[2022-09-29 05:49:02,543] INFO block read in memory in 2 ms. row count = 24 (org.apache.parquet.hadoop.InternalParquetRecordReader)
[2022-09-29 05:49:02,543] INFO FsSourceTask Processing records for file [path = hdfs://192.168.84.89:9000/data/test_24_records.parquet, length = 478, blocks = [[offset = 0, length = 478, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
[2022-09-29 05:50:01,020] DEBUG WorkerSourceTask{id=FsSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2022-09-29 05:50:01,020] INFO WorkerSourceTask{id=FsSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2022-09-29 05:50:01,020] INFO WorkerSourceTask{id=FsSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2022-09-29 05:50:01,020] DEBUG Submitting 1 entries to backing store. The offsets are: {{path=hdfs://192.168.84.89:9000/data/test_24_records.parquet}={offset=4, file-size=478, eof=false}} (org.apache.kafka.connect.storage.OffsetStorageWriter)
[2022-09-29 05:50:01,022] INFO WorkerSourceTask{id=FsSourceConnector-0} Finished commitOffsets successfully in 2 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)

Fourth Read - Seeks to offset 4

[2022-09-29 05:50:03,028] INFO SleepyPolicy Seeking to offset [4] for file [hdfs://192.168.84.89:9000/data/test_24_records.parquet]. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy)
[2022-09-29 05:50:03,029] INFO AvroDataConfig values:
connect.meta.data = true
enhanced.avro.schema.support = false
schemas.cache.config = 100
(io.confluent.connect.avro.AvroDataConfig)
[2022-09-29 05:50:03,045] INFO RecordReader initialized will read a total of 24 records. (org.apache.parquet.hadoop.InternalParquetRecordReader)
[2022-09-29 05:50:03,045] INFO at row 0. reading next block (org.apache.parquet.hadoop.InternalParquetRecordReader)
[2022-09-29 05:50:03,047] INFO block read in memory in 2 ms. row count = 24 (org.apache.parquet.hadoop.InternalParquetRecordReader)
[2022-09-29 05:50:03,048] INFO FsSourceTask Processing records for file [path = hdfs://192.168.84.89:9000/data/test_24_records.parquet, length = 478, blocks = [[offset = 0, length = 478, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
[2022-09-29 05:51:01,022] DEBUG WorkerSourceTask{id=FsSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2022-09-29 05:51:01,022] INFO WorkerSourceTask{id=FsSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2022-09-29 05:51:01,022] INFO WorkerSourceTask{id=FsSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2022-09-29 05:51:01,022] DEBUG Submitting 1 entries to backing store. The offsets are: {{path=hdfs://192.168.84.89:9000/data/test_24_records.parquet}={offset=4, file-size=478, eof=false}} (org.apache.kafka.connect.storage.OffsetStorageWriter)
[2022-09-29 05:51:01,024] INFO WorkerSourceTask{id=FsSourceConnector-0} Finished commitOffsets successfully in 2 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)

Fifth Read - Seeks to offset 4

[2022-09-29 05:51:03,532] INFO SleepyPolicy Seeking to offset [4] for file [hdfs://192.168.84.89:9000/data/test_24_records.parquet]. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy)

(repeats forever)

Before diving deeper into the connector source code, I thought I would post here to see if anyone has hit a similar issue.

Thanks!

@jakedorne jakedorne linked a pull request Nov 6, 2022 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant