I have a Storm cluster connecting to Kinesis Stream. The message looks like this.
{
_c: "a"
}
or it should be
{
_c: "b"
}
I would like to send a tuple with _c="a" to one bolt and _c="b" to a different bolt. How do I achieve this?
This is the bolt that parsing the message from Kinesis to JSON Object using GSon
@Override
public void execute(Tuple tuple) {
String partitionKey = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
String sequenceNumber = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
byte[] payload = (byte[]) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);
ByteBuffer buffer = ByteBuffer.wrap(payload);
String data = null;
try {
data = decoder.decode(buffer).toString();
HashMap < String, String > map = new Gson().fromJson(data, new TypeToken < HashMap < String, Object >> () {}.getType());
this.outputCollector.emit(tuple, new Values(map));
this.outputCollector.ack(tuple);
} catch (CharacterCodingException e) {
this.outputCollector.fail(tuple);
}
}
Thanks
You can define two streams in your bolt and then declare two outputstreams :
@Override
public void execute(Tuple tuple) {
// ...
// Some Code
// ...
if (_c =="a") {
collector.emit("stream1", tuple, new Values(_c));
} else {
collector.emit("stream2", tuple, new Values(_c));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("stream1", new Fields("_c"));
outputFieldsDeclarer.declareStream("stream2", new Fields("_c"));
} }
In your topology you can then use the option in ShuffleGrouping to pass a Stream_id.
topology.setBolt("FirstBolt",new FirstBolt(),1);
topology.setBolt("newBolt1", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream1");
topology.setBolt("newBolt2", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream2");
Another possibility is to just send it to both bolts and then check the value in both bolts and execute the required code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With