Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

news/2024/2/29 2:38:23

应用场景

上一篇《Spring Cloud Stream消费失败后的处理策略(一):自动重试》介绍了默认就会生效的消息重试功能。对于一些因环境原因、网络抖动等不稳定因素引发的问题可以起到比较好的作用。但是对于诸如代码本身存在的逻辑错误等,无论重试多少次都不可能成功的问题,是无法修复的。对于这样的情况,前文中说了可以利用日志记录消息内容,配合告警来做补救,但是很显然,这样做非常原始,并且太过笨拙,处理复杂度过高。所以,我们需要需求更好的办法,本文将介绍针对该类问题的一种处理方法:自定义错误处理逻辑。

动手试试

准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {public static void main(String[] args) {SpringApplication.run(TestApplication.class, args);}@RestControllerstatic class TestController {@Autowiredprivate TestTopic testTopic;/*** 消息生产接口** @param message* @return*/@GetMapping("/sendMessage")public String messageWithMQ(@RequestParam String message) {testTopic.output().send(MessageBuilder.withPayload(message).build());return "ok";}}/*** 消息消费逻辑*/@Slf4j@Componentstatic class TestListener {@StreamListener(TestTopic.INPUT)public void receive(String payload) {log.info("Received payload : " + payload);throw new RuntimeException("Message consumer failed!");}}interface TestTopic {String OUTPUT = "example-topic-output";String INPUT = "example-topic-input";@Output(OUTPUT)MessageChannel output();@Input(INPUT)SubscribableChannel input();}}

内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了,此时可以看到消费失败后抛出了异常,跟上一篇文章的结果一样,消息消费失败,记录了日志,消息信息丢弃。

下面,针对消息消费失败,在TestListener中针对消息消费逻辑创建一段错误处理逻辑,比如:

@Slf4j
@Component
static class TestListener {@StreamListener(TestTopic.INPUT)public void receive(String payload) {log.info("Received payload : " + payload);throw new RuntimeException("Message consumer failed!");}/*** 消息消费失败的降级处理逻辑** @param message*/@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")public void error(Message<?> message) {log.info("Message consumer failed, call fallback!");}}

通过使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")指定了某个通道的错误处理映射。其中,inputChannel的配置中对应关系如下:

  • test-topic:消息通道对应的目标(destination,即:spring.cloud.stream.bindings.example-topic-input.destination的配置)
  • stream-exception-handler:消息通道对应的消费组(group,即:spring.cloud.stream.bindings.example-topic-input.group的配置)

再启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中,此时可以看到日志如下:

2018-12-11 12:00:35.500  INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-12-11 12:00:35.512  INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#311db1cb:0/SimpleConnection@40370d8c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 54391]
2018-12-11 12:00:35.527  INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener  : Received: hello,
2018-12-11 12:00:38.541  INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener  : Message consumer failed, call fallback!

虽然消费逻辑中输出了消息内容之后抛出了异常,但是会进入到error函数中,执行错误处理逻辑(这里只是答应了一句话),用户可以根据需要读取消息内容以及异常详情做更进一步的细化处理。

深入思考

由于error逻辑是通过编码方式来实现的,所以这段逻辑相对来说比较死。通常,只有业务上有明确的错误处理逻辑的时候,这种方法才可以比较好的被应用到。不然能做的可能也只是将消息记录下来,然后具体的分析原因后再去做补救措施。所以这种方法也不是万能的,主要适用于有明确错误处理方案的方式来使用(这种场景并不多),另外。。。

注意:有坑! 这个方案在目前版本(2.0.x)其实还有一个坑,这种方式并不能很好的处理异常消息,会有部分消息得不到正确的处理,由于应用场景也不多,所以目前不推荐使用这种方法来做(完全可以用原始的异常捕获机制来处理,只是没有这种方式那么优雅)。目前看官方issue是在Spring Cloud Stream的2.1.0版本中会修复,后续发布之后可以使用该功能,具体点击查看:Issue #1357。

而对于没有特定的错误处理方案的,也只能通过记录和后续处理来解决,可能这样的方式也只是比从日志中抓去简单那么一些,并没有得到很大的提升。但是,不要紧,因为下一篇我们将继续介绍其他更好的处理方案。

代码示例

本文示例读者可以通过查看下面仓库的中的stream-exception-handler-2项目:

