-
Notifications
You must be signed in to change notification settings - Fork 1.8k
stream split merge
Data stream often needs to be split and merged as the figure shown below.
Generally, there are two types of split operation.
- Send the same type of tuple to different bolts
- Send different types of tuple to different bolts
Like the one-to-one send-receive mode, there is only one sender(spout or bolt), but just while there are two or more different bolt receiver. For this scenario, no much effort is required. We just need to define one more next level bolt to receive tuple.
Following is the example code:
Define topology:
SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
new SequenceSpout(), spoutParal);
builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
In jstorm, "stream" is used to define the data flow.
For example, there are one sender and two receivers(Receiver-A and Receiver-B). Receiver-A is defined to receive from Stream-A while Receiver-B is defined to receive from Stream-B. For this scenario, when sender is trying to send tuple through Stream-A, only Receiver-A will receive the tuples, vice versa.
Following is the example code:
Define topology:
builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping(
SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
SequenceTopologyDef.SPLIT_BOLT_NAME, // --- Sender Name
SequenceTopologyDef.TRADE_STREAM_ID); // --- The stream which bolt receives tuples from
builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
SequenceTopologyDef.SPLIT_BOLT_NAME, // --- Sender Name
SequenceTopologyDef.CUSTOMER_STREAM_ID); // --- The stream which bolt receives tuples from
Send tuples:
public void execute(Tuple tuple, BasicOutputCollector collector) {
tpsCounter.count();
Long tupleId = tuple.getLong(0);
Object obj = tuple.getValue(1);
if (obj instanceof TradeCustomer) {
TradeCustomer tradeCustomer = (TradeCustomer)obj;
Pair trade = tradeCustomer.getTrade();
Pair customer = tradeCustomer.getCustomer();
collector.emit(SequenceTopologyDef.TRADE_STREAM_ID,
new Values(tupleId, trade));
collector.emit(SequenceTopologyDef.CUSTOMER_STREAM_ID,
new Values(tupleId, customer));
}else if (obj != null){
LOG.info("Unknow type " + obj.getClass().getName());
}else {
LOG.info("Nullpointer " );
}
}
Define the output streams:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(SequenceTopologyDef.TRADE_STREAM_ID, new Fields("ID", "TRADE"));
declarer.declareStream(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER"));
}
When receiving tuples, the checking of source source stream is required.
if (input.getSourceStreamId().equals(SequenceTopologyDef.TRADE_STREAM_ID) ) {
customer = pair;
customerTuple = input;
tradeTuple = tradeMap.get(tupleId);
if (tradeTuple == null) {
customerMap.put(tupleId, input);
return;
}
trade = (Pair) tradeTuple.getValue(1);
}
JStorm is able to make one bolt receive tuples from more than two stream types to do the merge operation by defining more than two source stream type.
Following is an example. MergeRecord bolt receives tuples from SequenceTopologyDef.TRADE_BOLT_NAME and SequenceTopologyDef.CUSTOMER_BOLT_NAME at the same time.
Define topology:
builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
SequenceTopologyDef.SPLIT_BOLT_NAME,
SequenceTopologyDef.TRADE_STREAM_ID);
builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
SequenceTopologyDef.SPLIT_BOLT_NAME,
SequenceTopologyDef.CUSTOMER_STREAM_ID);
builder.setBolt(SequenceTopologyDef.MERGE_BOLT_NAME, new MergeRecord(), 1)
.shuffleGrouping(SequenceTopologyDef.TRADE_BOLT_NAME) // --- Define two source stream
.shuffleGrouping(SequenceTopologyDef.CUSTOMER_BOLT_NAME);
Sender: No changes are required for the sender.
Receiver: For receiver, getSourceComponent() api can be used to distinguish which source component the tuple is sent from.
if (input.getSourceComponent().equals(SequenceTopologyDef.CUSTOMER_BOLT_NAME) ) {
customer = pair;
customerTuple = input;
tradeTuple = tradeMap.get(tupleId);
if (tradeTuple == null) {
customerMap.put(tupleId, input);
return;
}
trade = (Pair) tradeTuple.getValue(1);
} else if (input.getSourceComponent().equals(SequenceTopologyDef.TRADE_BOLT_NAME)) {
trade = pair;
tradeTuple = input;
customerTuple = customerMap.get(tupleId);
if (customerTuple == null) {
tradeMap.put(tupleId, input);
return;
}
customer = (Pair) customerTuple.getValue(1);
}
Refer to example code