RabbitMQ 快速入门/工作模式

RabbitMQ 工作模式

官网目前有7个https://www.rabbitmq.com/getstarted.html

暂时先不使用springboot的注解使用 先通过官网的Spring案例来讲

"Hello World!"

The simplest thing that does something 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。

10

模式说明

一个P 一个C。

应用场景

入门练习的适合用把

代码编写

消费者端

// 先创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();

// 通过set 一些配置信息
factory.setHost(<ip>);
factory.setPort(5672);
factory.setUsername(<rabbitmq-username>);
factory.setPassword(<rabbitmq-password>);
factory.setVirTualHost(<VirTualHost>);


// 以上可以单独的封装一个工具类完完成rabbitmq的链接目的是完成TCP的长连接
// 直接查看下一个的工具类


// create chanel
Channel channel = conn.createChannel();
//create and define Queue  
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare("<queue_name>",false,false,false,null);

// get date from rabbitmq server
//创建一个消息消费者
//第一个参数:队列名
//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
//第三个参数要传入DefaultConsumer的实现类
channel.queueDeclare("<queue_name>",false,new Reciver(channel));




class  Reciver extends DefaultConsumer {

    private Channel channel;
    
    //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
    public Reciver(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

         String message = new String(body);
         System.out.println("消费者接收到的消息:"+message);

         System.out.println("消息的TagId:"+envelope.getDeliveryTag());
         
        //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

rabbitmq连接工具类

private static ConnectionFactory connectionFactory = new ConnectionFactory();
static {
    connectionFactory.setHost("<ip>");
    connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
    connectionFactory.setUsername("<rabbitmq-username>");
    connectionFactory.setPassword("rabbitmq-password");
    connectionFactory.setVirtualHost("/VirTualHost");
}
public static Connection getConnection(){
    Connection conn = null;
    try {
        conn = connectionFactory.newConnection();
        return conn;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

生产者端口

//获取TCP长连接
Connection conn = RabbitUtils.getConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = conn.createChannel();

//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare(<queue_name>,false, false, false, null);

String message = "要发送的消息内容";
//四个参数
//exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到
//队列名称
//额外的设置属性
//最后一个参数是要传递的消息字节数组
channel.basicPublish("", <queue_name>, null,message.getBytes());
channel.close();
conn.close();
System.out.println("===发送成功===");

Work queues

和基础模式相比多了多个消费者

Distributing tasks among workers (the competing consumers pattern) 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。

11

模式说明

与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系

应用场景

对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

比如 订单系统 需要进行下游系统的某些验证问题(短信验证码 短信服务部署多个,只需要有一个节点成功发送即可)

image-20220507220609098

代码编写

短信用户的实体类

@Data
@JsonInclude(JsonInclude.Include.NON_NULL)
@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public class SMS{
    private String name;
    private String mobile;
    private String content;
}

消息的生产者

public class WorkQueueProducer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // get connection
        Connection connection = RabbitUtils.getConnection();
        // declare channel
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(<queue_name>, false, false, false, null);
        //具体上游业务的信息生产处理
        for(int i = 1 ; i <= 100 ; i++) {
            SMS sms = SMS.builder()
                        .name("cus"+i)
                        .mobile(getPhoneNumber())
                        .content("success booking")
                        .build();
            String jsonSMS = new Gson().toJson(sms);
            channel.basicPublish("" , <queue_name> , null , jsonSMS.getBytes());
        }
        System.out.println("发送数据成功");
        //关闭管道和连接
        channel.close();
        connection.close();
    }
    
    public static int getNum(int start,int end){
         return (int)(Math.random()*(end-start+1)+start)       
    }
    
    public static String getPhoneNumber(){
        String[] telFirst="123,131,136,135,167".split(",");
        int index=getNum(0,telFirst.length-1);
        String first=telFirst[index];
        String second = String.valueOf(getNum(1,888)+10000).substring(1);
        String third = String.valueOf(getNum(1,9100)+10000).substring(1);
        return first+second+third;
    }
    
}

消息的消费者 其中1个消费者 可以有多个

public class WorkQueueConsumer1 {

    public static void main(String[] args) throws IOException {

        // 获取连接和声明管道
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        //队列的声明
        channel.queueDeclare(<queue_name>, false, false, false, null);

        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个
        //监听队列 手动完成返回状态
        channel.basicConsume(<queue_name> , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                //获取信息后可以做具体业务逻辑
                System.out.println("SMSSender1-短信发送成功:" + jsonSMS);

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

Publish/Subscribe

和队列模式相比 多出类一个交换机 消息会先发送到交换机里面

Sending messages to many consumers at once 需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。

13

模式说明

1.在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接收者,会一直等待消息到来
  • Queue:消息队列,接收消息、缓存消息
  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

2.交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

3.发布订阅模式与工作队列模式的区别:

  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机

应用场景

数据提供商和供应商 【天气】/同一个公司内部 跨部门的系统之间的调用

代码编写

发布订阅模式的消费者

public class PubsubProduce {
    //自动try
    @SneakyThrows
    public static void main(String[] args) throws Exception {
        
        Connection connection = RabbitUtils.getConnection();
        String input = new Scanner(System.in).next();
        Channel channel = connection.createChannel();
    
        //第一个参数交换机名字   其他参数和之前的一样
        channel.basicPublish(<exchange_name>,"" , null , input.getBytes());
    
        channel.close();
        connection.close();
}
}

发布订阅模式的生产者

public class PubSubConsumer {

    public static void main(String[] args) throws IOException {
        //获取TCP长连接
        Connection connection = RabbitUtils.getConnection();
        //获取虚拟连接
        final Channel channel = connection.createChannel();
        //声明队列信息的声明
        channel.queueDeclare(<queue_name>, false, false, false, null);

        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数三:路由key(暂时用不到)
        channel.queueBind(<queue_name>,<exchange_name>, "");
        channel.basicQos(1);
        
        channel.basicConsume(<queue_name>, false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 业务
                
                System.out.println("[PubSubConsumer1]:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }

}

Routing

通过路由规则(key) 将交换机把信息指定发送到对应的队列里面

Receiving messages selectively 需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

12

模式说明

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key

  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列

  • C1:消费者,其所在队列指定了需要 routing key 为 orange 的消息

  • C2:消费者,其所在队列指定了需要 routing key 为 black、green 的消息

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
    • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
    • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

应用场景

定制化消息推送

代码编写

消息的生产者

public class routeProducer {


    public static void main(String[] args) throws Exception {

        Map area = new LinkedHashMap<String, String>();
        area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
        area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
        area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201128天气数据");
        area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");

        area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
        area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
        area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
        area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");


        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
        while (itr.hasNext()) {
            Map.Entry<String, String> me = itr.next();
            //第一个参数交换机名字   第二个参数作为 消息的routing key
                    channel.basicPublish(<exchange_name>,me.getKey() , null , me.getValue().getBytes());

        }

        channel.close();
        connection.close();
    }
}

消息的消费者1

public class RouteConsumer1 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(<queue_name>, false, false, false, null);
        
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数三:路由key

        channel.queueBind(<queue_name>, <exchange_name>, "china.hebei.shijiazhuang.20201128");
        channel.basicQos(1);
        channel.basicConsume(<queue_name>, false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("RouteConsumer1:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });

    }

}

消息的消费者2

public class RouteConsumer2 {

    public static void main(String[] args) throws IOException {
        //获取TCP长连接
        Connection connection = RabbitUtils.getConnection();
        //获取虚拟连接
        final Channel channel = connection.createChannel();
        //声明队列信息
        channel.queueDeclare(<queue_name>, false, false, false, null);

        //指定队列与交换机以及routing key之间的关系

        channel.queueBind(<queue_name>, <exchange_name>, "us.cal.lsj.20201127");
        channel.queueBind(<queue_name>, <exchange_name>, "china.hubei.wuhan.20201127");
        channel.queueBind(<queue_name>, <exchange_name>, "us.cal.lsj.20201128");
        channel.queueBind(<queue_name>, <exchange_name>, "china.henan.zhengzhou.20201012");

        channel.basicQos(1);
        channel.basicConsume(<queue_name> , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("RouteConsumer2:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

Topics

在路由的工作模式基础上 通过特定的通配符完整特定的消息发送

Receiving messages based on a pattern (topics) 需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

14-1652259206132

模式说明

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

🍖通配符规则:

👍# 匹配一个或多个词

例如: jenkins.# 能够匹配 jenkins.job.work 或者 jenkins.job

👍* 匹配不多不少恰好1个词

例如: item.* 只能匹配 item.insert 不能匹配 item.xxxxx.xxx
15

如上图所示:

红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到

黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。

代码编写

消息的生产者

public class routeProducer {


    public static void main(String[] args) throws Exception {

        Map area = new LinkedHashMap<String, String>();
        area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据");
        area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据");
        area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据");
        area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据");
        
        area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据");
        area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据");
        area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据");
        area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据");


        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();

        Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
        while (itr.hasNext()) {
            Map.Entry<String, String> me = itr.next();
            //第一个参数交换机名字   第二个参数作为 消息的routing key
                    channel.basicPublish(<exchange_name>,me.getKey() , null , me.getValue().getBytes());

        }

        channel.close();
        connection.close();
    }
}

消息的消费者1

public class RouteConsumer1 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(<queue_name>, false, false, false, null);
        
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数三:路由key 表示匹配所有以us开头的消息key
        channel.queueBind(<queue_name>, <exchange_name>, "us.#");

        channel.basicQos(1);
        channel.basicConsume(<queue_name>, false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("RouteConsumer1:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });

    }

}

消息的消费者2

public class RouteConsumer2 {

    public static void main(String[] args) throws IOException {
        //获取TCP长连接
        Connection connection = RabbitUtils.getConnection();
        //获取虚拟连接
        final Channel channel = connection.createChannel();
        //声明队列信息
        channel.queueDeclare(<queue_name>, false, false, false, null);

        //指定队列与交换机以及routing key之间的关系
        channel.queueBind(<queue_name>, <exchange_name>, "*.*.*.20201127");
        
        channel.basicQos(1);
        channel.basicConsume(<queue_name> , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("RouteConsumer2:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }

}

RPC

Request/reply pattern example

Publisher Confirms

Reliable publishing with publisher confirms