  • Github
  • Gitee

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

  • Spring Boot基础教程
  • Spring Cloud基础教程

本文首发:http://blog.didispace.com/spr...


https://www.jiucaihua.cn/news/show-536768.html

相关文章

QT初级学习12--设置程序发布图标、打开指定的浏览器页面、播放动画

一、设置程序图标 超级简单。第一步找一个自己想设置为图标的.ico后缀的文件&#xff0c;并将其放在工程目录&#xff1b;第二步&#xff0c;在xx.pro文件末尾加入RC_ICONS yourImageName.ico;重新编译后即可。 二、打开指定浏览器页面 要求点击某action后&#xff0c;程序…

QT :-1: error: fatal error: no input files

网上有很多其他的原因&#xff0c;我这里提供我的出错的原因。 build directory 这里我出现了中文路径&#xff0c;并且这个定位是我上个项目的位置&#xff0c;我修改为目前这个项目的位置后&#xff0c;且不存在中文路径了&#xff0c;就好了。 QT里面不能出现任何中文路径…

POJ 1650

1 #include <iostream>2 #include <cmath> //wo de 编译器里的这个abs的功能不能用啊&#xff01;3 using namespace std;4 int main()5 {6 double a;7 double per;//误差计数器&#xff1b;8 int son_num;//代表分子的枚举&#xff1b;9 …

【转载】solr初体验

【1】http://cxshun.iteye.com/blog/1039445 由于工作原因&#xff0c;这段时间接触到solr&#xff0c;一个基于lucene的企业级搜索引擎。不怎么了解它的童鞋可以去GOOGLE一下。 下面开始正题&#xff1a; 1&#xff09;要开始solr的学习&#xff0c;首先当然是要下载它啦&…

QT 14--程序关闭的提示

对于很多程序在点击关闭按钮后都会提示各种各样的信息&#xff0c;设置方法如下&#xff0c;不需要connect连接 任何事件都要写在protected 里&#xff0c;以方便继承&#xff1a; //头文件里定义 #include <QCloseEvent> protected: void colse(QCloseEvent* event);/…

一些思想

为什么80%的码农都做不了架构师&#xff1f;>>> 其实只是为了解决开发部署的问题&#xff0c;结果沾上了k8s&#xff0c;某一天&#xff0c;才发现&#xff0c;其实使用linux docker再加router再加dns-proxy就可以达到这样的需求&#xff0c;所以…… 其实有些事情…

jquery 中post 、get的同步问题

解决方法1&#xff1a; 在全局设置&#xff1a; Js代码 $.ajaxSetup({ async : false }); [js] view plaincopy $.ajaxSetup({ async : false }); 然后再使用post或get方法 Js代码 $.get("register/RegisterState", {test : 12}, function(…

QT 15--获取任何种类文件的某些文件属性:大小、创建时间、上次修改时间等等

1、 首先说一些&#xff0c;如果是mainwindow的QT工程&#xff0c;如果打算做自己手写ui 界面的话&#xff0c;该如何将自己写的内容添加到mainwindow界面呢&#xff1f;方法为&#xff1a; 新建一个widget类&#xff0c;然后将所有零件都用布局布置好后&#xff0c;只需将总布…

VTK资源整理

实现物体的平移和旋转&#xff0c;用vtkTransform.h新实例化一个类&#xff0c;然后再用模型的setUserTransform加进去即可,coneActor代表任意的图形对象 vtkSmartPointer<vtkTransform> trans vtkSmartPointer<vtkTransform>::New(); trans->PostMultiply();…

QT16--最近用到的功能总结

int 转为 QString类型 int i10; QString testQString::number(i);Qt 信号与槽的连接问题 Qt的信号函数里的变量&#xff0c;槽函数的变量&#xff0c;如果有的话&#xff0c;必须和信号函数里的变量保持一致。 QObject::connect(listWidget,SIGNAL(QlistWidgetItem*),this,S…
最新文章