Spring Boot 中使用@KafkaListener批量接收消息

Kafka 天涯孤鸟 21698℃ 0评论

之前介绍了如何在SpringBoot中集成Kafka,但是默认情况下,@KafkaListener都是一条一条消费,如果想要一次消费一个批量的话,我们都知道,在kafka原生的API可以通过poll(num)来获取一次获取num条消息:

那么使用在Springboot中使用@KafkaListener能否实现批量监听呢?
看了spring-kafka的官方文档介绍,可以知道自1.1版本之后,@KafkaListener开始支持批量消费,只需要设置batchListener参数为true
https://docs.spring.io/spring-kafka/reference/html/_reference.html

下面是我在项目中使用到的方法:

开始监听,批量消费后采用JPA的方式批量写入数据库,这里containerFactory = “batchFactory”要指定为批量消费

运行结果如下图,每批消费100条消息:

转载请注明:猫头鹰工作室 » Spring Boot 中使用@KafkaListener批量接收消息

喜欢 (21)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(10)个小伙伴在吐槽
  1. 如果消息队列里消息不足100条,那是会不会等到100条后才执行?
    answer2017-12-25 15:09 回复
    • 天涯孤鸟
      不会的,如果队列里的数据大于100,就会分批消费,每批100,当队列里的数据量小于100时,就会消费当前所有数据
      天涯孤鸟2017-12-28 17:51 回复
  2. 大神,我设置自动提交偏移量,最终为什么没有提交呢
    土豆2018-03-09 19:21 回复
    • 天涯孤鸟
      你使用的是哪种监听?MessageListener(会自动提交)还是AcknowledgeMessageListener(不会自动提交),如果是后者,需要手动调用ack.acknowledge()来提交偏移量。
      天涯孤鸟2018-03-10 15:06 回复
  3. 注解里好像不需要这个了配置了唉。。containerFactory = “batchFactory”
    Jason2018-04-24 10:09 回复
    • 天涯孤鸟
      如果使用默认的就无需指定,要是自定义的就要指定这个
      天涯孤鸟2018-04-25 19:30 回复
  4. 能将完整示例代码放一份到github上面么,我这里怎么有数据丢失的情况? 如果我设置每次取10条,如果在消费了几条后 我停止服务,再启动时,就后丢失上次 已经被poll出来,但是我还没消费的那几条,这个咋个处理? 如果数据从1开始编号, 每次拉取10条, 第一次就应该拉取1~10, 如果处理到第5条,我终止服务,再启动时就是从11开始了, 6~10这部分数据就丢了
    肖哥哥2018-07-09 14:54 回复
    • 这个就是这样的。你看他的代码。最后是在finally中提交偏移量的。所以如果他的jpa保存出现异常。最后也会提交偏移量的。你这种问题,需要借助queue或者reids来实现。先把所有的消息都存到queue里。然后提交偏移量。最后在慢慢消费queue里的消息。
      warren2018-08-12 13:48 回复
  5. "我们都知道,在kafka原生的API可以通过poll(num)来获取一次获取num条消息" 那个poll(long)里面的值是个timeout,不是条数
    Happykuan2018-08-23 16:16 回复
  6. 如果一下拉100条数据,在第二条失败时ack.acknowledge() 提交的偏移量是第二条的还是第100条的
    傲慢的乌龟2018-09-04 16:45 回复