Spring Kafka批量监听器应用重启
我正在测试Spring Kafka批处理侦听器,批处理确认模式,一次轮询3条记录,并将这些记录持久存储到数据库中。当Spring Boot应用程序重新启动时,我看到另外3条记录正在被使用和处理。
如果批处理大小更大(500),并且在关闭过程完成之前无法将记录持久化到数据库,该怎么办?我们如何确保这些消息不会丢失或处理这种情况?
2021-09-21 22:37:22,448 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.kafka.CassandraConsumer: processing batch size: 3, starting partition: 0, offset: 2797145
Disconnected from the target VM, address: '127.0.0.1:55506', transport: 'socket'
2021-09-21 22:38:02,388 WARN [HikariPool-1 housekeeper] com.zaxxer.hikari.pool.HikariPool$HouseKeeper: HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=48s84ms).
2021-09-21 22:38:02,393 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.kafka.CassandraConsumer: batch update -> 39.943579103
2021-09-21 22:38:02,494 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.kafka.CassandraConsumer: processing batch size: 3, starting partition: 0, offset: 2797148
2021-09-21 22:38:02,495 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.kafka.CassandraConsumer: batch update -> 7.52304E-4
转载请注明出处:http://www.dannyxu.com/article/20230526/1349263.html
随机推荐
-
Spring Kafka Streams:缺少类型信息
我已经设置了一个简单的Kafka流Spring启动应用程序。Spring boot正在使用AutoConfiguration为卡夫卡创建工厂。消息是没有消息键的Json消息。使用下面的配置会导致在headers中显示No Type info...
-
spring Kafka连接到Kafka需要什么样的SSL配置?
我们在Kafka中只启用了ssl加密。客户端身份验证已关闭。我们有一个spring boot应用程序连接到它。现在,我们已经设置了spring.kafka.security.protocol = SSL“尽管如此,我们仍然得到Kafka侦听...
-
Spring Kafka @KafkaListener -重试发送失败消息并手动提交偏移量
我使用@KafkaListener来消费来自kafka主题,我有一个应用程序逻辑来处理不同消费者组中的多个消费者的每个记录。现在我的问题陈述是,-I必须将使用的消息发送到第三方rest端点。如果消息发送到rest-end点失败,则不应提交偏...
-
Spring Kafka重试日志记录
我有一个要求消费从kafka主题,做一些工作的记录,并产生到另一个主题与spring-kafka 2.1.7。其他要求是事务性的只有一次语义,重试和错误handling.On提交记录失败我应该做3次重试,记录每个重试消息重试主题和失败的所有...
-
Spring Kafka全球交易ID在程序结束后保持打开状态
我正在创建一个在Spring Boot下的Kafka Spring生产者,它将发送数据到Kafka,然后写入数据库;我希望所有的工作都在一个事务中。我刚接触Kafka,不是Spring方面的专家,我遇到了一些困难。任何指点都非常感谢。到目前...
-
Spring kafka运行时重新创建Kafka流拓扑
我有一个应用程序是基于spring boot,spring-kafka和kafka-streams。当应用程序启动时,它使用默认的主题列表创建kafka streams拓扑。我需要做的是在运行时编辑/重新创建拓扑。例如,当应用程序已经运行时...
-
Spring Kafka消费者将JSON解析为字符串
我正在使用Spring Kafka来消费来自主题的消息。下面是消费者将听到的对象的结构。{ identifier: ABCD1234, data: { id: 12345, sourc...
-
Spring-Kafka消费者不会自动接收消息
我有一个spring boot kafka消费者和一个spring boot kafka生产者,当我在终端中启动消费者并在启动生产者之后,消费者收到数据时,如果且仅当我按下任何键盘键时,我不知道为什么?这是我的配置。public Consu...
-
Spring Kafka -使用哪个批处理错误处理程序?
我刚开始使用spring-kafka 2.6.4。我创建了按批轮询消息的消费者工厂:@Bean public ConcurrentKafkaListenerContainerFactoryString, String ...
-
spring-kafka:在Spring Boot中使用交互式查询
在Kafka Streams + Spring Boot应用程序中配置(提供主机和端口信息)和访问交互式查询的惯用方式是什么?访问KafkaStreams实例以访问状态存储的正确方法是什么?我知道spring-cloud-stream中有I...