Can we run Kafka as Source for Morpheus AutoEncoder pipeline ? #1826
-
The above documentation shows that AE pipeline don't have support for Kafka as source.
My Question is "Do we still don't have support for Kafka in AE pipeline ? " Thank you in advance ! |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 2 replies
-
@srisudha-alt That command is for Starter DFP which will be removed soon in favor of Production DFP. Here is the current issue for its removal. It's a stale implementation so we strongly encourage everyone to use Production DFP. Here's more documentation on Production DFP: Morpheus makes it easy to swap out sources/stages so you should be able update our examples to use the KafkaSourceStage. |
Beta Was this translation helpful? Give feedback.
-
@efajardo-nv Thanks for the links! They are being useful. I am trying to build a production pipeline using Kafka as Source. Now I am trying to build a full length pipeline where we are trying to do anomaly detection. Pipeline looks like below : I got struck at the very first step itself where facing below error which basically shows some incompatibility in outputs and inputs for stage 1 and 2. Can i get some help here ? I couldn't find examples of doing this flow. ====Pipeline Pre-build==== |
Beta Was this translation helpful? Give feedback.
-
@srisudha-alt You are correct. The issue you're seeing is incompatibility of stage 1 output type with the stage 2 input type. The Production DFP example here uses the MultiFileSource stage which outputs a list of files ( We have another Production DFP example that builds a similar pipeline with Morpheus modules (instead of pure stages as above) and uses a Kafka source. More information on modules and how to run the examples can be found here. Module-based pipelines use control messages which is the expected input of the file batcher module. Please look at dfp_integrated_training_streaming_pipeline.py to see how the example pipeline is constructed. |
Beta Was this translation helpful? Give feedback.
-
@efajardo-nv Thank you for your inputs. I am trying to look into and run the intergrated pipelines. I am able to run dfp_integrated_training_batch_pipeline and understood the flow of module based pipelines. FileToDF [training_pipe]: 0 messages [00:07, ? messages/s] Error converting payload to ControlMessage : Extra data: line 1 column 10 (char 9) Error converting payload to ControlMessage : Extra data: line 1 column 16 (char 15) Error converting payload to ControlMessage : Extra data: line 1 column 17 (char 16) Error converting payload to ControlMessage : Extra data: line 1 column 13 (char 12) Error converting payload to ControlMessage : 'str' object has no attribute 'get' Error converting payload to ControlMessage : Expecting value: line 1 column 6 (char 5) Error converting payload to ControlMessage : Expecting value: line 1 column 4 (char 3) Error converting payload to ControlMessage : Expecting value: line 1 column 5 (char 4) Error converting payload to ControlMessage : Expecting property name enclosed in double quotes: line 1 column 6 (char 5) Error converting payload to ControlMessage : Extra data: line 1 column 10 (char 9) Error converting payload to ControlMessage : Extra data: line 1 column 16 (char 15) Error converting payload to ControlMessage : Expecting value: line 1 column 4 (char 3) Error converting payload to ControlMessage : Expecting value: line 1 column 5 (char 4) Error converting payload to ControlMessage : Expecting value: line 1 column 3 (char 2) Error converting payload to ControlMessage : Extra data: line 1 column 13 (char 12) Error converting payload to ControlMessage : Extra data: line 1 column 16 (char 15) Error converting payload to ControlMessage : Extra data: line 1 column 23 (char 22) Error converting payload to ControlMessage : Extra data: line 1 column 25 (char 24) Error converting payload to ControlMessage : Extra data: line 1 column 20 (char 19) Error converting payload to ControlMessage : Extra data: line 1 column 13 (char 12) Error converting payload to ControlMessage : Expecting value: line 1 column 5 (char 4) Error converting payload to ControlMessage : Expecting value: line 1 column 4 (char 3) Error converting payload to ControlMessage : Expecting value: line 1 column 3 (char 2) Error converting payload to ControlMessage : Expecting value: line 1 column 4 (char 3) Error converting payload to ControlMessage : Expecting property name enclosed in double quotes: line 1 column 5 (char 4) Error converting payload to ControlMessage : Extra data: line 1 column 10 (char 9) Error converting payload to ControlMessage : Expecting property name enclosed in double quotes: line 1 column 6 (char 5) Error converting payload to ControlMessage : Extra data: line 1 column 10 (char 9) Error converting payload to ControlMessage : Extra data: line 1 column 16 (char 15) Error converting payload to ControlMessage : Extra data: line 1 column 17 (char 16) Error converting payload to ControlMessage : Extra data: line 1 column 13 (char 12) Error converting payload to ControlMessage : 'str' object has no attribute 'get' Error converting payload to ControlMessage : Expecting value: line 1 column 6 (char 5) Error converting payload to ControlMessage : Expecting value: line 1 column 4 (char 3) Error converting payload to ControlMessage : Expecting value: line 1 column 5 (char 4) Error converting payload to ControlMessage : Expecting property name enclosed in double quotes: line 1 column 6 (char 5) Error converting payload to ControlMessage : Extra data: line 1 column 10 (char 9) Error converting payload to ControlMessage : Extra data: line 1 column 16 (char 15) Since i am trying with the example pipeline and example input, nothing changed in code and in input. Can you please suggest where am I going wrong here ? Below is the command used to run the pipeline. One more important thing is, control messages just seem to provide metadata like filepath etc but the actual messages are still read from the given filepath. Do we actually have support for reading actual data messages from Kafka instead of control messages ? |
Beta Was this translation helpful? Give feedback.
-
@efajardo-nv To have a better context of the issue we are trying to solve, I will try to explain the use case. We want to use Morpheus framework to perform anomaly detection on the network traffic data. We could run the pipelines successfully using this example dfp_duo_pipeline.py using input data from CSV file. Now, we want to use Kafka as input source for the pipelines to do real time anomaly detection on the streaming traffic. According to the discussion we had so far, below are the takeaways we have :
Is it possible to have a 30 min sync up any time early this week to have discussion and get possible solution on this requirement ? |
Beta Was this translation helpful? Give feedback.
@efajardo-nv thanks for the inputs! Having Control message as a Single line json resolved the issue !