由于RabbitMQ是用Erlang语言编写的,必须要先安装Erlang。安装成功以后,会提供默认的VHost、Exchange。
具体操作RabbitMQ可以通过它自带的控制台,在浏览器打开控制台http://43.105.136.120:15672,登录的默认账号密码都是guest
可以在控制台实现对MQ的监控,以及交换机、队列的创建等工作
1.引入依赖
创建Maven工程,pom.xml引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
2.Producer
public class MyProducer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";
public static void main(String[] args) throws Exception {
// 工厂模式
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("43.105.136.120"); // 连接IP
factory.setPort(5672); // 端口
factory.setVirtualHost("/"); // Vhost
factory.setUsername("guest"); // 用户名
factory.setPassword("guest"); // 密码
Connection connection = factory.newConnection(); // 获取连接
Channel channel = connection.createChannel(); // 获取消息通道
String msg = "Hello world, RabbitMQ"; // 消息
/**
* 发送消息 basicPublish
* String exchange, 交换机
* String routingKey, 路由键
* BasicProperties props, 消息属性(需要单独声明)
* byte[] body,消息体
*/
channel.basicPublish(EXCHANGE_NAME, "my.best", null, msg.getBytes());
channel.close();
connection.close();
}
}
关于消息属性(BasicProperties),它有如下属性:
| 属性名 | 用处 |
|---|---|
| contentType | 消息体的MIME类型,如application/json |
| contentEncoding | 消息的编码类型,如是否压缩 |
| messageId | 消息的唯一性标识,由应用进行设置 |
| correlationId | 一般用作关联消息的message-id,常用于消息的响应 |
| timestamp | 消息的创建时刻,整型,精确到秒 |
| expiration | 消息的过期时刻,字符串,但是呈现格式为整型,精确到秒 |
| deliveryMode | 消息的持久化类型 ,1为非持久化,2为持久化,性能影响巨大 |
| appId | 应用程序的类型和版本号 |
| userId | 标识已登录用户,极少使用 |
| type | 消息类型名称,完全由应用决定如何使用该字段 |
| replyTo | 构建回复消息的私有响应队列 |
| headers | 键/值对表,用户自定义任意的键和值 |
| priority | 指定队列中消息的优先级 |
3.Consumer
public class MyConsumer {
// 交换机
private static final String EXCHANGE_NAME = "SIMPLE_EXCHANGE";
// 消息队列
private static final String QUEUE_NAME = "SIMPLE_QUEUE";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("43.105.136.120");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 创建消费者,并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override // 回调函数
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties,byte[] body) throws IOException {
// 接收到的消息是字节数组形式,需要转成字符串
String msg = new String(body, "UTF-8");
System.out.println("Received message : '" + msg + "'");
if (msg.contains("拒收")){
// 拒绝消息
// requeue:是否重新入队列,true:是;false:直接丢弃,相当于告诉队列可以直接删除掉
// TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
channel.basicReject(envelope.getDeliveryTag(), false);
} else if (msg.contains("异常")){
// 批量拒绝
// requeue:是否重新入队列
// TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
channel.basicNack(envelope.getDeliveryTag(), true, false);
} else {
// 手工应答
// 如果不应答,队列中的消息会一直存在,重新连接的时候会重复消费
channel.basicAck(envelope.getDeliveryTag(), true);
}
}
};
/**
* 开始获取消息,push模式
* String queue,
* boolean autoAck, 上面开启了手动应答basicAck,所以这里是false;当没basicAck一般为true
* Consumer callback
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
4.声明交换机、队列
声明队列,交换机,及它们的绑定关系,这些代码既可以写在Producer,也可以写在Consumer,也可以直接在RabbitMQ的控制台完成。但必须保证在Producer向交换机发消息,Consumer在队列取消息前完成创建工作。
/**
* 声明交换机
* String exchange, 交换机
* String type, 交换机类型(direct,topic,fanout)
* boolean durable, 是否持久化。代表交换机在服务器重启后是否还存在
* boolean autoDelete, 自动删除
* Map<String,Object> arguments,交换机其他属性,如x-message-ttl,x-max-length
*/
channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
/**
* 声明队列
* String queue, 队列
* boolean durable, 是否持久化。代表队列在服务器重启后是否还存在
* boolean exclusive, 是否排他性队列。排他性队列只能在声明它的Connection中使用(可以在同一个Connection的不同的channel中使用),连接断开时自动删除
* boolean autoDelete, 是否自动删除队列。如果为true,至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,队列会自动删除。
* Map<String,Object> arguments, 队列其他属性
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将队列绑定到交换机上,绑定的依据是路由键
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "my.best");
上面的 Map<String, Object> arguments:队列的其他属性,例如
| 参数名 | 目的 |
|---|---|
| x-dead-letter-exchange | 死信交换器 |
| x-dead-letter-routing-key | 死信消息的可选路由键 |
| x-expires | 队列在多久没有消费者访问以后会被删除 |
| x-max-length | 队列的最大消息数 |
| x-max-length-bytes | 队列的最大容量,单位 Byte |
| x-message-ttl | 毫秒为单位的消息过期时间,队列级别 |
| x-max-priority | 队列中消息的最大优先级,消息的优先级不能超过它 |
本文标题:【RabbitMQ】基本使用:Java操作RabbitMQ(amqp-client)
本文链接:https://blog.quwenai.cn/post/9993.html
版权声明:本文不使用任何协议授权,您可以任何形式自由转载或使用。








还没有评论,来说两句吧...