Reactive Producer Consumer Observable in Java










-2















Lets say I have a very long string:



 trillions of chunks
|
v
/asdf/........./bar/baz/foo
^
|
what I try to find is closer to the right:
the data after 9999999th '/'


I need all the chunks of data up to this slash, but not any slashes. I see this as a stream and want to do the following:



  1. I start to read symbols from the back and count slashes.

  2. Anything but slash I put into Last-In-First-Out data structure.

  3. In order not to wait for the whole operation to finish, I start reading data from the lifo datastructure as it becomes available.

  4. I terminate after the 9999999th '/'

Can something like this be accomplished with reactive streams and how?










share|improve this question
























  • Please delete all of this and clearly explain your issue

    – rileyjsumner
    Nov 15 '18 at 22:12











  • thanks for your feedback but it sounds pretty clear to me and you did not ask for specific clarification. regarding your request for deletion: no.

    – Shakka
    Nov 16 '18 at 8:56















-2















Lets say I have a very long string:



 trillions of chunks
|
v
/asdf/........./bar/baz/foo
^
|
what I try to find is closer to the right:
the data after 9999999th '/'


I need all the chunks of data up to this slash, but not any slashes. I see this as a stream and want to do the following:



  1. I start to read symbols from the back and count slashes.

  2. Anything but slash I put into Last-In-First-Out data structure.

  3. In order not to wait for the whole operation to finish, I start reading data from the lifo datastructure as it becomes available.

  4. I terminate after the 9999999th '/'

Can something like this be accomplished with reactive streams and how?










share|improve this question
























  • Please delete all of this and clearly explain your issue

    – rileyjsumner
    Nov 15 '18 at 22:12











  • thanks for your feedback but it sounds pretty clear to me and you did not ask for specific clarification. regarding your request for deletion: no.

    – Shakka
    Nov 16 '18 at 8:56













-2












-2








-2








Lets say I have a very long string:



 trillions of chunks
|
v
/asdf/........./bar/baz/foo
^
|
what I try to find is closer to the right:
the data after 9999999th '/'


I need all the chunks of data up to this slash, but not any slashes. I see this as a stream and want to do the following:



  1. I start to read symbols from the back and count slashes.

  2. Anything but slash I put into Last-In-First-Out data structure.

  3. In order not to wait for the whole operation to finish, I start reading data from the lifo datastructure as it becomes available.

  4. I terminate after the 9999999th '/'

Can something like this be accomplished with reactive streams and how?










share|improve this question
















Lets say I have a very long string:



 trillions of chunks
|
v
/asdf/........./bar/baz/foo
^
|
what I try to find is closer to the right:
the data after 9999999th '/'


I need all the chunks of data up to this slash, but not any slashes. I see this as a stream and want to do the following:



  1. I start to read symbols from the back and count slashes.

  2. Anything but slash I put into Last-In-First-Out data structure.

  3. In order not to wait for the whole operation to finish, I start reading data from the lifo datastructure as it becomes available.

  4. I terminate after the 9999999th '/'

Can something like this be accomplished with reactive streams and how?







java string reactive-programming rx-java2






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 16 '18 at 9:01







Shakka

















asked Nov 15 '18 at 22:07









ShakkaShakka

604




604












  • Please delete all of this and clearly explain your issue

    – rileyjsumner
    Nov 15 '18 at 22:12











  • thanks for your feedback but it sounds pretty clear to me and you did not ask for specific clarification. regarding your request for deletion: no.

    – Shakka
    Nov 16 '18 at 8:56

















  • Please delete all of this and clearly explain your issue

    – rileyjsumner
    Nov 15 '18 at 22:12











  • thanks for your feedback but it sounds pretty clear to me and you did not ask for specific clarification. regarding your request for deletion: no.

    – Shakka
    Nov 16 '18 at 8:56
















Please delete all of this and clearly explain your issue

– rileyjsumner
Nov 15 '18 at 22:12





Please delete all of this and clearly explain your issue

– rileyjsumner
Nov 15 '18 at 22:12













thanks for your feedback but it sounds pretty clear to me and you did not ask for specific clarification. regarding your request for deletion: no.

– Shakka
Nov 16 '18 at 8:56





thanks for your feedback but it sounds pretty clear to me and you did not ask for specific clarification. regarding your request for deletion: no.

– Shakka
Nov 16 '18 at 8:56












1 Answer
1






active

oldest

votes


















1














I think the following code will achve what you want



@Test
public void reactiveParser() throws InterruptedException
ConnectableFlux<String> letters = Flux.create((Consumer<? super FluxSink<String>>) t ->
char chars = "sfdsfsdf/sdf/sdfs/dfsdfsd/fsd/fsd/fs/df/sdf".toCharArray();
for (char c : chars)
t.next(String.valueOf(c));

).publish();

letters
.window(
letters.filter( t -> t.equals("/"))
)
.flatMap( t -> t.collectList())
.map( t -> t.stream().collect(Collectors.joining()))
.subscribe(t ->
System.out.println(t);
);

letters.connect();



The example above utilizes the project reactor. Which is pretty cool way of doing the reactive stuff inside of java.



There is plenty of optimization that can be done in the following code. Not using Strings to represent a single letter would be one of them.



But the basic idea is there. You create flux/observable that emits a letters as they come in and make that observable shareable (you have to window over emitting values) and then just collect them in to a single sting. The code bellow should give the following output:



sfdsfsdf
/sdf
/sdfs
/dfsdfsd
/fsd
/fsd
/fs
/df


Of course you have to utilize non-blocking connection so the bytes could be read asynchronously.






