Skip to content

Commit

Permalink
NiFi scripts: ann manager updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
vladd-bit committed Dec 16, 2024
1 parent b863045 commit dd13201
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
11 changes: 8 additions & 3 deletions nifi/user-scripts/annotation_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,14 @@ def main():
output_stream = []
_sqlite_connection_rw = create_connection(db_file_path, read_only_mode=False)


_cursor = _sqlite_connection_ro.cursor() if _sqlite_connection_ro else _sqlite_connection_rw.cursor()

for record in records:
if OPERATION_MODE == "check":
document_id = str(record[DOCUMENT_ID_FIELD_NAME])
query = "SELECT id, elasticsearch_id FROM annotations WHERE elasticsearch_id LIKE '%" + document_id + "%' LIMIT 1"
result = connect_and_query(query, db_file_path, sqlite_connection=_sqlite_connection_ro, keep_conn_open=True)
result = connect_and_query(query, db_file_path, sqlite_connection=_sqlite_connection_ro, cursor=_cursor, keep_conn_open=True)

if len(result) < 1:
output_stream["content"].append(record)
Expand All @@ -91,14 +94,15 @@ def main():
document_id = str(record["meta." + DOCUMENT_ID_FIELD_NAME])
nlp_id = str(record["nlp.id"])
query = "INSERT OR REPLACE INTO annotations (elasticsearch_id) VALUES (" + '"' + document_id + "_" + nlp_id + '"' + ")"
result = connect_and_query(query, db_file_path, sqlite_connection=_sqlite_connection_rw, sql_script_mode=True, keep_conn_open=True)
result = connect_and_query(query, db_file_path, sqlite_connection=_sqlite_connection_rw, sql_script_mode=True, cursor=_cursor, keep_conn_open=True)
output_stream.append(record)

if _cursor is not None:
_cursor.close()
if _sqlite_connection_ro is not None:
_sqlite_connection_ro.close()
if _sqlite_connection_rw is not None:
_sqlite_connection_rw.close()

except Exception as exception:
time = datetime.datetime.now()
with open(log_file_path, "a+") as log_file:
Expand All @@ -107,4 +111,5 @@ def main():

return output_stream


sys.stdout.write(json.dumps(main()))
13 changes: 7 additions & 6 deletions nifi/user-scripts/utils/sqlite_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sqlite3


def connect_and_query(query: str, db_file_path: str, sqlite_connection: sqlite3.Connection = None, sql_script_mode: bool = False, keep_conn_open=False) -> List:
def connect_and_query(query: str, db_file_path: str, sqlite_connection: sqlite3.Connection = None, cursor: sqlite3.Cursor = None, sql_script_mode: bool = False, keep_conn_open=False) -> List:
""" Executes whatever query.
Args:
Expand All @@ -20,24 +20,25 @@ def connect_and_query(query: str, db_file_path: str, sqlite_connection: sqlite3.
result = []

try:
if sqlite_connection is not None:
sqlite_connection = sqlite_connection
else:
if sqlite_connection is None:
sqlite_connection = create_connection(db_file_path)

cursor = sqlite_connection.cursor()
if cursor is None:
cursor = sqlite_connection.cursor()

if not sql_script_mode:
cursor.execute(query)
result = cursor.fetchall()
else:
cursor.executescript(query)
sqlite_connection.commit()
cursor.close()
except sqlite3.Error as error:
raise sqlite3.Error(error)
finally:
if sqlite_connection and not keep_conn_open:
sqlite_connection.close()
if cursor and not keep_conn_open:
cursor.close()

return result

Expand Down

0 comments on commit dd13201

Please sign in to comment.