位置: IT常识 - 正文

RabbitMQ个人实践

编辑:rootadmin
前言 MQ(Message Queue)就是消息队列,其有点有很多:解耦、异步、削峰等等,本文来聊一下RabbitMQ的一些概念以及使用。 RabbitMq 案例 Springboot整合RabbitMQ简单案例 基本概念 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Que ... 前言

推荐整理分享RabbitMQ个人实践,希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:,内容如对您有帮助,希望把文章链接给更多的朋友!

MQ(Message Queue)就是消息队列,其有点有很多:解耦、异步、削峰等等,本文来聊一下RabbitMQ的一些概念以及使用。

RabbitMq案例

Springboot整合RabbitMQ简单案例

基本概念

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。Queue:消息队列载体,每个消息都会被投入到一个或多个队列。Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。Routing Key:路由关键字,exchange根据这个关键字进行消息投递。Producer:消息生产者,就是投递消息的程序。Consumer:消息消费者,就是接受消息的程序。

发布消息到RabbitMQ需要经过两步:

producer → exchangeexchange 根据 exchange 的类型和 routing key 确定将消息投递到哪个队列工作流程

了解了RabbitMQ的一些概念,我们来捋捋使用RabbitMQ的流程:

创建Exchange创建Queue将Queue绑定进Exchange中(此处会设置routing key)生产者发布消息消费者订阅消息交换机(Exchange)

交换机可以绑定队列,绑定时可以给队列指定路由(Routing key)和参数(Arguments)

所有的消息发送都是经过交换机转发到队列的,而不是直接到队列中

交换机类型:

direct

根据确定的路由(routing key)转发消息到队列中(一条消息可以发到多个队列,只要路由相同)

fanout

路由无效,只要和该交换机绑定的队列,都能接收到消息

topic

允许路由使用*和#来进行模糊匹配

*表示一个单词

表示任意数量(零个或多个)单词

例如:如果队列的路由为com.# 那么往交换机发消息是,路由填com.ccc 队列就可以收到消息

headers

忽略路由,由参数(Arguments)来确定转发的队列

消息过期时间TTL

有两种方式设置TTL,创建队列时设置整个队列的TTL或者在发送消息时单独设置每条消息的TTL,消息存活时间取两者的最小值。

创建队列时设置

RabbitMQ个人实践

是消息的存活时间,不是队列的存活时间,别搞混了。

@Beanpublic Queue queue(){ Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000); // 设置队列中的消息5秒过期 return new Queue("queueName",true, false, false, args);}

发送消息时设置

public void makeOrder(String userid,String productid,int num){ String exchangeName = "ttl_exchange"; String routingKey = "ttlmessage"; //给消息设置过期时间 MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){ public Message postProcessMessage(Message message){ // 设置消息5秒过期 message.getMessageProperties().setExpiration("5000"); return message; } } rabbitTemplate.convertAndSend(exchangeName,routingKey,"message",messagePostProcessor);}死信队列

死信队列也是一个正常队列,只是当绑定了死信队列的队列满足相应条件,就会将满足条件的消息转移到死信队列中。

进入死信队列的条件:

消息被拒绝消息过期(超时)队列达到最大长度

死信队列的配置:

按照正常步骤定义一个队列(交换机、队列、绑定)

给需要绑定死信队列的队列添加x-dead-letter-exchange(死信队列的交换机)和x-dead-letter-routing-key(死信队列的路由)参数

@Beanpublic Queue queue(){ Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "死信队列交换机名称"); args.put("x-dead-letter-routing-key", "死信队列路由"); return new Queue("queueName",true, false, false, args);}如何保证MQ消息正确送达与消费可靠性生产和推送

步骤:

发送消息前数据库保存MQ消息发送日志MQ消息发送后使用回调更新日志状态

实现:

上面我们讲了,发布消息到RabbitMQ需要经过两步:

producer → exchangeexchange 根据 exchange 的类型和 routing key 确定将消息投递到哪个队列

所以,发布消息的确认也分两个部分,以下是确认步骤:

修改MQ应答机制(yml)

spring: rabbitmq: username: rmq password: 123456 virtual-host: / host: localhost port: 5672 # 发送消息确认,producer -> exchange publisher-confirm-type: correlated # 发送消息确认,exchange -> queue publisher-returns: true

新增mq的回调方法

