Create Flux from messages on SQS queue










0















I'm trying to create a Flux from incoming messages received from a queue.



For instance, if I'm using Amazon SQS how do I achieve to write the following code:



Flux<String> messages = connectionToSQS.receiveFromQueue(queueName);


messages.map(s -> log.info("message: ", s).subscribe();



After experimentation, I found the following issues:



  • How do I keep requesting messages from the queue (loop forever)? Do I create one thread that has a loop that keeps on requesting from the queue?

  • How do I make the Flux cold? I don't want to request messages from SQS unless the consumer asks for it. This allows me to use backpressure.

First pass over this problem yielded something like the following code as per Reactor documentation:



Flux<String> bridge = Flux.create(sink -> 
myEventProcessor.register(new MyEventListener<String>()
public void onDataChunk(List<String> chunk)
for(String s : chunk)
sink.next(s);



public void processComplete()
sink.complete();

);
);


The idea being to create a single thread that keeps on requesting for messages in a loop and then using an observer pattern like above to do a next() on each message received.










share|improve this question






















  • Typically sqs queue readers loop on some condition. In your case you might loop until no messages are available and then break out of the loop. You'll probably want to pull as many messages as you can process in a reasonable time frame to reduce cost and latency. You'll also want to set the max wait to be small if you'd prefer ending the loop to waiting for late arriving messages. Finally, don't forget to delete the messages once you've proceeded them.

    – Dan Farrell
    Nov 15 '18 at 2:28















0















I'm trying to create a Flux from incoming messages received from a queue.



For instance, if I'm using Amazon SQS how do I achieve to write the following code:



Flux<String> messages = connectionToSQS.receiveFromQueue(queueName);


messages.map(s -> log.info("message: ", s).subscribe();



After experimentation, I found the following issues:



  • How do I keep requesting messages from the queue (loop forever)? Do I create one thread that has a loop that keeps on requesting from the queue?

  • How do I make the Flux cold? I don't want to request messages from SQS unless the consumer asks for it. This allows me to use backpressure.

First pass over this problem yielded something like the following code as per Reactor documentation:



Flux<String> bridge = Flux.create(sink -> 
myEventProcessor.register(new MyEventListener<String>()
public void onDataChunk(List<String> chunk)
for(String s : chunk)
sink.next(s);



public void processComplete()
sink.complete();

);
);


The idea being to create a single thread that keeps on requesting for messages in a loop and then using an observer pattern like above to do a next() on each message received.










share|improve this question






















  • Typically sqs queue readers loop on some condition. In your case you might loop until no messages are available and then break out of the loop. You'll probably want to pull as many messages as you can process in a reasonable time frame to reduce cost and latency. You'll also want to set the max wait to be small if you'd prefer ending the loop to waiting for late arriving messages. Finally, don't forget to delete the messages once you've proceeded them.

    – Dan Farrell
    Nov 15 '18 at 2:28













0












0








0








I'm trying to create a Flux from incoming messages received from a queue.



For instance, if I'm using Amazon SQS how do I achieve to write the following code:



Flux<String> messages = connectionToSQS.receiveFromQueue(queueName);


messages.map(s -> log.info("message: ", s).subscribe();



After experimentation, I found the following issues:



  • How do I keep requesting messages from the queue (loop forever)? Do I create one thread that has a loop that keeps on requesting from the queue?

  • How do I make the Flux cold? I don't want to request messages from SQS unless the consumer asks for it. This allows me to use backpressure.

First pass over this problem yielded something like the following code as per Reactor documentation:



Flux<String> bridge = Flux.create(sink -> 
myEventProcessor.register(new MyEventListener<String>()
public void onDataChunk(List<String> chunk)
for(String s : chunk)
sink.next(s);



public void processComplete()
sink.complete();

);
);


The idea being to create a single thread that keeps on requesting for messages in a loop and then using an observer pattern like above to do a next() on each message received.










share|improve this question














I'm trying to create a Flux from incoming messages received from a queue.



For instance, if I'm using Amazon SQS how do I achieve to write the following code:



Flux<String> messages = connectionToSQS.receiveFromQueue(queueName);


messages.map(s -> log.info("message: ", s).subscribe();



After experimentation, I found the following issues:



  • How do I keep requesting messages from the queue (loop forever)? Do I create one thread that has a loop that keeps on requesting from the queue?

  • How do I make the Flux cold? I don't want to request messages from SQS unless the consumer asks for it. This allows me to use backpressure.

First pass over this problem yielded something like the following code as per Reactor documentation:



Flux<String> bridge = Flux.create(sink -> 
myEventProcessor.register(new MyEventListener<String>()
public void onDataChunk(List<String> chunk)
for(String s : chunk)
sink.next(s);



public void processComplete()
sink.complete();

);
);


The idea being to create a single thread that keeps on requesting for messages in a loop and then using an observer pattern like above to do a next() on each message received.







project-reactor






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 15 '18 at 2:19









fizixxfizixx

5118




5118












  • Typically sqs queue readers loop on some condition. In your case you might loop until no messages are available and then break out of the loop. You'll probably want to pull as many messages as you can process in a reasonable time frame to reduce cost and latency. You'll also want to set the max wait to be small if you'd prefer ending the loop to waiting for late arriving messages. Finally, don't forget to delete the messages once you've proceeded them.

    – Dan Farrell
    Nov 15 '18 at 2:28

















  • Typically sqs queue readers loop on some condition. In your case you might loop until no messages are available and then break out of the loop. You'll probably want to pull as many messages as you can process in a reasonable time frame to reduce cost and latency. You'll also want to set the max wait to be small if you'd prefer ending the loop to waiting for late arriving messages. Finally, don't forget to delete the messages once you've proceeded them.

    – Dan Farrell
    Nov 15 '18 at 2:28
















Typically sqs queue readers loop on some condition. In your case you might loop until no messages are available and then break out of the loop. You'll probably want to pull as many messages as you can process in a reasonable time frame to reduce cost and latency. You'll also want to set the max wait to be small if you'd prefer ending the loop to waiting for late arriving messages. Finally, don't forget to delete the messages once you've proceeded them.

– Dan Farrell
Nov 15 '18 at 2:28





Typically sqs queue readers loop on some condition. In your case you might loop until no messages are available and then break out of the loop. You'll probably want to pull as many messages as you can process in a reasonable time frame to reduce cost and latency. You'll also want to set the max wait to be small if you'd prefer ending the loop to waiting for late arriving messages. Finally, don't forget to delete the messages once you've proceeded them.

– Dan Farrell
Nov 15 '18 at 2:28












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53311508%2fcreate-flux-from-messages-on-sqs-queue%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53311508%2fcreate-flux-from-messages-on-sqs-queue%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Top Tejano songwriter Luis Silva dead of heart attack at 64

ReactJS Fetched API data displays live - need Data displayed static

Evgeni Malkin