一、简要先容

  • 开源AMQP实现,Erlang语言编写,支持多种客户端
  • 分布式、高可用、持久化、可靠、平安
  • 支持多种协议:AMQP、STOMP、MQTT、HTTP
  • 适用于多系统之间的营业解耦的新闻中间件

二、基本概念

1、exchange:交换器,卖力吸收新闻,转发新闻至绑定的行列,有四种类型:

  • direct:完全匹配的路由
  • topic:模式匹配的路由
  • fanout:广播模式
  • headers:键值对匹配路由

Exchange属性:

  • 持久化:若是启用,那么rabbit服务重启之后仍然存在
  • 自动删除:若是启用,那么交换器将会在其绑定的行列都被删除掉之后自动删除掉自身

2、Queue:行列,rabbitmq的内部工具,用于存储新闻,其属性类似于Exchange,同样可以设置是否持久化、自动删除等。

消费者从Queue中获取新闻并消费。多个消费者可以订阅同一个Queue,这时Queue中的新闻会被平均分摊给多个消费者举行处置,而不是每个消费者都收到所有的新闻并处置。

3、Binding:绑定,凭据路由规则绑定交换器与行列

4、Routing:路由键,路由的关键字

三、新闻的可靠性

  • Message acknowledgment:新闻确认,在新闻确认机制下,收到回执才会删除新闻,未收到回执而断开了毗邻,新闻会转发给其他消费者,若是遗忘回执,会导致新闻聚积,消费者重启后会重复消费这些新闻并重复执行营业逻辑。

  • Message durability:新闻持久化,设置新闻持久化可以制止绝大部分新闻丢失,好比rabbitmq服务重启,然则接纳非持久化可以提升行列的处置效率。若是要确保新闻的持久化,那么新闻对应的Exchange和Queue同样要设置为持久化。

  • Prefetch count,每次发送给消费者新闻的数目,默以为1

另外,若是需要可靠性营业,需要设置持久化和ack机制,若是系统高吞吐,可以设置为非持久化、noack、自动删除机制。

四、简朴应用

模拟这样一个营业场景,用户下单乐成后,需要给用户增添积分,同时还需要给用户发送下单乐成的新闻,这是在电商营业中很常见的一个营业场景。

若是系统是微服务架构,可能用户下单功能在订单服务,给用户增添积分的功能在积分服务,给用户发送通知新闻的功能在通知服务,各个服务之间解耦,互不影响。那么要实现上述的营业场景,新闻中间件rabbitmq是一个很好的选择。

缘故原由如下:

  • 高性能,它的实现语言是天生具备高并发高可用的erlang 语言
  • 支持新闻的持久化,纵然服务器挂了,也不会丢失新闻
  • 新闻应答(ack)机制,消费者消费完新闻后发送一个新闻应答,rabbitmq才会删除新闻,确保新闻的可靠性
  • 支持高可用集群
  • 天真的路由

实现思绪:

,

联博统计

www.9cx.net采用以太坊区块链高度哈希值作为统计数据,联博以太坊统计数据开源、公平、无任何作弊可能性。联博统计免费提供API接口,支持多语言接入。

,

用户下单乐成后,rabbitmq发送一条新闻至EXCHANGE.ORDER_CREATE交换器,该交换器绑定了两个行列,QUEUE.ORDER_INCREASESCOREQUEUE.ORDER_NOTIFY,消费者订阅这两个行列划分用来处置增添积分、发送用户通知。若是后续日志系统还需要纪录下单的相关日志,那么我们只需要再界说一个行列并将其绑定到EXCHANGE.ORDER_CREATE即可。

下单发rabbitmq新闻

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * @author: 会舞蹈的机器人
 * @date: 2017/10/13 10:46
 * @description: 模拟用户下单之后发送rabbitmq新闻
 */
public class OrderCreator {
    // 交换器名称
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 新闻内容
    private static String msg = "create order success";

    /**
     * 模拟建立订单后发送mq新闻
     */
    public void createOrder() {
        System.out.println("下单乐成,最先发送rabbitmq新闻");

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");

        Connection connection;
        Channel channel;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 持久化
            boolean durable = true;
            // topic类型
            String type = "topic";
            // 声明交换器,若是交换器不存在则建立之
            channel.exchangeDeclare(EXCHANGE, type, durable);

            String messgeId = UUID.randomUUID().toString();
            // deliveryMode>=2示意设置新闻持久化
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).messageId(messgeId).build();
            // 公布新闻
            String routingKey = "order_create";
            channel.basicPublish(EXCHANGE, routingKey, props, msg.getBytes("utf-8"));
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

积分系统订阅新闻

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: 会舞蹈的机器人
 * @date: 2017/10/13 16:02
 * @description: rabbitmq消费者,模拟下单乐成后给用户增添积分
 */
public class IncreaseScoreConsumer implements Consumer {
    private Connection connection;
    private Channel channel;
    // 交换器名称
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 增添积分行列名称
    private static final String QUEUENAME = "QUEUE.ORDER_INCREASESCORE";

