1、RabbitMQ介绍目录l 什么是MQl MQ有什么优势l 哪些情况下建议使用MQl 什么是RabbitMQl 选择RabbitMQ理由l RabbitMQ服务场景l RabbitMQ结构图l RabbitMQ名词解释目录l RabbitMQ客户端使用流程(productor/cunsumer)l Productor范例代码及注意事项l Consumer范例代码及注意事项l 开发中注意事项及重点关注异常处理l RabbitMQ服务端配置及重点参数l RabbitMQ与Spring整合范例代码 什么是MQ? MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方
2、法(消息传递),一般用作进程间通讯 MQ有什么优势? MQ本身是异步的,往队列里发送消息后无需等待,不同于通信协议。如HTTP协议(同步),客户端发出请求后必须等待服务器回应 哪些情况下建议使用MQ 高并发应用来不及处理,实时性要求不高 多应用之间异步通信,且耗时操作什么是RabbitMQ RabbitMQ是由Erlang(爱立信公司)语言开发,实现Advanced Message Queuing Protocol (AMQP高级消息队列协议)的消息中间件。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。选择RabbitMQ理由Reliability可靠性 E
3、xchange交换机、Queue队列、Message消息持久化、高可用性Flexible Routing 灵活路由Clustering 集群 分为Disc(硬盘)与RAM(内存),保证至少一台DiscHighly Available Queues高可用队列 与集群结合使用,设置队列间的消息同步Management UI管理界面异常情况下RabbitMQ处理方式(单机丢失/网络丢失/掉电/队列爆满) 单机丢失RabbitMQ支持集群,多台机器队列同步,丢失消息可从其他机器上获取 网络丢失 掉电RabbitMQ支持持久化,数据保存在硬盘上 队列爆满RabbitMQ支持流控机制,可修改内存大小,默认
4、为机器内存的40%RabbitMQ服务场景 应用程序之间无需即时返回且耗时操作(异步) Work Queues(消息均匀分配消息给消费者) Publish/Subscribe(广播模式,消息分发给所有的消费者) Routing(消费者接收消息由路由规则决定,简单路由名) Topics(消费者接收消息由路由规则决定,路由规则名比较复杂) RPC远程调用(同步)RabbitMQ结构图RabbitMQ名词解释 Broker:消息队列服务器实体,例如RabbitMQ服务 Vhost:虚拟主机,默认为“/”,一个broker里可以有多个vhost,区分不同用户权限,类似java的命令空间 Connect
5、ion:应用程序与broker连接,可有多个连接 Channel:消息通道,connection中可建立多个channel,每个channel代表一个会话任务,所有操作都在channel中进行。RabbitMQ名词解释 Exchange:消息交换机,channel中可有多个,用于投递消息。应用程序发送消息时先把消息给交换机,由交换机投递给队列,不是直接给队列 Queue:队列,用于存放消息 Message:消息,应用程序需要发送的数据 Bind:根据routingKey绑定exchange与queue规则,决定消息发送的方向RabbitMQ对象间关系broker可多个Connection可多个
6、Channel可多个Exchange可多个QueuemessageExchange主要3种类型 Fanout:不处理路由键(没有routingKey ),只需把队列绑定到交换机上。发送到交换机的消息都会转发到与该交换机绑定的所有队列上,类似于广播,转发消息是最快的Exchange主要3种类型 Direct:处理路由键(有routingKey )。将一队列绑定到交换机上,该消息需与一个特定的路由键( routingKey )完全匹配Exchange主要3种类型 Topic:与direct类似,功能更强,支持模糊绑定 *表示通配一个词 #表示通配0个或多个词RabbitMQ客户端使用流程(prod
7、uctor/cunsumer)Productor范例代码及注意事项 /以exchange为direct为例 package com.rabbitmq.test.ow.demo2; import java.io.IOException; import com.rabbitmq.client.BlockedListener; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbit
8、mq.client.MessageProperties; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.impl.AMQCommand; import com.rabbitmq.client.impl.AMQImpl; /* * Title: Producer.java * Description: 【生产者样例】 * author: zengqiang.yang * date: 2013-12
9、-25 * * 版权所有 (c) 2013,天翼电子商务有限公司 */ public class Producer /交换机命名规范:e_模块_其他private static final String EXCHANGE_NAME = e_tyb_test;/routingkey命名规范:r_模块_其他private static final String ROUTING_KEY = r_tyb_test;public static void main(String argv) throws Exception /注意:factory应为单例,不要每次取消息新建一次对象ConnectionFa
10、ctory factory = new ConnectionFactory();factory.setHost(192.168.225.190);factory.setPort(5672);/ 默认端口Productor范例代码及注意事项factory.setUsername(guest);/ 默认用户名factory.setPassword(guest);/ 默认密码factory.setVirtualHost(/);/ 默认虚拟主机,区分权限/ 设置心跳时间,防止长时间未活动被防火墙杀死,默认600秒,单位:秒 /factory.setRequestedHeartbeat(60*4); /
11、连接超时时间,单位:毫秒 /factory.setConnectionTimeout(1000*2);/注意:connection应为单例,不要每次取消息新建一次对象Connection connection = factory.newConnection();/监听connection关闭异常 connection.addShutdownListener(new ShutdownListener() Override public void shutdownCompleted(ShutdownSignalException cause) /connection异常 if (cause.isH
12、ardError() System.out.println(connection异常: + cause.getMessage() + ); /程序引起的异常,如:connection.close() if (cause.isInitiatedByApplication() System.out.println(connection关闭异常,重连.begin.); / connection = factory.newConnection(); System.out.println(connection关闭异常,重连.end.); else/rabbitmq服务引起的异常 AMQCommand a
13、mqCommand = (AMQCommand)cause.getReason(); if( amqCommand.getMethod() instanceof AMQImpl.Connection.Close ) AMQImpl.Connection.Close close = (AMQImpl.Connection.Close)amqCommand.getMethod(); if( 320=close.getReplyCode() )/rabbitmq服务器强制关闭 System.out.println(connection关闭异常,请检查rabbitmq服务器是否正常启动!); );Pr
14、oductor范例代码及注意事项 /监听connection阻塞异常 connection.addBlockedListener(new BlockedListener() Override public void handleUnblocked() throws IOException System.out.println(connection已解除阻塞!); Override public void handleBlocked(String reason) throws IOException System.out.println(connection阻塞原因:+reason+,请检查内存
15、是否够!); ); /注意:channel应为单例,不要每次取消息新建一次对象Channel channel = connection.createChannel();/监听channel关闭异常channel.addShutdownListener(new ShutdownListener() Override public void shutdownCompleted(ShutdownSignalException cause) /channel异常 if (!cause.isHardError() System.out.println(channel异常: + cause.getMess
16、age() + ); );Productor范例代码及注意事项/* * 创建交换机 * exchange交换机名 * type交换机类型fanout:广播模式,所有消费者都能收到生产者发送的消息,速度更快,不需设置routingkey * direct:只有与routingkey配置的消费者才能收到消息 * topic:与direct相同,只是支持模糊配置,类似正则表达式,功能更强, * *表示通配一个词,#表示通配0个或多个词(注意:是词,不是字母) * * durabledurable=true,交换机持久化,rabbitmq服务重启交换机依然存在,保证不丢失; * durable=fal
17、se,相反 * */channel.exchangeDeclare(EXCHANGE_NAME, direct,true);/模拟发送消息for (int i = 0; i 100; i+) String message = Hello World!; message += i; /* * exchange交换机名, 为默认交换机,direct类型 * routingKeyexchange为direct、topic类型时指定routingKey,exchange为fanout类型时指定queueName队列名 * props MessageProperties.PERSISTENT_TEXT_
18、PLAIN:消息持久化,rabbitmq服务重启消息不会丢失;null:非持久化 * body发送消息 */ channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(); System.out.println( x Sent + message + ); /关闭连接 / channel.close(); / connection.close(); Productor范例代码及注意事项 ConnectionFactory、Connectio
19、n、Channel注意单例控制 默认心跳时间为10分钟,发现个人账户环境防火墙为5分钟,有可能被防火墙杀死,建议设置4分钟connectionFactory.setRequestedHeartbeat(4*60); 添加Shutdown、Blocked异常监听,Shutdown后重连机制、Blocked后的日志输出Consumer范例代码及注意事项 /以exchange为direct为例 package com.rabbitmq.test.ow.demo2; import java.io.IOException; import java.util.HashMap; import java.ut
20、il.Map; import com.rabbitmq.client.BlockedListener; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSi
21、gnalException; import com.rabbitmq.client.impl.AMQCommand; import com.rabbitmq.client.impl.AMQImpl; /* * Title: Consumer.java * Description: 【消费者样例】 * author: zengqiang.yang * date: 2013-12-25 * * 版权所有 (c) 2013,天翼电子商务有限公司 */Consumer范例代码及注意事项 public class Consumer / 交换机命名规范:e_模块_其他private static fina
22、l String EXCHANGE_NAME = e_tyb_test;/ 队列命名规范:q_模块_其他private static final String QUEUE_NAME = q_tyb_test;/ routingkey命名规范:r_模块_其他private static final String ROUTING_KEY = r_tyb_test;public static void main(String argv) throws Exception / 注意:factory应为单例,不要每次取消息新建一次对象 ConnectionFactory factory = new Co
23、nnectionFactory(); factory.setHost(192.168.225.190); factory.setPort(5672);/ 默认端口 factory.setUsername(guest);/ 默认用户名 factory.setPassword(guest);/ 默认密码 factory.setVirtualHost(/);/ 默认虚拟主机,区分权限 / 设置心跳时间,防止长时间未活动被防火墙杀死,默认600秒,单位:秒 / factory.setRequestedHeartbeat(60*4); Consumer范例代码及注意事项/连接超时时间,单位:毫秒 /fa
24、ctory.setConnectionTimeout(1000*2);/ 注意:connection应为单例,不要每次取消息新建一次对象Connection connection = factory.newConnection();/监听connection关闭异常 connection.addShutdownListener(new ShutdownListener() Override public void shutdownCompleted(ShutdownSignalException cause) /connection异常 if (cause.isHardError() Syst
25、em.out.println(connection异常: + cause.getMessage() + ); /程序引起的异常,如:connection.close() if (cause.isInitiatedByApplication() System.out.println(connection关闭异常,重连.begin.); / connection = factory.newConnection(); System.out.println(connection关闭异常,重连.end.); Consumer范例代码及注意事项else/rabbitmq服务引起的异常服务引起的异常 AMQ
26、Command amqCommand = (AMQCommand)cause.getReason(); if( amqCommand.getMethod() instanceof AMQImpl.Connection.Close ) AMQImpl.Connection.Close close = (AMQImpl.Connection.Close)amqCommand.getMethod(); if( 320=close.getReplyCode() )/rabbitmq服务器强制关闭服务器强制关闭 System.out.println(connection关闭异常,请检查rabbitmq服
27、务器是否正常启动!); ); /监听connection阻塞异常 connection.addBlockedListener(new BlockedListener() Override public void handleUnblocked() throws IOException System.out.println(connection已解除阻塞!); Override public void handleBlocked(String reason) throws IOException System.out.println(connection阻塞原因:+reason+,请检查内存是否
28、够!); );Consumer范例代码及注意事项/ 注意:channel应为单例,不要每次取消息新建一次对象Channel channel = connection.createChannel();/监听channel关闭异常channel.addShutdownListener(new ShutdownListener() Override public void shutdownCompleted(ShutdownSignalException cause) /channel异常 if (!cause.isHardError() System.out.println(channel异常:
29、+ cause.getMessage() + ); );/ 同一时间一个消费者只能接收一条消息,web管理界面队列Unacked数值,如多个队列数值累加channel.basicQos(1);Consumer范例代码及注意事项 /* * 创建交换机 * exchange交换机名 * type交换机类型fanout:广播模式,所有消费者都能收到生产者发送的消息,速度更快,不需设置routingkey * direct:只有与routingkey配置的消费者才能收到消息 * topic:与direct相同,只是支持模糊配置,类似正则表达式,功能更强, * *表示通配一个词,#表示通配0个或多个词(
30、注意:是词,不是字母) * * durabledurable=true,交换机持久化,rabbitmq服务重启交换机依然存在,保证不丢失; * durable=false,相反 * */ channel.exchangeDeclare(EXCHANGE_NAME, direct,true); /* * 创建队列 * queue 队列名 * durable持久化队列,true:服务器重启队列不会丢失;false:反之 * exclusive 排他性,true:首次申明的connection连接下可见; * false:所有connection连接下都可见 * connection关闭后队列自动删
31、除,忽略队列持久化 * autoDeletetrue:无消费者时,队列自动删除;false:无消费者时,队列不会自动删除 * arguments可指定队列里消息总数 */ Map args = new HashMap(); / args.put(x-max-length, 100);/设置队列里的最大消息数 / args.put(x-message-ttl, 1000*10);/设置消息过期时间,时间一过消息自动删除,单位:毫秒 / channel.queueDeclare(QUEUE_NAME, true, false, false, args); channel.queueDeclare(
32、QUEUE_NAME, true, false, false, null);Consumer范例代码及注意事项channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);System.out.println( * Waiting for messages. To exit press CTRL+C);/ 声明一个消费者QueueingConsumer consumer = new QueueingConsumer(channel); /* * queue队列名 * autoAcktrue:消息从队列删除,不管是否正确处理;false:消息
33、不从队列删除,需要ack响应 * callback消费者 */ channel.basicConsume(QUEUE_NAME, false, consumer); while (true) / 循环获取消息 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody(); System.out.println( x Received + message + ); doWork(message); System.out.println( x
34、Done); / 确认,消息从队列中删除 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); / 拒绝,消息不从队列删除/ channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true); private static void doWork(String task) throws InterruptedException for (char ch : task.toCharArray() if (ch = ) Thread.sleep(1000)
35、; Consumer范例代码及注意事项 ConnectionFactory、Connection、Channel注意单例控制 默认心跳时间为10分钟,发现个人账户环境防火墙为5分钟,有可能被防火墙杀死,建议设置4分钟connectionFactory.setRequestedHeartbeat(4*60); 添加Shutdown、Blocked异常监听,Shutdown后重连机制、Blocked后的日志输出 对于比较重要的消息,消费者启动应在生产者之前,避免生产者发送消息时消费者未启动消息丢失开发中注意事项及重点关注异常处理ConnectionFactory、 Connection、 Chan
36、nel切勿每发送一条消息创建一次,因用单例控制Connection默认心跳/默认600秒(10分钟) connectionFactory. setRequestedHeartbeat(240);Exchange持久化/durable = true持久化, durable = false非持久化channel.exchangeDeclare(EXCHANGE_NAME, “direct”,true);开发中注意事项及重点关注异常处理Queue持久化/durable = true持久化, durable = false非持久化channel.queueDeclare(QUEUE_NAME, tru
37、e, false, false, null);Message持久化/props=MessageProperties.PERSISTENT_TEXT_PLAIN消息持久化channel.basicPublish(, QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes();开发中注意事项及重点关注异常处理Queue队列排他性/ exclusive = true首次申明的connection连接下可见, / exclusive = false所有connection连接下都可见channel.queueDecla
38、re(QUEUE_NAME, true, true, false, null);Queue队列里消息数大小设置Map args = new HashMap(); args.put(x-max-length, 50); /设置队列里的最大消息数 channel.queueDeclare(QUEUE_NAME, true, false, false, args);开发中注意事项及重点关注异常处理Queue队列里消息有效性Map args = new HashMap();/设置消息过期时间,时间一过消息自动删除,单位:毫秒args.put(x-message-ttl, 1000*10);channe
39、l.queueDeclare(QUEUE_NAME, true, false, false, args);开发中注意事项及重点关注异常处理 关闭异常ShutdownSignalException 通过Connection. addShutdownListener监听,可分为程序引起(如:connection.close())和服务器引起两大异常,程序引起建议捕获异常后有重连机制 阻塞异常 通过connection.addBlockedListener监听,当使用内存接近最大内存时,消息会阻塞( handleBlocked方法 捕获),可通过调整内存大小解除阻塞(handleUnblocked方
40、法捕获)RabbitMQ服务端配置及重点参数 添加用户并指定密码 ./rabbitmqctl add_user 用户名 密码 设置用户权限 ./rabbitmqctl set_user_tags 用户名 权限组 权限组4类ManagementPolicymakerMonitoringAdministrator Management 查看rabbitmq服务总消息数、当前与多少机器相连(connection)、建了多少通道(channel)、建立了多少队列数等 Policymaker 比management多了一个Admin选项卡中的policies标签,可添加镜像队列,满足集群情况下队列的同步
41、 Monitoring 比management多了Overview选项卡中的Nodes、Ports and contexts标签,可查看当前打开socket连接数、rabbitmq内存、硬盘使用情况;amqp端口(默认5672)及Web management 管理界面端口(默认15672,55672是rabbitmq 3.0之前版本端口号) Administrator 最高权限,用户、虚拟主机、策略等都可设置RabbitMQ服务端配置及重点参数 添加vhost ./rabbitmqctl add_vhost vhost名 设置vhost权限 ./rabbitmqctl set_permissi
42、ons -p vhost名 用户名 .* .* .*“ Vhost下的所有资源都有读写权限RabbitMQ服务端配置及重点参数 添加镜像队列策略 ./rabbitmqctl -n 节点名 set_policy -p vhost名 策略名 q_tyb_ ha-mode:all,ha-sync-mode:automatic q_tyb_ 设置队列名规则 ha-mode:all 所有机器都接收(集群环境下) ha-sync-mode:automatic 某一台机器停止重启后,自动同步队列里的消息RabbitMQ服务端配置及重点参数 最大打开文件数及socket连接数 如ConnectionFacto
43、ry、Connection、Channel未控制单例,每发送一条消息就创建一次连接,file、socket数也随之增加,file与socket总数比例大约80%RabbitMQ服务端配置及重点参数 rabbitmq内存、硬盘使用空间 rabbitmq最大内存默认为机器内存的40% rabbitmq磁盘空间默认为rabbitmq数据所在分区的可用空闲空间RabbitMQ服务端配置及重点参数 Exchange交换机的持久化 交换机持久化,rabbitmq服务重启exchange依然存在,数据保存在硬盘上RabbitMQ服务端配置及重点参数 Queue队列的持久化 队列持久化,rabbitmq服务重
44、启queue依然存在,数据保存在硬盘上RabbitMQ服务端配置及重点参数 队列最大消息数(可选)当队列里消息数到达最大值时,后面消息会替换原先的消息,队列里消息是先进先出原则代码Map args = new HashMap();args.put(x-max-length, 50); /设置队列里的最大消息数channel.queueDeclare(QUEUE_NAME, true, false, false, args); 示意图RabbitMQ服务端配置及重点参数消息有效性(可选) 消息进入队列过了指定时间(单位:毫秒),自动删除消息 代码Map args = new HashMap();
45、/设置消息过期时间,时间一过消息自动删除,单位:毫秒args.put(x-message-ttl, 1000*10);channel.queueDeclare(QUEUE_NAME, true, false, false, args); 示意图RabbitMQ服务端配置及重点参数 多机集群队列同步 如果一台RabbitMQ服务挂了,其他机器上会有一份副本。RabbitMQ服务端配置及重点参数 队列命名规范,如:q_模块_其他 影响队列同步,多机集群间队列的同步需要使用policy策略来保证RabbitMQ与Spring整合范例代码 生产者applicationContext-rabbitmq-
46、producer.xml配置文件 RabbitMQ与Spring整合范例代码 RabbitMQ与Spring整合范例代码 /生产者 package com.rabbitmq.test.ow.spring; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.; /* * Title: Producer.java * Description: 【rabb
47、itmq与spring整合-生产者】 * author: zengqiang.yang * date: 2013-12-30 * * 版权所有 (c) 2013,天翼电子商务有限公司 */ public class Producer public static void main(String args) ApplicationContext context = new (src/main/resources/applicationContext- rabbitmq-producer.xml);RabbitMQ与Spring整合范例代码 AmqpTemplate amqpTemplate =
48、(AmqpTemplate)context.getBean(amqpTemplate); /模拟发送消息 for (int i = 0; i 100; i+) String message = Hello World!; message += i; /* * r_tyb_test2 routingkey * message 发送消息 */ amqpTemplate.convertAndSend(r_tyb_test2,message); System.out.println(Sent:+message); RabbitMQ与Spring整合范例代码 消费者applicationContext-
49、rabbitmq-consumer.xml配置文件 RabbitMQ与Spring整合范例代码 RabbitMQ与Spring整合范例代码 /消费者 package com.rabbitmq.test.ow.spring; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.context.ApplicationContext; import org.springframework.contex
50、t.support.; /* * Title: ConsumerLitener.java * Description: 【rabbitmq与spring整合-消费者】 * author: zengqiang.yang * date: 2013-12-30 * * 版权所有 (c) 2013,天翼电子商务有限公司 */ public class ConsumerLitener implements MessageListener Override public void onMessage(Message arg0) RabbitMQ与Spring整合范例代码 String message =