1. Redis延迟队列

Redis 是通过有序集合(ZSet)的方式来实现延迟消息队列的,ZSet 有一个 Score 属性可以用来存储延迟执行的时间。但需要无限循环检查任务,会消耗系统资源

class RedisDelayQueue(object):
    """Simple Queue with Redis Backend
    dq = RedisDelayQueue('delay:commtrans')
    dq.put( 5 ,{'info':'测试 5555','time': timestamp_to_datetime_str(t + 5)})

    print(dq.get())
    """

    def __init__(self, name, namespace='queue'):
        """The default connection parameters are: host='localhost', port=6379, db=0"""
        self.__db = get_redis_engine(database_name='spdb')
        self.key = '%s:%s' % (namespace, name)

    def qsize(self):
        """Return the approximate size of the queue."""
        return self.__db.zcard(self.key)

    def empty(self):
        """Return True if the queue is empty, False otherwise."""
        return self.qsize() == 0

    def rem(self, value):
        return self.__db.zrem(self.key, value)

    def get(self):
        # 获取任务,以0和当前时间为区间,返回一条在当前区间的记录
        items = self.__db.zrangebyscore(self.key, 0, int(time.time()), 0, 1)
        if items:
            item = items[0]
            if self.rem(item):  # 解决并发问题  如能删就让谁取走
                return json.loads(item)
        return None

    def put(self, interval, item):
        """:param interval 延时秒数"""
        # 以时间作为score,对任务队列按时间戳从小到大排序
        """Put item into the queue."""
        d = json.dumps(item)
        return self.__db.zadd(self.key, {d: int(time.time()) + int(interval)})
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