位置: IT常识 - 正文
推荐整理分享RabbitMQ(rabbitmq如何解决消息堆积问题),希望有所帮助,仅作参考,欢迎阅读内容。
文章相关热门搜索词:rabbitmq自动删除队列,rabbitmq五种消息模型,rabbitmq队列有几种,rabbitmq五种消息模型,rabbitmq如何保证消息不丢,rabbitmq五种消息模型,rabbitmq和kafka的区别,rabbitmq如何保证消息不丢,内容如对您有帮助,希望把文章链接给更多的朋友!
同步调用的优点:
时效性较强,可以立即得到结果同步调用的问题:
耦合度高性能和吞吐能力下降有额外的资源消耗有级联失败问题2.异步调用异步调用常见实现就是事件驱动模式
好处:
吞吐量提升:无需等待订阅者处理完成,响应更快速
故障隔离:服务没有直接调用,不存在级联失败问题
调用间没有阻塞,不会造成无效的资源占用
耦合度极低,每个服务都可以灵活插拔,可替换
流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
缺点:
架构复杂了,业务没有明显的流程线,不好管理需要依赖于Broker的可靠、安全、性能3.MQ实现MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
几种常见MQ的对比:
RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScala&Java协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒以内消息可靠性高一般高一般追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
二、Linux安装RabbitMQ1.1.下载镜像方式一:在线拉取
docker pull rabbitmq:3-management方式二:从本地加载
在课前资料已经提供了镜像包:
上传到虚拟机中后,使用命令加载镜像即可:
docker load -i mq.tar1.2.安装MQ执行下面的命令来运行MQ容器:
docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management注意5672是MQ消息通信的端口,15672是ui端口
三、RabbitMQ1.种类大致分为两种
1.**无交换机的 **基本消息队列 和 工作消息队列2.有交换机的发布订阅,路由,主题2.基于SpringAMQP使用2.1基础消息队列先使用测试类生成队列
public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("116.62.32.68"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); }}引入依赖
<!--AMQP依赖,包含RabbitMQ--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>生产者编写yaml文件的MQ地址,端口号,用户名密码
spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码编写生产者代码
@RunWith(SpringRunner.class)@SpringBootTestpublic class SpringAmqpTest { //类似于redisTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue(){ //队列名 String queueName = "simple.queue"; //消息 String message = "hello,SpringAmqp"; //发送消息到队列 rabbitTemplate.convertAndSend(queueName,message); }}消费者编写yaml文件的MQ地址,端口号,用户名密码
同上
编写消费者代码
@Componentpublic class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); }}2.2工作消息队列消费者:
//工作队列@RabbitListener(queues = "simple.queue")public void listenWorkQueue1Message(String msg) throws InterruptedException { System.out.println("spring 消费者1接收到消息:【" + msg + "】");}@RabbitListener(queues = "simple.queue")public void listenSimpleQueue2Message(String msg) throws InterruptedException { System.out.println("spring 消费者2接收到消息:【" + msg + "】");}logging: pattern: dateformat: MM-dd HH:mm:ss:SSSspring: rabbitmq: host: # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码 listener: direct: prefetch: 1 # 消息预期,之前把所有消息预期导致平分消息,现在是处理一个取一个2.3广播Fanout多了一个交换机,交换机决定将生产者的消息发送给哪个队列,这决定就是交换机种类不同规则不同,因此分为三种,广播、路由和主题。
广播:只要队列绑定了交换机,就发送消息到队列
1.编写消费者:使用注解
上一篇:Qt4.7.4下单独编译QtWebkit(qt 5.15 编译)
下一篇:js中Promise的状态探究(js中promise的三种状态)
友情链接: 武汉网站建设