From 26744886030e7a73a6080326cda609157d523a8d Mon Sep 17 00:00:00 2001 From: Matt Casters Date: Thu, 23 Nov 2023 12:54:50 +0100 Subject: [PATCH] Fix for issue #3056 : NullPointerException on pipelines --- .../hop/pipeline/transform/BaseTransform.java | 58 +++++++++++++++++-- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java index a53cedd91af..4cbe10c34f0 100644 --- a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java +++ b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java @@ -1577,6 +1577,7 @@ private Object[] handleGetRow() throws HopException { row = inputRowSet.getRowImmediate(); } if (row != null) { + obtainInputRowMeta(row, inputRowSet); incrementLinesRead(); } } else { @@ -1629,6 +1630,7 @@ private Object[] handleGetRow() throws HopException { row = inputRowSet.getRowWait(waitingTime.get(), TimeUnit.MILLISECONDS); boolean timeout = false; if (row != null) { + obtainInputRowMeta(row, inputRowSet); incrementLinesRead(); blockPointer++; waitingTime.reset(); @@ -1663,6 +1665,7 @@ private Object[] handleGetRow() throws HopException { inputRowSetsLock.writeLock().unlock(); } } else { + obtainInputRowMeta(row, inputRowSet); incrementLinesRead(); } } else { @@ -1691,17 +1694,12 @@ private Object[] handleGetRow() throws HopException { nextInputStream(); inputRowSet = currentInputStream(); row = getRowFrom(inputRowSet); + obtainInputRowMeta(row, inputRowSet); } } finally { inputRowSetsLock.readLock().unlock(); } - // Also set the meta data on the first occurrence. - // or if prevTransforms.length > 1 inputRowMeta can be changed - if (inputRowMeta == null || prevTransforms.length > 1) { - inputRowMeta = inputRowSet.getRowMeta(); - } - if (row != null) { // OK, before we return the row, let's see if we need to check on mixing // row compositions... @@ -1721,6 +1719,54 @@ private Object[] handleGetRow() throws HopException { return row; } + /** + * The first non-null row we get we'll lock in the row metadata. + * For scenarios with multiple inputs, we move the metadata around (e.g. Merge Rows). + * + * @param row The input row (not null!) + * @param inputRowSet The row set we're reading from right now + */ + private void obtainInputRowMeta(Object[] row, IRowSet inputRowSet) { + if (row==null) { + return; + } + + // Set the row metadata on the first occurrence. + // If prevTransforms.length > 1, inputRowMeta can be changed as well. + // + if (inputRowMeta == null || prevTransforms.length > 1) { + inputRowMeta = inputRowSet.getRowMeta(); + } + + // Extra sanity check + // + if (row!=null && inputRowMeta == null) { + int nr = 0; + for (IRowSet rowSet : inputRowSets) { + log.logMinimal( + "===> Input row set #" + + nr + + ", done? " + + rowSet.isDone() + + ", size=" + + rowSet.size() + + ", metadata? " + + (rowSet.getRowMeta() != null)); + nr++; + } + log.logMinimal("===> Current input row set nr=" + currentInputRowSetNr); + + throw new RuntimeException( + "No row metadata obtained for row " + + Arrays.toString(row) + + Const.CR + + "inputRowSet.getRowMeta()=" + + inputRowSet.getRowMeta() + + ", inputRowSets.size()=" + + inputRowSets.size()); + } + } + /** * IRowHandler controls how getRow/putRow are handled. The default IRowHandler will simply call * {@link #handleGetRow()} and {@link #handlePutRow(IRowMeta, Object[])}