Skip to content

Commit

Permalink
NiFi: added sample script for flowfile content fields to attr conv.
Browse files Browse the repository at this point in the history
  • Loading branch information
vladd-bit committed Nov 29, 2023
1 parent 72c1ce0 commit 3fde2b6
Showing 1 changed file with 104 additions and 0 deletions.
104 changes: 104 additions & 0 deletions nifi/user-scripts/flowfile_to_attribute_with_content.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import traceback
import io
import json
import avro
import copy

# jython packages
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback, OutputStreamCallback
import org.apache.nifi.logging.ComponentLog
from org.python.core.util import StringUtil

# other packages, normally available to python 2.7
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

"""
This script converts a flow file with avro/json content with all the record's fields that has to flow file attributes.
"""

global flowFile

global OPERATION_MODE
global FIELD_NAMES_TO_KEEP_AS_CONTENT

flowFile = session.get()

output_flowFiles = []

class WriteContentCallback(OutputStreamCallback):
def __init__(self, content):
self.content_text = content

def process(self, outputStream):
try:
outputStream.write(StringUtil.toBytes(self.content_text))
except:
traceback.print_exc(file=sys.stdout)
raise

class PyStreamCallback(StreamCallback):
def __init__(self):
pass

def process(self, inputStream, outputStream):
bytes_arr = IOUtils.toByteArray(inputStream)
bytes_io = io.BytesIO(bytes_arr)


if OPERATION_MODE == "json":
records = json.loads(bytes_io.read())
elif OPERATION_MODE == "avro":
records = DataFileReader(bytes_io, DatumReader())

for record in records:
metadata = copy.deepcopy(records.meta)
schema_from_file = json.loads(metadata["avro.schema"])

#with open("/opt/nifi/user-scripts/tmp/test_schmea.avsc", "w+") as f:
# f.write(str(record))

new_flow_file = session.create(flowFile)

for k, v in record.iteritems():
new_flow_file = session.putAttribute(new_flow_file, k, str(v))
if FIELD_NAMES_TO_KEEP_AS_CONTENT is not "" and k == FIELD_NAMES_TO_KEEP_AS_CONTENT:
new_flow_file = session.write(new_flow_file, WriteContentCallback(str(v).encode("UTF-8")))
output_flowFiles.append(new_flow_file)

if type(records) == DataFileReader:
records.close()


if flowFile != None:
# possible values:
# - avro, convert avro flowfile fields to flowfile attributes, keep only what is needed as content
# - json, convert json flowfile fields to attributes
OPERATION_MODE_PROPERTY_NAME = "operation_mode"

FIELD_NAMES_TO_KEEP_AS_CONTENT_PROPERTY_NAME = "keep_fields_as_content"

FIELD_NAMES_TO_KEEP_AS_CONTENT = []

OPERATION_MODE = str(context.getProperty(OPERATION_MODE_PROPERTY_NAME)).lower()

if OPERATION_MODE == "" or OPERATION_MODE is None:
OPERATION_MODE = "avro"

FIELD_NAMES_TO_KEEP_AS_CONTENT = str(context.getProperty(FIELD_NAMES_TO_KEEP_AS_CONTENT_PROPERTY_NAME)).lower()

try:
#flowFile_content = IOUtils.toString(session.read(flowFile), StandardCharsets.UTF_8)
flowFile = session.write(flowFile, PyStreamCallback())

session.transfer(output_flowFiles, REL_SUCCESS)
session.remove(flowFile)
except Exception as exception:
log.error(traceback.format_exc())
session.transfer(flowFile, REL_FAILURE)

else:
session.transfer(flowFile, REL_FAILURE)

0 comments on commit 3fde2b6

Please sign in to comment.