Redis实现延迟队列
Redis 是通过有序集合(ZSet)的方式来实现延迟消息队列的,ZSet 有一个 Score 属性可以用来存储延迟执行的时间。但需要无限循环检查任务,会消耗系统资源
·
1. Redis延迟队列
Redis 是通过有序集合(ZSet)的方式来实现延迟消息队列的,ZSet 有一个 Score 属性可以用来存储延迟执行的时间。但需要无限循环检查任务,会消耗系统资源
class RedisDelayQueue(object):
"""Simple Queue with Redis Backend
dq = RedisDelayQueue('delay:commtrans')
dq.put( 5 ,{'info':'测试 5555','time': timestamp_to_datetime_str(t + 5)})
print(dq.get())
"""
def __init__(self, name, namespace='queue'):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.__db = get_redis_engine(database_name='spdb')
self.key = '%s:%s' % (namespace, name)
def qsize(self):
"""Return the approximate size of the queue."""
return self.__db.zcard(self.key)
def empty(self):
"""Return True if the queue is empty, False otherwise."""
return self.qsize() == 0
def rem(self, value):
return self.__db.zrem(self.key, value)
def get(self):
# 获取任务,以0和当前时间为区间,返回一条在当前区间的记录
items = self.__db.zrangebyscore(self.key, 0, int(time.time()), 0, 1)
if items:
item = items[0]
if self.rem(item): # 解决并发问题 如能删就让谁取走
return json.loads(item)
return None
def put(self, interval, item):
""":param interval 延时秒数"""
# 以时间作为score,对任务队列按时间戳从小到大排序
"""Put item into the queue."""
d = json.dumps(item)
return self.__db.zadd(self.key, {d: int(time.time()) + int(interval)})
更多推荐
已为社区贡献1条内容
所有评论(0)