Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faust workers crash due to keyerror from aiokafka #19

Open
DhruvaPatil98 opened this issue Feb 25, 2020 · 0 comments
Open

Faust workers crash due to keyerror from aiokafka #19

DhruvaPatil98 opened this issue Feb 25, 2020 · 0 comments

Comments

@DhruvaPatil98
Copy link

DhruvaPatil98 commented Feb 25, 2020

(Not sure if this should be raised on faust repo. Since the issue and fix can be on aiokafka, raising it here. Let me know if I should move it to faust repo)

Steps to reproduce

When changing the number of faust workers from around 5 to 6-10, while data is being streamed to the topics that agents subscribe to, the workers crash with keyerror

The following script is used for app.py

from faust import App

app = App(
    'app_main',
    broker='kafka://kafka:9094',
    store='rocksdb://',
)

PARTITITONS = 10

event_topic = []
event_table = []
for i in range(20):
    event_topic.append(app.topic(
        f'event_topic_write{i}',
        internal=True,
        partitions=PARTITITONS,
    ))

    event_table.append(app.Table(
        f'event_table{i}',
        partitions=PARTITITONS,
    ))

@app.agent(event_topic[0])
async def event_topic_write(streams):
    async for payload in streams.events():
        print(f'Got data: {payload}')
        event_table[0][payload.key] = payload.value


if __name__ == '__main__':
    app.main()

The following script was used to stream events to the topics:

import json
import random
import string
from kafka import KafkaProducer

producer_instance = KafkaProducer(
    bootstrap_servers=['kafka:9094'],
)

event_topic = []
for i in range(20):
    event_topic.append(f'event_topic_write{i}')


def randomString(stringLength=10):
    """Generate a random string of fixed length """
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(stringLength))


while True:
    key_bytes = bytes(json.dumps(randomString()), encoding='utf-8')
    value_bytes = bytes(json.dumps('test'), encoding='utf-8')

    topic_name = event_topic[random.randint(0, len(event_topic)-1)]

    producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
    producer_instance.flush()
    print(key_bytes)

Expected behavior

The rebalance to finish successfully

Actual behavior

Some workers crash

Full traceback