/** * PostConstruct注解好多人以为是Spring提供的。其实是Java自己的注解。 * Java中该注解的说明:@PostConstruct该注解被用来修饰一个非静态的void()方法。 * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。 * PostConstruct在构造函数之后执行,init()方法之前执行。 * Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法) */@PostConstructprivate void regCallBack() { // producer -> exchange 成功或失败都会触发此回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // 这个id是在消息发送的时候传入的 String id = correlationData.getId(); // 如果ack为true代表消息被mq成功接收 if (!ack) { // 应答失败,修改日志状态 System.out.println("exchange 应答失败,做失败处理!"); } else { // 应答成功,修改日志状态 System.out.println("exchange 成功处理"); } } }); // 这个回调只有exchange -> queue 失败时才会触发 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("exchange -> queue 发送失败"); } });}

修改MQ发送消息的方法,增加日志id的传递

String correlationId = "这是日志id";rabbitTemplate.convertAndSend(exchange, routeKey, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 消费者需要correlationId才做这个处理 message.getMessageProperties().setCorrelationId(correlationId); return message; }}, new CorrelationData(correlationId));// 如果消费者不需要获取correlationId,则用下面这种即可rabbitTemplate.convertAndSend(exchange, routeKey, msg, new CorrelationData(correlationId));可靠性消费

步骤:

开启手动应答监听器增加手动应答逻辑

实现:

开启手动应答

spring: rabbitmq: username: rmq password: 123456 virtual-host: / host: localhost port: 5672 listener: simple: acknowledge-mode: manual # 将自动应答ack模式改成手动应答

acknowledge-mode有三种类型:

nome:不进行ack,rabbitmq默认消费者正确处理所有请求munual:手动确认auto:自动确认消息(默认类型)。如果消费者抛出异常,则消息重回队列。

监听器增加手动应答逻辑