share|improve this answer
























    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53328563%2freactive-producer-consumer-observable-in-java%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    1














    I think the following code will achve what you want



    @Test
    public void reactiveParser() throws InterruptedException
    ConnectableFlux<String> letters = Flux.create((Consumer<? super FluxSink<String>>) t ->
    char chars = "sfdsfsdf/sdf/sdfs/dfsdfsd/fsd/fsd/fs/df/sdf".toCharArray();
    for (char c : chars)
    t.next(String.valueOf(c));

    ).publish();

    letters
    .window(
    letters.filter( t -> t.equals("/"))
    )
    .flatMap( t -> t.collectList())
    .map( t -> t.stream().collect(Collectors.joining()))
    .subscribe(t ->
    System.out.println(t);
    );

    letters.connect();



    The example above utilizes the project reactor. Which is pretty cool way of doing the reactive stuff inside of java.



    There is plenty of optimization that can be done in the following code. Not using Strings to represent a single letter would be one of them.



    But the basic idea is there. You create flux/observable that emits a letters as they come in and make that observable shareable (you have to window over emitting values) and then just collect them in to a single sting. The code bellow should give the following output:



    sfdsfsdf
    /sdf
    /sdfs
    /dfsdfsd
    /fsd
    /fsd
    /fs
    /df


    Of course you have to utilize non-blocking connection so the bytes could be read asynchronously.






    share|improve this answer





























      1














      I think the following code will achve what you want



      @Test
      public void reactiveParser() throws InterruptedException
      ConnectableFlux<String> letters = Flux.create((Consumer<? super FluxSink<String>>) t ->
      char chars = "sfdsfsdf/sdf/sdfs/dfsdfsd/fsd/fsd/fs/df/sdf".toCharArray();
      for (char c : chars)
      t.next(String.valueOf(c));

      ).publish();

      letters
      .window(
      letters.filter( t -> t.equals("/"))
      )
      .flatMap( t -> t.collectList())
      .map( t -> t.stream().collect(Collectors.joining()))
      .subscribe(t ->
      System.out.println(t);
      );

      letters.connect();



      The example above utilizes the project reactor. Which is pretty cool way of doing the reactive stuff inside of java.



      There is plenty of optimization that can be done in the following code. Not using Strings to represent a single letter would be one of them.



      But the basic idea is there. You create flux/observable that emits a letters as they come in and make that observable shareable (you have to window over emitting values) and then just collect them in to a single sting. The code bellow should give the following output:



      sfdsfsdf
      /sdf
      /sdfs
      /dfsdfsd
      /fsd
      /fsd
      /fs
      /df


      Of course you have to utilize non-blocking connection so the bytes could be read asynchronously.






      share|improve this answer



























        1












        1








        1







        I think the following code will achve what you want



        @Test
        public void reactiveParser() throws InterruptedException
        ConnectableFlux<String> letters = Flux.create((Consumer<? super FluxSink<String>>) t ->
        char chars = "sfdsfsdf/sdf/sdfs/dfsdfsd/fsd/fsd/fs/df/sdf".toCharArray();
        for (char c : chars)
        t.next(String.valueOf(c));

        ).publish();

        letters
        .window(
        letters.filter( t -> t.equals("/"))
        )
        .flatMap( t -> t.collectList())
        .map( t -> t.stream().collect(Collectors.joining()))
        .subscribe(t ->
        System.out.println(t);
        );

        letters.connect();



        The example above utilizes the project reactor. Which is pretty cool way of doing the reactive stuff inside of java.



        There is plenty of optimization that can be done in the following code. Not using Strings to represent a single letter would be one of them.



        But the basic idea is there. You create flux/observable that emits a letters as they come in and make that observable shareable (you have to window over emitting values) and then just collect them in to a single sting. The code bellow should give the following output:



        sfdsfsdf
        /sdf
        /sdfs
        /dfsdfsd
        /fsd
        /fsd
        /fs
        /df


        Of course you have to utilize non-blocking connection so the bytes could be read asynchronously.






        share|improve this answer















        I think the following code will achve what you want



        @Test
        public void reactiveParser() throws InterruptedException
        ConnectableFlux<String> letters = Flux.create((Consumer<? super FluxSink<String>>) t ->
        char chars = "sfdsfsdf/sdf/sdfs/dfsdfsd/fsd/fsd/fs/df/sdf".toCharArray();
        for (char c : chars)
        t.next(String.valueOf(c));

        ).publish();

        letters
        .window(
        letters.filter( t -> t.equals("/"))
        )
        .flatMap( t -> t.collectList())
        .map( t -> t.stream().collect(Collectors.joining()))
        .subscribe(t ->
        System.out.println(t);
        );

        letters.connect();



        The example above utilizes the project reactor. Which is pretty cool way of doing the reactive stuff inside of java.



        There is plenty of optimization that can be done in the following code. Not using Strings to represent a single letter would be one of them.



        But the basic idea is there. You create flux/observable that emits a letters as they come in and make that observable shareable (you have to window over emitting values) and then just collect them in to a single sting. The code bellow should give the following output:



        sfdsfsdf
        /sdf
        /sdfs
        /dfsdfsd
        /fsd
        /fsd
        /fs
        /df


        Of course you have to utilize non-blocking connection so the bytes could be read asynchronously.







        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Nov 16 '18 at 13:11

























        answered Nov 16 '18 at 12:08









        piotr szybickipiotr szybicki

        6611511




        6611511





























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53328563%2freactive-producer-consumer-observable-in-java%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Top Tejano songwriter Luis Silva dead of heart attack at 64

            ReactJS Fetched API data displays live - need Data displayed static

            Evgeni Malkin