Skip to content

Commit

Permalink
Fix to fix to Insert (!)
Browse files Browse the repository at this point in the history
  • Loading branch information
sylvainhalle committed Nov 10, 2023
1 parent 364501b commit 80f0eb7
Showing 1 changed file with 14 additions and 71 deletions.
85 changes: 14 additions & 71 deletions Core/src/ca/uqac/lif/cep/tmf/Insert.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,29 +104,16 @@ public Object getState()
return m_sentPad;
}

public Pullable getPullableOutput(int index)
{
if (m_outputPullables[index] == null)
{
m_outputPullables[index] = new InsertPullable(index);
}
return m_outputPullables[index];
}

/**
* A {@link Pullable} object specific to the behavior of {@link Insert}.
* It overrides the default behavior of {@link SynchronousProcessor}'s
* pullable object, which first pulls an input event from each input pipe
* before calling {@code compute}. In the case of {@link Insert}, input
* pipes are not pulled if the pad has not been emitted yet.
* A {@link Pullable} object that does not pull events from upstream before
* the events to insert have been emitted. This makes it possible to connect
* the output of a processor to a path that leads back to one of its inputs,
* provided that an {@link Insert} processor lies somewhere on that path.
*
* @since 0.11.2
*/
protected class InsertPullable extends OutputPullable
{
/**
* Creates a new instance of the pullable object.
* @param index The index of the output pipe this pullable is associated
* with
*/
public InsertPullable(int index)
{
super(index);
Expand All @@ -135,63 +122,19 @@ public InsertPullable(int index)
@Override
public boolean hasNext()
{
if (m_sentPad)
{
return super.hasNext();
}
for (int i = 0; i < m_times; i++)
if (!m_sentPad)
{
for (int j = 0; j < m_pad.length; j++)
for (int i = 0; i < m_times; i++)
{
m_outputQueues[j].add(m_pad[j]);
for (int j = 0; j < m_pad.length; j++)
{
m_outputQueues[j].add(m_pad[j]);
}
}
m_sentPad = true;
return true;
}
return true;
return super.hasNext();
}
}

@Override
public Pullable getPullableOutput(int index)
{
if (m_outputPullables[index] == null)
{
m_outputPullables[index] = new InsertPullable(index);
}
return m_outputPullables[index];
}

/**
* A {@link Pullable} object that does not pull events from upstream before
* the events to insert have been emitted. This makes it possible to connect
* the output of a processor to a path that leads back to one of its inputs,
* provided that an {@link Insert} processor lies somewhere on that path.
*
* @since 0.11.2
*/
protected class InsertPullable extends OutputPullable
{
public InsertPullable(int index)
{
super(index);
}

@Override
public boolean hasNext()
{
if (!m_sentPad)
{
for (int i = 0; i < m_times; i++)
{
for (int j = 0; j < m_pad.length; j++)
{
m_outputQueues[j].add(m_pad[j]);
}
}
m_sentPad = true;
return true;
}
return super.hasNext();
}
}
}

0 comments on commit 80f0eb7

Please sign in to comment.