[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table1-changelog', partition=0) at offset 0 
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table12-changelog', partition=9) at offset 0 
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table4-changelog', partition=2) at offset 0 
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table3-changelog', partition=6) at offset 0 
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table13-changelog', partition=6) at offset 0 
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table6-changelog', partition=4) at offset 0 
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table12-changelog', partition=4) at offset 0 
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table2-changelog', partition=9) at offset 0 
[2020-02-21 11:04:39,179] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table11-changelog', partition=1) at offset 0 
[2020-02-21 11:04:39,179] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table5-changelog', partition=3) at offset 0 
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table9-changelog', partition=1) at offset 0 
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table2-changelog', partition=4) at offset 0 
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table14-changelog', partition=9) at offset 0 
[2020-02-21 11:04:39,182] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table19-changelog', partition=8) at offset 0 
[2020-02-21 11:04:39,186] [9] [DEBUG] <AIOKafkaConnection host=kafka port=9094> Request 357: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='app_main-event_table18-changelog', partitions=[(partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576)]), (topic='app_main-event_table6-changelog', partitions=[(partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table0-changelog', partitions=[(partition=6, offset=133, max_bytes=1048576), (partition=0, offset=33, max_bytes=1048576), (partition=4, offset=20, max_bytes=1048576), (partition=5, offset=26, max_bytes=1048576), (partition=2, offset=35, max_bytes=1048576)]), (topic='app_main-event_table2-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table11-changelog', partitions=[(partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table5-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-event_table7-changelog', partitions=[(partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576)]), (topic='app_main-event_table19-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576)]), (topic='app_main-event_table15-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576)]), (topic='app_main-event_table9-changelog', partitions=[(partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576)]), (topic='app_main-event_table1-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576)]), (topic='app_main-event_table10-changelog', partitions=[(partition=3, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576)]), (topic='app_main-event_table8-changelog', partitions=[(partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table13-changelog', partitions=[(partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table17-changelog', partitions=[(partition=1, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576)]), (topic='app_main-event_table4-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576)]), (topic='app_main-event_table3-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table16-changelog', partitions=[(partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table12-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-event_table14-changelog', partitions=[(partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)])]) 
[2020-02-21 11:04:39,190] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=8) 
[2020-02-21 11:04:39,190] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=7) 
[2020-02-21 11:04:39,192] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=9) 
[2020-02-21 11:04:39,192] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=9) 
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=3) 
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=3) 
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=3) 
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=9) 
[2020-02-21 11:04:39,194] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=7) 
[2020-02-21 11:04:39,194] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=8) 
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=3) 
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=1) 
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=7) 
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=1) 
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=7) 
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=7) 
[2020-02-21 11:04:39,198] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=8) 
[2020-02-21 11:04:39,198] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=8) 
[2020-02-21 11:04:39,199] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=9) 
[2020-02-21 11:04:39,199] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=8) 
[2020-02-21 11:04:39,202] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=7) 
[2020-02-21 11:04:39,202] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=8) 
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=3) 
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=1) 
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=7) 
[2020-02-21 11:04:39,204] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=7) 
[2020-02-21 11:04:39,204] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=9) 
[2020-02-21 11:04:39,205] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=3) 
[2020-02-21 11:04:39,205] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=1) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=3) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=9) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=8) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=8) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=3) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=8) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=7) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=7) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=9) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=1) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=1) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=3) 
[2020-02-21 11:04:39,208] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=1) 
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=9) 
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=1) 
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=9) 
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=1) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=3) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=8) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=1) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=3) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=1) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=9) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=3) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=7) 
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=1) 
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=8) 
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=1) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=3) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=7) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=8) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=8) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=1) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=3) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=8) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=7) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=8) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=9) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=7) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=8) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=9) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=7) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=8) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=7) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=7) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=8) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=8) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=9) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=3) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=1) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=7) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=9) 
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=7) 
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=1) 
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=9) 
[2020-02-21 11:04:39,220] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=3) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=7) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=3) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=9) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=1) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=3) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=3) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=1) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=9) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=9) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=9) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=3) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=1) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=1) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=9) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=8) 
[2020-02-21 11:04:39,229] [9] [ERROR] [^--Consumer]: Drain messages raised: KeyError(TopicPartition(topic='app_main-event_table0-changelog', partition=9),) 
Traceback (most recent call last):
  File "/application/faust_mod/transport/consumer.py", line 1039, in _drain_messages
    async for tp, message in ait:
  File "/application/faust_mod/transport/consumer.py", line 640, in getmany
    records, active_partitions = await self._wait_next_records(timeout)
  File "/application/faust_mod/transport/consumer.py", line 678, in _wait_next_records
    timeout=timeout,
  File "/application/faust_mod/transport/consumer.py", line 1269, in _getmany
    return await self._thread.getmany(active_partitions, timeout)
  File "/application/faust_mod/transport/drivers/aiokafka.py", line 810, in getmany
    max_records=_consumer._max_poll_records,
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/usr/local/lib/python3.6/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/application/faust_mod/transport/drivers/aiokafka.py", line 827, in _fetch_records
    max_records=max_records,
  File "/usr/local/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 1082, in fetched_records
    res_or_error = self._records[tp]
KeyError: TopicPartition(topic='app_main-event_table0-changelog', partition=9)
[2020-02-21 11:04:39,263] [9] [DEBUG] Timer Recovery.stats woke up - iteration=70 time_spent_sleeping=5.114945699984673 drift=-0.11494569998467341 new_interval=4.9 since_epoch=355.7561254000175 
[2020-02-21 11:04:39,263] [9] [ERROR] [^---Fetcher]: Crashed reason=KeyError(TopicPartition(topic='app_main-event_table0-changelog', partition=9),) 
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/application/faust_mod/transport/consumer.py", line 176, in _fetcher
    await self._drainer
  File "/application/faust_mod/transport/consumer.py", line 1039, in _drain_messages
    async for tp, message in ait:
  File "/application/faust_mod/transport/consumer.py", line 640, in getmany
    records, active_partitions = await self._wait_next_records(timeout)
  File "/application/faust_mod/transport/consumer.py", line 678, in _wait_next_records
    timeout=timeout,
  File "/application/faust_mod/transport/consumer.py", line 1269, in _getmany
    return await self._thread.getmany(active_partitions, timeout)
  File "/application/faust_mod/transport/drivers/aiokafka.py", line 810, in getmany
    max_records=_consumer._max_poll_records,
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/usr/local/lib/python3.6/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/application/faust_mod/transport/drivers/aiokafka.py", line 827, in _fetch_records
    max_records=max_records,
  File "/usr/local/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 1082, in fetched_records
    res_or_error = self._records[tp]
KeyError: TopicPartition(topic='app_main-event_table0-changelog', partition=9)

Versions

  • Python version - 3.6
  • Faust version - 1.10.1 to 1.10.3 and master with corresponding versions of robinhood aiokafka
  • Operating system
  • Kafka version
  • RocksDB version (if applicable)
DhruvaPatil98 added a commit to DhruvaPatil98/aiokafka that referenced this issue Feb 25, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant