Reactive Producer Consumer Observable in Java
Lets say I have a very long string:
trillions of chunks
|
v
/asdf/........./bar/baz/foo
^
|
what I try to find is closer to the right:
the data after 9999999th '/'
I need all the chunks of data up to this slash, but not any slashes. I see this as a stream and want to do the following:
- I start to read symbols from the back and count slashes.
- Anything but slash I put into Last-In-First-Out data structure.
- In order not to wait for the whole operation to finish, I start reading data from the lifo datastructure as it becomes available.
- I terminate after the 9999999th '/'
Can something like this be accomplished with reactive streams and how?
java string reactive-programming rx-java2
add a comment |
Lets say I have a very long string:
trillions of chunks
|
v
/asdf/........./bar/baz/foo
^
|
what I try to find is closer to the right:
the data after 9999999th '/'
I need all the chunks of data up to this slash, but not any slashes. I see this as a stream and want to do the following:
- I start to read symbols from the back and count slashes.
- Anything but slash I put into Last-In-First-Out data structure.
- In order not to wait for the whole operation to finish, I start reading data from the lifo datastructure as it becomes available.
- I terminate after the 9999999th '/'
Can something like this be accomplished with reactive streams and how?
java string reactive-programming rx-java2
Please delete all of this and clearly explain your issue
– rileyjsumner
Nov 15 '18 at 22:12
thanks for your feedback but it sounds pretty clear to me and you did not ask for specific clarification. regarding your request for deletion: no.
– Shakka
Nov 16 '18 at 8:56
add a comment |
Lets say I have a very long string:
trillions of chunks
|
v
/asdf/........./bar/baz/foo
^
|
what I try to find is closer to the right:
the data after 9999999th '/'
I need all the chunks of data up to this slash, but not any slashes. I see this as a stream and want to do the following:
- I start to read symbols from the back and count slashes.
- Anything but slash I put into Last-In-First-Out data structure.
- In order not to wait for the whole operation to finish, I start reading data from the lifo datastructure as it becomes available.
- I terminate after the 9999999th '/'
Can something like this be accomplished with reactive streams and how?
java string reactive-programming rx-java2
Lets say I have a very long string:
trillions of chunks
|
v
/asdf/........./bar/baz/foo
^
|
what I try to find is closer to the right:
the data after 9999999th '/'
I need all the chunks of data up to this slash, but not any slashes. I see this as a stream and want to do the following:
- I start to read symbols from the back and count slashes.
- Anything but slash I put into Last-In-First-Out data structure.
- In order not to wait for the whole operation to finish, I start reading data from the lifo datastructure as it becomes available.
- I terminate after the 9999999th '/'
Can something like this be accomplished with reactive streams and how?
java string reactive-programming rx-java2
java string reactive-programming rx-java2
edited Nov 16 '18 at 9:01
Shakka
asked Nov 15 '18 at 22:07
ShakkaShakka
604
604
Please delete all of this and clearly explain your issue
– rileyjsumner
Nov 15 '18 at 22:12
thanks for your feedback but it sounds pretty clear to me and you did not ask for specific clarification. regarding your request for deletion: no.
– Shakka
Nov 16 '18 at 8:56
add a comment |
Please delete all of this and clearly explain your issue
– rileyjsumner
Nov 15 '18 at 22:12
thanks for your feedback but it sounds pretty clear to me and you did not ask for specific clarification. regarding your request for deletion: no.
– Shakka
Nov 16 '18 at 8:56
Please delete all of this and clearly explain your issue
– rileyjsumner
Nov 15 '18 at 22:12
Please delete all of this and clearly explain your issue
– rileyjsumner
Nov 15 '18 at 22:12
thanks for your feedback but it sounds pretty clear to me and you did not ask for specific clarification. regarding your request for deletion: no.
– Shakka
Nov 16 '18 at 8:56
thanks for your feedback but it sounds pretty clear to me and you did not ask for specific clarification. regarding your request for deletion: no.
– Shakka
Nov 16 '18 at 8:56
add a comment |
1 Answer
1
active
oldest
votes
I think the following code will achve what you want
@Test
public void reactiveParser() throws InterruptedException
ConnectableFlux<String> letters = Flux.create((Consumer<? super FluxSink<String>>) t ->
char chars = "sfdsfsdf/sdf/sdfs/dfsdfsd/fsd/fsd/fs/df/sdf".toCharArray();
for (char c : chars)
t.next(String.valueOf(c));
).publish();
letters
.window(
letters.filter( t -> t.equals("/"))
)
.flatMap( t -> t.collectList())
.map( t -> t.stream().collect(Collectors.joining()))
.subscribe(t ->
System.out.println(t);
);
letters.connect();
The example above utilizes the project reactor. Which is pretty cool way of doing the reactive stuff inside of java.
There is plenty of optimization that can be done in the following code. Not using Strings to represent a single letter would be one of them.
But the basic idea is there. You create flux/observable that emits a letters as they come in and make that observable shareable (you have to window over emitting values) and then just collect them in to a single sting. The code bellow should give the following output:
sfdsfsdf
/sdf
/sdfs
/dfsdfsd
/fsd
/fsd
/fs
/df
Of course you have to utilize non-blocking connection so the bytes could be read asynchronously.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53328563%2freactive-producer-consumer-observable-in-java%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
I think the following code will achve what you want
@Test
public void reactiveParser() throws InterruptedException
ConnectableFlux<String> letters = Flux.create((Consumer<? super FluxSink<String>>) t ->
char chars = "sfdsfsdf/sdf/sdfs/dfsdfsd/fsd/fsd/fs/df/sdf".toCharArray();
for (char c : chars)
t.next(String.valueOf(c));
).publish();
letters
.window(
letters.filter( t -> t.equals("/"))
)
.flatMap( t -> t.collectList())
.map( t -> t.stream().collect(Collectors.joining()))
.subscribe(t ->
System.out.println(t);
);
letters.connect();
The example above utilizes the project reactor. Which is pretty cool way of doing the reactive stuff inside of java.
There is plenty of optimization that can be done in the following code. Not using Strings to represent a single letter would be one of them.
But the basic idea is there. You create flux/observable that emits a letters as they come in and make that observable shareable (you have to window over emitting values) and then just collect them in to a single sting. The code bellow should give the following output:
sfdsfsdf
/sdf
/sdfs
/dfsdfsd
/fsd
/fsd
/fs
/df
Of course you have to utilize non-blocking connection so the bytes could be read asynchronously.
add a comment |
I think the following code will achve what you want
@Test
public void reactiveParser() throws InterruptedException
ConnectableFlux<String> letters = Flux.create((Consumer<? super FluxSink<String>>) t ->
char chars = "sfdsfsdf/sdf/sdfs/dfsdfsd/fsd/fsd/fs/df/sdf".toCharArray();
for (char c : chars)
t.next(String.valueOf(c));
).publish();
letters
.window(
letters.filter( t -> t.equals("/"))
)
.flatMap( t -> t.collectList())
.map( t -> t.stream().collect(Collectors.joining()))
.subscribe(t ->
System.out.println(t);
);
letters.connect();
The example above utilizes the project reactor. Which is pretty cool way of doing the reactive stuff inside of java.
There is plenty of optimization that can be done in the following code. Not using Strings to represent a single letter would be one of them.
But the basic idea is there. You create flux/observable that emits a letters as they come in and make that observable shareable (you have to window over emitting values) and then just collect them in to a single sting. The code bellow should give the following output:
sfdsfsdf
/sdf
/sdfs
/dfsdfsd
/fsd
/fsd
/fs
/df
Of course you have to utilize non-blocking connection so the bytes could be read asynchronously.
add a comment |
I think the following code will achve what you want
@Test
public void reactiveParser() throws InterruptedException
ConnectableFlux<String> letters = Flux.create((Consumer<? super FluxSink<String>>) t ->
char chars = "sfdsfsdf/sdf/sdfs/dfsdfsd/fsd/fsd/fs/df/sdf".toCharArray();
for (char c : chars)
t.next(String.valueOf(c));
).publish();
letters
.window(
letters.filter( t -> t.equals("/"))
)
.flatMap( t -> t.collectList())
.map( t -> t.stream().collect(Collectors.joining()))
.subscribe(t ->
System.out.println(t);
);
letters.connect();
The example above utilizes the project reactor. Which is pretty cool way of doing the reactive stuff inside of java.
There is plenty of optimization that can be done in the following code. Not using Strings to represent a single letter would be one of them.
But the basic idea is there. You create flux/observable that emits a letters as they come in and make that observable shareable (you have to window over emitting values) and then just collect them in to a single sting. The code bellow should give the following output:
sfdsfsdf
/sdf
/sdfs
/dfsdfsd
/fsd
/fsd
/fs
/df
Of course you have to utilize non-blocking connection so the bytes could be read asynchronously.
I think the following code will achve what you want
@Test
public void reactiveParser() throws InterruptedException
ConnectableFlux<String> letters = Flux.create((Consumer<? super FluxSink<String>>) t ->
char chars = "sfdsfsdf/sdf/sdfs/dfsdfsd/fsd/fsd/fs/df/sdf".toCharArray();
for (char c : chars)
t.next(String.valueOf(c));
).publish();
letters
.window(
letters.filter( t -> t.equals("/"))
)
.flatMap( t -> t.collectList())
.map( t -> t.stream().collect(Collectors.joining()))
.subscribe(t ->
System.out.println(t);
);
letters.connect();
The example above utilizes the project reactor. Which is pretty cool way of doing the reactive stuff inside of java.
There is plenty of optimization that can be done in the following code. Not using Strings to represent a single letter would be one of them.
But the basic idea is there. You create flux/observable that emits a letters as they come in and make that observable shareable (you have to window over emitting values) and then just collect them in to a single sting. The code bellow should give the following output:
sfdsfsdf
/sdf
/sdfs
/dfsdfsd
/fsd
/fsd
/fs
/df
Of course you have to utilize non-blocking connection so the bytes could be read asynchronously.
edited Nov 16 '18 at 13:11
answered Nov 16 '18 at 12:08
piotr szybickipiotr szybicki
6611511
6611511
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53328563%2freactive-producer-consumer-observable-in-java%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Please delete all of this and clearly explain your issue
– rileyjsumner
Nov 15 '18 at 22:12
thanks for your feedback but it sounds pretty clear to me and you did not ask for specific clarification. regarding your request for deletion: no.
– Shakka
Nov 16 '18 at 8:56