diff --git a/RecordParser/Extensions/EnumerableExtensions.cs b/RecordParser/Extensions/EnumerableExtensions.cs new file mode 100644 index 0000000..8ee5fd7 --- /dev/null +++ b/RecordParser/Extensions/EnumerableExtensions.cs @@ -0,0 +1,24 @@ +using System.Collections.Generic; + +namespace RecordParser.Extensions +{ + internal static class EnumerableExtensions + { + public static IEnumerable> Batch(this IEnumerable source, int batchSize) + { + using var e = source.GetEnumerator(); + var hasMore = true; + bool MoveNext() => hasMore && (hasMore = e.MoveNext()); + + while (MoveNext()) + yield return Chunk(batchSize); + + IEnumerable Chunk(int countdown) + { + do + yield return e.Current; + while (--countdown > 0 && MoveNext()); + } + } + } +} diff --git a/RecordParser/Extensions/FileWriter/WriterExtensions.cs b/RecordParser/Extensions/FileWriter/WriterExtensions.cs index 83767a3..04f8345 100644 --- a/RecordParser/Extensions/FileWriter/WriterExtensions.cs +++ b/RecordParser/Extensions/FileWriter/WriterExtensions.cs @@ -55,63 +55,36 @@ public static void WriteRecords(this TextWriter textWriter, IEnumerable it } } - private class BufferContext - { - public int pow; - public char[] buffer; - public object lockObj; - } - private static void WriteParallel(TextWriter textWriter, IEnumerable items, TryFormat tryFormat, ParallelismOptions options) { - var initialPool = 20; - var pool = new Stack(initialPool); + var poolSize = 10_000; + var pool = new char[poolSize][]; - for (var index = 0; index < initialPool; index++) - pool.Push(ArrayPool.Shared.Rent((int)Math.Pow(2, initialPow))); + for (var index = 0; index < poolSize; index++) + pool[index] = new char[(int)Math.Pow(2, initialPow)]; - var xs = items.AsParallel(options).Select((item, i) => + foreach (var xx in items.Batch(poolSize)) { - var buffer = Pop() ?? ArrayPool.Shared.Rent((int)Math.Pow(2, initialPow)); - retry: - - if (tryFormat(item, buffer, out var charsWritten)) + var xs = xx.AsParallel(options).Select((item, i) => { - return (buffer, charsWritten); - } - else - { - ArrayPool.Shared.Return(buffer); - buffer = ArrayPool.Shared.Rent(buffer.Length * 2); - goto retry; - } - }); - - foreach (var x in xs) - { - textWriter.WriteLine(x.buffer, 0, x.charsWritten); - Push(x.buffer); - } - - foreach (var x in pool) - { - ArrayPool.Shared.Return(x); - } - - pool.Clear(); + var buffer = pool[i]; + retry: - char[] Pop() - { - char[] x; - lock (pool) - pool.TryPop(out x); - return x; - } + if (tryFormat(item, buffer, out var charsWritten)) + { + return (buffer, charsWritten); + } + else + { + buffer = pool[i] = new char[buffer.Length * 2]; + goto retry; + } + }); - void Push(char[] item) - { - lock (pool) - pool.Push(item); + foreach (var x in xs) + { + textWriter.WriteLine(x.buffer, 0, x.charsWritten); + } } }