Skip to content

Commit

Permalink
NiFi: ann manager script updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
vladd-bit committed May 11, 2024
1 parent b1872ac commit af43087
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
9 changes: 5 additions & 4 deletions nifi/user-scripts/annotation_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ def main():
records = json_data_records["content"]

# if we are parsing an annotation only (post-NLP)
if type(records) == dict:
if type(records) is dict:
records = [records]
output_stream = []

last_doc_id = ""
inserted_doc_ids = []

for record in records:
if OPERATION_MODE == "check":
document_id = str(record[DOCUMENT_ID_FIELD_NAME])
Expand All @@ -82,10 +83,10 @@ def main():
elif OPERATION_MODE == "insert":
document_id = str(record["meta." + DOCUMENT_ID_FIELD_NAME])

if last_doc_id != document_id:
if document_id not in inserted_doc_ids:
query = "INSERT OR REPLACE INTO annotations (elasticsearch_id) VALUES (" + '"' + document_id + '"' + ")"
result = connect_and_query(query, db_file_path, sql_script_mode=True)

inserted_doc_ids.append(document_id)
if len(result) == 0:
output_stream = record

Expand Down
18 changes: 14 additions & 4 deletions nifi/user-scripts/utils/sqlite_query.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sqlite3

def connect_and_query(query, db_file_path, sql_script_mode=False):
def connect_and_query(query: str, db_file_path: str, sql_script_mode: bool = False):
"""
Executes whatever query.
Expand Down Expand Up @@ -28,12 +28,22 @@ def connect_and_query(query, db_file_path, sql_script_mode=False):

return result

def check_db_exists(table_name, db_file_path):
def check_db_exists(table_name: str, db_file_path: str):
query = "PRAGMA table_info(" + table_name + ");"
return connect_and_query(query=query, db_file_path=db_file_path)

def create_db_from_file(sqlite_file_path, db_file_path):
def create_db_from_file(sqlite_file_path: str, db_file_path: str) -> sqlite3.Cursor:
"""
Args:
sqlite_file_path (str): sqlite db folder
db_file_path (str): sqlite db file name
Returns:
sqlite3.Cursor: result of query
"""
query = ""
with open(sqlite_file_path, mode="r") as sql_file:
query = sql_file.read()
return connect_and_query(query=query, db_file_path=db_file_path, sql_script_mode=True)
return connect_and_query(query=query, db_file_path=db_file_path,
sql_script_mode=True)

0 comments on commit af43087

Please sign in to comment.