Skip to content

Commit

Permalink
NiFi scripts: json to avro parser logging update.
Browse files Browse the repository at this point in the history
  • Loading branch information
vladd-bit committed Jan 23, 2025
1 parent 93abf2c commit 3037bba
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
Empty file.
25 changes: 20 additions & 5 deletions nifi/user-scripts/parse-json-to-avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,22 @@

from pydoc import locate

records_stream = json.loads(sys.stdin.read())
from datetime import datetime as time

log_file_path = "/opt/nifi/user-scripts/logs/parse_json/parse-json-to-avro_file_"

time = str(time.now().timestamp())

input_stream = sys.stdin.read()
records_stream = None

try:
records_stream = json.loads(input_stream)
except Exception as e:
_log_file_path = log_file_path + time + ".log"
with open(_log_file_path, "a+") as log_file:
log_file.write(input_stream)


schema = {
"type": "record",
Expand All @@ -24,16 +39,16 @@
fields = list(records_stream[0].keys())

for field_name in fields:
schema["fields"].append({"name": field_name, "type": ["null", { "type" : "long", "logicalType" : "timestamp-millis"}, "string"]})
schema["fields"].append({"name": field_name, "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}, "string"]})

avro_schema = avro.schema.parse(json.dumps(schema))
avro_schema = avro.schema.parse(json.dumps(schema))

file_id = str(uuid.uuid4().hex)

tmp_file_path = os.path.join("/opt/nifi/user-scripts/tmp/" + file_id + ".avro")

with open(tmp_file_path, mode="wb+") as tmp_file:
writer = DataFileWriter(tmp_file, DatumWriter(), avro_schema)
writer = DataFileWriter(tmp_file, DatumWriter(), avro_schema)

for _record in records_stream:
writer.append(_record)
Expand All @@ -50,4 +65,4 @@
if os.path.isfile(tmp_file_path):
os.remove(tmp_file_path)

sys.stdout.buffer.write(tmp_file_data)
sys.stdout.buffer.write(tmp_file_data)

0 comments on commit 3037bba

Please sign in to comment.