Skip to content

Commit

Permalink
Implement Abort #3
Browse files Browse the repository at this point in the history
  • Loading branch information
cortex93 committed May 19, 2020
1 parent a25c6ac commit e8e644a
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,24 @@ public void Pop_WhenCalled_DeletesKeyAndData()
string result2 = Encoding.ASCII.GetString(resultBytes2);
Assert.AreEqual("b", result2);
}

[Test]
public void Discard_WhenCalled_DeletesKeyAndData()
{
// arrange
var catalogue = new FragmentCatalogue();
byte[] b1 = Encoding.ASCII.GetBytes("a");
byte[] b2 = Encoding.ASCII.GetBytes("b");

// act
catalogue.Push(1, 1, b1);
catalogue.Discard(1, 1); // discard a
catalogue.Push(1, 1, b2);

// assert
byte[] resultBytes2 = catalogue.Pop(1, 1); // b
string result2 = Encoding.ASCII.GetString(resultBytes2);
Assert.AreEqual("b", result2);
}
}
}
8 changes: 8 additions & 0 deletions HAProxy.StreamProcessingOffload.Agent/FragmentCatalogue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ public byte[] Pop(long streamId, long frameId)
}
}

public void Discard(long streamId, long frameId)
{
lock (this.lockObj)
{
DeleteKeyAndData(streamId, frameId);
}
}

private void DeleteKeyAndData(long streamId, long frameId)
{
string key = CombineStreamIdAndFrameId(streamId, frameId);
Expand Down
77 changes: 43 additions & 34 deletions HAProxy.StreamProcessingOffload.Agent/FrameProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class FrameProcessor : IFrameProcessor
/// </summary>
public FrameProcessor()
{
this.LogFunc = (s) => {};
this.LogFunc = (s) => { };
this.MaxFrameSize = 16380;
this.agentCapabilities = new string[] { "fragmentation" };
}
Expand Down Expand Up @@ -89,7 +89,8 @@ public void HandleStream(Stream stream, Func<NotifyFrame, IList<SpoeAction>> not
{
closeConnection = true;
}
else{
else
{
string haproxyCapabilitiesString =
(string)((KeyValueListPayload)frame.Payload).KeyValueItems.First(item => item.Key == "capabilities").Value.Value;
haproxyCapabilities = haproxyCapabilitiesString.Split(',');
Expand Down Expand Up @@ -134,47 +135,55 @@ public void HandleStream(Stream stream, Func<NotifyFrame, IList<SpoeAction>> not
}
break;
case FrameType.Unset:
// The Unset frame continues the data, but we only need
// to append its payload, not its metadata
fragments.Push(
frame.Metadata.StreamId.Value,
frame.Metadata.FrameId.Value,
frame.Payload.Bytes);

if (frame.Metadata.Flags.Fin)
if (frame.Metadata.Flags.Abort)
{
byte[] data = fragments.Pop(frame.Metadata.StreamId.Value, frame.Metadata.FrameId.Value);
Frame newNotifyFrame = ParseFrame(data);
newNotifyFrame.Metadata.Flags.Fin = true;
fragments.Discard(frame.Metadata.StreamId.Value, frame.Metadata.FrameId.Value);
sendAgentDisconnect = true;
}
else
{
// The Unset frame continues the data, but we only need
// to append its payload, not its metadata
fragments.Push(
frame.Metadata.StreamId.Value,
frame.Metadata.FrameId.Value,
frame.Payload.Bytes);

if (this.EnableLogging)
if (frame.Metadata.Flags.Fin)
{
this.LogFunc(newNotifyFrame.ToString());
}
byte[] data = fragments.Pop(frame.Metadata.StreamId.Value, frame.Metadata.FrameId.Value);
Frame newNotifyFrame = ParseFrame(data);
newNotifyFrame.Metadata.Flags.Fin = true;

var actions = notifyHandler((NotifyFrame)newNotifyFrame);
if (this.EnableLogging)
{
this.LogFunc(newNotifyFrame.ToString());
}

var ackFrame = new AckFrame(
frame.Metadata.StreamId.Value,
frame.Metadata.FrameId.Value,
actions ?? new List<SpoeAction>());
var actions = notifyHandler((NotifyFrame)newNotifyFrame);

// fragment if necessary
if (haproxyCapabilities.Contains("fragmentation"))
{
var fragmentedFrames = ((AckFrame)ackFrame).FragmentFrame(this.MaxFrameSize);
var ackFrame = new AckFrame(
frame.Metadata.StreamId.Value,
frame.Metadata.FrameId.Value,
actions ?? new List<SpoeAction>());

foreach (var f in fragmentedFrames)
// fragment if necessary
if (haproxyCapabilities.Contains("fragmentation"))
{
responseFrames.Enqueue(f);
var fragmentedFrames = ((AckFrame)ackFrame).FragmentFrame(this.MaxFrameSize);

foreach (var f in fragmentedFrames)
{
responseFrames.Enqueue(f);
}
}
else
{
responseFrames.Enqueue(ackFrame);
}
}
else
{
responseFrames.Enqueue(ackFrame);
}

sendAgentDisconnect = true;
sendAgentDisconnect = true;
}
}
break;
}
Expand Down Expand Up @@ -307,7 +316,7 @@ private Frame NewFrameFromType(FrameType frameType, Status status = Status.Norma
{
Frame frame;

switch(frameType)
switch (frameType)
{
case FrameType.HaproxyDisconnect:
frame = new HaproxyDisconnectFrame();
Expand Down

0 comments on commit e8e644a

Please sign in to comment.