FastAPI异步Redis-aioredis集成

在这里插入图片描述

一、前言

FastAPI是一个高性能的异步框架,集成Redis时,使用的时候异步Redisaioredis

二、aioredis是什么?

asyncio (PEP 3156) Redis客户端库。

该库旨在基于asyncio为Redis提供简单而清晰的接口。

文档地址:https://aioredis.readthedocs.io/en/latest/

三、安装

pip install aioredis

四、在FastAPI中集成aioredis

  • 使用FastAPI启动前后处理程序进行创建连接和关闭连接
  • import aioredis

4.1 在FastAPI创建前创建Redis连接

  • FastAPI创建前创建Redis连接
  • app.state添加新的属性值redis,用来存放Redis实例
  • 全局都可使用app.state.redis获取Redis实例
@app.on_event("startup")
async def startup_event():
    app.state.redis = await aioredis.create_redis(address=("127.0.0.1", 6379), db=3, encoding="utf-8")
    print(f"redis成功--->>{app.state.redis}")

4.2 关闭连接

  • FastAPI关闭时关闭Redis连接
  • 使用app.state.redis直接进行关闭操作
@app.on_event("shutdown")
async def shutdown_event():
    app.state.redis.close()
    await app.state.redis.wait_closed()

4.3 把Redis挂载到app上

1)创建注册Redis的方法
def register_redis(app: FastAPI):
    @app.on_event("startup")
    async def startup_event():
        app.state.redis = await aioredis.create_redis(address=("127.0.0.1", 6379), db=3, encoding="utf-8")
        print(f"redis成功--->>{app.state.redis}")

    @app.on_event("shutdown")
    async def shutdown_event():
        app.state.redis.close()
        await app.state.redis.wait_closed()

2)挂载到app上
from fastapi import FastAPI

app = FastAPI()
# 进行挂载
register_redis(app)

4.4 接口中使用

@app.get("/test")
async def test(q: str):
    key = time.time()
    await app.state.redis.set(key=key, value=q)
    # 添加数据,5秒后自动清除
    await app.state.redis.setex(key="vvv", seconds=5, value="临时存在")
    value = await app.state.redis.get(key=key)
    return {key: value}

4.5 在子路由中使用

  • 添加参数request: Request
  • 通过request.app.state.redis获取Redis实例
  • 通过获取到的实例,进行操作Redis
router = APIRouter()

@router.get("/api/test")
async def test(request: Request, value: str):
    api_time = f"api-{time.time()}"
    await request.app.state.redis.setex(key=api_time, seconds=5.5, value=value)
    value = await request.app.state.redis.get(key=api_time)
    return {api_time: value}

app.include_router(router)

在使用过程中遇到的问题

在实际工作中使用,发现在register_redis注册方法中使用startup_event添加app.state.redis值,在使用中却提示request.app.state.redis方法不存在。
最后的一个解决方法,是去掉startup_event前置方法,直接在register_redis注册方法中添加:

def register_redis(app: FastAPI):
    app.state.redis = await aioredis.create_redis(address=("127.0.0.1", 6379), db=3, encoding="utf-8")
    print(f"redis成功--->>{app.state.redis}")
        
    @app.on_event("shutdown")
    async def shutdown_event():
        app.state.redis.close()
        await app.state.redis.wait_closed()
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