RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,**生产者只能将消息发送到交换机(exchange)**,交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
Exchanges的类型
**直接(direct)**:处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 abc ,则只有被标记为 abc 的消息才被转发,不会转发 abc.def,也不会转发 dog.ghi,只会转发 abc。
**主题(topic)*:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号 * 匹配不多不少一个词。因此 abc.# 能够匹配到 abc.def.ghi,但是 abc. 只会匹配到 abc.def。
**标题(headers)**:不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型。而 fanout,direct,topic 的路由键都需要要字符串形式的。
匹配规则 x-match 有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息
**扇出(fanout)**:不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout 交换机转发消息是最快的
默认exchange:即空字符串
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
Fanout交换机
类似于广播,生产者发布消息,绑定的消费者都可以收到消息
两个消费者
/**
* @author shaoshao
* @Date 2022/11/22 19:20
* @Description:
*/
public class ReceiveLogs01 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 声明一个临时队列 当消费者与队列断开时,队列自动删除
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机与队列
channel.queueBind(queueName, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("ReceiveLogs01"+new String(message.getBody()));
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
/**
* @author shaoshao
* @Date 2022/11/22 19:20
* @Description:
*/
public class ReceiveLogs02 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 声明一个临时队列 当消费者与队列断开时,队列自动删除
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机与队列
channel.queueBind(queueName, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("ReceiveLogs02"+new String(message.getBody()));
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
生产者
/**
* @author shaoshao
* @Date 2022/11/22 19:34
* @Description:
*/
public class EmitLog {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明交换机 消费者声明了这里可以不声明
// channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
注意
先启动两个消费者再启动生产者。
生产者生产消息后,如果没有对应的消费者接收,则该消息是遗弃的消息
Direct exchange
生产者只发给特定的消费者
两个消费者
package com.ssm.rabbitmq.six;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.ssm.rabbitmq.utils.RabbitMqUtils;
/**
* @author shaoshao
* @Date 2022/11/22 19:56
* @Description:
*/
public class ReceiveLogsDirect01 {
public static final String EXCHANGE_DIRECT_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_DIRECT_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("disk", false, false, false, null);
channel.queueBind("disk", EXCHANGE_DIRECT_NAME, "error");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogsDirect01" + new String(message.getBody()));
};
channel.basicConsume("disk", true, deliverCallback, consumerTag -> {
});
}
}
package com.ssm.rabbitmq.six;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.ssm.rabbitmq.utils.RabbitMqUtils;
/**
* @author shaoshao
* @Date 2022/11/22 19:56
* @Description:
*/
public class ReceiveLogsDirect02 {
public static final String EXCHANGE_DIRECT_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_DIRECT_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("console", false, false, false, null);
channel.queueBind("console", EXCHANGE_DIRECT_NAME, "info");
channel.queueBind("console", EXCHANGE_DIRECT_NAME, "warning");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogsDirect02" + new String(message.getBody()));
};
channel.basicConsume("console", true, deliverCallback, consumerTag -> {
});
}
}
生产者
package com.ssm.rabbitmq.six;
import com.rabbitmq.client.Channel;
import com.ssm.rabbitmq.utils.RabbitMqUtils;
import java.util.Scanner;
/**
* @author shaoshao
* @Date 2022/11/22 19:34
* @Description:
*/
public class DirectLogs {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));
System.out.println(" 生产者发出消息:" + message);
}
}
}
Topics exchange
- 发送到类型是 topic 交换机的消息的
routing_key
不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词 - *(星号)可以代替一个位置
- #(井号)可以替代零个或多个位置
案例
绑定关系如下:
- Q1–>绑定的是
- 中间带 orange 带 3 个单词的字符串
(*.orange.*)
- 中间带 orange 带 3 个单词的字符串
- Q2–>绑定的是
- 最后一个单词是 rabbit 的 3 个单词
(*.*.rabbit)
- 第一个单词是 lazy 的多个单词
(lazy.#)
- 最后一个单词是 rabbit 的 3 个单词
上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的
例子 | 说明 |
---|---|
quick.orange.rabbit | 被队列 Q1Q2 接收到 |
azy.orange.elephant | 被队列 Q1Q2 接收到 |
quick.orange.fox | 被队列 Q1 接收到 |
lazy.brown.fox | 被队列 Q2 接收到 |
lazy.pink.rabbit | 虽然满足两个绑定但只被队列 Q2 接收一次 |
quick.brown.fox | 不匹配任何绑定不会被任何队列接收到会被丢弃 |
quick.orange.male.rabbit | 是四个单词不匹配任何绑定会被丢弃 |
lazy.orange.male.rabbit | 是四个单词但匹配 Q2 |
当一个队列绑定键是 #,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有 # 和 * 出现,那么该队列绑定类型就是 direct 了
两个消费者
package com.ssm.rabbitmq.seven;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.ssm.rabbitmq.utils.RabbitMqUtils;
import java.sql.SQLOutput;
import java.util.Queue;
/**
* @author shaoshao
* @Date 2022/11/22 20:27
* @Description: 消费者C1
*/
public class ReceiveLogsTopic01 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare("Q1", false, false, false, null);
channel.queueBind("Q1", EXCHANGE_NAME, "*.orange.*");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogsTopic01" + new String(message.getBody(), "UTF-8"));
System.out.println("接收队列:" + "Q1" + "绑定键:" + message.getEnvelope().getRoutingKey());
};
channel.basicConsume("Q1", true, deliverCallback, consumerTag -> {
});
}
}
package com.ssm.rabbitmq.seven;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.ssm.rabbitmq.utils.RabbitMqUtils;
/**
* @author shaoshao
* @Date 2022/11/22 20:27
* @Description: 消费者C2
*/
public class ReceiveLogsTopic02 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare("Q2", false, false, false, null);
channel.queueBind("Q2", EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind("Q2", EXCHANGE_NAME, "lazy.#");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogsTopic01" + new String(message.getBody(), "UTF-8"));
System.out.println("接收队列:" + "Q2" + "绑定键:" + message.getEnvelope().getRoutingKey());
};
channel.basicConsume("Q2", true, deliverCallback, consumerTag -> {
});
}
}
生产者
package com.ssm.rabbitmq.seven;
import com.rabbitmq.client.Channel;
import com.ssm.rabbitmq.utils.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;
/**
* @author shaoshao
* @Date 2022/11/22 20:43
* @Description: 消息发送方
*/
public class EmitLogsTopic {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
/**
* Q1-->绑定的是
* 中间带 orange 带 3 个单词的字符串(*.orange.*)
* Q2-->绑定的是
* 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
* 第一个单词是 lazy 的多个单词(lazy.#)
*/
HashMap<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
}
}
}