Skip to content

Commit

Permalink
[MWB] - pub-sub spike - more poc code
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeclayton committed Jan 28, 2025
1 parent 8273130 commit c6d1202
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 1 deletion.
25 changes: 25 additions & 0 deletions src/modules/MouseWithoutBorders/App/Class/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
using ManagedCommon;
using Microsoft.PowerToys.Settings.UI.Library.Utilities;
using Microsoft.PowerToys.Telemetry;
using MouseWithoutBorders.Messaging;
using Newtonsoft.Json;
using StreamJsonRpc;

Expand All @@ -52,6 +53,9 @@ internal static class Program

public static bool ShowServiceModeErrorTooltip;

public static PacketProducer PacketProducer
=> Program.CreatePacketProducer();

[STAThread]
private static void Main()
{
Expand Down Expand Up @@ -229,6 +233,8 @@ private static void Main()

Application.Run(formScreen);

Program.PacketProducer.Queue.Complete();

etwTrace?.Dispose();
}
catch (Exception e)
Expand All @@ -237,6 +243,25 @@ private static void Main()
}
}

private static PacketProducer CreatePacketProducer()
{
// make a producer that we'll use to push messages onto a queue
var producer = new PacketProducer();

// create a demo consumer to log the types of packages,
// subscribe it to the producer's queue and then start it
PacketConsumer loggingConsumer = new(
(DATA packet, CancellationToken cancellationToken) =>
{
Logger.Log($"received packet of type {packet.Type}");
return Task.CompletedTask;
});
producer.Queue.Subscribe(loggingConsumer);
_ = Task.Run(() => loggingConsumer.StartAsync());

return producer;
}

private interface ISettingsSyncHelper
{
[JsonObject(MemberSerialization.OptIn)]
Expand Down
5 changes: 5 additions & 0 deletions src/modules/MouseWithoutBorders/App/Core/Receiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ private static bool PreProcess(DATA package)

internal static void ProcessPackage(DATA package, TcpSk tcp)
{
#pragma warning disable VSTHRD002
Program.PacketProducer.WriteAsync(package)
.AsTask().GetAwaiter().GetResult();
#pragma warning restore VSTHRD002

if (!PreProcess(package))
{
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public async Task DrainAsync(CancellationToken cancellationToken = default)
}
}

public void Stop()
public void Complete()
{
this.Channel.Writer.Complete();
}
Expand Down
21 changes: 21 additions & 0 deletions src/modules/MouseWithoutBorders/App/Messaging/PacketQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,25 @@ public bool TryWrite(DATA packet)

return result;
}

public void Complete()
{
foreach (var consumer in this.Consumers)
{
consumer.Complete();
}
}

/// <summary>
/// Reads and processes all messages currently on the queue until it is empty.
/// Any messages that arrive while draining will be read and processed as well.
/// Does *not* "Complete" the queue, just leaves it empty.
/// </summary>
public async Task DrainAsync(CancellationToken cancellationToken = default)
{
foreach (var consumer in this.Consumers)
{
await consumer.DrainAsync(cancellationToken);
}
}
}

0 comments on commit c6d1202

Please sign in to comment.