import asyncio import aiomysql
class MySql: async def select(self, loop, sql, pool): async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) try: res = cur.description columns_name = [i[0] for i in res] res = await cur.fetchall() res_dict = [dict(zip(columns_name, row_value)) for row_value in res] return res_dict except: r = await cur.fetchone() return r
async def insert(self, loop, sql, pool): async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) await conn.commit()
async def main(self, loop, sql_str, key_s='find'): pool = await aiomysql.create_pool( host='192.168.700.200', port=3306, user='cores', password='hJ3TsdrA', db='cores', autocommit=True, loop=loop) if key_s == 'find': c1 = MySql().select(loop=loop, sql=sql_str, pool=pool) tasks = [asyncio.ensure_future(c1)] return await asyncio.gather(*tasks) if key_s == 'insert': c2 = MySql().insert(loop=loop, sql=sql_str, pool=pool) tasks = [asyncio.ensure_future(c2)] return await asyncio.gather(*tasks)
def run_loop(self, sql_str, key_s='find'): cur_loop = asyncio.get_event_loop() azz = cur_loop.run_until_complete(MySql().main(cur_loop, sql_str, key_s=key_s)) if azz == None: return '' else: return azz[0]
if __name__ == '__main__': man_status = MySql().run_loop(f"select * from wemic where title='测试数据'")
|