@RabbitListener(queues = {"队列名字"})public void messageConsumer(String orderMsg, Channel channel, @Headers Map<String,Object> headers) throws Exception{ // 需要producer做相应处理,consumer才能拿到correlationId String correlationId = messages.getMessageProperties().getCorrelationId(); System.out.println("消息为:" + orderMsg); long tag = Long.parseLong(headers.get(AmqpHeaders.DELIVERY_TAG).toString()); try { // 消费成功,进行确认 channel.basicAck(tag, false); } catch (IOException e) { // 消费失败,重发 // requeue代表是否重发,为false则直接将消息丢弃,有死信就进入死信队列 channel.basicNack(tag, false, true); }}总结

本文介绍了RabbitMQ的一些概念和简单使用,有不少东西其实是没有讲清楚的,比如publisher-confirm-type和acknowledge-mode的几种类型的区别等等。主要是在官方文档找不到相关的细致描述,查文档的能力还是有待提高。。。

参考资料

RabbitMq 技术文档 - 腾讯云开发者社区-腾讯云 (tencent.com)

Spring AMQP

本文链接地址:https://www.jiuchutong.com/zhishi/311639.html 转载请保留说明!

上一篇:c语言中函数的声明和定义(c语言中函数的数据类型是指)

下一篇:基于sklearn的集成学习实战(sklearn实例)

  • 微信不绑定银行卡可以收转账吗(微信不绑定银行卡可以使用零钱吗)

    微信不绑定银行卡可以收转账吗(微信不绑定银行卡可以使用零钱吗)

  • 极光tv投屏弹幕怎么设置(极光tv投屏弹幕大小)

    极光tv投屏弹幕怎么设置(极光tv投屏弹幕大小)

  • realmev11电池容量是多少(realme手机电池容量)

    realmev11电池容量是多少(realme手机电池容量)

  • 华为P30怎么把竖屏视频变成横屏全屏(华为p30怎么把竖屏变横屏)

    华为P30怎么把竖屏视频变成横屏全屏(华为p30怎么把竖屏变横屏)

  • 小米10私密相册怎么打开(小米10私密相册为啥要云空间)

    小米10私密相册怎么打开(小米10私密相册为啥要云空间)

  • 华为手机可以一直更新吗(华为手机能放两个微信吗)

    华为手机可以一直更新吗(华为手机能放两个微信吗)

  • 亮度自动调节关闭了怎么还会自己打开(亮度自动调节关闭了怎么还会自动调节亮度oppo)

    亮度自动调节关闭了怎么还会自己打开(亮度自动调节关闭了怎么还会自动调节亮度oppo)

  • 钉钉云课堂考试退出会自动提交吗(钉钉云课堂考试能分屏吗)

    钉钉云课堂考试退出会自动提交吗(钉钉云课堂考试能分屏吗)

  • 微信朋友圈仅对你可见是什么意思(微信朋友圈仅对方可见,对方知道吗)

    微信朋友圈仅对你可见是什么意思(微信朋友圈仅对方可见,对方知道吗)

  • oppor11怎么强制开机(oppor11怎么强制重启手机)

    oppor11怎么强制开机(oppor11怎么强制重启手机)

  • 早期计算机是用来(早期计算机是用来运行什么的)

    早期计算机是用来(早期计算机是用来运行什么的)

  • 红米note8pro耗电太快怎么办(红米note8pro耗电快)

    红米note8pro耗电太快怎么办(红米note8pro耗电快)

  • 华为平板m6怎么插u盘(华为平板m6怎么设置学生模式)

    华为平板m6怎么插u盘(华为平板m6怎么设置学生模式)

  • 苹果11处理器多少(苹果11处理器多少核)

    苹果11处理器多少(苹果11处理器多少核)

  • iwatch能打电话吗(蜂窝版iwatch能打电话吗)

    iwatch能打电话吗(蜂窝版iwatch能打电话吗)

  • vivo nex3怎么设置护眼模式(vivoNEX3怎么设置控制中心)

    vivo nex3怎么设置护眼模式(vivoNEX3怎么设置控制中心)

  • 华为手机如何查激活时间(华为手机如何查找对方手机位置)

    华为手机如何查激活时间(华为手机如何查找对方手机位置)

  • 南京5g试点在哪(南京5g覆盖区域)

    南京5g试点在哪(南京5g覆盖区域)

  • 华为平板m5怎样切换电脑模式(华为平板m5怎样截屏)

    华为平板m5怎样切换电脑模式(华为平板m5怎样截屏)

  • 如何拉好友进微信群(怎样拉微信好友进微信群)

    如何拉好友进微信群(怎样拉微信好友进微信群)

  • 圣克鲁斯岛上的一只岛屿灰狐,加利福尼亚州海峡群岛国家公园 (© Ian Shive/Tandem Motion + Stills)(圣克鲁斯群岛海战百度百科)

    圣克鲁斯岛上的一只岛屿灰狐,加利福尼亚州海峡群岛国家公园 (© Ian Shive/Tandem Motion + Stills)(圣克鲁斯群岛海战百度百科)

  • 怎么证明自己是建档立卡贫困户
  • 平行结转分步法各步骤的费用
  • 丢失增值税发票已报税证明单怎么开
  • 质保金一年怎么写
  • 不能抵扣进项税额的发票类型
  • 个税为负数如何申报
  • 企业付的快递费是扣增值税还是进入费用扣除
  • 公司租房水电费如何入帐
  • 雇佣退休人员工伤赔偿标准
  • 国际船舶代理是什么意思
  • 过渡期损益会计分录
  • 企业收到供货单位提供的材料,如其价款大于
  • 商品过期可以向商家索要赔偿吗
  • 增值税税控系统的税务处理
  • 融资租赁租金收入计税基础
  • 购买烟怎么做账
  • 小规模纳税人的季度
  • 常用的索赔费用计算方法
  • a公司转给b公司投资款的会计分录
  • 建筑企业所得税怎么算举例说明
  • 小规模企业所得税按季度还是按年
  • 总分公司合并缴纳个税
  • 如何关闭edge浏览器下载保护
  • qq能上,网页打不开怎么回事
  • 应收账款平均余额怎么理解
  • mac屏幕共享要打开什么权限
  • 启动电脑显示器不亮怎么回事
  • 总公司人员的工资子公司发,如何报税?
  • 商品流通会计科目
  • 投资性房地产期末公允价值大于账面价值
  • 日出的时候是圣诞节
  • 没有抵扣联可以作废吗
  • php imagecopymerge
  • echarts地图tooltip
  • 建筑工程增值税怎么算
  • php数组按照某个字段排序
  • nvidia-smi failed to initialize
  • 营改增企业所得税税率
  • vue3 原理
  • 公章盖错在旁边加盖正确的说明
  • 自然人所属税务机关怎么选
  • 帝国cms采集发布的文章链接打不开
  • 个人对个人大额转账用途怎么写
  • 免税普票要交企业所得税吗
  • 一般纳税人什么情况可以开3%的发票
  • 购买商品并入库做什么会计分录
  • 用于餐厅的不锈钢餐具
  • 原材料 半成品
  • 营业税金及附加包括哪些
  • 冲回上月暂估入账的商品会计分录
  • 坏账准备与应收账款的影响有哪些
  • 暂估成本多久要冲掉
  • 固定资产融资租出对应入账科目
  • 社会保险费结算表怎么打印
  • 往来核算是指会计核算中的资金往来业务
  • 机票行程单可以打印几次
  • 融资租出固定资产的账务处理
  • 可供出售金融资产公允价值变动
  • 物流运输公司的运营盈利模式
  • mysql的20条优化方法
  • Select count(*)、Count(1)和Count(列)的区别及执行方式
  • 系和系怎么区分
  • wysafe.exe是什么
  • 如何彻底删除超级QQ秀
  • win8怎么删除输入法
  • win8程序和功能在哪
  • javascript学习指南
  • python利用for循环求1到100的奇数之和
  • iframe嵌套页面滚动事件
  • js正则匹配特殊符号
  • 深入理解javascript特性
  • 谷歌chrome浏览器网页版网址
  • Python的for和in
  • Python中的def
  • 小规模企业所得税怎么算
  • 稳岗补贴是否需要发放
  • 西安12345官网登录入口二维码
  • 土地买卖谈判开场陈述
  • 纳税服务存在的不足之处
  • 普惠性税收优惠政策例子
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设