TP6 queue队列基本使用

关于队列的模式

1:sync 同步执行
2:database 配合数据库完成
3:redis 配合redis完成
小提醒:异步执行 最好使用redis作为配合

直接开始上代码

首先在项目根目录下执行composer require topthink/think-queue

修改config下queue文件

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'robot',
            'host'       => env('redis.host','127.0.0.1'),
            'port'       => env('redis.port',6379),
            'password'   => env('redis.password',''),
            'select'     => env('redis.select',0),
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

我这里是连接到redis

投递消息

在app下创建文件夹job//一般都叫这个
创建文件 我这里叫sendMessage

<?php

namespace app\job;

use think\facade\Queue;

class SendMessage
{
    public function sendMessage(array $data){//data 是你要处理的数据
        $jobHandlerClassName = 'app\job\Message';
        //当前任务归属的队列名称,如果为新队列,会自动创建
        //php think queue:work --queue orderJobQueue
        //php think queue:work --queue orderJobQueue --daemon
        $jobQueueName = "sendMessage";
        //将该任务推送到消息队列,等待对应的消费者去执行
        //这里只是负责将数据添加到相应的队列名称的队列里,消费者与生产者并无联系
        //立即执行
        $isPushed = Queue::push($jobHandlerClassName, $data, $jobQueueName);
        //延迟10秒后执行
        //$isPushed = Queue::later(10, $jobHandlerClassName, $orderData, $jobQueueName);
        if ($isPushed !== false) {
            return false;
        }
        return true;
    }

}

消费消息队列

<?php

namespace app\job;

use app\api\service\MessageService;
use think\facade\Log;
use think\queue\Job;

class Message
{
    /**
     * fire方法是消息队列默认调用的方法
     * @param Job $job
     * @param array $data
     * @author Poison
     */
    public function fire(Job $job, array $data)
    {
        //有些消息在到达消费者时,可能已经不再需要执行了
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone){
            $job->delete();
            return;
        }
        $jobId =  $job->getJobId();
        $isJobDone = $this->message($data, $jobId);
        if ($isJobDone) {
            //如果任务执行成功,记得删除任务
            $job->delete();
        } else {
            //通过这个方法可以检查这个任务已经重试了几次了
            if ($job->attempts() > 3){
                Log::error('试了3次了');
                $job->delete();

                //也可以重新发布这个任务
                //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
    }

    /**
     * 有些消息在到达消费者时,可能已经不再需要执行了
     * @param array $data
     * @author Poison
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone(array $data): bool
    {
        return true;
    }

    /**
     * @param array $data
     * @author Poison
     */
    public function message(array $data,  $jobId): bool
    {
        //对你的订单这些进行一个处理
        (new MessageService())->initMessage($data);
        return true;
    }

}

接口处理

public function index(): Response
    {
        date_default_timezone_set('PRC');
        $params = input('post.');
        trace($params);
        //投递进队列
        (new SendMessage())->sendMessage($params);
        return echoJson(['code'=>200]);
    }

运行php think queue:listen --queue sendMessage

喜欢的点个关注呗 有问题留言 我看到会回复的

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