Skip to content

Commit

Permalink
wip: SQL results
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Kicinski <[email protected]>
  • Loading branch information
kuba-moo committed Jun 7, 2024
1 parent 481f8e9 commit a224013
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 6 deletions.
66 changes: 66 additions & 0 deletions contest/backend/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@


from flask import Flask
from flask import Response
from flask import request
import json
import psycopg2
import couchdb
import os
import re
import datetime


Expand All @@ -16,6 +20,10 @@
couch = couchdb.Server(f'http://{user}:{pwd}@127.0.0.1:5984')
res_db = couch["results"]

db_name = os.getenv('DB_NAME')
psql = psycopg2.connect(database=db_name)
psql.autocommit = True


def branches_to_rows(br_cnt):
data = res_db.view('branch/rows', None,
Expand Down Expand Up @@ -63,3 +71,61 @@ def results():
print(f"Query for {br_cnt} branches, {need_rows} records took: {str(t3-t1)} ({str(t2-t1)}+{str(t3-t2)})")

return data


def branches_to_rows2(br_cnt):
global psql

cnt = 0
with psql.cursor() as cur:
q = f"SELECT branch,count(*) FROM results GROUP BY branch ORDER BY branch DESC LIMIT {br_cnt}"
cur.execute(q)
for r in cur.fetchall():
cnt += r[1]
return cnt


@app.route('/results2')
def results2():
global psql

br_name = request.args.get('branch-name')
if br_name:
if re.match(r'^[\w-_ ]+$', br_name) is None:
return {}

t1 = datetime.datetime.now()
with psql.cursor() as cur:
cur.execute(f"SELECT unparsed FROM results WHERE branch = '{br_name}' LIMIT 100")
rows = [json.loads(r[0]) for r in cur.fetchall()]
t2 = datetime.datetime.now()
print("Query for exact branch took: ", str(t2-t1))
return rows

t1 = datetime.datetime.now()

br_cnt = request.args.get('branches')
try:
br_cnt = int(br_cnt)
except:
br_cnt = None
if not br_cnt:
br_cnt = 10

raw = request.args.get('raw')

need_rows = branches_to_rows2(br_cnt)
t2 = datetime.datetime.now()
with psql.cursor() as cur:
cur.execute(f"SELECT unparsed FROM results ORDER BY branch DESC LIMIT {need_rows}")
if raw:
rows = "[" + ",".join([r[0] for r in cur.fetchall()]) + "]"
else:
rows = [json.loads(r[0]) for r in cur.fetchall()]

t3 = datetime.datetime.now()
print(f"Query for {br_cnt} branches, {need_rows} records took: {str(t3-t1)} ({str(t2-t1)}+{str(t3-t2)})")

if raw:
return Response(rows, mimetype='application/json')
return rows
88 changes: 82 additions & 6 deletions contest/results-fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import datetime
import json
import os
import psycopg2
import requests
import time
import uuid
Expand All @@ -23,8 +24,9 @@
url_pfx=relative/within/server
combined=name-of-manifest.json
[db]
results-name=db-name
branches-name=db-name
db=db-name
results-name=table-name
branches-name=table-name
user=name
pwd=pass
"""
Expand All @@ -38,11 +40,18 @@ def __init__(self):
# "fetched" is more of a "need state rebuild"
self.fetched = True

self.tbl_res = self.config.get("db", "results-name", fallback="results")
self.tbl_brn = self.config.get("db", "branches-name", fallback="branches")

user = self.config.get("db", "user")
pwd = self.config.get("db", "pwd")
server = couchdb.Server(f'http://{user}:{pwd}@127.0.0.1:5984')
self.res_db = server[self.config.get("db", "results-name", fallback="results")]
self.brn_db = server[self.config.get("db", "branches-name", fallback="branches")]
self.res_db = server[self.tbl_res]
self.brn_db = server[self.tbl_brn]

db_name = self.config.get("db", "db")
self.psql_conn = psycopg2.connect(database=db_name)
self.psql_conn.autocommit = True

def _one(self, rows):
rows = list(rows)
Expand All @@ -51,13 +60,77 @@ def _one(self, rows):
return rows[0]

def get_branch(self, name):
try:
with self.psql_conn.cursor() as cur:
cur.execute(f"SELECT info FROM {self.tbl_brn} WHERE branch = '{name}'")
rows = cur.fetchall()
return json.loads(rows[0][0])
except Exception as e:
print("PSQL branch GET FAIL!")
print(e)

branch_info = self.brn_db.find({
'selector': {
'branch': name
}
})
return self._one(branch_info)

def psql_run_selector(self, cur, remote, run):
return cur.mogrify("WHERE branch = %s AND remote = %s AND executor = %s",
(run['branch'], remote["name"], run["executor"],)).decode('utf-8')

def psql_has_wip(self, remote, run):
with self.psql_conn.cursor() as cur:
cur.execute(f"SELECT branch FROM {self.tbl_res} " + self.psql_run_selector(cur, remote, run))
rows = cur.fetchall()
return len(rows) > 0

def insert_result_psql(self, cur, data):
arg = cur.mogrify("(%s,%s,%s,%s,%s,%s)", (data["branch"], data["remote"], data["executor"],
data["start"], data["end"], json.dumps(data)))
cur.execute(f"INSERT INTO {self.tbl_res} VALUES " + arg.decode('utf-8'))

def insert_wip_psql(self, remote, run, branch_info):
try:
if self.psql_has_wip(remote, run):
# no point, we have no interesting info to add
return

data = run.copy()
data["remote"] = remote["name"]
when = datetime.datetime.fromisoformat(branch_info['date'])
data["start"] = str(when)
when += datetime.timedelta(hours=2, minutes=58)
data["end"] = str(when)
data["results"] = None

with self.psql_conn.cursor() as cur:
self.insert_result_psql(cur, data)
print("PSQL WIP save success!")
except Exception as e:
print("PSQL WIP save FAIL!")
print(e)

def insert_real_psql(self, remote, run):
data = run.copy()
data["remote"] = remote["name"]

try:
with self.psql_conn.cursor() as cur:
if self.psql_has_wip(remote, run):
self.insert_result_psql(cur, data)
else:
vals = cur.mogrify("SET t_start = %s, t_end = %s, unparsed = %s",
(data["start"], data["end"], json.dumps(data))).decode('utf-8')
selector = self.psql_run_selector(cur, remote, run)
q = f"UPDATE {self.tbl_res} " + vals + selector
cur.execute(q)
print("PSQL FULL save success!")
except Exception as e:
print("PSQL FULL save FAIL!")
print(e)

def get_wip_row(self, remote, run):
rows = self.res_db.find({
'selector': {
Expand All @@ -71,10 +144,12 @@ def get_wip_row(self, remote, run):
return row

def insert_wip(self, remote, run):
existing = self.get_wip_row(remote, run)

branch_info = self.get_branch(run["branch"])

self.insert_wip_psql(remote, run, branch_info)

existing = self.get_wip_row(remote, run)

data = run.copy()
if existing:
data['_id'] = existing['_id']
Expand All @@ -91,6 +166,7 @@ def insert_wip(self, remote, run):
self.res_db.save(data)

def insert_real(self, remote, run):
self.insert_real_psql(remote, run)
existing = self.get_wip_row(remote, run)

data = run.copy()
Expand Down

0 comments on commit a224013

Please sign in to comment.