共计 1243 个字符,预计需要花费 4 分钟才能阅读完成。
composer包安装
composer require php-amqplib/php-amqplib
Amqp类的封装
RABBITMQ的使用,封装类,修改对应的命名空间和配置之后可直接使用
namespace app\common\service;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Amqp
{
public function __construct()
{
defined('RABBITMQ_HOST')? null : define('RABBITMQ_HOST', env('RABBITMQ.HOST') ?: '127.0.0.1');
defined('RABBITMQ_PORT')? null : define('RABBITMQ_PORT', env('RABBITMQ.PORT') ?: 5672);
defined('RABBITMQ_USER')? null : define('RABBITMQ_USER', env('RABBITMQ.USER') ?: 'admin');
defined('RABBITMQ_PASS')? null : define('RABBITMQ_PASS', env('RABBITMQ.PASS') ?: 'admin');
defined('RABBITMQ_VHOST')? null : define('RABBITMQ_VHOST',env('RABBITMQ.VHOST') ?:'lovebread-test');
}
public function receive($queueName, $callback = [])
{
try {
$connection = new AMQPStreamConnection(RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USER, RABBITMQ_PASS, RABBITMQ_VHOST); // 建立连接到RabbitMQ服务器
$channel = $connection->channel(); // 建立通道
$channel->basic_consume($queueName, 'PHP-API', false, false, false, false, $callback); // 消费消息
//dump($channel->is_consuming());
while ($channel->is_consuming()) { // 循环消费消息,没有阻塞等待
$channel->wait();
}
$channel->close(); // 通道关闭
$connection->close(); // 连接关闭
} catch (Exception $e) {
die($e->getMessage());
}
}
public static function ack($msg)
{
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
}
正文完