forked from jhm-/claimtracker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqlconnector.py
155 lines (134 loc) · 5.25 KB
/
sqlconnector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python
# Copyright (c) 2020 Andrew Carne <[email protected]>
# This file provides methods for connecting to an SQL database, reading a list of tenure IDs from a
# table, and updating/replacing tenure data in the same or another table.
#
# Currently it is hardcoded to require MS SQL Server.
#
# The input table specification is flexible, but the output table is currently hardcoded:
# Parcels_Audit
# Parcel_ID int NOT NULL PRIMARY KEY,
# Area_ha decimal(8,2) NULL,
# OwnerRegistration nvarchar(50) NOT NULL,
# RegistrationDate datetime NOT NULL,
# ParcelName nvarchar(20) NOT NULL,
# RegTitleNumber nvarchar(30) NOT NULL,
# NextDueDate datetime NULL,
# JurisdictionMajor_ID int NOT NULL
import sqlalchemy
import urllib
import arcweb_data
import time
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Float, func
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class DbDefinition:
user = ""
password = ""
address = ""
trusted_conn = False
driver = "{SQL Server Native Client 11.0}"
database = ""
def connection_string(self):
if self.trusted_conn:
params = urllib.parse.quote_plus("DRIVER=" + self.driver + ";" +
"SERVER=" + self.address + ";" +
"DATABASE=" + self.database + ";" +
"Trusted_Connection=yes")
return "mssql+pyodbc:///?odbc_connect={}".format(params)
else:
params = urllib.parse.quote_plus("DRIVER=" + self.driver + ";" +
"SERVER=" + self.address + ";" +
"DATABASE=" + self.database + ";" +
"UID=" + self.user + ";" +
"PWD=" + self.password)
return "mssql+pyodbc:///?odbc_connect={}".format(params)
class TableDefinition:
name = ""
keyCol = ""
jurisdictionCol = ""
required_cols = ["RegDate",
"Owner",
"Area_ha",
"ParcelName",
"RegTitleNumber",
"NextDueDate"]
column_map = dict()
def update_tenure(db: DbDefinition, inTable: TableDefinition, outTable: TableDefinition,
jurisdiction: str, operation=0):
"""
Updates tenure in an SQL database
:param db: input database specification
:param out_dbdef: output database specification
:param inTable: input table specification
:param outTable: output table specification
:param jurisdiction: jurisdiction code ("NV", "YK", "NWT")
:param operation: 0 = replace data in outTable; 1 = append data to outTable
:return:
"""
db_engine = sqlalchemy.create_engine(db.connection_string())
conn = db_engine.connect()
if jurisdiction == "NV":
data_func = arcweb_data.get_data_NV
jurisdiction_id = 4
elif jurisdiction == "YK":
data_func = arcweb_data.get_data_YK
jurisdiction_id = 1
elif jurisdiction == "NWT":
data_func = arcweb_data.get_data_NWT
jurisdiction_id = 2
elif jurisdiction == "NU":
data_func = arcweb_data.get_data_NU
jurisdiction_id = 3
elif jurisdiction == "BC":
data_func = arcweb_data.get_data_BC
jurisdiction_id = 7
else:
raise NotImplementedError
rows = conn.execute("SELECT " + inTable.keyCol + " FROM " + inTable.name + " WHERE " + inTable.jurisdictionCol +
"=" + str(jurisdiction_id))
tenure_list = [r[inTable.keyCol] for r in rows]
Session = sessionmaker(bind=db_engine)
s = Session()
if operation == 0:
conn.execute("DELETE FROM " + outTable.name)
i = 0
else:
i = s.query(func.max(AuditParcel.Parcel_ID)).scalar() + 1
print("Fetching data for " + str(len(tenure_list)) + " tenures...")
print("[.", end="")
start = 0
batch_size = 25
while start < len(tenure_list):
end = start + batch_size
if end > len(tenure_list):
end = len(tenure_list)
tenure_data = data_func(tenure_list[start:end])
for t in tenure_data:
parcel = AuditParcel()
parcel.Parcel_ID = i
parcel.Area_ha = t["Area_ha"]
parcel.OwnerRegistration = t["Owner"]
parcel.RegistrationDate = t["RegDate"]
parcel.RegTitleNumber = t["RegTitleNumber"]
parcel.NextDueDate = t["NextDueDate"]
parcel.ParcelName = t["ParcelName"]
parcel.JurisdictionMajor_ID = jurisdiction_id
s.add(parcel)
i = i + 1
s.commit()
time.sleep(0.1)
start = start + batch_size
print(".", end="")
print("]\nDone!")
class AuditParcel(Base):
__tablename__ = "Parcels_Audit"
Parcel_ID = Column(Integer, primary_key=True, autoincrement=False)
RegistrationDate = Column(DateTime)
OwnerRegistration = Column(String)
Area_ha = Column(Float)
ParcelName = Column(String)
RegTitleNumber = Column(String)
NextDueDate = Column(DateTime)
JurisdictionMajor_ID = Column(Integer)