    public void consume() {
        // 初始化rabbitmq毗邻信息
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 声明交换器
            channel.exchangeDeclare(EXCHANGE, "topic", true);
            // 声明行列
            channel.queueDeclare(QUEUENAME, true, false, false, null);
            // 交换器与行列绑定并设置routingKey
            channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
            // 消费新闻,callback是该类,关闭自动确认新闻,在完成营业逻辑后手动确认确认
            channel.basicConsume(QUEUENAME, false, this);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("《积分系统》收到订单新闻:" + msg + ",给用户增添积分......");
        // 手动确认新闻
        channel.basicAck(envelope.getDeliveryTag(), false);

        /**
         * channel.basicReject(envelope.getDeliveryTag(), false);该方式会抛弃掉行列中的这条新闻
         * channel.basicReject(envelope.getDeliveryTag(), true);该方式会把新闻重新放回行列
         * 一样平常系统会设定一个重试次数,若是跨越重试次数,则会抛弃新闻,反之则会把新闻再放入行列
         */
    }

    public void handleConsumeOk(String consumerTag) {

    }

    public void handleCancelOk(String consumerTag) {

    }

    public void handleCancel(String consumerTag) throws IOException {

    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    public void handleRecoverOk(String consumerTag) {

    }

}

通知系统订阅新闻

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: 会舞蹈的机器人
 * @date: 2017/10/13 16:20
 * @description: rabbitmq消费者,模拟下单乐成后给用户发送通知
 */
public class NotifyConsumer implements Consumer {
    private Connection connection;
    private Channel channel;

    // 交换器名称
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 通知用户下单乐成通知行列名称
    private static final String QUEUENAME = "QUEUE.ORDER_NOTIFY";

    public void consume() {
        // 初始化rabbitmq毗邻信息
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 声明交换器
            channel.exchangeDeclare(EXCHANGE, "topic", true);
            // 声明行列
            channel.queueDeclare(QUEUENAME, true, false, false, null);
            // 交换器与行列绑定并设置routingKey
            channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
            // 消费新闻,callback是该类,关闭自动确认新闻,在完成营业逻辑后手动确认确认
            channel.basicConsume(QUEUENAME, false, this);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("《通知系统》收到订单新闻:" + msg + ",最先给用户发送通知......");
        // 手动确认新闻
        channel.basicAck(envelope.getDeliveryTag(), false);

        /**
         * channel.basicReject(envelope.getDeliveryTag(), false);该方式会抛弃掉行列中的这条新闻
         * channel.basicReject(envelope.getDeliveryTag(), true);该方式会把新闻重新放回行列
         * 一样平常系统会设定一个重试次数,若是跨越重试次数,则会抛弃新闻,反之则会把新闻再放入行列
         */
    }

    public void handleConsumeOk(String consumerTag) {

    }

    public void handleCancelOk(String consumerTag) {

    }

    public void handleCancel(String consumerTag) throws IOException {

    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    public void handleRecoverOk(String consumerTag) {

    }
}

测试

package com.robot.rabbitmq;

/**
 * @author: 会舞蹈的机器人
 * @date: 2017/10/13 16:27
 * @description:
 */
public class Test {
    public static void main(String[] args) {

        IncreaseScoreConsumer increaseScoreConsumer = new IncreaseScoreConsumer();
        increaseScoreConsumer.consume();

        NotifyConsumer notifyConsumer = new NotifyConsumer();
        notifyConsumer.consume();

        OrderCreator orderCreator = new OrderCreator();
        for (int i = 0; i < 3; i++) {
            orderCreator.createOrder();
        }
    }
}

输出:

下单乐成,最先发送rabbitmq新闻
《积分系统》收到订单新闻:create order success,给用户增添积分......
《通知系统》收到订单新闻:create order success,最先给用户发送通知......
下单乐成,最先发送rabbitmq新闻
《积分系统》收到订单新闻:create order success,给用户增添积分......
《通知系统》收到订单新闻:create order success,最先给用户发送通知......
下单乐成,最先发送rabbitmq新闻
《积分系统》收到订单新闻:create order success,给用户增添积分......
《通知系统》收到订单新闻:create order success,最先给用户发送通知......

原文转载:https://www.jianshu.com/p/2f55cd7a3e1c
作者:会舞蹈的机器人