PHP使用消息队列RABBITMQ

515次阅读
没有评论

共计 1243 个字符,预计需要花费 4 分钟才能阅读完成。

PHP使用消息队列RABBITMQ

composer包安装

composer require php-amqplib/php-amqplib

Amqp类的封装

RABBITMQ的使用,封装类,修改对应的命名空间和配置之后可直接使用

namespace app\common\service;

use PhpAmqpLib\Connection\AMQPStreamConnection;

class Amqp
{
    public function __construct()
    {

        defined('RABBITMQ_HOST')? null : define('RABBITMQ_HOST', env('RABBITMQ.HOST') ?: '127.0.0.1');
        defined('RABBITMQ_PORT')? null : define('RABBITMQ_PORT', env('RABBITMQ.PORT') ?: 5672);
        defined('RABBITMQ_USER')? null : define('RABBITMQ_USER', env('RABBITMQ.USER') ?: 'admin');
        defined('RABBITMQ_PASS')? null : define('RABBITMQ_PASS', env('RABBITMQ.PASS') ?: 'admin');
        defined('RABBITMQ_VHOST')? null : define('RABBITMQ_VHOST',env('RABBITMQ.VHOST') ?:'lovebread-test');
    }

    public function receive($queueName, $callback = [])
    {
        try {
            $connection = new AMQPStreamConnection(RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USER, RABBITMQ_PASS, RABBITMQ_VHOST); // 建立连接到RabbitMQ服务器
            $channel = $connection->channel(); // 建立通道

            $channel->basic_consume($queueName, 'PHP-API', false, false, false, false, $callback); // 消费消息
            //dump($channel->is_consuming());
            while ($channel->is_consuming()) { // 循环消费消息,没有阻塞等待
                $channel->wait();
            }
            $channel->close();      // 通道关闭
            $connection->close();   // 连接关闭
        } catch (Exception $e) {
            die($e->getMessage());
        }

    }

    public static function ack($msg)
    {
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }


}

                                                    
正文完
 
BlackBeans
版权声明:本站原创文章,由 BlackBeans 2023-10-08发表,共计1243字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。