基于Redis Stream的Python异步队列库

一个基于Redis Stream构建的Python异步队列库,支持Asyncio、推迟任务、死信队列和多消费者功能。

基于最新的Redis版本构建的异步队列库,受启发于arq仓库: https://github.com/Wh1isper/brqLICENSE:BSD-3-Clause license特性:- 异步Asyncio支持- 推迟任务、死信队列- 多消费者支持#### 生产者代码案例pythonimport osfrom brq.producer import Producerfrom brq.tools import get_redis_client, get_redis_urlasync def main(): redis_url = get_redis_url( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", 6379)), db=int(os.getenv("REDIS_DB", 0)), cluster=bool(os.getenv("REDIS_CLUSTER", False)), tls=bool(os.getenv("REDIS_TLS", False)), ) async with get_redis_client(redis_url) as async_redis_client: await Producer(async_redis_client).run_job("echo", ["hello"])if __name__ == "__main__": import asyncio asyncio.run(main())#### 消费者代码案例pythonimport osfrom brq.consumer import Consumerfrom brq.daemon import Daemonfrom brq.tools import get_redis_client, get_redis_urlasync def echo(message): print(message)async def main(): redis_url = get_redis_url( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", 6379)), db=int(os.getenv("REDIS_DB", 0)), cluster=bool(os.getenv("REDIS_CLUSTER", False)), tls=bool(os.getenv("REDIS_TLS", False)), ) async with get_redis_client(redis_url) as async_redis_client: daemon = Daemon(Consumer(async_redis_client, echo)) await daemon.run_forever()if __name__ == "__main__": import asyncio asyncio.run(main())