Kafka commitAsync Retries with Commit Order









up vote
0
down vote

favorite












I'm reading through Kafka the Definitive Guide and in the chapter on Consumers there is a blurb on "Retrying Async Commits":




A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you're getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don't retry because a newer commit was already sent.




A quick example by the author would have been great here for dense folks like me. I am particularly unclear about the portion I bolded above.



Can anyone shed light on what this means or even better provide a toy example demonstrating this?










share|improve this question

























    up vote
    0
    down vote

    favorite












    I'm reading through Kafka the Definitive Guide and in the chapter on Consumers there is a blurb on "Retrying Async Commits":




    A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you're getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don't retry because a newer commit was already sent.




    A quick example by the author would have been great here for dense folks like me. I am particularly unclear about the portion I bolded above.



    Can anyone shed light on what this means or even better provide a toy example demonstrating this?










    share|improve this question























      up vote
      0
      down vote

      favorite









      up vote
      0
      down vote

      favorite











      I'm reading through Kafka the Definitive Guide and in the chapter on Consumers there is a blurb on "Retrying Async Commits":




      A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you're getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don't retry because a newer commit was already sent.




      A quick example by the author would have been great here for dense folks like me. I am particularly unclear about the portion I bolded above.



      Can anyone shed light on what this means or even better provide a toy example demonstrating this?










      share|improve this question













      I'm reading through Kafka the Definitive Guide and in the chapter on Consumers there is a blurb on "Retrying Async Commits":




      A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you're getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don't retry because a newer commit was already sent.




      A quick example by the author would have been great here for dense folks like me. I am particularly unclear about the portion I bolded above.



      Can anyone shed light on what this means or even better provide a toy example demonstrating this?







      apache-kafka commit offset kafka-consumer-api






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 10 at 15:46









      mathfish

      2016




      2016






















          1 Answer
          1






          active

          oldest

          votes

















          up vote
          0
          down vote



          accepted










          Here is what I think it is, but got to humble I could be wrong



           try 
          AtomicInteger atomicInteger = new AtomicInteger(0);
          while (true)
          ConsumerRecords<String, String> records = consumer.poll(5);
          for (ConsumerRecord<String, String> record : records)
          System.out.format("offset: %dn", record.offset());
          System.out.format("partition: %dn", record.partition());
          System.out.format("timestamp: %dn", record.timestamp());
          System.out.format("timeStampType: %sn", record.timestampType());
          System.out.format("topic: %sn", record.topic());
          System.out.format("key: %sn", record.key());
          System.out.format("value: %sn", record.value());


          consumer.commitAsync(new OffsetCommitCallback()
          private int marker = atomicInteger.incrementAndGet();
          @Override
          public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
          Exception exception)
          if (exception != null)
          if (marker == atomicInteger.get()) consumer.commitAsync(this);
          else
          //Cant' try anymore


          );

          catch (WakeupException e)
          // ignore for shutdown
          finally
          consumer.commitSync(); //Block
          consumer.close();
          System.out.println("Closed consumer and we are done");






          share|improve this answer






















          • Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
            – mathfish
            Nov 11 at 11:23










          • Yes, that is a good point. I agree.
            – Daniel Hinojosa
            Nov 11 at 14:09










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













           

          draft saved


          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53240589%2fkafka-commitasync-retries-with-commit-order%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          0
          down vote



          accepted










          Here is what I think it is, but got to humble I could be wrong



           try 
          AtomicInteger atomicInteger = new AtomicInteger(0);
          while (true)
          ConsumerRecords<String, String> records = consumer.poll(5);
          for (ConsumerRecord<String, String> record : records)
          System.out.format("offset: %dn", record.offset());
          System.out.format("partition: %dn", record.partition());
          System.out.format("timestamp: %dn", record.timestamp());
          System.out.format("timeStampType: %sn", record.timestampType());
          System.out.format("topic: %sn", record.topic());
          System.out.format("key: %sn", record.key());
          System.out.format("value: %sn", record.value());


          consumer.commitAsync(new OffsetCommitCallback()
          private int marker = atomicInteger.incrementAndGet();
          @Override
          public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
          Exception exception)
          if (exception != null)
          if (marker == atomicInteger.get()) consumer.commitAsync(this);
          else
          //Cant' try anymore


          );

          catch (WakeupException e)
          // ignore for shutdown
          finally
          consumer.commitSync(); //Block
          consumer.close();
          System.out.println("Closed consumer and we are done");






          share|improve this answer






















          • Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
            – mathfish
            Nov 11 at 11:23










          • Yes, that is a good point. I agree.
            – Daniel Hinojosa
            Nov 11 at 14:09














          up vote
          0
          down vote



          accepted










          Here is what I think it is, but got to humble I could be wrong



           try 
          AtomicInteger atomicInteger = new AtomicInteger(0);
          while (true)
          ConsumerRecords<String, String> records = consumer.poll(5);
          for (ConsumerRecord<String, String> record : records)
          System.out.format("offset: %dn", record.offset());
          System.out.format("partition: %dn", record.partition());
          System.out.format("timestamp: %dn", record.timestamp());
          System.out.format("timeStampType: %sn", record.timestampType());
          System.out.format("topic: %sn", record.topic());
          System.out.format("key: %sn", record.key());
          System.out.format("value: %sn", record.value());


          consumer.commitAsync(new OffsetCommitCallback()
          private int marker = atomicInteger.incrementAndGet();
          @Override
          public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
          Exception exception)
          if (exception != null)
          if (marker == atomicInteger.get()) consumer.commitAsync(this);
          else
          //Cant' try anymore


          );

          catch (WakeupException e)
          // ignore for shutdown
          finally
          consumer.commitSync(); //Block
          consumer.close();
          System.out.println("Closed consumer and we are done");






          share|improve this answer






















          • Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
            – mathfish
            Nov 11 at 11:23










          • Yes, that is a good point. I agree.
            – Daniel Hinojosa
            Nov 11 at 14:09












          up vote
          0
          down vote



          accepted







          up vote
          0
          down vote



          accepted






          Here is what I think it is, but got to humble I could be wrong



           try 
          AtomicInteger atomicInteger = new AtomicInteger(0);
          while (true)
          ConsumerRecords<String, String> records = consumer.poll(5);
          for (ConsumerRecord<String, String> record : records)
          System.out.format("offset: %dn", record.offset());
          System.out.format("partition: %dn", record.partition());
          System.out.format("timestamp: %dn", record.timestamp());
          System.out.format("timeStampType: %sn", record.timestampType());
          System.out.format("topic: %sn", record.topic());
          System.out.format("key: %sn", record.key());
          System.out.format("value: %sn", record.value());


          consumer.commitAsync(new OffsetCommitCallback()
          private int marker = atomicInteger.incrementAndGet();
          @Override
          public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
          Exception exception)
          if (exception != null)
          if (marker == atomicInteger.get()) consumer.commitAsync(this);
          else
          //Cant' try anymore


          );

          catch (WakeupException e)
          // ignore for shutdown
          finally
          consumer.commitSync(); //Block
          consumer.close();
          System.out.println("Closed consumer and we are done");






          share|improve this answer














          Here is what I think it is, but got to humble I could be wrong



           try 
          AtomicInteger atomicInteger = new AtomicInteger(0);
          while (true)
          ConsumerRecords<String, String> records = consumer.poll(5);
          for (ConsumerRecord<String, String> record : records)
          System.out.format("offset: %dn", record.offset());
          System.out.format("partition: %dn", record.partition());
          System.out.format("timestamp: %dn", record.timestamp());
          System.out.format("timeStampType: %sn", record.timestampType());
          System.out.format("topic: %sn", record.topic());
          System.out.format("key: %sn", record.key());
          System.out.format("value: %sn", record.value());


          consumer.commitAsync(new OffsetCommitCallback()
          private int marker = atomicInteger.incrementAndGet();
          @Override
          public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
          Exception exception)
          if (exception != null)
          if (marker == atomicInteger.get()) consumer.commitAsync(this);
          else
          //Cant' try anymore


          );

          catch (WakeupException e)
          // ignore for shutdown
          finally
          consumer.commitSync(); //Block
          consumer.close();
          System.out.println("Closed consumer and we are done");







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 12 at 14:26









          mathfish

          2016




          2016










          answered Nov 11 at 0:09









          Daniel Hinojosa

          79737




          79737











          • Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
            – mathfish
            Nov 11 at 11:23










          • Yes, that is a good point. I agree.
            – Daniel Hinojosa
            Nov 11 at 14:09
















          • Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
            – mathfish
            Nov 11 at 11:23










          • Yes, that is a good point. I agree.
            – Daniel Hinojosa
            Nov 11 at 14:09















          Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
          – mathfish
          Nov 11 at 11:23




          Thanks for the example! Makes sense but should the instantiation of the AtomicInteger happen before the loop to process the records, then increment this value in the loop, and finally set this value to the marker in the callback? I think without that you wouldn't be able to check if a later offset was committed since each offset would start with a value of 0 in this example. That seem correct?
          – mathfish
          Nov 11 at 11:23












          Yes, that is a good point. I agree.
          – Daniel Hinojosa
          Nov 11 at 14:09




          Yes, that is a good point. I agree.
          – Daniel Hinojosa
          Nov 11 at 14:09

















           

          draft saved


          draft discarded















































           


          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53240589%2fkafka-commitasync-retries-with-commit-order%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Top Tejano songwriter Luis Silva dead of heart attack at 64

          政党

          天津地下鉄3号線