diff --git a/Bonsai.Core/Reactive/RepeatWhile.cs b/Bonsai.Core/Reactive/RepeatWhile.cs new file mode 100644 index 000000000..f68c4bbf0 --- /dev/null +++ b/Bonsai.Core/Reactive/RepeatWhile.cs @@ -0,0 +1,80 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Xml.Serialization; +using System.ComponentModel; +using System.Linq.Expressions; +using System.Reactive.Linq; +using Bonsai.Expressions; +using System.Reactive; +using Rx = System.Reactive.Subjects; + +namespace Bonsai.Reactive +{ + /// + /// Represents an expression builder which repeats an observable sequence until + /// the condition specified by the encapsulated workflow becomes false. + /// + [XmlType(Namespace = Constants.ReactiveXmlNamespace)] + [Description("Repeats the observable sequence until the condition specified by the encapsulated workflow becomes false.")] + public class RepeatWhile : SingleArgumentWorkflowExpressionBuilder + { + /// + /// Initializes a new instance of the class. + /// + public RepeatWhile() + : this(new ExpressionBuilderGraph()) + { + } + + /// + /// Initializes a new instance of the class + /// with the specified expression builder workflow. + /// + /// + /// The expression builder workflow instance that will be used by this builder + /// to generate the output expression tree. + /// + public RepeatWhile(ExpressionBuilderGraph workflow) + : base(workflow) + { + } + + /// + public override Expression Build(IEnumerable arguments) + { + var source = arguments.Single(); + var sourceType = source.Type.GetGenericArguments()[0]; + var inputParameter = Expression.Parameter(typeof(IObservable)); + return BuildWorkflow(arguments.Take(1), inputParameter, selectorBody => + { + var selector = Expression.Lambda(selectorBody, inputParameter); + var selectorObservableType = selector.ReturnType.GetGenericArguments()[0]; + if (selectorObservableType != typeof(bool)) + { + throw new InvalidOperationException("The specified condition workflow must have a single boolean output."); + } + + return Expression.Call( + typeof(RepeatWhile), + nameof(Process), + new[] { sourceType }, + source, selector); + }); + } + + static IObservable Process(IObservable source, Func, IObservable> condition) + { + return Observable.Using( + () => new Rx.BehaviorSubject(false), + repeat => Observable.Using( + () => new Rx.Subject(), + completed => MergeDependencies( + source.DoWhile(() => + { + completed.OnNext(Unit.Default); + return repeat.Value; + }), condition(completed).Do(repeat).IgnoreElements().Select(_ => default(TSource))))); + } + } +}