一、业务背景

        项目使用的是thinkphp5.1框架,需要用到think-queue的延时队列功能,在使用时发现延时队列不生效。

        在检查日志后发现,一直在刷新以下错误:

        

二、问题定位

        接下来进行手动出列操作,发现提示以上错误,并且只有延时队列会出现,所以是移动延迟任务时出现的错误,定位以下出列代码有问题:

    public function pop($queue = null)
    {
        $original = $queue ?: $this->options['default'];

        $queue = $this->getQueue($queue);

        $this->migrateExpiredJobs($queue . ':delayed', $queue, false);

        if (!is_null($this->options['expire'])) {
            $this->migrateExpiredJobs($queue . ':reserved', $queue);
        }

        $job = $this->redis->lPop($queue);

        if (false !== $job) {
            $this->redis->zAdd($queue . ':reserved', time() + $this->options['expire'], $job);

            return new RedisJob($this, $job, $original);
        }
    }

三、问题分析

        在执行出列时,会检测是否有到时间的延时队列消息,如果有,会移动延迟任务,并同时操作2个不同的key。        

    /**
     * 移动延迟任务
     *
     * @param string $from
     * @param string $to
     * @param bool   $attempt
     */
    public function migrateExpiredJobs($from, $to, $attempt = true)
    {
        $this->redis->watch($from);

        $jobs = $this->getExpiredJobs(
            $from, $time = time()
        );
        if (count($jobs) > 0) {
            $this->transaction(function () use ($from, $to, $time, $jobs, $attempt) {
                $this->removeExpiredJobs($from, $time);
                $this->pushExpiredJobsOntoNewQueue($to, $jobs, $attempt);
            });
        }
        $this->redis->unwatch();
    }

将到期的延时队列从redis有序集合中删除(zRemRangeByScore),

    /**
     * 删除过期任务
     *
     * @param  string $from
     * @param  int    $time
     * @return void
     */
    protected function removeExpiredJobs($from, $time)
    {
        $this->redis->zRemRangeByScore($from, '-inf', $time);
    }

并且加入(rpush)新的redis队列。

    /**
     * 重新发布到期任务
     *
     * @param  string  $to
     * @param  array   $jobs
     * @param  boolean $attempt
     */
    protected function pushExpiredJobsOntoNewQueue($to, $jobs, $attempt = true)
    {
        if ($attempt) {
            foreach ($jobs as &$job) {
                $attempts = json_decode($job, true)['attempts'];
                $job      = $this->setMeta($job, 'attempts', $attempts + 1);
            }
        }
        call_user_func_array([$this->redis, 'rPush'], array_merge([$to], $jobs));
    }

然后在执行exec()方法操作第2个节点时,一直提示connect close。

    /**
     * redis事务
     * @param \Closure $closure
     */
    protected function transaction(\Closure $closure)
    {
        $this->redis->multi();
        try {
            call_user_func($closure);
            if (!$this->redis->exec()) {
                $this->redis->discard();
            }
        } catch (Exception $e) {
            $this->redis->discard();
        }
    }

日志里面的错误信息是由于代码内有异常捕捉,所以会继续执行下去,一直到执行unwach()方法,提示:NOAUTH Authentication required.

整个移动延时任务redis执行顺序如下:

1、watch

2、multi

3、操作key1

4、操作key2

5、exec

6、unwatch

问题到这里中断了,为什么在执行exec()方法的时候会提示connect close?

这时候自然而然的想到了redis版本的问题,我们使用的是阿里云redis集群版,在询问了阿里云技术人员后,对方告知以下信息:

由于redis集群版一次连接只会连接一个节点,然后2个key的slot(槽点)不一致,导致在操作第2个key的时候,连接不存在,所以提示connect close。

三、解决方案

         根据查找的资料,解决方式是建议更换阿里云redis版本为主从版。

        参考资料:阿里云Redis报NOAUTH Authentication required - 简书

        但是,这不符合我们目前的实际场景,我们没办法更换阿里云redis版本,所以只能根据分析出来的问题继续寻找新的解决方案。

        根据已知信息,只需要解决两个key不同槽点的问题,就能解决连接问题。

        根据redis官方资料:https://redis.io/commands/cluster-keyslot 

        在两个不同key后面增加{},{}里面的值如果一致,则2个key经过hash之后得到的slot(槽点)就一致。

        由于该项目使用队列场景业务比较简单,所以简单粗暴的在获取队列名的时候强制给每组队列名称加上了{相同值},问题得到解决。

        这部分代码会导致使用到的槽点大量减少,降低集群的效果。请根据自己业务自行修改该部分代码。

    /**
     * 获取队列名
     *
     * @param  string|null $queue
     * @return string
     */
    protected function getQueue($queue)
    {
        $queue = $queue ?: $this->options['default'];
        return 'queues:' . $queue . '{' . substr($queue, -1). '}';
    }

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