Skip to content

Commit

Permalink
Fix for issue apache#3056 : NullPointerException on pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
mattcasters committed Nov 23, 2023
1 parent 4c35db5 commit 2674488
Showing 1 changed file with 52 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,7 @@ private Object[] handleGetRow() throws HopException {
row = inputRowSet.getRowImmediate();
}
if (row != null) {
obtainInputRowMeta(row, inputRowSet);
incrementLinesRead();
}
} else {
Expand Down Expand Up @@ -1629,6 +1630,7 @@ private Object[] handleGetRow() throws HopException {
row = inputRowSet.getRowWait(waitingTime.get(), TimeUnit.MILLISECONDS);
boolean timeout = false;
if (row != null) {
obtainInputRowMeta(row, inputRowSet);
incrementLinesRead();
blockPointer++;
waitingTime.reset();
Expand Down Expand Up @@ -1663,6 +1665,7 @@ private Object[] handleGetRow() throws HopException {
inputRowSetsLock.writeLock().unlock();
}
} else {
obtainInputRowMeta(row, inputRowSet);
incrementLinesRead();
}
} else {
Expand Down Expand Up @@ -1691,17 +1694,12 @@ private Object[] handleGetRow() throws HopException {
nextInputStream();
inputRowSet = currentInputStream();
row = getRowFrom(inputRowSet);
obtainInputRowMeta(row, inputRowSet);
}
} finally {
inputRowSetsLock.readLock().unlock();
}

// Also set the meta data on the first occurrence.
// or if prevTransforms.length > 1 inputRowMeta can be changed
if (inputRowMeta == null || prevTransforms.length > 1) {
inputRowMeta = inputRowSet.getRowMeta();
}

if (row != null) {
// OK, before we return the row, let's see if we need to check on mixing
// row compositions...
Expand All @@ -1721,6 +1719,54 @@ private Object[] handleGetRow() throws HopException {
return row;
}

/**
* The first non-null row we get we'll lock in the row metadata.
* For scenarios with multiple inputs, we move the metadata around (e.g. Merge Rows).
*
* @param row The input row (not null!)
* @param inputRowSet The row set we're reading from right now
*/
private void obtainInputRowMeta(Object[] row, IRowSet inputRowSet) {
if (row==null) {
return;
}

// Set the row metadata on the first occurrence.
// If prevTransforms.length > 1, inputRowMeta can be changed as well.
//
if (inputRowMeta == null || prevTransforms.length > 1) {
inputRowMeta = inputRowSet.getRowMeta();
}

// Extra sanity check
//
if (row!=null && inputRowMeta == null) {
int nr = 0;
for (IRowSet rowSet : inputRowSets) {
log.logMinimal(
"===> Input row set #"
+ nr
+ ", done? "
+ rowSet.isDone()
+ ", size="
+ rowSet.size()
+ ", metadata? "
+ (rowSet.getRowMeta() != null));
nr++;
}
log.logMinimal("===> Current input row set nr=" + currentInputRowSetNr);

throw new RuntimeException(
"No row metadata obtained for row "
+ Arrays.toString(row)
+ Const.CR
+ "inputRowSet.getRowMeta()="
+ inputRowSet.getRowMeta()
+ ", inputRowSets.size()="
+ inputRowSets.size());
}
}

/**
* IRowHandler controls how getRow/putRow are handled. The default IRowHandler will simply call
* {@link #handleGetRow()} and {@link #handlePutRow(IRowMeta, Object[])}
Expand Down

0 comments on commit 2674488

Please sign in to comment.