diff --git a/HAProxy.StreamProcessingOffload.Agent.Tests/FragmentCatalogueTests.cs b/HAProxy.StreamProcessingOffload.Agent.Tests/FragmentCatalogueTests.cs index d6a0daa..3c4a490 100644 --- a/HAProxy.StreamProcessingOffload.Agent.Tests/FragmentCatalogueTests.cs +++ b/HAProxy.StreamProcessingOffload.Agent.Tests/FragmentCatalogueTests.cs @@ -75,5 +75,24 @@ public void Pop_WhenCalled_DeletesKeyAndData() string result2 = Encoding.ASCII.GetString(resultBytes2); Assert.AreEqual("b", result2); } + + [Test] + public void Discard_WhenCalled_DeletesKeyAndData() + { + // arrange + var catalogue = new FragmentCatalogue(); + byte[] b1 = Encoding.ASCII.GetBytes("a"); + byte[] b2 = Encoding.ASCII.GetBytes("b"); + + // act + catalogue.Push(1, 1, b1); + catalogue.Discard(1, 1); // discard a + catalogue.Push(1, 1, b2); + + // assert + byte[] resultBytes2 = catalogue.Pop(1, 1); // b + string result2 = Encoding.ASCII.GetString(resultBytes2); + Assert.AreEqual("b", result2); + } } } \ No newline at end of file diff --git a/HAProxy.StreamProcessingOffload.Agent/FragmentCatalogue.cs b/HAProxy.StreamProcessingOffload.Agent/FragmentCatalogue.cs index ba043dd..067882c 100644 --- a/HAProxy.StreamProcessingOffload.Agent/FragmentCatalogue.cs +++ b/HAProxy.StreamProcessingOffload.Agent/FragmentCatalogue.cs @@ -55,6 +55,14 @@ public byte[] Pop(long streamId, long frameId) } } + public void Discard(long streamId, long frameId) + { + lock (this.lockObj) + { + DeleteKeyAndData(streamId, frameId); + } + } + private void DeleteKeyAndData(long streamId, long frameId) { string key = CombineStreamIdAndFrameId(streamId, frameId); diff --git a/HAProxy.StreamProcessingOffload.Agent/FrameProcessor.cs b/HAProxy.StreamProcessingOffload.Agent/FrameProcessor.cs index dd61074..3fc5037 100644 --- a/HAProxy.StreamProcessingOffload.Agent/FrameProcessor.cs +++ b/HAProxy.StreamProcessingOffload.Agent/FrameProcessor.cs @@ -25,7 +25,7 @@ public class FrameProcessor : IFrameProcessor /// public FrameProcessor() { - this.LogFunc = (s) => {}; + this.LogFunc = (s) => { }; this.MaxFrameSize = 16380; this.agentCapabilities = new string[] { "fragmentation" }; } @@ -89,7 +89,8 @@ public void HandleStream(Stream stream, Func> not { closeConnection = true; } - else{ + else + { string haproxyCapabilitiesString = (string)((KeyValueListPayload)frame.Payload).KeyValueItems.First(item => item.Key == "capabilities").Value.Value; haproxyCapabilities = haproxyCapabilitiesString.Split(','); @@ -134,47 +135,55 @@ public void HandleStream(Stream stream, Func> not } break; case FrameType.Unset: - // The Unset frame continues the data, but we only need - // to append its payload, not its metadata - fragments.Push( - frame.Metadata.StreamId.Value, - frame.Metadata.FrameId.Value, - frame.Payload.Bytes); - - if (frame.Metadata.Flags.Fin) + if (frame.Metadata.Flags.Abort) { - byte[] data = fragments.Pop(frame.Metadata.StreamId.Value, frame.Metadata.FrameId.Value); - Frame newNotifyFrame = ParseFrame(data); - newNotifyFrame.Metadata.Flags.Fin = true; + fragments.Discard(frame.Metadata.StreamId.Value, frame.Metadata.FrameId.Value); + sendAgentDisconnect = true; + } + else + { + // The Unset frame continues the data, but we only need + // to append its payload, not its metadata + fragments.Push( + frame.Metadata.StreamId.Value, + frame.Metadata.FrameId.Value, + frame.Payload.Bytes); - if (this.EnableLogging) + if (frame.Metadata.Flags.Fin) { - this.LogFunc(newNotifyFrame.ToString()); - } + byte[] data = fragments.Pop(frame.Metadata.StreamId.Value, frame.Metadata.FrameId.Value); + Frame newNotifyFrame = ParseFrame(data); + newNotifyFrame.Metadata.Flags.Fin = true; - var actions = notifyHandler((NotifyFrame)newNotifyFrame); + if (this.EnableLogging) + { + this.LogFunc(newNotifyFrame.ToString()); + } - var ackFrame = new AckFrame( - frame.Metadata.StreamId.Value, - frame.Metadata.FrameId.Value, - actions ?? new List()); + var actions = notifyHandler((NotifyFrame)newNotifyFrame); - // fragment if necessary - if (haproxyCapabilities.Contains("fragmentation")) - { - var fragmentedFrames = ((AckFrame)ackFrame).FragmentFrame(this.MaxFrameSize); + var ackFrame = new AckFrame( + frame.Metadata.StreamId.Value, + frame.Metadata.FrameId.Value, + actions ?? new List()); - foreach (var f in fragmentedFrames) + // fragment if necessary + if (haproxyCapabilities.Contains("fragmentation")) { - responseFrames.Enqueue(f); + var fragmentedFrames = ((AckFrame)ackFrame).FragmentFrame(this.MaxFrameSize); + + foreach (var f in fragmentedFrames) + { + responseFrames.Enqueue(f); + } + } + else + { + responseFrames.Enqueue(ackFrame); } - } - else - { - responseFrames.Enqueue(ackFrame); - } - sendAgentDisconnect = true; + sendAgentDisconnect = true; + } } break; } @@ -307,7 +316,7 @@ private Frame NewFrameFromType(FrameType frameType, Status status = Status.Norma { Frame frame; - switch(frameType) + switch (frameType) { case FrameType.HaproxyDisconnect: frame = new HaproxyDisconnectFrame();