-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathInteractiveBrokersDB.py
131 lines (117 loc) · 3.92 KB
/
InteractiveBrokersDB.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import os
import sys
import mysql.connector
import traceback
class InteractiveBrokersDB:
def __init__(self):
self.UserName = ""
self.Password = ""
self.DBHost = ""
self.DBPort = 0
self.DBName = ""
pass
def loadSettings(self, filename:str):
with open(filename, 'r') as f:
for line in f:
parts = line.split('=')
if parts[0] == "user_name":
self.UserName = str(parts[1]).strip()
if parts[0] == "password":
self.Password = str(parts[1]).strip()
if parts[0] == "db_host":
self.DBHost = str(parts[1]).strip()
if parts[0] == "db_port":
self.DBPort = int(parts[1])
if parts[0] == "db_name":
self.DBName = str(parts[1]).strip()
pass
pass
def getDB(self):
db_conn = None
db_conn = mysql.connector.connect(
host=self.DBHost,
port=self.DBPort,
user=self.UserName,
passwd=self.Password,
pool_name="db_conns",
pool_size=32,
db=self.DBName)
db_conn.autocommit = True
return db_conn
# Specify variables using %s or %(name)s parameter style.
def executeSql(self, sql, parms=None):
tries = 0
id = -1
cursor = None
error_msg = ""
while tries < 5:
try:
mydb = self.getDB()
cursor = mydb.cursor(buffered=True)
tries += 1
if not parms == None:
cursor.execute(sql, parms)
else:
cursor.execute(sql)
id = cursor.rowcount
mydb.commit()
break
except Exception as ex:
error_msg = str(ex)
if "not available" in str(ex) or "bytearray index" in str(ex):
time.sleep(0.120)
else:
print("executeSql - {}".format(ex))
for strTrace in traceback.format_stack():
print(strTrace)
print("Parms: {}".format(parms))
print("SQL: {}".format(sql))
print("MySQL Tries: {}".format(tries))
if cursor != None:
cursor.close()
mydb.close()
if tries > 4:
for strTrace in traceback.format_stack():
print(strTrace)
print("mysql failed to execute - {}".format(error_msg))
print(sql)
print()
return id
def executeQuery(self, sql, parms=None):
results = None
mydb = self.getDB()
#mydb.reset_session()
cursor = mydb.cursor(buffered=True, dictionary=True)
if not parms == None:
cursor.execute(sql, parms)
else:
cursor.execute(sql)
results = cursor.fetchall()
mydb.commit()
cursor.close()
mydb.close()
return results
if __name__ == "__main__":
settingsFile = "Settings.ini"
db = InteractiveBrokersDB()
db.loadSettings(settingsFile)
testCase = 1
if testCase == 0:
sql = "INSERT INTO `interactive_brokers`.`symbols` "
sql += "( "
sql += "`name`, "
sql += "`symbol`, "
sql += "`active`) "
sql += "VALUES "
sql += "( "
sql += "'Microsoft', "
sql += "'MSFT', "
sql += "true); "
results = db.executeSql(sql)
if testCase == 1:
sql = "Select * from `interactive_brokers`.`symbols`;"
results = db.executeQuery(sql)
for row in results:
print("row: {}".format(row))
print("\n\nFinished.")
pass