-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathasync_db.py
141 lines (126 loc) · 4.08 KB
/
async_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: utf-8 -*-
# Author : 程序员阿江-Relakkes
# Email : [email protected]
# CreatedTime : 2024/07/29 02:41
# Desc : aiomysql事务版本
from typing import Dict, List, NoReturn, Optional
import aiomysql
from aiomysql import Connection
class AsyncDbTransaction:
def __init__(self, db_config: Dict):
self.conn: Optional[Connection] = None
self._db_config = db_config
async def query(self, sql: str, *args) -> List[Dict]:
"""
查询给定的SQL,返回列表
:param sql:
:param args:
:return:
"""
async with self.conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, *args)
data = await cur.fetchall()
if isinstance(data, tuple):
data = list(data)
return data
async def get(self, sql: str, *args) -> Dict:
"""
返回第一个结果
:param sql:
:param args:
:return:
"""
async with self.conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, *args)
try:
data = await cur.fetchall()
return data[0]
except Exception as e:
return dict()
async def execute(self, sql: str, *args) -> int:
"""
需要更新、写入等操作的 excute 执行语句
:param sql:
:param args:
:return:
"""
async with self.conn.cursor() as cur:
rows = await cur.execute(sql, *args)
return rows
async def is_in_table(self, table_name: str, field: str, value: str) -> bool:
"""
表中是否含有指定数据
:param table_name:
:param field:
:param value:
:return:
"""
sql = 'SELECT %s FROM %s WHERE %s="%s"' % (field, table_name, field, value)
d = await self.get(sql)
if d:
return True
return False
async def item_to_table(self, table_name: str, item: Dict) -> int:
"""
向指定的表明中插入一条记录
:param table_name:
:param item:
:return:
"""
fields = list(item.keys())
values = list(item.values())
# 处理insert中包含mysql关键字
fields_list = list(map(lambda field: f'`{field}`', fields))
fieldstr = ','.join(fields_list)
valstr = ','.join(['%s'] * len(item))
sql = 'INSERT INTO %s (%s) VALUES(%s)' % (table_name, fieldstr, valstr)
async with self.conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, values)
lastrowid = cur.lastrowid
return lastrowid
async def update_table(self, table_name: str, updates: Dict, field_where: str, value_where: str) -> int:
"""
根据给定的字段、更新值,更新表中的一条数据
:param table_name:
:param updates:
:param field_where:
:param value_where:
:return:
"""
upsets = []
values = []
for k, v in updates.items():
s = '`%s`=%%s' % k
upsets.append(s)
values.append(v)
upsets = ','.join(upsets)
sql = 'UPDATE %s SET %s WHERE %s="%s"' % (
table_name,
upsets,
field_where, value_where,
)
async with self.conn.cursor() as cur:
rows = await cur.execute(sql, values)
return rows
async def begin(self) -> NoReturn:
"""
开启一个mysql事物
:return:
"""
self.conn = await aiomysql.connect(**self._db_config, autocommit=False, loop=None)
await self.conn.autocommit(False)
await self.conn.begin()
async def commit(self) -> NoReturn:
"""
提交事物、并关闭链接
:return:
"""
await self.conn.commit()
self.conn.close()
async def rollback(self) -> NoReturn:
"""
回滚事物
:return:
"""
await self.conn.rollback()
self.conn.close()