sending input from single spout to multiple bolts with Fields grouping in Apache Storm
builder.setSpout("spout", new TweetSpout());
builder.setBolt("bolt", new TweetCounter(), 2).fieldsGrouping("spout",
new Fields("field1"));
I have an input field "field1" added in fields grouping. By definition of fields grouping, all tweets with same "field1" should go to a single task of TweetCounter. The executors # set for TweetCounter bolt is 2.
However, if "field1" is the same in all the tuples of incoming stream, does this mean that even though I specified 2 executors for TweetCounter, the stream would only be sent to one of them and the other instance remains empty?
To go further with my particular use case, how can I use a single spout and send data to different bolts based on a particular value of an input field (field1)?
apache-storm
add a comment |
builder.setSpout("spout", new TweetSpout());
builder.setBolt("bolt", new TweetCounter(), 2).fieldsGrouping("spout",
new Fields("field1"));
I have an input field "field1" added in fields grouping. By definition of fields grouping, all tweets with same "field1" should go to a single task of TweetCounter. The executors # set for TweetCounter bolt is 2.
However, if "field1" is the same in all the tuples of incoming stream, does this mean that even though I specified 2 executors for TweetCounter, the stream would only be sent to one of them and the other instance remains empty?
To go further with my particular use case, how can I use a single spout and send data to different bolts based on a particular value of an input field (field1)?
apache-storm
add a comment |
builder.setSpout("spout", new TweetSpout());
builder.setBolt("bolt", new TweetCounter(), 2).fieldsGrouping("spout",
new Fields("field1"));
I have an input field "field1" added in fields grouping. By definition of fields grouping, all tweets with same "field1" should go to a single task of TweetCounter. The executors # set for TweetCounter bolt is 2.
However, if "field1" is the same in all the tuples of incoming stream, does this mean that even though I specified 2 executors for TweetCounter, the stream would only be sent to one of them and the other instance remains empty?
To go further with my particular use case, how can I use a single spout and send data to different bolts based on a particular value of an input field (field1)?
apache-storm
builder.setSpout("spout", new TweetSpout());
builder.setBolt("bolt", new TweetCounter(), 2).fieldsGrouping("spout",
new Fields("field1"));
I have an input field "field1" added in fields grouping. By definition of fields grouping, all tweets with same "field1" should go to a single task of TweetCounter. The executors # set for TweetCounter bolt is 2.
However, if "field1" is the same in all the tuples of incoming stream, does this mean that even though I specified 2 executors for TweetCounter, the stream would only be sent to one of them and the other instance remains empty?
To go further with my particular use case, how can I use a single spout and send data to different bolts based on a particular value of an input field (field1)?
apache-storm
apache-storm
edited Nov 15 '18 at 19:32
Sahil
asked Nov 15 '18 at 18:23
SahilSahil
134
134
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
It seems one way to solved this problem is to use Direct grouping where the source decides which component will receive the tuple. :
This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).
You can see it's example uses here:
collector.emitDirect(getWordCountIndex(word),new Values(word));
where getWordCountIndex
returns the index of the component where this tuple will be processes.
add a comment |
An alternative to using emitDirect
as described in this answer is to implement your own stream grouping. The complexity is about the same, but it allows you to reuse grouping logic across multiple bolts.
For example, the shuffle grouping in Storm is implemented as a CustomStreamGrouping
as follows:
public class ShuffleGrouping implements CustomStreamGrouping, Serializable
private ArrayList<List<Integer>> choices;
private AtomicInteger current;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks)
choices = new ArrayList<List<Integer>>(targetTasks.size());
for (Integer i : targetTasks)
choices.add(Arrays.asList(i));
current = new AtomicInteger(0);
Collections.shuffle(choices, new Random());
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values)
int rightNow;
int size = choices.size();
while (true)
rightNow = current.incrementAndGet();
if (rightNow < size)
return choices.get(rightNow);
else if (rightNow == size)
current.set(0);
return choices.get(0);
// race condition with another thread, and we lost. try again
Storm will call prepare
to tell you the task ids your grouping is responsible for, as well as some context on the topology. When Storm emits a tuple from a bolt/spout where you're using this grouping, Storm will call chooseTasks
which lets you define which tasks the tuple should go to. You would then use the grouping when building your topology as shown:
TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("spout", new MySpout(), 1);
tp.setBolt("bolt", new MyBolt())
.customGrouping("spout", new ShuffleGrouping());
Be aware that groupings need to be Serializable
and thread safe.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53325710%2fsending-input-from-single-spout-to-multiple-bolts-with-fields-grouping-in-apache%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
It seems one way to solved this problem is to use Direct grouping where the source decides which component will receive the tuple. :
This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).
You can see it's example uses here:
collector.emitDirect(getWordCountIndex(word),new Values(word));
where getWordCountIndex
returns the index of the component where this tuple will be processes.
add a comment |
It seems one way to solved this problem is to use Direct grouping where the source decides which component will receive the tuple. :
This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).
You can see it's example uses here:
collector.emitDirect(getWordCountIndex(word),new Values(word));
where getWordCountIndex
returns the index of the component where this tuple will be processes.
add a comment |
It seems one way to solved this problem is to use Direct grouping where the source decides which component will receive the tuple. :
This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).
You can see it's example uses here:
collector.emitDirect(getWordCountIndex(word),new Values(word));
where getWordCountIndex
returns the index of the component where this tuple will be processes.
It seems one way to solved this problem is to use Direct grouping where the source decides which component will receive the tuple. :
This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).
You can see it's example uses here:
collector.emitDirect(getWordCountIndex(word),new Values(word));
where getWordCountIndex
returns the index of the component where this tuple will be processes.
answered Nov 17 '18 at 10:01
SaurabhSaurabh
30.3k18102169
30.3k18102169
add a comment |
add a comment |
An alternative to using emitDirect
as described in this answer is to implement your own stream grouping. The complexity is about the same, but it allows you to reuse grouping logic across multiple bolts.
For example, the shuffle grouping in Storm is implemented as a CustomStreamGrouping
as follows:
public class ShuffleGrouping implements CustomStreamGrouping, Serializable
private ArrayList<List<Integer>> choices;
private AtomicInteger current;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks)
choices = new ArrayList<List<Integer>>(targetTasks.size());
for (Integer i : targetTasks)
choices.add(Arrays.asList(i));
current = new AtomicInteger(0);
Collections.shuffle(choices, new Random());
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values)
int rightNow;
int size = choices.size();
while (true)
rightNow = current.incrementAndGet();
if (rightNow < size)
return choices.get(rightNow);
else if (rightNow == size)
current.set(0);
return choices.get(0);
// race condition with another thread, and we lost. try again
Storm will call prepare
to tell you the task ids your grouping is responsible for, as well as some context on the topology. When Storm emits a tuple from a bolt/spout where you're using this grouping, Storm will call chooseTasks
which lets you define which tasks the tuple should go to. You would then use the grouping when building your topology as shown:
TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("spout", new MySpout(), 1);
tp.setBolt("bolt", new MyBolt())
.customGrouping("spout", new ShuffleGrouping());
Be aware that groupings need to be Serializable
and thread safe.
add a comment |
An alternative to using emitDirect
as described in this answer is to implement your own stream grouping. The complexity is about the same, but it allows you to reuse grouping logic across multiple bolts.
For example, the shuffle grouping in Storm is implemented as a CustomStreamGrouping
as follows:
public class ShuffleGrouping implements CustomStreamGrouping, Serializable
private ArrayList<List<Integer>> choices;
private AtomicInteger current;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks)
choices = new ArrayList<List<Integer>>(targetTasks.size());
for (Integer i : targetTasks)
choices.add(Arrays.asList(i));
current = new AtomicInteger(0);
Collections.shuffle(choices, new Random());
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values)
int rightNow;
int size = choices.size();
while (true)
rightNow = current.incrementAndGet();
if (rightNow < size)
return choices.get(rightNow);
else if (rightNow == size)
current.set(0);
return choices.get(0);
// race condition with another thread, and we lost. try again
Storm will call prepare
to tell you the task ids your grouping is responsible for, as well as some context on the topology. When Storm emits a tuple from a bolt/spout where you're using this grouping, Storm will call chooseTasks
which lets you define which tasks the tuple should go to. You would then use the grouping when building your topology as shown:
TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("spout", new MySpout(), 1);
tp.setBolt("bolt", new MyBolt())
.customGrouping("spout", new ShuffleGrouping());
Be aware that groupings need to be Serializable
and thread safe.
add a comment |
An alternative to using emitDirect
as described in this answer is to implement your own stream grouping. The complexity is about the same, but it allows you to reuse grouping logic across multiple bolts.
For example, the shuffle grouping in Storm is implemented as a CustomStreamGrouping
as follows:
public class ShuffleGrouping implements CustomStreamGrouping, Serializable
private ArrayList<List<Integer>> choices;
private AtomicInteger current;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks)
choices = new ArrayList<List<Integer>>(targetTasks.size());
for (Integer i : targetTasks)
choices.add(Arrays.asList(i));
current = new AtomicInteger(0);
Collections.shuffle(choices, new Random());
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values)
int rightNow;
int size = choices.size();
while (true)
rightNow = current.incrementAndGet();
if (rightNow < size)
return choices.get(rightNow);
else if (rightNow == size)
current.set(0);
return choices.get(0);
// race condition with another thread, and we lost. try again
Storm will call prepare
to tell you the task ids your grouping is responsible for, as well as some context on the topology. When Storm emits a tuple from a bolt/spout where you're using this grouping, Storm will call chooseTasks
which lets you define which tasks the tuple should go to. You would then use the grouping when building your topology as shown:
TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("spout", new MySpout(), 1);
tp.setBolt("bolt", new MyBolt())
.customGrouping("spout", new ShuffleGrouping());
Be aware that groupings need to be Serializable
and thread safe.
An alternative to using emitDirect
as described in this answer is to implement your own stream grouping. The complexity is about the same, but it allows you to reuse grouping logic across multiple bolts.
For example, the shuffle grouping in Storm is implemented as a CustomStreamGrouping
as follows:
public class ShuffleGrouping implements CustomStreamGrouping, Serializable
private ArrayList<List<Integer>> choices;
private AtomicInteger current;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks)
choices = new ArrayList<List<Integer>>(targetTasks.size());
for (Integer i : targetTasks)
choices.add(Arrays.asList(i));
current = new AtomicInteger(0);
Collections.shuffle(choices, new Random());
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values)
int rightNow;
int size = choices.size();
while (true)
rightNow = current.incrementAndGet();
if (rightNow < size)
return choices.get(rightNow);
else if (rightNow == size)
current.set(0);
return choices.get(0);
// race condition with another thread, and we lost. try again
Storm will call prepare
to tell you the task ids your grouping is responsible for, as well as some context on the topology. When Storm emits a tuple from a bolt/spout where you're using this grouping, Storm will call chooseTasks
which lets you define which tasks the tuple should go to. You would then use the grouping when building your topology as shown:
TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("spout", new MySpout(), 1);
tp.setBolt("bolt", new MyBolt())
.customGrouping("spout", new ShuffleGrouping());
Be aware that groupings need to be Serializable
and thread safe.
answered Nov 26 '18 at 18:59
Stig Rohde DøssingStig Rohde Døssing
1,716234
1,716234
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53325710%2fsending-input-from-single-spout-to-multiple-bolts-with-fields-grouping-in-apache%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown