位置: 编程技术 - 正文

利用Python学习RabbitMQ消息队列(用python学算法)

编辑:rootadmin

推荐整理分享利用Python学习RabbitMQ消息队列(用python学算法),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:python learn,python learn,python learning,python learn,python learning,python可以这样学,python learn,python learn,内容如对您有帮助,希望把文章链接给更多的朋友!

RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱、邮局、投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处。

RabbitMQ和邮局的主要区别就是RabbitMQ接收、存储和发送的是二进制数据----消息。

rabbitmq基本管理命令:

一步启动Erlang node和Rabbit应用:sudo rabbitmq-server

在后台启动Rabbit node:sudo rabbitmq-server -detached

关闭整个节点(包括应用):sudo rabbitmqctl stop

Demo:

producer.py

consumer.py

一、概念:

Connection: 一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。程序的起始处就是建立这个TCP连接。

Channels: 虚拟连接。建立在上述的TCP连接中。数据流动都是在Channel中进行的。一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

二、队列:

首先建立一个Connection,然后建立Channels,在channel上建立队列

建立时指定durable参数为真,队列将持久化;指定exclusive为真,队列为临时队列,关闭consumer后该队列将不再存在,一般情况下建立临时队列并不指定队列名称,rabbitmq将随机起名,通过result.method.queue来获取队列名:

result = channel.queue_declare(exclusive=True)

result.method.queue

区别:durable是队列持久化与否,如果为真,队列将在rabbitmq服务重启后仍存在,如果为假,rabbitmq服务重启前不会消失,与consumer关闭与否无关;

而exclusive是建立临时队列,当consumer关闭后,该队列就会被删除

三、exchange和bind

Exchange中durable参数指定exchange是否持久化,exchange参数指定exchange名称,type指定exchange类型。Exchange类型有direct,fanout和topic。

Bind是将exchange与queue进行关联,exchange参数和queue参数分别指定要进行bind的exchange和queue,routing_key为可选参数。

Exchange的三种模式:

Direct:

任何发送到Direct Exchange的消息都会被转发到routing_key中指定的Queue

1.一般情况可以使用rabbitMQ自带的Exchange:””(该Exchange的名字为空字符串);

2.这种模式下不需要将Exchange进行任何绑定(bind)操作;

3.消息传递时需要一个“routing_key”,可以简单的理解为要发送到的队列名字;

4.如果vhost中不存在routing_key中指定的队列名,则该消息会被抛弃。

利用Python学习RabbitMQ消息队列(用python学算法)

Demo中虽然声明了一个exchange='yanfa'和queue='anheng'的bind,但是在后面发送消息时并没有使用该exchange和bind,而是采用了direct的模式,没有指定exchange,而是指定了routing_key的名称为队列名,消息将发送到指定队列。

如果一个exchange 声明为direct,并且bind中指定了routing_key,那么发送消息时需要同时指明该exchange和routing_key.

Fanout:

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上

1.可以理解为路由表的模式

2.这种模式不需要routing_key

3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。

4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

Demo中创建了一个将一个exchange和一个queue进行fanout类型的bind.但是发送信息时没有用到它,如果要用到它,只要在发送消息时指定该exchange的名称即可,该exchange就会将消息发送到所有和它bind的队列中。在fanout模式下,指定的routing_key是无效的 。

Topic:

任何发送到Topic Exchange的消息都会被转发到所有关心routing_key中指定话题的Queue上

1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(routing_key),Exchange会将消息转发到所有关注主题能与routing_key模糊匹配的队列。

2.这种模式需要routing_key,也许要提前绑定Exchange与Queue。

3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个routing_key为”MQ.log.error”的消息会被转发到该队列)。

4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

5.同样,如果Exchange没有发现能够与routing_key匹配的Queue,则会抛弃此消息。

四、任务分发

1.Rabbitmq的任务是循环分发的,如果开启两个consumer,producer发送的信息是轮流发送到两个consume的。

2.在producer端使用cha.basic_publish()来发送消息,其中body参数就是要发送的消息,properties=pika.BasicProperties(delivery_mode = 2,)启用消息持久化,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。

3.在接收端使用cha.basic_consume()无限循环监听,如果设置no-ack参数为真,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。

在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

Demo的callback方法中ch.basic_ack(delivery_tag = method.delivery_tag)告诉rabbitmq消息已经正确处理。如果没有这条代码,Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。去调试这种错误,可以通过一下命令打印un-acked Messages:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

4.公平分发:设置cha.basic_qos(prefetch_count=1),这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。

五、注意:

生产者和消费者都应该声明建立队列,网上教程上说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。

可能因为版本问题,在我的测试中如果第二次声明建立的队列属性和第一次不完全相同,将报类似这种错, "PRECONDITION_FAILED - parameters for queue 'anheng' in vhost '/' not equivalent"

如果是exchange第二次创建属性不同,将报这种错, "PRECONDITION_FAILED - cannot redeclare exchange 'yanfa' in vhost '/' with different type, durable, internal or autodelete value"

如果第一次声明建立队列也出现这个错误,说明之前存在名字相同的队列且本次声明的某些属性和之前声明不同,可通过命令sudo rabbitmqctl list_queues查看当前有哪些队列。解决方法是声明建立另一名称的队列或删除原有队列,如果原有队列是非持久化的,可通过重启rabbitmq服务删除原有队列,如果原有队列是持久化的,只能删除它所在的vhost,然后再重建vhost,再设置vhost的权限(先确认该vhost中没有其他有用队列)。

以上内容是小编给大家介绍的利用Python学习RabbitMQ消息队列,希望大家喜欢。

python抓取网页中图片并保存到本地 在上篇文章给大家分享PHP源码批量抓取远程网页图片并保存到本地的实现方法,感兴趣的朋友可以点击了解详情。#-*-coding:utf-8-*-importosimportuuidimporturllib2

Python随手笔记之标准类型内建函数 Python提供了一些内建函数用于基本对象类型:cmp(),repr(),str(),type()和等同于repr()的('')操作符(1)type()type的用法如下:type(object)接

用Python抢过年的火车票附源码 前言:大家跟我一起念,Python大法好,跟着本宝宝用Python抢火车票首先我们需要splinter安装:pipinstallsplinter-

标签: 用python学算法

本文链接地址:https://www.jiuchutong.com/biancheng/379930.html 转载请保留说明!

上一篇:MySQL中表的复制以及大型数据表的备份教程(mysql数据表复制)

下一篇:python抓取网页中图片并保存到本地(用python抓取网页数据的代码)

  • 销售商品结转成本的记账凭证
  • 企业所得税的亏损怎么弥补
  • 个人所得税哪里报税
  • 会计凭证归档后几年后可销毁
  • 企业隐藏收入会受怎样处罚
  • 工业固定资产折旧会计处理
  • 员工向公司借的备用金到年底未还清
  • 以旧换新的金子划算吗
  • 应收账款进行债务转让
  • 发票已认证未抵扣怎么办
  • 计提和缴纳税会计分录
  • 股权的溢价是什么意思
  • 保安公司增值服务
  • 所得税需要结转嘛
  • 进口设备的重置成本包括什么
  • 企业捐赠现金支出应在哪个项目反应
  • 上市公司非限售股股票转让收入
  • 转回给挂靠单位的钱怎么做分录
  • 会计速动比率计算公式
  • 餐饮业租赁要求
  • 发票总金额怎么算折扣
  • 专票只能开一万的额度开了三万的发票
  • 在售房地产土地使用税如何计算?
  • 销售退回涉及的科目
  • 支付的股权溢价款账务处理
  • 糖尿病患者在什么情况下无需
  • 企业所得税汇算清缴会计分录
  • 抵押房产的保险能退吗
  • win7怎么添加设备
  • 现金解款单的使用范围
  • 净资产和净利润一样吗
  • 哪些费用可以在开办费中列支
  • 微信小程序四人游戏
  • 企业可以超范围经营吗
  • php缓存机制有哪些
  • 安斯海滩上的角眼沙蟹,塞舌尔普拉兰岛 (© Ingo Schulz/Offset by Shutterstock)
  • 什么叫股权溢价之谜?
  • vue中的use
  • 发票校验码不清晰
  • php强制下载文件在哪
  • mysql自连接查询各个部门经理的工资
  • php判断是否存在某字符串
  • 商业一般纳税人辅导期截止时间
  • 应付账款主要账户有哪些
  • 红字发票冲销的申请表是税务局开吗
  • 一般纳税人简易征收的适用范围
  • 建筑公司跨区域经营预缴
  • 所得税核定征收的政策
  • 长期股权投资成本法会计处理
  • 企业购买二手车计提折旧年限
  • 房地产企业的会计核算
  • 无偿赠送他人产品,会计分录怎么写
  • 工程施工企业外管证怎么办理
  • 旅游业开具的是什么证明
  • 坐车时的保险费怎么算
  • 房屋租赁费交什么税
  • 合作社盈余返还总额是指哪些数据
  • 企业注销其他应付款怎么处理账务
  • 如何解决win10系统安装不兼容驱动的问题
  • xp电脑启动项怎么设置启动项
  • linux系统中的用户分为哪几类
  • vmware怎么放大虚拟机
  • android-x86安装
  • xp系统如何做
  • avsched32.exe是什么进程 avsched32进程有什么作用
  • 让你的好朋友评价你图片
  • cfg是什么格式,怎么打开
  • three.js怎么用
  • Windows下自动备份MongoDB的批处理脚本
  • python读取文件指定内容
  • 详解如何使用pcem安装windows 98 csdn
  • 用python做斐波那契数列
  • android 自定义dialog
  • 刀具路径轨迹模拟
  • google年会
  • jQuery UI结合Ajax创建可定制的Web界面
  • 四川国税发票查询验证
  • 拆迁户契税减免政策文件
  • 泉州企业医保哪里缴费
  • 单位首次交社保医疗要多久才能享受
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设