0%

Python-Mysql异步并发处理数据库

Python-Mysql异步并发处理数据库 模版

安装库:

# aiomysql库 # 异步mysql 数据库

pip3 install aiomysql

代码:

import asyncio
import aiomysql
import hashlib

class MySQLDatabase:
def __init__(self, host, port, user, password, db_name, loop=None, maxsize=20, minsize=1):
self.host = host
self.port = port
self.user = user
self.password = password
self.db_name = db_name
self.loop = loop or asyncio.get_event_loop()
self.maxsize = maxsize
self.minsize = minsize
self.pool = None

async def create_pool(self):
self.pool = await aiomysql.create_pool(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
db=self.db_name,
loop=self.loop,
autocommit=True,
charset='utf8',
maxsize=self.maxsize,
minsize=self.minsize,
)

async def fetch_data(self, query, params=None):
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(query, params)
result = await cur.fetchall()
return result

async def update_data(self, query, params):
print("正在执行>>> ", params)
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
await conn.commit()

async def close_pool(self):
self.pool.close()
await self.pool.wait_closed()


Table = "table" # 数据表
_localhost = "127.0.0.1" # 数据库地址
Username = "admin" # username
Password = "123456" # password
DB_Name = "test" # 数据库名
# 使用示例

async def main():
# 数据库配置
db = MySQLDatabase(host=_localhost, port=3306, user=Username, password=Password, db_name=DB_Name)

# 创建连接池
await db.create_pool()

# ## 并发 查询多条内容 >>>
# # 准备多个查询任务
# query_tasks = [db.fetch_data("SELECT * FROM your_table WHERE id = %s", (1,)), db.fetch_data("SELECT * FROM your_table WHERE id = %s", (2,)), # ... 可以添加更多查询任务
# ]
# # 并发执行查询任务
# results = await asyncio.gather(*query_tasks)
# for result in results:
# print("Fetched Data:", result)

# 查询数据
query = f"SELECT * FROM {Table} WHERE id = %s"
params = (1,)
result = await db.fetch_data(query, params) # params可为空 为空默认查询全部数据
# print("Fetched Data:", result)

update_tasks = []
for item in result:
# 添加为列表异步执行
update_tasks.append(db.update_data(f"UPDATE {Table} SET title = %s WHERE id = %s", ("这是一个测试title", item["id"])),)


## 异步兵法修改内容
# # 准备多个更新任务
# update_tasks = [
# db.update_data("UPDATE your_table SET column_name = %s WHERE id = %s", ('new_value1', 1)),
# db.update_data("UPDATE your_table SET column_name = %s WHERE id = %s", ('new_value2', 2))
# ]
#
# # 并发执行更新任务
await asyncio.gather(*update_tasks) # 等待执行所有添加的内容
# print("All Data Updated")

# 关闭连接池
await db.close_pool()


# 运行主函数
asyncio.run(main())