一丶介绍
rabbitMQJar是对rabbitmq的一种jar包封装,使其能简单的被java程序调用。在spring的rabbitTemplate基础上进一步简化使用。此项目参考了https://github.com/littlersmall/rabbitmq-access项目的多线程消费者与消息重发机制。也非常感谢作者的开源支持。
二丶封装功能支持
- 支持发送字符串类型消息与java对象消息。
- 支持多线程消费者。(固定线程池)
- 支持消息重发,保证消息可靠到达。
三丶缺点
- 消息重发机制缺陷:
因发送消息至MQ后,需要MQ返回ACK确认发送成功,但在发送ACK返回前突然断网,则不知道是否发送成功。一定时间未收到ACK,则认定发送失败重发,而或许已经发送成功,只是还未有ACK确认。因此可能会造成消息多发。经过rabbitmq-access作者测试发送300w条数据,发送结束后,实际发送数据301.2w条。详细性能见:https://www.jianshu.com/p/4112d78a8753
四丶rabbitMQJar API
- Rabbit:
Rabbit对象里封装了设置连接,创建Exchange丶queue,接收消息功能。
- SenderInter:
发送消息接口。
- SenderBuilder:
发送消息对象创建者。可以创建一个发送消息对象。
- MessageProcessInter:
消息处理接口,用于接收到消息后给用户自己处理。
- 设置连接属性接口:
1
| public static void setConnection(String host,int port,String username,String password); --属于Rabbit类方法
|
- 启动连接接口:
1
| Rabbit rabbit = Rabbit.start();
|
- 创建Exchange:
1 2 3 4 5 6
| void createExchange(String exchange,String routingKey,String type,boolean durable,boolean autoDelete,Connection connection) type: direct丶fanout丶 topic durable: 是否持久化exchange,即重启是否保留这个exchange。 autoDelete: 没有消费者的时候,服务器是否删除exchange. Connection: rabbitTemplate的连接对象。 --属于Rabbit对象方法
|
- 创建队列并绑定exchange:
1 2 3 4 5 6
| void createQueueAndBindExchange(String queue,boolean durable,boolean exclusive,boolean autoDelete,Connection connection); durable: 在服务器重启时,队列是否不删除. exclusive: 是否为当前连接的专用队列。(专用则在连接断开后,会自动删除该队列) autoDelete: 当没有任何消费者使用时,自动删除该队列。 Connection: rabbitTemplate的连接对象。 --属于Rabbit对象方法
|
- 创建发送者:
1 2 3
| SenderInter buildMessageSender(String exchange,String routingKey,String queue,Rabbit rabbit); 其中queue参数可为空串,表示只往exchange发消息,不直接发送到队列,而是y与exchange绑定的队列可以接收到消息。 --属于SenderBuilder类方法
|
- 发送消息:
1 2 3
| void send(String message); void send(Object message); --属于SenderInter接口方法
|
- 接收消息:
1 2 3 4
| void threadPoolConsume(int threadCount,int interval,String queue,MessageProcessInter messageProcess); threadCount: 消费者线程数量。 interval: 每个线程消费消息的间隔时间。 messageProcess: 消息处理对象。(用户自定义对象继承MessageProcessInter接口)。
|
五丶例子
- 导入jar包:rabbitMQ.jar。
- 设置并启动连接:
1 2
| Rabbit.setConnection("127.0.0.1", 5672, "username", "password"); Rabbit rabbit = Rabbit.start();
|
- 创建Exchange:
1
| rabbit.createExchange("exchange", "routingKey","type",true,false,rabbit.getConnection());
|
- 创建Queue:
1
| rabbit.createQueueAndBindExchange("queue",true,false,false,rabbit.getConnection());
|
- 创建消息发送者:
1 2 3 4 5
| SenderInter sender=SenderBuilder.buildMessageSender("exchange","",rabbit);
sender.send("我是rabbit");
sender.send(new ConsumeFlag(true,"i am rabbit"));
|
- 创建消息处理类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.pk.rabbit; import java.io.UnsupportedEncodingException; import org.springframework.amqp.core.Message; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import com.pk.rabbit.common.SuccessFlag; import com.pk.rabbit.handle.MessageProcessInter;
public class MyProcess implements MessageProcessInter {
@Override public SuccessFlag process(Message message) { MessageConverter messageConverter = new Jackson2JsonMessageConverter(); try { System.out.println(new String(message.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return new SuccessFlag(true, ""); } }
|
- 创建消息处理对象:
1
| MyProcess mp=new MyProcess();
|
- 开启线程池消费者:
1
| rabbit.threadPoolConsume(2,1,"queue",mp);
|
github: https://github.com/spcBackToLife/rabbitJar
原来是这样登录的!