RabbitMQ是一种消息队列中间件。支持安装多种plugin实现不同的通信协议。
本文测试STOMP插件实现实时消息推送。
RabbitMQ使用docker-compose安装。Dockerfile文件如下:
FROM rabbitmq:alpine
RUN rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp
EXPOSE 4369 5671 5672 15671 15672 15674 25672
docker-compose.yml
rabbitmq:
build:
docker/rabbitmq
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
restart: always
ports:
- "15672:15672"
- "15674:15674"
- "5672:5672"
container_name: rabbitmq
Producer使用PHP
先执行PHP的composer安装
composer require php-amqplib/php-amqplib
Producer代码
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'topic_exchange';
$exchangeType = 'topic';
$queueName = 'topic_queue';
$routingKey = 'topic.routing_key';
$channel->exchange_declare($exchangeName, $exchangeType, false, true, false);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routingKey);
for ($i = 0; $i < 1000; $i++) {
$data = date('Y-m-d H:i:s');
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchangeName, $routingKey);
sleep(1);
}
$channel->close();
$connection->close();
前端Js
需要引入stomp.js
<script>
var has_had_focus = false;
var pipe = function(el_name, send) {
var div = $(el_name + ' div');
var inp = $(el_name + ' input');
var form = $(el_name + ' form');
var print = function(m, p) {
p = (p === undefined) ? '' : JSON.stringify(p);
div.append($("<code>").text(m + ' ' + p));
div.scrollTop(div.scrollTop() + 10000);
};
if (send) {
form.submit(function() {
send(inp.val());
inp.val('');
return false;
});
}
return print;
};
// Stomp.js boilerplate
var client = Stomp.client('ws://' + window.location.hostname + ':15674/ws');
client.debug = pipe('#second');
var print_first = pipe('#first', function(data) {
client.send('/amq/queue/topic_fanout', {"content-type":"text/plain"}, data);
});
var on_connect = function(x) {
id = client.subscribe("/amq/queue/topic_fanout", function(d) {
print_first(d.body);
});
};
var on_error = function() {
console.log('error');
};
client.connect('anywhere', 'anywhere', on_connect, on_error, '/');
$('#first input').focus(function() {
if (!has_had_focus) {
has_had_focus = true;
$(this).val("");
}
});
</script>
问题发现:这种发布订阅方式,如果有多个前端同时订阅一个队列。后台发布的消息,只有一个前端能收到。消息被前台接收后,队列中的消息会被清空。