0%

python_异步aiomysql数据库操作指南

python_异步aiomysql数据库操作指南

其实一般数据库 使用pymysql操作数据库就行,但是那种超大型的数据库执行操作,pymysql就显得有点力不从心了。

所以我们需要一个可以异步操作mysql数据库的模块,特此记录一下 异步mysql数据库操作

import asyncio
import aiomysql

class MySql:
async def select(self, loop, sql, pool):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql)
# r = await cur.fetchone()
# print(r)
try:
res = cur.description
columns_name = [i[0] for i in res]
# print(columns_name) # ['city', 'prcp']
res = await cur.fetchall()
res_dict = [dict(zip(columns_name, row_value)) for row_value in res]
return res_dict
except:
r = await cur.fetchone()
return r

async def insert(self, loop, sql, pool):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql)
await conn.commit()

async def main(self, loop, sql_str, key_s='find'):
pool = await aiomysql.create_pool(
host='192.168.700.200',
port=3306,
user='cores',
password='hJ3TsdrA',
db='cores',
autocommit=True,
loop=loop)
if key_s == 'find':
c1 = MySql().select(loop=loop, sql=sql_str, pool=pool)
# c2 = MySql().insert(loop=loop, sql="insert into minifw (name) values ('hello')", pool=pool)
# c2 = MySql().select(loop=loop, sql=f"select * from web_comic where title='先交往后恋爱'", pool=pool)
tasks = [asyncio.ensure_future(c1)]
return await asyncio.gather(*tasks)
if key_s == 'insert':
c2 = MySql().insert(loop=loop, sql=sql_str, pool=pool)
tasks = [asyncio.ensure_future(c2)]
return await asyncio.gather(*tasks)

def run_loop(self, sql_str, key_s='find'): # find == 查询操作 “insert” == 写入操作
cur_loop = asyncio.get_event_loop()
azz = cur_loop.run_until_complete(MySql().main(cur_loop, sql_str, key_s=key_s))
if azz == None:
return ''
else:
return azz[0]

if __name__ == '__main__':
man_status = MySql().run_loop(f"select * from wemic where title='测试数据'") # 执行查询操作