Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support KTable-KTable Foreign-Key Join #52

Open
ybuasen opened this issue Jan 18, 2021 · 14 comments
Open

Support KTable-KTable Foreign-Key Join #52

ybuasen opened this issue Jan 18, 2021 · 14 comments
Milestone

Comments

@ybuasen
Copy link

ybuasen commented Jan 18, 2021

Description

The java Kafka Stream library supports KTable-KTable Foreign-Key Join as mentioned at https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#ktable-ktable-fk-join.

The sample use case of java version is post at https://kafka-tutorials.confluent.io/foreign-key-joins/kstreams.html.

Will the same functionality be implemented in Streamiz Kafka .NET soon?

@LGouellec
Copy link
Owner

Hi @ybuasen,

Exactly, java library offer KTable-KTable Foreign-Key Join. This feature is not available yet.
I hope create a release 1.3.0 (april-may 2021) with this feature.

I leave open this issue to track feature implementation progress.

Regards,

@ybuasen
Copy link
Author

ybuasen commented Jan 19, 2021

Hi @ybuasen,

Exactly, java library offer KTable-KTable Foreign-Key Join. This feature is not available yet.
I hope create a release 1.3.0 (april-may 2021) with this feature.

I leave open this issue to track feature implementation progress.

Regards,

@LGouellec Awesome!!! Looking forward to see it in action.

@sgiuliani-ovalmoney
Copy link

Hi, is this feature currently available within the Streamiz library?
Regards

@LGouellec
Copy link
Owner

Hi, is this feature currently available within the Streamiz library?

Regards

For now, this feature is not prioritized. But you have a workaround with SelectKey(..) combine ˋJoin(..)`.

@MladenTasevski
Copy link

Hi, is this feature currently available within the Streamiz library?
Regards

For now, this feature is not prioritized. But you have a workaround with SelectKey(..) combine ˋJoin(..)`.

For streams you can join them by remapping the key and then using the join. I was trying to use ToStream and then remap the key and join . Also I am getting the error using this code (might make sense since changing the keys of tables leads to inconsistencies and loosing data since the key that you would remap to is not unique to records with the same key).
So can you join two KTables reliably by foreign key using your library?

var table1 = builder.Table("topic1", new StringSerDes(), new StringSerDes(), InMemory<string, string>.As("table1"));
var table2 = builder.Table("topic2", new StringSerDes(), new StringSerDes(), InMemory<string, string>.As("table2"));
table1.ToStream().SelectKey((k, v) => v.name).ToTable()
                .Join(table2, (v1, v2) => $"{v1}-{v2}")
                .ToStream().To("final-topic", new StringSerDes(), new StringSerDes());

The error we are getting:

fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[test-app-20ab114e-9933-47d5-a379-0839c9c2bcc7-stream-thread-0] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
      Confluent.Kafka.KafkaException: Local: Erroneous state
         at Confluent.Kafka.Impl.SafeKafkaHandle.AssignImpl(IEnumerable`1 partitions, Func`3 assignMethodErr, Func`3 assignMethodError)
         at Confluent.Kafka.Consumer`2.Unassign()
         at Confluent.Kafka.Consumer`2.RebalanceCallback(IntPtr rk, ErrorCode err, IntPtr partitionsPtr, IntPtr opaque)
         at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr rk, IntPtr timeout_ms)
         at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
         at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
         at Streamiz.Kafka.Net.Crosscutting.KafkaExtensions.ConsumeRecords[K,V](IConsumer`2 consumer, TimeSpan timeout, Int64 maxRecords)
         at Streamiz.Kafka.Net.Processors.StreamThread.<>c__DisplayClass57_0.<Run>b__0()
         at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action)
         at Streamiz.Kafka.Net.Processors.StreamThread.Run()

@LGouellec
Copy link
Owner

Hi @MladenTasevski,

Could you provide more logs regarding your example please ?
Looks strange, I don't think it's link about table foreign key.
Maybe it's relative to another issue.

All details are welcome :)

Regards,

@MladenTasevski
Copy link

Hi @MladenTasevski,

Could you provide more logs regarding your example please ? Looks strange, I don't think it's link about table foreign key. Maybe it's relative to another issue.

All details are welcome :)

Regards,

Hello again @LGouellec

Here I am trying to join the order table with the customer and book table.
Both the book and customer have different PK from orders table and from eachother so will have to SelectKey before each join.
When trying to do it by joining with only one order stream it fails with this error:

fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[test-app-35497002-748f-4946-97be-ae569c23265d-stream-thread-0] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
      Confluent.Kafka.KafkaException: Local: Unknown partition
         at Confluent.Kafka.Impl.SafeKafkaHandle.QueryWatermarkOffsets(String topic, Int32 partition, Int32 millisecondsTimeout)
         at Confluent.Kafka.Consumer`2.QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
         at Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader.<OffsetsChangelogs>b__30_0(ChangelogMetadata _changelog)
         at System.Linq.Enumerable.SelectListIterator`2.MoveNext()
         at System.Linq.Enumerable.ToDictionary[TSource,TKey,TElement](IEnumerable`1 source, Func`2 keySelector, Func`2 elementSelector, IEqualityComparer`1 comparer)
         at Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader.OffsetsChangelogs(IEnumerable`1 registeredChangelogs)
         at Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader.InitChangelogs(IEnumerable`1 registeredChangelogs)
         at Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader.Restore()
         at Streamiz.Kafka.Net.Processors.StreamThread.RestorePhase()
         at Streamiz.Kafka.Net.Processors.StreamThread.Run()
