快捷搜索:  汽车  科技

rabbitmq安装无法进页面(RabbitMQ安装与使用)

rabbitmq安装无法进页面(RabbitMQ安装与使用)RABBITMQ_DEFAULT_USER=user // 用户名 RABBITMQ_DEFAULT_PASS=password // 密码rabbitMQ使用引入依赖$ docker pull rabbitmq:3-management $ docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 rabbitmq:3-management也可以通过添加环境变量修改登录的用户名和密码rabbitmq的message model实际上消息不直接发送到queue中,中间有一个exchange是做消息分发,producer甚至不知道消息发送到那个队列中去。因此,当exchange收到message时,必须准确知道该如何分发。是append到一定规则的queue,还是append到多个qu

简介

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。RabbitMQ使用的是AMQP协议,它是一种二进制协议。默认启动端口 5672。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

rabbitmq安装无法进页面(RabbitMQ安装与使用)(1)

  • 左侧的P 代表生产者
  • 中间的是RabbitMQ,包括交换机和队列
  • 右侧的C 代表消费者

那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
  • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:*路由键 * 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
  • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
exchange交换机机制

什么是交换机

rabbitmq的message model实际上消息不直接发送到queue中,中间有一个exchange是做消息分发,producer甚至不知道消息发送到那个队列中去。因此,当exchange收到message时,必须准确知道该如何分发。是append到一定规则的queue,还是append到多个queue中,还是被丢弃?这些规则都是通过exchange的4种type去定义的。

特点

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  1. 可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
  2. 灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
  3. 消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
  4. 高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  5. 多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
  6. 多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
  7. 管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
  8. 跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
  9. 插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
安装

$ docker pull rabbitmq:3-management $ docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 rabbitmq:3-management

也可以通过添加环境变量修改登录的用户名和密码

RABBITMQ_DEFAULT_USER=user // 用户名 RABBITMQ_DEFAULT_PASS=password // 密码rabbitMQ使用

引入依赖

<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.3.0</version> </dependency>

生产者

public class RabbitProducer { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String ROUTING_KEY = "routingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "127.0.0.1"; private static final int PORT = 5672; // RabbitMQ服务端默认端口号为5672 public static void main(String[] args) throws IOException TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("username"); factory.setPassword("passwd"); Connection connection = factory.newConnection(); // 建立连接 Channel channel = connection.createChannel(); // 创建信道 // 创建一个type="direct"、持久化的、非自动删除的交换器 channel.exchangeDeclare(EXCHANGE_NAME "direct" true false null); // 创建一个持久化、非排他的、非自动删除的队列 channel.queueDeclare(QUEUE_NAME true false false null); // 将交换器和队列通过路由绑定 channel.queueBind(QUEUE_NAME EXCHANGE_NAME ROUTING_KEY); // 发送一条持久化的消息:hello world! String message = "hello world!"; channel.basicPublish(EXCHANGE_NAME ROUTING_KEY MessageProperties.PERSISTENT_TEXT_PLAIN message.getBytes()); // 关闭资源 channel.close(); connection.close(); } }

消费者

public class RabbitConsumer { private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "127.0.0.1"; private static final int PORT = 5672; public static void main(String[] args) throws IOException TimeoutException InterruptedException { Address[] addresses = new Address[] { new Address(IP_ADDRESS PORT) }; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("username"); factory.setPassword("passwd"); // 这里的连接方式与生产者的demo略有不同,注意区分 Connection connection = factory.newConnection(addresses); // 创建连接 final Channel channel = connection.createChannel(); // 创建信道 channel.basicQos(64); // 设置客户端最多接受未被ack的消息的个数 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag Envelope envelope AMQP.BasicProperties properties byte[] body) throws IOException { System.out.println("recv message: " new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() false); } }; channel.basicConsume(QUEUE_NAME consumer); // 等待回调函数执行完毕后,关闭资源 TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); } }

猜您喜欢: