python 协程 自定义互斥锁

cnblogs 2024-07-26 08:09:22 阅读 86

最近在用python的一款异步web框架sanic搭建web服务,遇到一个需要加特定锁的场景:同一用户并发处理订单时需要排队处理,但不同用户不需要排队。

如果仅仅使用async with asyncio.Lock()的话。会使所有请求都排队处理。

1 import asyncio

2 import datetime

3

4 lock = asyncio.Lock()

5

6

7 async def place_order(user_id, order_id):

8 async with lock:

9 # 模拟下单处理

10 print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Processing order {order_id} for user {user_id}")

11 await asyncio.sleep(1) # 假设处理需要 1 秒

12 print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Order {order_id} for user {user_id} is done")

13

14

15 # 定义一个测试函数

16 async def test():

17 # 创建四个任务,模拟两个 UserID 的并发请求

18 tasks = [

19 asyncio.create_task(place_order(1, 101)),

20 asyncio.create_task(place_order(1, 102)),

21 asyncio.create_task(place_order(2, 201)),

22 asyncio.create_task(place_order(2, 202)),

23 ]

24 # 等待所有任务完成

25 await asyncio.gather(*tasks)

26

27

28 if __name__ == '__main__':

29 # 运行测试函数

30 asyncio.run(test())

这显然不是想要的结果,第二种方案是定义一个字典,key使用user_id,value为asyncio.Lock(),每次执行前从字典里面获取lock,相同的user_id将会使用同一个lock,那就实现了功能。

import asyncio

import datetime

locks = {}

async def place_order(user_id, order_id):

if user_id not in locks:

locks[user_id] = asyncio.Lock()

async with locks[user_id]:

# 模拟下单处理

print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Processing order {order_id} for user {user_id}")

await asyncio.sleep(1) # 假设处理需要 1 秒

print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Order {order_id} for user {user_id} is done")

# 定义一个测试函数

async def test():

# 创建四个任务,模拟两个 UserID 的并发请求

tasks = [

asyncio.create_task(place_order(1, 101)),

asyncio.create_task(place_order(1, 102)),

asyncio.create_task(place_order(2, 201)),

asyncio.create_task(place_order(2, 202)),

]

# 等待所有任务完成

await asyncio.gather(*tasks)

if __name__ == '__main__':

# 运行测试函数

asyncio.run(test())

但是这个方案会有缺点是,user_id执行完成之后没有释放资源,当请求的user_id变多之后,势必会造成占用过多的资源。继续改进方案,将locks的value加一个计数器,当获取lock时计数器加1,使用完之后计数器-1,当计数器变为小于等于0时,释放locks对应的key。最后将这个功能封装为一个类方便其他地方调用。

import asyncio

mutex_locks = {}

class MutexObj:

def __init__(self):

self.lock = asyncio.Lock()

self.count = 0

class Mutex:

def __init__(self, key: str):

if key not in mutex_locks:

mutex_locks[key] = MutexObj()

self.__mutex_obj = mutex_locks[key]

self.__key = key

def lock(self):

"""

获取锁

:return:

"""

self.__mutex_obj.count += 1

return self.__mutex_obj.lock

def release(self):

"""

释放锁

:return:

"""

self.__mutex_obj.count -= 1

if self.__mutex_obj.count <= 0:

del mutex_locks[self.__key]

import asyncio

import datetime

from utils.mutex import Mutex, mutex_locks

locks = {}

async def place_order(user_id, order_id):

mutex = Mutex(user_id)

async with mutex.lock():

try:

# 模拟下单处理

print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Processing order {order_id} for user {user_id}")

await asyncio.sleep(1) # 假设处理需要 1 秒

print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Order {order_id} for user {user_id} is done")

except Exception as ex:

print(ex)

finally:

mutex.release()

print('=====================')

print(mutex_locks)

print('=====================')

# 定义一个测试函数

async def test():

# 创建四个任务,模拟两个 UserID 的并发请求

tasks = [

asyncio.create_task(place_order(1, 101)),

asyncio.create_task(place_order(1, 102)),

asyncio.create_task(place_order(2, 201)),

asyncio.create_task(place_order(2, 202)),

]

# 等待所有任务完成

await asyncio.gather(*tasks)

if __name__ == '__main__':

# 运行测试函数

asyncio.run(test())

至此实现了我的需求,此方案只考虑了单应用场景,如果是分布式部署,需要更换方案如redis锁,这里暂且不考虑。如果有其他实现方式,欢迎留言交流。



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。