MENU

RabbitMQ入门教程

• June 1, 2020 • Read: 86 • Note,分布式系统

MQ?

MQ(Message Quene):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为消息中间件通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

主要用途:不同进程Process/线程Thread之间通信。

RabbitMQ

你可以注意到一句非常“狂”的话,RabbitMQ is the most widely deployed open source message broker.

确实是这样哈,目前市面上用的最多的就是RabbitMQ。

RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office and a postman.

The major difference between RabbitMQ and the post office is that it doesn't deal with paper, instead it accepts, stores and forwards binary blobs of data ‒ messages.

RabbitMQ是一个消息代理(也称中间件):它接受和转发消息。你可以把它想象成邮局:当你把要邮寄的邮件放在邮筒里时,你可以确定送信先生或女士最终会将邮件发送给你的收件人。在这个类比中,RabbitMQ是一个邮政信箱,一个邮局和一个邮递员。

区别在于:RabbitMQ不会帮你处理里面的内容(官方原话的纸张是为了让读者更好的理解),而是帮你接受,存储和转发。

安装RabbitMQ

这里我推荐大家开启虚拟机,然后使用docker来安装RabbitMQ,不要用Windows版本。

docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

启动和停止

docker start rabbitmq
docker stop rabbitmq 

这里为什么有两个端口呢

15672:WEB界面的端口,启动RabbitMQ后,使用ip+15672就可以访问了。

5672:通信端口(比如使用JAVA连接肯定是使用这个端口啦)

访问:你的ip:15672,如果你是本机,localhost:15672,如果你在虚拟机(Linux)不知道ip,输入命令ifconfig即可查看

默认的管理员用户密码都为guest

JAVA(Hello World)

In this part of the tutorial we'll write two programs in Java; a producer that sends a single message, and a consumer that receives messages and prints them out. We'll gloss over some of the detail in the Java API, concentrating on this very simple thing just to get started. It's a "Hello World" of messaging.

In the diagram below, "P" is our producer and "C" is our consumer. The box in the middle is a queue - a message buffer that RabbitMQ keeps on behalf of the consumer.

我们将用Java编写两个程序

发送单个消息的producer(生产者)和接收消息并打印出消息的consumer(消费者)。官方文档说会掩盖JavaAPI的一些细节,专注于这个非常简单的事情,以便开始。这是一个"Hello World"的消息。但我是一个细节的人,所以我会处理得比官方细节。

在上图中,"P"是我们的生产者,"C"是我们的消费者。中间的框是队列-RabbitMQ代表消费者保留的消息缓冲区。

设计生产者

我们将新建一个Maven项目

引入依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
</dependencies>

创建生产者类,name->producer,注意,factory.setHost填写的是你安装的IP地址

package com.xn2001;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author 乐心湖
 * @date 2020/5/31 0:26
 **/
public class producer {

    //命名队列
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建到服务器的工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.123.128");
        //创建连接
        Connection connection = factory.newConnection();
        //创建一个通道,这是大多数API用于完成工作的位置。
        Channel channel = connection.createChannel();
        //声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "乐心湖好帅";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
}

运行程序,可以看到这个消息队列已经发送了过来。

这个1代表有一个消息还没给消费者接收到。我们点进去hello,

设计消费者

新建一个类,name->consumer

package com.xn2001;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * @author 乐心湖
 * @date 2020/5/31 0:51
 **/
public class consumer{
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //由于它将异步推送我们的消息,这里我们以对象的形式提供回调,该对象将缓冲消息,直到我们使用它们。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [√] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

官方这里有一段话

Why don't we use a try-with-resource statement to automatically close the channel and the connection? By doing so we would simply make the program move on, close everything, and exit! This would be awkward because we want the process to stay alive while the consumer is listening asynchronously for messages to arrive.

这里我们为什么不尝试使用关闭通道和连接呢,如果这样做,这个程序就会运行一遍就过去了,不能处在活跃状态,那如何接受消息呢。

换句话说,我们必须让通道和连接保持活跃,这样就能时刻监听到消息。

我们启动consumer,就可以收到消息了。

我们将封装一个工具类,RabbitMQUtil

package com.xn2001.util;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author 乐心湖
 * @date 2020/5/31 13:01
 **/
public class RabbitMQUtil {

    private static ConnectionFactory connectionFactory;

    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.111.129");
    }

    //定义连接对象的方法
    public static Connection getConnection() {
        try {
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    //定义一个关闭通道和连接的方法
    public static void closeChannelAndConnection(Channel channel, Connection connection) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

把生产者和消费者重新写一下

package com.xn2001;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.xn2001.util.RabbitMQUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author 乐心湖
 * @date 2020/5/31 0:26
 **/
public class producer {

    //命名队列
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException {
        //创建到服务器的工厂
        //ConnectionFactory factory = new ConnectionFactory();
        //factory.setHost("192.168.111.129");

        //创建连接
        //Connection connection = factory.newConnection();
        //创建一个通道,这是大多数API用于完成工作的位置。
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "乐心湖好帅";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [√] Sent '" + message + "'");
        RabbitMQUtil.closeChannelAndConnection(channel,connection);
    }
}
package com.xn2001;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.xn2001.util.RabbitMQUtil;

/**
 * @author 乐心湖
 * @date 2020/5/31 0:51
 **/
public class comsumr {

    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        //ConnectionFactory factory = new ConnectionFactory();
        //factory.setHost("192.168.111.129");
        //Connection connection = factory.newConnection();
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [√] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

解释说明

package com.xn2001;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author 乐心湖
 * @date 2020/6/1 15:52
 **/
public class Test {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //设置ip
        factory.setHost("192.168.111.111");
        //若你的端口不是默认的5672,就需要设置,否则不写也可。
        factory.setPort(5672);
        //若用户不是使用guest,请往下看
        //设置连接到哪一个虚拟机
        factory.setVirtualHost("/ems");
        //设置访问该虚拟机的用户和密码
        factory.setUsername("ems");
        factory.setPassword("123456");
        //获取连接对象
        Connection connection = factory.newConnection();
        //创建通道连接
        Channel channel = connection.createChannel();
        /**
         * 通道绑定消息队列
         * @param queue 消息队列的名称,不存在时自动创建
         * @param durable 队列是否持久化,持久化后当重新启动rabbitmq时队列依旧存在
         * @param exclusive 是否独占队列,只允许一个在用,一般设置为false
         * @param autoDelete 消费完消息后自动删除这个消息队列
         * @param arguments
         */
        channel.queueDeclare("hello",false,false,false,null);
        /**
         * 发布消息
         * @param exchange 交换机名称
         * @param routingKey 消息队列名称
         * @param props 传递消息的额外设置,例如消息需要持久化,可以设置为MessageProperties.PERSISTENT_TEXT_PLAIN
         * @param body 消息具体内容
         */
        channel.basicPublish("","hello", null,"hello rabbitmq".getBytes());

        //关闭
        channel.close();
        connection.close();
    }
}

注意:上面我仅展示出了生产者这些常用的设置,消费者理论上几乎是一样的。

同时你需要关注的是,生产者和消费者的消息队列参数必须一致,这样才能匹配得上。


本站所有未注明转载的文章均为原创,并采用CC BY-NV-SA 4.0 授权协议,转载请注明来源。

Last Modified: June 2, 2020
Archives QR Code
QR Code for this page
Tipping QR Code
Leave a Comment