消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。
然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。
最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
publisher->exchange->router->queue->consumer
https://www.rabbitmq.com/download.html
<?php
/**
* @desc RabbitMesQueue.php
* @author robertzhai
*/
class RabbitMesQueue
{
private static $exchange_instance;
private static $channel_instance;
private static $amqp_connection;
private static $queue_instance;
//交换机名
const EXCHANGE_NAME = 'app.rabbitmq.durable';
//路由key
const KEY_ROUTE = 'usertestdata';
const QUEUE_NAME = 'worker.rabbitmq';
public function __construct($conn_args)
{
if (self::$exchange_instance &&
self::$exchange_instance instanceof AMQPExchange
) {
return;
}
$conn_args = array(
'host' => $conn_args['host'],
'port' => $conn_args['port'],
'login' => $conn_args['login'],
'password' => $conn_args['password'],
'vhost' => '/'
);
$e_name = self::EXCHANGE_NAME; //交换机名
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
log_message('error', ' Cannot connect to the broker! ' . PHP_EOL);
} else {
log_message('debug',' succ connect to the broker! ');
}
self::$amqp_connection = $conn;
$channel = new AMQPChannel($conn);
self::$channel_instance = $channel;
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setFlags(AMQP_DURABLE); //持久化
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->declareExchange();
self::$exchange_instance = $ex;
}
public function producer($message, $connected = true)
{
log_message('debug', ' in producer ' . PHP_EOL);
log_message('debug', ' in producer message: ' . json_encode($message) . PHP_EOL);
$k_route = self::KEY_ROUTE; //路由key
//消息内容
if(!$message){
return false;
}
if(is_array($message) || is_object($message)) {
$message = json_encode($message);
}
log_message('debug', 'producer message:' . $message . PHP_EOL);
$send_result = self::$exchange_instance->publish($message, $k_route);
if(!$connected && self::$amqp_connection->isConnected()) {
self::$amqp_connection->disconnect();
}
if ($send_result) {
log_message('debug', "Send Message $message succ" . "\n");
return true;
} else {
log_message('error', "Send Message $message fail" . "\n");
return false;
}
}
public function __destruct()
{
if(self::$amqp_connection->isConnected()) {
self::$amqp_connection->disconnect();
self::$amqp_connection = null;
}
if(self::$exchange_instance && self::$exchange_instance) {
self::$exchange_instance = null;
}
}
private function init_queue() {
if(!self::$queue_instance ||
!(self::$queue_instance instanceof AMQPQueue) ) {
//创建队列
$queue = new AMQPQueue(self::$channel_instance);
$queue->setName(self::QUEUE_NAME);
$queue->setFlags(AMQP_DURABLE); //持久化
$queue->declareQueue();
//绑定交换机与队列,并指定路由键
$boolBind = $queue->bind(self::EXCHANGE_NAME, self::KEY_ROUTE);
if(!$boolBind){
log_message('error', " queue->bind(self::EXCHANGE_NAME, self::KEY_ROUTE) FAIL");
exit(" queue->bind(self::EXCHANGE_NAME, self::KEY_ROUTE) FAIL");
}
self::$queue_instance = $queue;
}
}
public function consumer() {
$this->init_queue();
return self::$queue_instance->get();
}
/**
* @return AMQPConnection
*/
public function ack($envelope)
{
$this->init_queue();
//手动发送ACK应答
return self::$queue_instance->ack($envelope->getDeliveryTag());
}
}
echo "start check ... ";
ps -fe|grep '/etc/init.d/rabbitmq-server' |grep -v grep
if [ $? -ne 0 ]
then
echo "start process....."
/etc/init.d/rabbitmq-server start
sleep 10
else
echo "runing....."
fi
echo "check succ";