fail: Streamiz.Kafka.Net.Processors.StreamStateManager[0]
      All stream threads have died. The instance will be in error state and should be closed

What more from the logs would you need?

Also when I use SelectKey right before the join it also fails. So I only use SelectKey when initializing the stream or after joining the streams.
Is this way also safe since the FKs aren't a unique key for the orders table?

I am not getting any joins on the customerOrders and bookOrders even though I checked there were entries with the same ID in them.

This is my current implementation:

var orders = builder.Stream("orders" new StringSerDes(), new StringSerDes()).SelectKey((k, v) => v.bookId).ToTable();
var copyOfOrders = builder.Stream("copy_of_orders", new StringSerDes(), new StringSerDes()).SelectKey((k, v) => v.customerId).ToTable();
var book = builder.Stream("book", new StringSerDes(), new StringSerDes()).ToTable();
var customer = builder.Stream("customer", new StringSerDes(), new StringSerDes()).ToTable();

orders.Join(customers, (v1, v2) =>
{
    return JoinCustomerWithBook(v1, v2);
}).ToStream().SelectKey((k, v) => v.orderId).ToTable().To("CustomerOrders");

orders.Join(books, (v1, v2) =>
{
    return JoinBookWithOrder(v1, v2);
}).ToStream().SelectKey((k, v) => v.orderId).ToTable().To("BookOrders");

customerOrders.Join(bookOrders, (v1, v2) => {
    return JoinOrders(v1, v2);
});

@LGouellec
Copy link
Owner

HI @MladenTasevski ,

Regarding this log :

fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[test-app-35497002-748f-4946-97be-ae569c23265d-stream-thread-0] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
      Confluent.Kafka.KafkaException: Local: Unknown partition

It seems that all source topics are not co-partitioned. During a JOIN operation, both topics need to have the same number partitions else the join can't work.

I recommend you to read this article, there are more public blogs available also.

Best regards,

@MladenTasevski
Copy link

@LGouellec I didn't know about the partitions for joining thank you for pointing that out. I've managed to join the KTables, but I am getting a lot of out of order records and I think that makes up for latency in the streams. I'm just joining and using select key. What could be some reasons for the out of order records?

info: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      Finished restoring changelog KSTREAM-TOTABLE-STATE-STORE-0000000019 to store test-app-KSTREAM-TOTABLE-STATE-STORE-0000000019-changelog [[0]] with a total number of 59121 records
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[2|0] Task 2-0 state transition from RESTORING to RUNNING
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[2|0] Restored and ready to run
warn: Streamiz.Kafka.Net.Processors.KTableSourceProcessor[0]
      Detected out-of-order KTable update for KSTREAM-TOTABLE-STATE-STORE-0000000019 at offset 7424, partition [0].

@LGouellec
Copy link
Owner

@MladenTasevski
If you stream a topic, you change the key (via SelectKey(..)) and make a statefull operation like create a KTable, Join operation or Count/Agg/Reduce.
The library will create a repartition topic to materialize the new key.

So your topology is split into two parts.
First one : Stream topic, change Key , Publish into the repartition topic
Second one : Stream repartition topic and do the statefull operation.

Internal consumer subscribe both topics (source topic and repartition), so you are not guaranteed to consume messages first from the source topic and then from the repartition topic. It is in parallel.

So out-of-order records can appears if the timestamp present into the state store is newer compared to the current record.

Btw, You can visualize your topology with this tools.
Call for getting the output :

streamBuilder.Build().Describe().ToString();

@MladenTasevski
Copy link

@LGouellec Thank you for that explanation and the tool, it helped a lot to understand what's going on. I was getting a lot of the out of order records. I am doing a FK join of 3 tables which all have different PKs. The out of order records appear too often and it's causing performance issues. I don't know if there's something more to try. I maybe have something that I'm not doing correctly also.

@LGouellec
Copy link
Owner

@MladenTasevski
What about kafka messages itself ? By default, Streamiz process data in priority by message timestamp.
Out-of-order records means you process message 1 with timestamp 1 after processed message 2 with timestamp 2.

In the producer source topics, the timestamp of your messages are setted explicitly or you let the broker do it ?
Do you use TimestampExtractor ? or keeping the default value ?

@MladenTasevski
Copy link

@LGouellec I keep the default value, let the broker do it. Haven't messed with the Timestamps Extractor.

@LGouellec
Copy link
Owner

Ok. Can you attach to this issue a dump of all your source topics ? And your topology ?

@LGouellec LGouellec added this to the 1.5.0 milestone Oct 25, 2022
@LGouellec LGouellec removed this from the 1.5.0 milestone Nov 7, 2022
@LGouellec LGouellec added the 1.6.0 label Jun 9, 2023
@LGouellec LGouellec added this to the 1.6.0 milestone Jul 6, 2023
@In-Wake In-Wake mentioned this issue Apr 3, 2024
8 tasks
@LGouellec LGouellec removed the 1.6.0 label May 9, 2024
@LGouellec LGouellec removed this from the 1.6.0 milestone May 9, 2024
@LGouellec LGouellec added this to the 1.8.0 milestone Oct 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants