import asyncio import aiomysql import hashlib
class MySQLDatabase: def __init__(self, host, port, user, password, db_name, loop=None, maxsize=20, minsize=1): self.host = host self.port = port self.user = user self.password = password self.db_name = db_name self.loop = loop or asyncio.get_event_loop() self.maxsize = maxsize self.minsize = minsize self.pool = None
async def create_pool(self): self.pool = await aiomysql.create_pool( host=self.host, port=self.port, user=self.user, password=self.password, db=self.db_name, loop=self.loop, autocommit=True, charset='utf8', maxsize=self.maxsize, minsize=self.minsize, )
async def fetch_data(self, query, params=None): async with self.pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute(query, params) result = await cur.fetchall() return result
async def update_data(self, query, params): print("正在执行>>> ", params) async with self.pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(query, params) await conn.commit()
async def close_pool(self): self.pool.close() await self.pool.wait_closed()
Table = "table" _localhost = "127.0.0.1" Username = "admin" Password = "123456" DB_Name = "test"
async def main(): db = MySQLDatabase(host=_localhost, port=3306, user=Username, password=Password, db_name=DB_Name) await db.create_pool() query = f"SELECT * FROM {Table} WHERE id = %s" params = (1,) result = await db.fetch_data(query, params) update_tasks = [] for item in result: update_tasks.append(db.update_data(f"UPDATE {Table} SET title = %s WHERE id = %s", ("这是一个测试title", item["id"])),) await asyncio.gather(*update_tasks) await db.close_pool()
asyncio.run(main())
|