rabbitmq安装无法进页面(RabbitMQ安装与使用)
rabbitmq安装无法进页面(RabbitMQ安装与使用)RABBITMQ_DEFAULT_USER=user // 用户名 RABBITMQ_DEFAULT_PASS=password // 密码rabbitMQ使用引入依赖$ docker pull rabbitmq:3-management $ docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 rabbitmq:3-management也可以通过添加环境变量修改登录的用户名和密码rabbitmq的message model实际上消息不直接发送到queue中,中间有一个exchange是做消息分发,producer甚至不知道消息发送到那个队列中去。因此,当exchange收到message时,必须准确知道该如何分发。是append到一定规则的queue,还是append到多个qu
简介RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。RabbitMQ使用的是AMQP协议,它是一种二进制协议。默认启动端口 5672。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
- 左侧的P 代表生产者
- 中间的是RabbitMQ,包括交换机和队列
- 右侧的C 代表消费者
那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。
- 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
- 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:*路由键 * 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
- 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
什么是交换机
rabbitmq的message model实际上消息不直接发送到queue中,中间有一个exchange是做消息分发,producer甚至不知道消息发送到那个队列中去。因此,当exchange收到message时,必须准确知道该如何分发。是append到一定规则的queue,还是append到多个queue中,还是被丢弃?这些规则都是通过exchange的4种type去定义的。
特点
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
- 可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
- 灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
- 消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
- 高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
- 多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
- 多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
- 管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
- 跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
- 插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
$ docker pull rabbitmq:3-management
$ docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 rabbitmq:3-management
也可以通过添加环境变量修改登录的用户名和密码
RABBITMQ_DEFAULT_USER=user // 用户名
RABBITMQ_DEFAULT_PASS=password // 密码
rabbitMQ使用
引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.3.0</version>
</dependency>
生产者
public class RabbitProducer {
private static final String EXCHANGE_NAME = "exchange_demo";
private static final String ROUTING_KEY = "routingkey_demo";
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "127.0.0.1";
private static final int PORT = 5672; // RabbitMQ服务端默认端口号为5672
public static void main(String[] args) throws IOException TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("username");
factory.setPassword("passwd");
Connection connection = factory.newConnection(); // 建立连接
Channel channel = connection.createChannel(); // 创建信道
// 创建一个type="direct"、持久化的、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME "direct" true false null);
// 创建一个持久化、非排他的、非自动删除的队列
channel.queueDeclare(QUEUE_NAME true false false null);
// 将交换器和队列通过路由绑定
channel.queueBind(QUEUE_NAME EXCHANGE_NAME ROUTING_KEY);
// 发送一条持久化的消息:hello world!
String message = "hello world!";
channel.basicPublish(EXCHANGE_NAME ROUTING_KEY
MessageProperties.PERSISTENT_TEXT_PLAIN
message.getBytes());
// 关闭资源
channel.close();
connection.close();
}
}
消费者
public class RabbitConsumer {
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "127.0.0.1";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException TimeoutException InterruptedException {
Address[] addresses = new Address[] {
new Address(IP_ADDRESS PORT)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("username");
factory.setPassword("passwd");
// 这里的连接方式与生产者的demo略有不同,注意区分
Connection connection = factory.newConnection(addresses); // 创建连接
final Channel channel = connection.createChannel(); // 创建信道
channel.basicQos(64); // 设置客户端最多接受未被ack的消息的个数
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag Envelope envelope
AMQP.BasicProperties properties byte[] body) throws IOException {
System.out.println("recv message: " new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag() false);
}
};
channel.basicConsume(QUEUE_NAME consumer);
// 等待回调函数执行完毕后,关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}
}