一、准备工作
1、说明
让PHP支持rabbitmq有两种方式,一种是安装amqp扩展,另一种是在项目中composer安装组件php-amqplib/php-amqplib。
选择的方法不同,代码写出来也是不一样的,但本质上都是调用的rabbitmq对外提供的一些方法。
2、理解RabbitMQ的消息流转过程
关于exchange、routing key、channel 等消息队列相关概念,可以查看文章 AMQP协议
3、搭建rabbitmq
可以查看文章 Docker快速构建RabbitMQ集群
如果想结合haproxy+keepalived做rabbitmq的负载均衡,可以查看文章 Docker快速构建HaProxy集群,并配置好rabbitmq的负载均衡 和 引入KeepAlived
三、amqp composer组件方式时,使用PHP向RabbitMQ推送消息
1、PHP代码
public function actionProduceMessage()
{
$config = ['host' => '192.168.78.200', 'port' => 5600, 'user' => 'root', 'password' => 123456, 'vhost' => '/'];
// 定义要使用的交换机名称
$exchange = 'exchange_1';
// 定义队列名称
$queue = 'create_goods';
// 定义使用的路由键
$routingKey = 'goods';
// 创建与MQ的连接
$connection = new AMQPStreamConnection(...array_values($config));
// 创建一个通道
$channel = $connection->channel();
/**
* 声明交换机
* 第一个参数 exchange 交换机名称
* 第二个参数 type 交换机类型 direct-直连,fanout-广播 topic-主题 headers-消息体header匹配
* 第三个参数 passive 是否检测同名交换机
* 第四个参数 durable 是否开启交换机持久化
* 第五个参数 auto_delete 通道关闭后是否删除交换机
*/
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
/**
* 声明队列
* 第一个参数 queue 队列名称
* 第二个参数 passive 是否检测同名队列
* 第三个参数 durable 是否开启队列持久化
* 第四个参数 exclusive 队列是否可以可以被其他队列访问
* 第五个参数 auto_delete 通道关闭后是否删除队列
*/
$channel->queue_declare($queue, false, true, false, false);
/**
* 将队列与交换机、路由键绑定
* 第一个参数 queue 指定队列
* 第二个参数 exchange 指定交换机
* 第三个参数 routingKey 指定路由键
*/
$channel->queue_bind($queue, $exchange, $routingKey);
// 组装生产的消息
$messageBody = json_encode([
'sku_id' => 11101,
'sku_sn' => 'sn2001',
'price' => 12000
]);
// 创建要发送的消息对象
$queueMessage = new AMQPMessage($messageBody, [
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
/**
* 推送消息
* 第一个参数 msg 要发送的消息对象 object
* 第二个参数 exchange 指定使用的交换机
* 第三个参数 routingKey 指定路由键
*/
$channel->basic_publish($queueMessage, $exchange, $routingKey);
// 关闭通道
$channel->close();
// 关闭连接
$connection->close();
// 返回消息
return json_encode([
'status' => 1,
'msg' => 'publish queue message success'
]);
}
2、执行结果
可以看到,我们的消息已经推送到rabbitmq集群上了,负载均衡将这条消息分发到了mq3节点。
我们点击 create_goods
,进入消息详情,可以看到我们推送的具体消息
3、总结
到这里,就已经完成了向rabbitmq推送消息。