diff --git a/Package.swift b/Package.swift index 0a7c540a..7dd238db 100644 --- a/Package.swift +++ b/Package.swift @@ -71,7 +71,7 @@ let package = Package( ], swiftSettings: [ .unsafeFlags(["-enable-testing"]), - .define("WASI", .when(platforms: [.wasi])) + .define("WASI", .when(platforms: [.wasi])), ] ) ], diff --git a/README.md b/README.md index 772f51dc..32d45cc4 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # OpenCombine -[![codecov](https://codecov.io/gh/OpenSwiftUIProject/OpenCombine/branch/master/graph/badge.svg)](https://codecov.io/gh/OpenSwiftUIProject/OpenCombine) +[![codecov](https://codecov.io/gh/OpenSwiftUIProject/OpenCombine/graph/badge.svg?token=BJSI3J7RZQ)](https://codecov.io/gh/OpenSwiftUIProject/OpenCombine) ![Language](https://img.shields.io/badge/Swift-5.9-orange.svg) Open-source implementation of Apple's [Combine](https://developer.apple.com/documentation/combine) framework for processing values over time. diff --git a/RemainingCombineInterface.swift b/RemainingCombineInterface.swift index f9484f82..56777d59 100644 --- a/RemainingCombineInterface.swift +++ b/RemainingCombineInterface.swift @@ -244,396 +244,6 @@ extension Publisher { public func collect(_ strategy: Publishers.TimeGroupingStrategy, options: S.SchedulerOptions? = nil) -> Publishers.CollectByTime where S : Scheduler } -extension Publishers { - - /// A publisher created by applying the merge function to two upstream publishers. - public struct Merge : Publisher where A : Publisher, B : Publisher, A.Failure == B.Failure, A.Output == B.Output { - - /// The kind of values published by this publisher. - public typealias Output = A.Output - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public init(_ a: A, _ b: B) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, B.Failure == S.Failure, B.Output == S.Input - - public func merge

(with other: P) -> Publishers.Merge3 where P : Publisher, B.Failure == P.Failure, B.Output == P.Output - - public func merge(with z: Z, _ y: Y) -> Publishers.Merge4 where Z : Publisher, Y : Publisher, B.Failure == Z.Failure, B.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output - - public func merge(with z: Z, _ y: Y, _ x: X) -> Publishers.Merge5 where Z : Publisher, Y : Publisher, X : Publisher, B.Failure == Z.Failure, B.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output - - public func merge(with z: Z, _ y: Y, _ x: X, _ w: W) -> Publishers.Merge6 where Z : Publisher, Y : Publisher, X : Publisher, W : Publisher, B.Failure == Z.Failure, B.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output, X.Failure == W.Failure, X.Output == W.Output - - public func merge(with z: Z, _ y: Y, _ x: X, _ w: W, _ v: V) -> Publishers.Merge7 where Z : Publisher, Y : Publisher, X : Publisher, W : Publisher, V : Publisher, B.Failure == Z.Failure, B.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output, X.Failure == W.Failure, X.Output == W.Output, W.Failure == V.Failure, W.Output == V.Output - - public func merge(with z: Z, _ y: Y, _ x: X, _ w: W, _ v: V, _ u: U) -> Publishers.Merge8 where Z : Publisher, Y : Publisher, X : Publisher, W : Publisher, V : Publisher, U : Publisher, B.Failure == Z.Failure, B.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output, X.Failure == W.Failure, X.Output == W.Output, W.Failure == V.Failure, W.Output == V.Output, V.Failure == U.Failure, V.Output == U.Output - } - - /// A publisher created by applying the merge function to three upstream publishers. - public struct Merge3 : Publisher where A : Publisher, B : Publisher, C : Publisher, A.Failure == B.Failure, A.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output { - - /// The kind of values published by this publisher. - public typealias Output = A.Output - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public let c: C - - public init(_ a: A, _ b: B, _ c: C) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, C.Failure == S.Failure, C.Output == S.Input - - public func merge

(with other: P) -> Publishers.Merge4 where P : Publisher, C.Failure == P.Failure, C.Output == P.Output - - public func merge(with z: Z, _ y: Y) -> Publishers.Merge5 where Z : Publisher, Y : Publisher, C.Failure == Z.Failure, C.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output - - public func merge(with z: Z, _ y: Y, _ x: X) -> Publishers.Merge6 where Z : Publisher, Y : Publisher, X : Publisher, C.Failure == Z.Failure, C.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output - - public func merge(with z: Z, _ y: Y, _ x: X, _ w: W) -> Publishers.Merge7 where Z : Publisher, Y : Publisher, X : Publisher, W : Publisher, C.Failure == Z.Failure, C.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output, X.Failure == W.Failure, X.Output == W.Output - - public func merge(with z: Z, _ y: Y, _ x: X, _ w: W, _ v: V) -> Publishers.Merge8 where Z : Publisher, Y : Publisher, X : Publisher, W : Publisher, V : Publisher, C.Failure == Z.Failure, C.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output, X.Failure == W.Failure, X.Output == W.Output, W.Failure == V.Failure, W.Output == V.Output - } - - /// A publisher created by applying the merge function to four upstream publishers. - public struct Merge4 : Publisher where A : Publisher, B : Publisher, C : Publisher, D : Publisher, A.Failure == B.Failure, A.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output { - - /// The kind of values published by this publisher. - public typealias Output = A.Output - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public let c: C - - public let d: D - - public init(_ a: A, _ b: B, _ c: C, _ d: D) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, D.Failure == S.Failure, D.Output == S.Input - - public func merge

(with other: P) -> Publishers.Merge5 where P : Publisher, D.Failure == P.Failure, D.Output == P.Output - - public func merge(with z: Z, _ y: Y) -> Publishers.Merge6 where Z : Publisher, Y : Publisher, D.Failure == Z.Failure, D.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output - - public func merge(with z: Z, _ y: Y, _ x: X) -> Publishers.Merge7 where Z : Publisher, Y : Publisher, X : Publisher, D.Failure == Z.Failure, D.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output - - public func merge(with z: Z, _ y: Y, _ x: X, _ w: W) -> Publishers.Merge8 where Z : Publisher, Y : Publisher, X : Publisher, W : Publisher, D.Failure == Z.Failure, D.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output, X.Failure == W.Failure, X.Output == W.Output - } - - /// A publisher created by applying the merge function to five upstream publishers. - public struct Merge5 : Publisher where A : Publisher, B : Publisher, C : Publisher, D : Publisher, E : Publisher, A.Failure == B.Failure, A.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output { - - /// The kind of values published by this publisher. - public typealias Output = A.Output - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public let c: C - - public let d: D - - public let e: E - - public init(_ a: A, _ b: B, _ c: C, _ d: D, _ e: E) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, E.Failure == S.Failure, E.Output == S.Input - - public func merge

(with other: P) -> Publishers.Merge6 where P : Publisher, E.Failure == P.Failure, E.Output == P.Output - - public func merge(with z: Z, _ y: Y) -> Publishers.Merge7 where Z : Publisher, Y : Publisher, E.Failure == Z.Failure, E.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output - - public func merge(with z: Z, _ y: Y, _ x: X) -> Publishers.Merge8 where Z : Publisher, Y : Publisher, X : Publisher, E.Failure == Z.Failure, E.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output - } - - /// A publisher created by applying the merge function to six upstream publishers. - public struct Merge6 : Publisher where A : Publisher, B : Publisher, C : Publisher, D : Publisher, E : Publisher, F : Publisher, A.Failure == B.Failure, A.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output, E.Failure == F.Failure, E.Output == F.Output { - - /// The kind of values published by this publisher. - public typealias Output = A.Output - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public let c: C - - public let d: D - - public let e: E - - public let f: F - - public init(_ a: A, _ b: B, _ c: C, _ d: D, _ e: E, _ f: F) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, F.Failure == S.Failure, F.Output == S.Input - - public func merge

(with other: P) -> Publishers.Merge7 where P : Publisher, F.Failure == P.Failure, F.Output == P.Output - - public func merge(with z: Z, _ y: Y) -> Publishers.Merge8 where Z : Publisher, Y : Publisher, F.Failure == Z.Failure, F.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output - } - - /// A publisher created by applying the merge function to seven upstream publishers. - public struct Merge7 : Publisher where A : Publisher, B : Publisher, C : Publisher, D : Publisher, E : Publisher, F : Publisher, G : Publisher, A.Failure == B.Failure, A.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output, E.Failure == F.Failure, E.Output == F.Output, F.Failure == G.Failure, F.Output == G.Output { - - /// The kind of values published by this publisher. - public typealias Output = A.Output - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public let c: C - - public let d: D - - public let e: E - - public let f: F - - public let g: G - - public init(_ a: A, _ b: B, _ c: C, _ d: D, _ e: E, _ f: F, _ g: G) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, G.Failure == S.Failure, G.Output == S.Input - - public func merge

(with other: P) -> Publishers.Merge8 where P : Publisher, G.Failure == P.Failure, G.Output == P.Output - } - - /// A publisher created by applying the merge function to eight upstream publishers. - public struct Merge8 : Publisher where A : Publisher, B : Publisher, C : Publisher, D : Publisher, E : Publisher, F : Publisher, G : Publisher, H : Publisher, A.Failure == B.Failure, A.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output, E.Failure == F.Failure, E.Output == F.Output, F.Failure == G.Failure, F.Output == G.Output, G.Failure == H.Failure, G.Output == H.Output { - - /// The kind of values published by this publisher. - public typealias Output = A.Output - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public let c: C - - public let d: D - - public let e: E - - public let f: F - - public let g: G - - public let h: H - - public init(_ a: A, _ b: B, _ c: C, _ d: D, _ e: E, _ f: F, _ g: G, _ h: H) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, H.Failure == S.Failure, H.Output == S.Input - } - - public struct MergeMany : Publisher where Upstream : Publisher { - - /// The kind of values published by this publisher. - public typealias Output = Upstream.Output - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = Upstream.Failure - - public let publishers: [Upstream] - - public init(_ upstream: Upstream...) - - public init(_ upstream: S) where Upstream == S.Element, S : Sequence - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input - - public func merge(with other: Upstream) -> Publishers.MergeMany - } -} - -extension Publisher { - - /// Combines elements from this publisher with those from another publisher, delivering an interleaved sequence of elements. - /// - /// The merged publisher continues to emit elements until all upstream publishers finish. If an upstream publisher produces an error, the merged publisher fails with that error. - /// - Parameter other: Another publisher. - /// - Returns: A publisher that emits an event when either upstream publisher emits an event. - public func merge

(with other: P) -> Publishers.Merge where P : Publisher, Self.Failure == P.Failure, Self.Output == P.Output - - /// Combines elements from this publisher with those from two other publishers, delivering an interleaved sequence of elements. - /// - /// The merged publisher continues to emit elements until all upstream publishers finish. If an upstream publisher produces an error, the merged publisher fails with that error. - /// - /// - Parameters: - /// - b: A second publisher. - /// - c: A third publisher. - /// - Returns: A publisher that emits an event when any upstream publisher emits - /// an event. - public func merge(with b: B, _ c: C) -> Publishers.Merge3 where B : Publisher, C : Publisher, Self.Failure == B.Failure, Self.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output - - /// Combines elements from this publisher with those from three other publishers, delivering - /// an interleaved sequence of elements. - /// - /// The merged publisher continues to emit elements until all upstream publishers finish. If an upstream publisher produces an error, the merged publisher fails with that error. - /// - /// - Parameters: - /// - b: A second publisher. - /// - c: A third publisher. - /// - d: A fourth publisher. - /// - Returns: A publisher that emits an event when any upstream publisher emits an event. - public func merge(with b: B, _ c: C, _ d: D) -> Publishers.Merge4 where B : Publisher, C : Publisher, D : Publisher, Self.Failure == B.Failure, Self.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output - - /// Combines elements from this publisher with those from four other publishers, delivering an interleaved sequence of elements. - /// - /// The merged publisher continues to emit elements until all upstream publishers finish. If an upstream publisher produces an error, the merged publisher fails with that error. - /// - /// - Parameters: - /// - b: A second publisher. - /// - c: A third publisher. - /// - d: A fourth publisher. - /// - e: A fifth publisher. - /// - Returns: A publisher that emits an event when any upstream publisher emits an event. - public func merge(with b: B, _ c: C, _ d: D, _ e: E) -> Publishers.Merge5 where B : Publisher, C : Publisher, D : Publisher, E : Publisher, Self.Failure == B.Failure, Self.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output - - /// Combines elements from this publisher with those from five other publishers, delivering an interleaved sequence of elements. - /// - /// The merged publisher continues to emit elements until all upstream publishers finish. If an upstream publisher produces an error, the merged publisher fails with that error. - /// - /// - Parameters: - /// - b: A second publisher. - /// - c: A third publisher. - /// - d: A fourth publisher. - /// - e: A fifth publisher. - /// - f: A sixth publisher. - /// - Returns: A publisher that emits an event when any upstream publisher emits an event. - public func merge(with b: B, _ c: C, _ d: D, _ e: E, _ f: F) -> Publishers.Merge6 where B : Publisher, C : Publisher, D : Publisher, E : Publisher, F : Publisher, Self.Failure == B.Failure, Self.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output, E.Failure == F.Failure, E.Output == F.Output - - /// Combines elements from this publisher with those from six other publishers, delivering an interleaved sequence of elements. - /// - /// The merged publisher continues to emit elements until all upstream publishers finish. If an upstream publisher produces an error, the merged publisher fails with that error. - /// - /// - Parameters: - /// - b: A second publisher. - /// - c: A third publisher. - /// - d: A fourth publisher. - /// - e: A fifth publisher. - /// - f: A sixth publisher. - /// - g: A seventh publisher. - /// - Returns: A publisher that emits an event when any upstream publisher emits an event. - public func merge(with b: B, _ c: C, _ d: D, _ e: E, _ f: F, _ g: G) -> Publishers.Merge7 where B : Publisher, C : Publisher, D : Publisher, E : Publisher, F : Publisher, G : Publisher, Self.Failure == B.Failure, Self.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output, E.Failure == F.Failure, E.Output == F.Output, F.Failure == G.Failure, F.Output == G.Output - - /// Combines elements from this publisher with those from seven other publishers, delivering an interleaved sequence of elements. - /// - /// The merged publisher continues to emit elements until all upstream publishers finish. If an upstream publisher produces an error, the merged publisher fails with that error. - /// - /// - Parameters: - /// - b: A second publisher. - /// - c: A third publisher. - /// - d: A fourth publisher. - /// - e: A fifth publisher. - /// - f: A sixth publisher. - /// - g: A seventh publisher. - /// - h: An eighth publisher. - /// - Returns: A publisher that emits an event when any upstream publisher emits an event. - public func merge(with b: B, _ c: C, _ d: D, _ e: E, _ f: F, _ g: G, _ h: H) -> Publishers.Merge8 where B : Publisher, C : Publisher, D : Publisher, E : Publisher, F : Publisher, G : Publisher, H : Publisher, Self.Failure == B.Failure, Self.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output, E.Failure == F.Failure, E.Output == F.Output, F.Failure == G.Failure, F.Output == G.Output, G.Failure == H.Failure, G.Output == H.Output - - /// Combines elements from this publisher with those from another publisher of the same type, delivering an interleaved sequence of elements. - /// - /// - Parameter other: Another publisher of this publisher's type. - /// - Returns: A publisher that emits an event when either upstream publisher emits - /// an event. - public func merge(with other: Self) -> Publishers.MergeMany -} - extension Publishers { /// A publisher that attempts to recreate its subscription to a failed upstream publisher. @@ -811,93 +421,3 @@ extension Publishers.CombineLatest4 : Equatable where A : Equatable, B : Equatab /// - rhs: Another value to compare. public static func == (lhs: Publishers.CombineLatest4, rhs: Publishers.CombineLatest4) -> Bool } - -extension Publishers.Merge : Equatable where A : Equatable, B : Equatable { - - /// Returns a Boolean value that indicates whether two publishers are equivalent. - /// - /// - Parameters: - /// - lhs: A merging publisher to compare for equality. - /// - rhs: Another merging publisher to compare for equality.. - /// - Returns: `true` if the two merging - rhs: Another merging publisher to compare for equality. - public static func == (lhs: Publishers.Merge, rhs: Publishers.Merge) -> Bool -} - -extension Publishers.Merge3 : Equatable where A : Equatable, B : Equatable, C : Equatable { - - /// Returns a Boolean value that indicates whether two publishers are equivalent. - /// - /// - Parameters: - /// - lhs: A merging publisher to compare for equality. - /// - rhs: Another merging publisher to compare for equality. - /// - Returns: `true` if the two merging publishers have equal source publishers, `false` otherwise. - public static func == (lhs: Publishers.Merge3, rhs: Publishers.Merge3) -> Bool -} - -extension Publishers.Merge4 : Equatable where A : Equatable, B : Equatable, C : Equatable, D : Equatable { - - /// Returns a Boolean value that indicates whether two publishers are equivalent. - /// - /// - Parameters: - /// - lhs: A merging publisher to compare for equality. - /// - rhs: Another merging publisher to compare for equality. - /// - Returns: `true` if the two merging publishers have equal source publishers, `false` otherwise. - public static func == (lhs: Publishers.Merge4, rhs: Publishers.Merge4) -> Bool -} - -extension Publishers.Merge5 : Equatable where A : Equatable, B : Equatable, C : Equatable, D : Equatable, E : Equatable { - - /// Returns a Boolean value that indicates whether two publishers are equivalent. - /// - /// - Parameters: - /// - lhs: A merging publisher to compare for equality. - /// - rhs: Another merging publisher to compare for equality. - /// - Returns: `true` if the two merging publishers have equal source publishers, `false` otherwise. - public static func == (lhs: Publishers.Merge5, rhs: Publishers.Merge5) -> Bool -} - -extension Publishers.Merge6 : Equatable where A : Equatable, B : Equatable, C : Equatable, D : Equatable, E : Equatable, F : Equatable { - - /// Returns a Boolean value that indicates whether two publishers are equivalent. - /// - /// - Parameters: - /// - lhs: A merging publisher to compare for equality. - /// - rhs: Another merging publisher to compare for equality. - /// - Returns: `true` if the two merging publishers have equal source publishers, `false` otherwise. - public static func == (lhs: Publishers.Merge6, rhs: Publishers.Merge6) -> Bool -} - -extension Publishers.Merge7 : Equatable where A : Equatable, B : Equatable, C : Equatable, D : Equatable, E : Equatable, F : Equatable, G : Equatable { - - /// Returns a Boolean value that indicates whether two publishers are equivalent. - /// - /// - Parameters: - /// - lhs: A merging publisher to compare for equality. - /// - rhs: Another merging publisher to compare for equality. - /// - Returns: `true` if the two merging publishers have equal source publishers, `false` otherwise. - public static func == (lhs: Publishers.Merge7, rhs: Publishers.Merge7) -> Bool -} - -extension Publishers.Merge8 : Equatable where A : Equatable, B : Equatable, C : Equatable, D : Equatable, E : Equatable, F : Equatable, G : Equatable, H : Equatable { - - /// Returns a Boolean value that indicates whether two publishers are equivalent. - /// - /// - Parameters: - /// - lhs: A merging publisher to compare for equality. - /// - rhs: Another merging publisher to compare for equality. - /// - Returns: `true` if the two merging publishers have equal source publishers, `false` otherwise. - public static func == (lhs: Publishers.Merge8, rhs: Publishers.Merge8) -> Bool -} - -extension Publishers.MergeMany : Equatable where Upstream : Equatable { - - /// Returns a Boolean value indicating whether two values are equal. - /// - /// Equality is the inverse of inequality. For any values `a` and `b`, - /// `a == b` implies that `a != b` is `false`. - /// - /// - Parameters: - /// - lhs: A value to compare. - /// - rhs: Another value to compare. - public static func == (lhs: Publishers.MergeMany, rhs: Publishers.MergeMany) -> Bool -} diff --git a/Sources/OpenCombine/Publishers/Publishers.Merge.swift b/Sources/OpenCombine/Publishers/Publishers.Merge.swift new file mode 100644 index 00000000..1f2a1ec5 --- /dev/null +++ b/Sources/OpenCombine/Publishers/Publishers.Merge.swift @@ -0,0 +1,1268 @@ +// +// Publishers.Merge.swift +// +// +// Created by Kyle on 2023/11/21. +// Audited for Combine 2023 + +#if canImport(COpenCombineHelpers) +import COpenCombineHelpers +#endif + +extension Publisher { + /// Combines elements from this publisher with those from another publisher, delivering an interleaved sequence of elements. + /// + /// Use ``Publisher/merge(with:)-394v9`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:)``. To combine elements from multiple upstream publishers, use ``Publisher/zip(_:)``. + /// + /// In this example, as ``Publisher/merge(with:)-394v9`` receives input from either upstream publisher, it republishes it to the downstream: + /// + /// let publisher = PassthroughSubject() + /// let pub2 = PassthroughSubject() + /// + /// cancellable = publisher + /// .merge(with: pub2) + /// .sink { print("\($0)", terminator: " " )} + /// + /// publisher.send(2) + /// pub2.send(2) + /// publisher.send(3) + /// pub2.send(22) + /// publisher.send(45) + /// pub2.send(22) + /// publisher.send(17) + /// + /// // Prints: "2 2 3 22 45 22 17" + /// + /// + /// The merged publisher continues to emit elements until all upstream publishers finish. + /// If an upstream publisher produces an error, the merged publisher fails with that error. + /// + /// - Parameter other: Another publisher. + /// - Returns: A publisher that emits an event when either upstream publisher emits an event. + public func merge

(with other: P) -> Publishers.Merge where P: Publisher, Self.Failure == P.Failure, Self.Output == P.Output { + Publishers.Merge(self, other) + } + + /// Combines elements from this publisher with those from two other publishers, delivering an interleaved sequence of elements. + /// + /// Use ``Publisher/merge(with:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:)-5crqg``. + /// To combine elements from multiple upstream publishers, use ``Publisher/zip(_:_:)-2p498``. + /// + /// In this example, as ``Publisher/merge(with:_:)`` receives input from the upstream publishers, it republishes the interleaved elements to the downstream: + /// + /// let pubA = PassthroughSubject() + /// let pubB = PassthroughSubject() + /// let pubC = PassthroughSubject() + /// + /// cancellable = pubA + /// .merge(with: pubB, pubC) + /// .sink { print("\($0)", terminator: " " )} + /// + /// pubA.send(1) + /// pubB.send(40) + /// pubC.send(90) + /// pubA.send(2) + /// pubB.send(50) + /// pubC.send(100) + /// + /// // Prints: "1 40 90 2 50 100" + /// + /// The merged publisher continues to emit elements until all upstream publishers finish. + /// If an upstream publisher produces an error, the merged publisher fails with that error. + /// + /// - Parameters: + /// - b: A second publisher. + /// - c: A third publisher. + /// - Returns: A publisher that emits an event when any upstream publisher emits an event. + public func merge(with b: B, _ c: C) -> Publishers.Merge3 where B: Publisher, C: Publisher, Self.Failure == B.Failure, Self.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output { + Publishers.Merge3(self, b, c) + } + + /// Combines elements from this publisher with those from three other publishers, delivering an interleaved sequence of elements. + /// + /// Use ``Publisher/merge(with:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-48buc``. + /// To combine elements from multiple upstream publishers, use ``Publisher/zip(_:_:_:)-67czn``. + /// + /// In this example, as ``Publisher/merge(with:_:_:)`` receives input from the upstream publishers, it republishes the interleaved elements to the downstream: + /// + /// let pubA = PassthroughSubject() + /// let pubB = PassthroughSubject() + /// let pubC = PassthroughSubject() + /// let pubD = PassthroughSubject() + /// + /// cancellable = pubA + /// .merge(with: pubB, pubC, pubD) + /// .sink { print("\($0)", terminator: " " )} + /// + /// pubA.send(1) + /// pubB.send(40) + /// pubC.send(90) + /// pubD.send(-1) + /// pubA.send(2) + /// pubB.send(50) + /// pubC.send(100) + /// pubD.send(-2) + /// + /// // Prints: "1 40 90 -1 2 50 100 -2 " + /// + /// The merged publisher continues to emit elements until all upstream publishers finish. + /// If an upstream publisher produces an error, the merged publisher fails with that error. + /// + /// - Parameters: + /// - b: A second publisher. + /// - c: A third publisher. + /// - d: A fourth publisher. + /// - Returns: A publisher that emits an event when any upstream publisher emits an event. + public func merge(with b: B, _ c: C, _ d: D) -> Publishers.Merge4 where B: Publisher, C: Publisher, D: Publisher, Self.Failure == B.Failure, Self.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output { + Publishers.Merge4(self, b, c, d) + } + + /// Combines elements from this publisher with those from four other publishers, delivering an interleaved sequence of elements. + /// + /// Use ``Publisher/merge(with:_:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-48buc``. + /// To combine elements from multiple upstream publishers, use ``Publisher/zip(_:_:_:)-67czn``. + /// + /// In this example, as ``Publisher/merge(with:_:_:_:)`` receives input from the upstream publishers, it republishes the interleaved elements to the downstream: + /// + /// let pubA = PassthroughSubject() + /// let pubB = PassthroughSubject() + /// let pubC = PassthroughSubject() + /// let pubD = PassthroughSubject() + /// let pubE = PassthroughSubject() + /// + /// cancellable = pubA + /// .merge(with: pubB, pubC, pubD, pubE) + /// .sink { print("\($0)", terminator: " " ) } + /// + /// pubA.send(1) + /// pubB.send(40) + /// pubC.send(90) + /// pubD.send(-1) + /// pubE.send(33) + /// pubA.send(2) + /// pubB.send(50) + /// pubC.send(100) + /// pubD.send(-2) + /// pubE.send(33) + /// + /// // Prints: "1 40 90 -1 33 2 50 100 -2 33" + /// + /// + /// The merged publisher continues to emit elements until all upstream publishers finish. + /// If an upstream publisher produces an error, the merged publisher fails with that error. + /// + /// - Parameters: + /// - b: A second publisher. + /// - c: A third publisher. + /// - d: A fourth publisher. + /// - e: A fifth publisher. + /// - Returns: A publisher that emits an event when any upstream publisher emits an event. + public func merge(with b: B, _ c: C, _ d: D, _ e: E) -> Publishers.Merge5 where B: Publisher, C: Publisher, D: Publisher, E: Publisher, Self.Failure == B.Failure, Self.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output { + Publishers.Merge5(self, b, c, d, e) + } + + /// Combines elements from this publisher with those from five other publishers, delivering an interleaved sequence of elements. + /// + /// Use ``Publisher/merge(with:_:_:_:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-48buc``. + /// To combine elements from multiple upstream publishers, use ``Publisher/zip(_:_:_:)-67czn``. + /// + /// In this example, as ``Publisher/merge(with:_:_:_:_:_:)`` receives input from the upstream publishers, it republishes the interleaved elements to the downstream: + /// + /// let pubA = PassthroughSubject() + /// let pubB = PassthroughSubject() + /// let pubC = PassthroughSubject() + /// let pubD = PassthroughSubject() + /// let pubE = PassthroughSubject() + /// let pubF = PassthroughSubject() + /// + /// cancellable = pubA + /// .merge(with: pubB, pubC, pubD, pubE, pubF) + /// .sink { print("\($0)", terminator: " " ) } + /// + /// pubA.send(1) + /// pubB.send(40) + /// pubC.send(90) + /// pubD.send(-1) + /// pubE.send(33) + /// pubF.send(44) + /// + /// pubA.send(2) + /// pubB.send(50) + /// pubC.send(100) + /// pubD.send(-2) + /// pubE.send(33) + /// pubF.send(33) + /// + /// //Prints: "1 40 90 -1 33 44 2 50 100 -2 33 33" + /// + /// The merged publisher continues to emit elements until all upstream publishers finish. + /// If an upstream publisher produces an error, the merged publisher fails with that error. + /// + /// - Parameters: + /// - b: A second publisher. + /// - c: A third publisher. + /// - d: A fourth publisher. + /// - e: A fifth publisher. + /// - f: A sixth publisher. + /// - Returns: A publisher that emits an event when any upstream publisher emits an event. + public func merge(with b: B, _ c: C, _ d: D, _ e: E, _ f: F) -> Publishers.Merge6 where B: Publisher, C: Publisher, D: Publisher, E: Publisher, F: Publisher, Self.Failure == B.Failure, Self.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output, E.Failure == F.Failure, E.Output == F.Output { + Publishers.Merge6(self, b, c, d, e, f) + } + + /// Combines elements from this publisher with those from six other publishers, delivering an interleaved sequence of elements. + /// + /// Use ``Publisher/merge(with:_:_:_:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-48buc``. + /// To combine elements from multiple upstream publishers, use ``Publisher/zip(_:_:_:)-67czn``. + /// + /// In this example, as ``Publisher/merge(with:_:_:_:_:_:)`` receives input from the upstream publishers; it republishes the interleaved elements to the downstream: + /// + /// let pubA = PassthroughSubject() + /// let pubB = PassthroughSubject() + /// let pubC = PassthroughSubject() + /// let pubD = PassthroughSubject() + /// let pubE = PassthroughSubject() + /// let pubF = PassthroughSubject() + /// let pubG = PassthroughSubject() + /// + /// cancellable = pubA + /// .merge(with: pubB, pubC, pubD, pubE, pubE, pubG) + /// .sink { print("\($0)", terminator: " " ) } + /// + /// pubA.send(1) + /// pubB.send(40) + /// pubC.send(90) + /// pubD.send(-1) + /// pubE.send(33) + /// pubF.send(44) + /// pubG.send(54) + /// + /// pubA.send(2) + /// pubB.send(50) + /// pubC.send(100) + /// pubD.send(-2) + /// pubE.send(33) + /// pubF.send(33) + /// pubG.send(54) + /// + /// //Prints: "1 40 90 -1 33 44 54 2 50 100 -2 33 33 54" + /// + /// + /// The merged publisher continues to emit elements until all upstream publishers finish. + /// If an upstream publisher produces an error, the merged publisher fails with that error. + /// + /// - Parameters: + /// - b: A second publisher. + /// - c: A third publisher. + /// - d: A fourth publisher. + /// - e: A fifth publisher. + /// - f: A sixth publisher. + /// - g: A seventh publisher. + /// - Returns: A publisher that emits an event when any upstream publisher emits an event. + public func merge(with b: B, _ c: C, _ d: D, _ e: E, _ f: F, _ g: G) -> Publishers.Merge7 where B: Publisher, C: Publisher, D: Publisher, E: Publisher, F: Publisher, G: Publisher, Self.Failure == B.Failure, Self.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output, E.Failure == F.Failure, E.Output == F.Output, F.Failure == G.Failure, F.Output == G.Output { + Publishers.Merge7(self, b, c, d, e, f, g) + } + + /// Combines elements from this publisher with those from seven other publishers, delivering an interleaved sequence of elements. + /// + /// Use ``Publisher/merge(with:_:_:_:_:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-48buc``. + /// To combine elements from multiple upstream publishers, use ``Publisher/zip(_:_:_:)-67czn``. + /// + /// In this example, as ``Publisher/merge(with:_:_:_:_:_:_:)`` receives input from the upstream publishers, it republishes the interleaved elements to the downstream: + /// + /// let pubA = PassthroughSubject() + /// let pubB = PassthroughSubject() + /// let pubC = PassthroughSubject() + /// let pubD = PassthroughSubject() + /// let pubE = PassthroughSubject() + /// let pubF = PassthroughSubject() + /// let pubG = PassthroughSubject() + /// let pubH = PassthroughSubject() + /// + /// cancellable = pubA + /// .merge(with: pubB, pubC, pubD, pubE, pubF, pubG, pubH) + /// .sink { print("\($0)", terminator: " " ) } + /// + /// pubA.send(1) + /// pubB.send(40) + /// pubC.send(90) + /// pubD.send(-1) + /// pubE.send(33) + /// pubF.send(44) + /// pubG.send(54) + /// pubH.send(1000) + /// + /// pubA.send(2) + /// pubB.send(50) + /// pubC.send(100) + /// pubD.send(-2) + /// pubE.send(33) + /// pubF.send(33) + /// pubG.send(54) + /// pubH.send(1001) + /// + /// //Prints: "1 40 90 -1 33 44 54 1000 2 50 100 -2 33 33 54 1001" + /// + /// The merged publisher continues to emit elements until all upstream publishers finish. + /// If an upstream publisher produces an error, the merged publisher fails with that error. + /// + /// - Parameters: + /// - b: A second publisher. + /// - c: A third publisher. + /// - d: A fourth publisher. + /// - e: A fifth publisher. + /// - f: A sixth publisher. + /// - g: A seventh publisher. + /// - h: An eighth publisher. + /// - Returns: A publisher that emits an event when any upstream publisher emits an event. + public func merge(with b: B, _ c: C, _ d: D, _ e: E, _ f: F, _ g: G, _ h: H) -> Publishers.Merge8 where B: Publisher, C: Publisher, D: Publisher, E: Publisher, F: Publisher, G: Publisher, H: Publisher, Self.Failure == B.Failure, Self.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output, E.Failure == F.Failure, E.Output == F.Output, F.Failure == G.Failure, F.Output == G.Output, G.Failure == H.Failure, G.Output == H.Output { + Publishers.Merge8(self, b, c, d, e, f, g, h) + } + + /// Combines elements from this publisher with those from another publisher of the same type, delivering an interleaved sequence of elements. + /// + /// - Parameter other: Another publisher of this publisher’s type. + /// - Returns: A publisher that emits an event when either upstream publisher emits an event. + public func merge(with other: Self) -> Publishers.MergeMany { + Publishers.MergeMany([self, other]) + } +} + +extension Publishers { + /// A publisher created by applying the merge function to two upstream publishers. + public struct Merge: Publisher where A: Publisher, B: Publisher, A.Failure == B.Failure, A.Output == B.Output { + /// The kind of values published by this publisher. + /// + /// This publisher uses its upstream publishers' common output type. + public typealias Output = A.Output + + /// The kind of errors this publisher might publish. + /// + /// This publisher uses its upstream publishers' common failure type. + public typealias Failure = A.Failure + + /// A publisher to merge. + public let a: A + + /// A second publisher to merge. + public let b: B + + /// Creates a publisher created by applying the merge function to two upstream publishers. + /// - Parameters: + /// - a: A publisher to merge + /// - b: A second publisher to merge. + public init(_ a: A, _ b: B) { + self.a = a + self.b = b + } + + /// Attaches the specified subscriber to this publisher. + /// + /// Implementations of ``Publisher`` must implement this method. + /// + /// The provided implementation of ``Publisher/subscribe(_:)-199o9``calls this method. + /// + /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values. + public func receive(subscriber: S) where S: Subscriber, B.Failure == S.Failure, B.Output == S.Input { + typealias Inner = _Merged + let merger = Inner(downstream: subscriber, count: 2) + subscriber.receive(subscription: merger) + a.subscribe(Inner.Side(index: 0, merger: merger)) + b.subscribe(Inner.Side(index: 1, merger: merger)) + } + + public func merge

(with p: P) -> Publishers.Merge3 where P: Publisher, B.Failure == P.Failure, B.Output == P.Output { + Merge3(a, b, p) + } + + public func merge(with z: Z, _ y: Y) -> Publishers.Merge4 where Z: Publisher, Y: Publisher, B.Failure == Z.Failure, B.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output { + Merge4(a, b, z, y) + } + + public func merge(with z: Z, _ y: Y, _ x: X) -> Publishers.Merge5 where Z: Publisher, Y: Publisher, X: Publisher, B.Failure == Z.Failure, B.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output { + Merge5(a, b, z, y, x) + } + + public func merge(with z: Z, _ y: Y, _ x: X, _ w: W) -> Publishers.Merge6 where Z: Publisher, Y: Publisher, X: Publisher, W: Publisher, B.Failure == Z.Failure, B.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output, X.Failure == W.Failure, X.Output == W.Output { + Merge6(a, b, z, y, x, w) + } + + public func merge(with z: Z, _ y: Y, _ x: X, _ w: W, _ v: V) -> Publishers.Merge7 where Z: Publisher, Y: Publisher, X: Publisher, W: Publisher, V: Publisher, B.Failure == Z.Failure, B.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output, X.Failure == W.Failure, X.Output == W.Output, W.Failure == V.Failure, W.Output == V.Output { + Merge7(a, b, z, y, x, w, v) + } + + public func merge(with z: Z, _ y: Y, _ x: X, _ w: W, _ v: V, _ u: U) -> Publishers.Merge8 where Z: Publisher, Y: Publisher, X: Publisher, W: Publisher, V: Publisher, U: Publisher, B.Failure == Z.Failure, B.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output, X.Failure == W.Failure, X.Output == W.Output, W.Failure == V.Failure, W.Output == V.Output, V.Failure == U.Failure, V.Output == U.Output { + Merge8(a, b, z, y, x, w, v, u) + } + } + + /// A publisher created by applying the merge function to three upstream publishers. + public struct Merge3: Publisher where A: Publisher, B: Publisher, C: Publisher, A.Failure == B.Failure, A.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output { + /// The kind of values published by this publisher. + /// + /// This publisher uses its upstream publishers' common output type. + public typealias Output = A.Output + + /// The kind of errors this publisher might publish. + /// + /// This publisher uses its upstream publishers' common failure type. + public typealias Failure = A.Failure + + /// A publisher to merge. + public let a: A + + /// A second publisher to merge. + public let b: B + + /// A third publisher to merge. + public let c: C + + /// Creates a publisher created by applying the merge function to three upstream publishers. + /// - Parameters: + /// - a: A publisher to merge + /// - b: A second publisher to merge. + /// - c: A third publisher to merge. + public init(_ a: A, _ b: B, _ c: C) { + self.a = a + self.b = b + self.c = c + } + + /// Attaches the specified subscriber to this publisher. + /// + /// Implementations of ``Publisher`` must implement this method. + /// + /// The provided implementation of ``Publisher/subscribe(_:)-199o9``calls this method. + /// + /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values. + public func receive(subscriber: S) where S: Subscriber, C.Failure == S.Failure, C.Output == S.Input { + typealias Inner = _Merged + let merger = Inner(downstream: subscriber, count: 3) + subscriber.receive(subscription: merger) + a.subscribe(Inner.Side(index: 0, merger: merger)) + b.subscribe(Inner.Side(index: 1, merger: merger)) + c.subscribe(Inner.Side(index: 2, merger: merger)) + } + + public func merge

(with other: P) -> Publishers.Merge4 where P: Publisher, C.Failure == P.Failure, C.Output == P.Output { + Merge4(a, b, c, other) + } + + public func merge(with z: Z, _ y: Y) -> Publishers.Merge5 where Z: Publisher, Y: Publisher, C.Failure == Z.Failure, C.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output { + Merge5(a, b, c, z, y) + } + + public func merge(with z: Z, _ y: Y, _ x: X) -> Publishers.Merge6 where Z: Publisher, Y: Publisher, X: Publisher, C.Failure == Z.Failure, C.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output { + Merge6(a, b, c, z, y, x) + } + + public func merge(with z: Z, _ y: Y, _ x: X, _ w: W) -> Publishers.Merge7 where Z: Publisher, Y: Publisher, X: Publisher, W: Publisher, C.Failure == Z.Failure, C.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output, X.Failure == W.Failure, X.Output == W.Output { + Merge7(a, b, c, z, y, x, w) + } + + public func merge(with z: Z, _ y: Y, _ x: X, _ w: W, _ v: V) -> Publishers.Merge8 where Z: Publisher, Y: Publisher, X: Publisher, W: Publisher, V: Publisher, C.Failure == Z.Failure, C.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output, X.Failure == W.Failure, X.Output == W.Output, W.Failure == V.Failure, W.Output == V.Output { + Merge8(a, b, c, z, y, x, w, v) + } + } + + /// A publisher created by applying the merge function to four upstream publishers. + public struct Merge4: Publisher where A: Publisher, B: Publisher, C: Publisher, D: Publisher, A.Failure == B.Failure, A.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output { + /// The kind of values published by this publisher. + /// + /// This publisher uses its upstream publishers' common output type. + public typealias Output = A.Output + + /// The kind of errors this publisher might publish. + /// + /// This publisher uses its upstream publishers' common failure type. + public typealias Failure = A.Failure + + /// A publisher to merge. + public let a: A + + /// A second publisher to merge. + public let b: B + + /// A third publisher to merge. + public let c: C + + /// A fourth publisher to merge. + public let d: D + + /// Creates a publisher created by applying the merge function to four upstream publishers. + /// - Parameters: + /// - a: A publisher to merge + /// - b: A second publisher to merge. + /// - c: A third publisher to merge. + /// - d: A fourth publisher to merge. + public init(_ a: A, _ b: B, _ c: C, _ d: D) { + self.a = a + self.b = b + self.c = c + self.d = d + } + + /// Attaches the specified subscriber to this publisher. + /// + /// Implementations of ``Publisher`` must implement this method. + /// + /// The provided implementation of ``Publisher/subscribe(_:)-199o9``calls this method. + /// + /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values. + public func receive(subscriber: S) where S: Subscriber, D.Failure == S.Failure, D.Output == S.Input { + typealias Inner = _Merged + let merger = Inner(downstream: subscriber, count: 4) + subscriber.receive(subscription: merger) + a.subscribe(Inner.Side(index: 0, merger: merger)) + b.subscribe(Inner.Side(index: 1, merger: merger)) + c.subscribe(Inner.Side(index: 2, merger: merger)) + d.subscribe(Inner.Side(index: 3, merger: merger)) + } + + public func merge

(with other: P) -> Publishers.Merge5 where P: Publisher, D.Failure == P.Failure, D.Output == P.Output { + Merge5(a, b, c, d, other) + } + + public func merge(with z: Z, _ y: Y) -> Publishers.Merge6 where Z: Publisher, Y: Publisher, D.Failure == Z.Failure, D.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output { + Merge6(a, b, c, d, z, y) + } + + public func merge(with z: Z, _ y: Y, _ x: X) -> Publishers.Merge7 where Z: Publisher, Y: Publisher, X: Publisher, D.Failure == Z.Failure, D.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output { + Merge7(a, b, c, d, z, y, x) + } + + public func merge(with z: Z, _ y: Y, _ x: X, _ w: W) -> Publishers.Merge8 where Z: Publisher, Y: Publisher, X: Publisher, W: Publisher, D.Failure == Z.Failure, D.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output, X.Failure == W.Failure, X.Output == W.Output { + Merge8(a, b, c, d, z, y, x, w) + } + } + + /// A publisher created by applying the merge function to five upstream publishers. + public struct Merge5: Publisher where A: Publisher, B: Publisher, C: Publisher, D: Publisher, E: Publisher, A.Failure == B.Failure, A.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output { + /// The kind of values published by this publisher. + /// + /// This publisher uses its upstream publishers' common output type. + public typealias Output = A.Output + + /// The kind of errors this publisher might publish. + /// + /// This publisher uses its upstream publishers' common failure type. + public typealias Failure = A.Failure + + /// A publisher to merge. + public let a: A + + /// A second publisher to merge. + public let b: B + + /// A third publisher to merge. + public let c: C + + /// A fourth publisher to merge. + public let d: D + + /// A fifth publisher to merge. + public let e: E + + /// Creates a publisher created by applying the merge function to five upstream publishers. + /// - Parameters: + /// - a: A publisher to merge + /// - b: A second publisher to merge. + /// - c: A third publisher to merge. + /// - d: A fourth publisher to merge. + /// - e: A fifth publisher to merge. + public init(_ a: A, _ b: B, _ c: C, _ d: D, _ e: E) { + self.a = a + self.b = b + self.c = c + self.d = d + self.e = e + } + + /// Attaches the specified subscriber to this publisher. + /// + /// Implementations of ``Publisher`` must implement this method. + /// + /// The provided implementation of ``Publisher/subscribe(_:)-199o9``calls this method. + /// + /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values. + public func receive(subscriber: S) where S: Subscriber, E.Failure == S.Failure, E.Output == S.Input { + typealias Inner = _Merged + let merger = Inner(downstream: subscriber, count: 5) + subscriber.receive(subscription: merger) + a.subscribe(Inner.Side(index: 0, merger: merger)) + b.subscribe(Inner.Side(index: 1, merger: merger)) + c.subscribe(Inner.Side(index: 2, merger: merger)) + d.subscribe(Inner.Side(index: 3, merger: merger)) + e.subscribe(Inner.Side(index: 4, merger: merger)) + } + + public func merge

(with other: P) -> Publishers.Merge6 where P: Publisher, E.Failure == P.Failure, E.Output == P.Output { + Merge6(a, b, c, d, e, other) + } + + public func merge(with z: Z, _ y: Y) -> Publishers.Merge7 where Z: Publisher, Y: Publisher, E.Failure == Z.Failure, E.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output { + Merge7(a, b, c, d, e, z, y) + } + + public func merge(with z: Z, _ y: Y, _ x: X) -> Publishers.Merge8 where Z: Publisher, Y: Publisher, X: Publisher, E.Failure == Z.Failure, E.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output, Y.Failure == X.Failure, Y.Output == X.Output { + Merge8(a, b, c, d, e, z, y, x) + } + } + + /// A publisher created by applying the merge function to six upstream publishers. + public struct Merge6: Publisher where A: Publisher, B: Publisher, C: Publisher, D: Publisher, E: Publisher, F: Publisher, A.Failure == B.Failure, A.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output, E.Failure == F.Failure, E.Output == F.Output { + /// The kind of values published by this publisher. + /// + /// This publisher uses its upstream publishers' common output type. + public typealias Output = A.Output + + /// The kind of errors this publisher might publish. + /// + /// This publisher uses its upstream publishers' common failure type. + public typealias Failure = A.Failure + + /// A publisher to merge. + public let a: A + + /// A second publisher to merge. + public let b: B + + /// A third publisher to merge. + public let c: C + + /// A fourth publisher to merge. + public let d: D + + /// A fifth publisher to merge. + public let e: E + + /// A sixth publisher to merge. + public let f: F + + /// publisher created by applying the merge function to six upstream publishers. + /// - Parameters: + /// - a: A publisher to merge + /// - b: A second publisher to merge. + /// - c: A third publisher to merge. + /// - d: A fourth publisher to merge. + /// - e: A fifth publisher to merge. + /// - f: A sixth publisher to merge. + public init(_ a: A, _ b: B, _ c: C, _ d: D, _ e: E, _ f: F) { + self.a = a + self.b = b + self.c = c + self.d = d + self.e = e + self.f = f + } + + /// Attaches the specified subscriber to this publisher. + /// + /// Implementations of ``Publisher`` must implement this method. + /// + /// The provided implementation of ``Publisher/subscribe(_:)-199o9``calls this method. + /// + /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values. + public func receive(subscriber: S) where S: Subscriber, F.Failure == S.Failure, F.Output == S.Input { + typealias Inner = _Merged + let merger = Inner(downstream: subscriber, count: 6) + subscriber.receive(subscription: merger) + a.subscribe(Inner.Side(index: 0, merger: merger)) + b.subscribe(Inner.Side(index: 1, merger: merger)) + c.subscribe(Inner.Side(index: 2, merger: merger)) + d.subscribe(Inner.Side(index: 3, merger: merger)) + e.subscribe(Inner.Side(index: 4, merger: merger)) + f.subscribe(Inner.Side(index: 5, merger: merger)) + } + + public func merge

(with other: P) -> Publishers.Merge7 where P: Publisher, F.Failure == P.Failure, F.Output == P.Output { + Merge7(a, b, c, d, e, f, other) + } + + public func merge(with z: Z, _ y: Y) -> Publishers.Merge8 where Z: Publisher, Y: Publisher, F.Failure == Z.Failure, F.Output == Z.Output, Z.Failure == Y.Failure, Z.Output == Y.Output { + Merge8(a, b, c, d, e, f, z, y) + } + } + + /// A publisher created by applying the merge function to seven upstream publishers. + public struct Merge7: Publisher where A: Publisher, B: Publisher, C: Publisher, D: Publisher, E: Publisher, F: Publisher, G: Publisher, A.Failure == B.Failure, A.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output, E.Failure == F.Failure, E.Output == F.Output, F.Failure == G.Failure, F.Output == G.Output { + /// The kind of values published by this publisher. + /// + /// This publisher uses its upstream publishers' common output type. + public typealias Output = A.Output + + /// The kind of errors this publisher might publish. + /// + /// This publisher uses its upstream publishers' common failure type. + public typealias Failure = A.Failure + + /// A publisher to merge. + public let a: A + + /// A second publisher to merge. + public let b: B + + /// A third publisher to merge. + public let c: C + + /// A fourth publisher to merge. + public let d: D + + /// A fifth publisher to merge. + public let e: E + + /// A sixth publisher to merge. + public let f: F + + /// An seventh publisher to merge. + public let g: G + + /// Creates a publisher created by applying the merge function to seven upstream publishers. + /// - Parameters: + /// - a: A publisher to merge + /// - b: A second publisher to merge. + /// - c: A third publisher to merge. + /// - d: A fourth publisher to merge. + /// - e: A fifth publisher to merge. + /// - f: A sixth publisher to merge. + /// - g: An seventh publisher to merge. + public init(_ a: A, _ b: B, _ c: C, _ d: D, _ e: E, _ f: F, _ g: G) { + self.a = a + self.b = b + self.c = c + self.d = d + self.e = e + self.f = f + self.g = g + } + + /// Attaches the specified subscriber to this publisher. + /// + /// Implementations of ``Publisher`` must implement this method. + /// + /// The provided implementation of ``Publisher/subscribe(_:)-199o9``calls this method. + /// + /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values. + public func receive(subscriber: S) where S: Subscriber, G.Failure == S.Failure, G.Output == S.Input { + typealias Inner = _Merged + let merger = Inner(downstream: subscriber, count: 7) + subscriber.receive(subscription: merger) + a.subscribe(Inner.Side(index: 0, merger: merger)) + b.subscribe(Inner.Side(index: 1, merger: merger)) + c.subscribe(Inner.Side(index: 2, merger: merger)) + d.subscribe(Inner.Side(index: 3, merger: merger)) + e.subscribe(Inner.Side(index: 4, merger: merger)) + f.subscribe(Inner.Side(index: 5, merger: merger)) + g.subscribe(Inner.Side(index: 6, merger: merger)) + } + + public func merge

(with other: P) -> Publishers.Merge8 where P: Publisher, G.Failure == P.Failure, G.Output == P.Output { + Merge8(a, b, c, d, e, f, g, other) + } + } + + /// A publisher created by applying the merge function to eight upstream publishers. + public struct Merge8: Publisher where A: Publisher, B: Publisher, C: Publisher, D: Publisher, E: Publisher, F: Publisher, G: Publisher, H: Publisher, A.Failure == B.Failure, A.Output == B.Output, B.Failure == C.Failure, B.Output == C.Output, C.Failure == D.Failure, C.Output == D.Output, D.Failure == E.Failure, D.Output == E.Output, E.Failure == F.Failure, E.Output == F.Output, F.Failure == G.Failure, F.Output == G.Output, G.Failure == H.Failure, G.Output == H.Output { + /// The kind of values published by this publisher. + /// + /// This publisher uses its upstream publishers' common output type. + public typealias Output = A.Output + + /// The kind of errors this publisher might publish. + /// + /// This publisher uses its upstream publishers' common failure type. + public typealias Failure = A.Failure + + /// A publisher to merge. + public let a: A + + /// A second publisher to merge. + public let b: B + + /// A third publisher to merge. + public let c: C + + /// A fourth publisher to merge. + public let d: D + + /// A fifth publisher to merge. + public let e: E + + /// A sixth publisher to merge. + public let f: F + + /// An seventh publisher to merge. + public let g: G + + /// A eighth publisher to merge. + public let h: H + + /// Creates a publisher created by applying the merge function to eight upstream publishers. + /// - Parameters: + /// - a: A publisher to merge + /// - b: A second publisher to merge. + /// - c: A third publisher to merge. + /// - d: A fourth publisher to merge. + /// - e: A fifth publisher to merge. + /// - f: A sixth publisher to merge. + /// - g: An seventh publisher to merge. + /// - h: An eighth publisher to merge. + public init(_ a: A, _ b: B, _ c: C, _ d: D, _ e: E, _ f: F, _ g: G, _ h: H) { + self.a = a + self.b = b + self.c = c + self.d = d + self.e = e + self.f = f + self.g = g + self.h = h + } + + /// Attaches the specified subscriber to this publisher. + /// + /// Implementations of ``Publisher`` must implement this method. + /// + /// The provided implementation of ``Publisher/subscribe(_:)-199o9``calls this method. + /// + /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values. + public func receive(subscriber: S) where S: Subscriber, H.Failure == S.Failure, H.Output == S.Input { + typealias Inner = _Merged + let merger = Inner(downstream: subscriber, count: 8) + subscriber.receive(subscription: merger) + a.subscribe(Inner.Side(index: 0, merger: merger)) + b.subscribe(Inner.Side(index: 1, merger: merger)) + c.subscribe(Inner.Side(index: 2, merger: merger)) + d.subscribe(Inner.Side(index: 3, merger: merger)) + e.subscribe(Inner.Side(index: 4, merger: merger)) + f.subscribe(Inner.Side(index: 5, merger: merger)) + g.subscribe(Inner.Side(index: 6, merger: merger)) + h.subscribe(Inner.Side(index: 7, merger: merger)) + } + } + + /// A publisher created by applying the merge function to an arbitrary number of upstream publishers. + public struct MergeMany: Publisher where Upstream: Publisher { + /// The kind of values published by this publisher. + /// + /// This publisher uses its upstream publishers' common output type. + public typealias Output = Upstream.Output + + /// The kind of errors this publisher might publish. + /// + /// This publisher uses its upstream publishers' common failure type. + public typealias Failure = Upstream.Failure + + /// The array of upstream publishers that this publisher merges together. + public let publishers: [Upstream] + + /// Creates a publisher created by applying the merge function to an arbitrary number of upstream publishers. + /// - Parameter upstream: A variadic parameter containing zero or more publishers to merge with this publisher. + public init(_ upstream: Upstream...) { + publishers = upstream + } + + /// Creates a publisher created by applying the merge function to a sequence of upstream publishers. + /// - Parameter upstream: A sequence containing zero or more publishers to merge with this publisher. + public init(_ upstream: S) where Upstream == S.Element, S: Swift.Sequence { + publishers = Array(upstream) + } + + /// Attaches the specified subscriber to this publisher. + /// + /// Implementations of ``Publisher`` must implement this method. + /// + /// The provided implementation of ``Publisher/subscribe(_:)-199o9``calls this method. + /// + /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values. + public func receive(subscriber: S) where S: Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input { + typealias Inner = _Merged + let merger = Inner(downstream: subscriber, count: publishers.count) + subscriber.receive(subscription: merger) + for (index, publisher) in publishers.enumerated() { + publisher.subscribe(Inner.Side(index: index, merger: merger)) + } + } + + public func merge(with other: Upstream) -> Publishers.MergeMany { + MergeMany(publishers + [other]) + } + } +} + +extension Publishers.Merge: Equatable where A: Equatable, B: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. + /// + /// - Parameters: + /// - lhs: A merging publisher to compare for equality. + /// - rhs: Another merging publisher to compare for equality.. + /// - Returns: `true` if the two merging - rhs: Another merging publisher to compare for equality. + public static func == (lhs: Publishers.Merge, rhs: Publishers.Merge) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b + } +} + +extension Publishers.Merge3: Equatable where A: Equatable, B: Equatable, C: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. + /// + /// - Parameters: + /// - lhs: A merging publisher to compare for equality. + /// - rhs: Another merging publisher to compare for equality. + /// - Returns: `true` if the two merging publishers have equal source publishers; otherwise `false`. + public static func == (lhs: Publishers.Merge3, rhs: Publishers.Merge3) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b && lhs.c == rhs.c + } +} + +extension Publishers.Merge4: Equatable where A: Equatable, B: Equatable, C: Equatable, D: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. + /// + /// - Parameters: + /// - lhs: A merging publisher to compare for equality. + /// - rhs: Another merging publisher to compare for equality. + /// - Returns: `true` if the two merging publishers have equal source publishers; otherwise `false`. + public static func == (lhs: Publishers.Merge4, rhs: Publishers.Merge4) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b && lhs.c == rhs.c && lhs.d == rhs.d + } +} + +extension Publishers.Merge5: Equatable where A: Equatable, B: Equatable, C: Equatable, D: Equatable, E: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. + /// + /// - Parameters: + /// - lhs: A merging publisher to compare for equality. + /// - rhs: Another merging publisher to compare for equality. + /// - Returns: `true` if the two merging publishers have equal source publishers; otherwise `false`. + public static func == (lhs: Publishers.Merge5, rhs: Publishers.Merge5) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b && lhs.c == rhs.c && lhs.d == rhs.d && lhs.e == rhs.e + } +} + +extension Publishers.Merge6: Equatable where A: Equatable, B: Equatable, C: Equatable, D: Equatable, E: Equatable, F: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. + /// + /// - Parameters: + /// - lhs: A merging publisher to compare for equality. + /// - rhs: Another merging publisher to compare for equality. + /// - Returns: `true` if the two merging publishers have equal source publishers; otherwise `false`. + public static func == (lhs: Publishers.Merge6, rhs: Publishers.Merge6) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b && lhs.c == rhs.c && lhs.d == rhs.d && lhs.e == rhs.e && lhs.f == rhs.f + } +} + +extension Publishers.Merge7: Equatable where A: Equatable, B: Equatable, C: Equatable, D: Equatable, E: Equatable, F: Equatable, G: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. + /// + /// - Parameters: + /// - lhs: A merging publisher to compare for equality. + /// - rhs: Another merging publisher to compare for equality. + /// - Returns: `true` if the two merging publishers have equal source publishers; otherwise `false`. + public static func == (lhs: Publishers.Merge7, rhs: Publishers.Merge7) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b && lhs.c == rhs.c && lhs.d == rhs.d && lhs.e == rhs.e && lhs.f == rhs.f && lhs.g == rhs.g + } +} + +extension Publishers.Merge8: Equatable where A: Equatable, B: Equatable, C: Equatable, D: Equatable, E: Equatable, F: Equatable, G: Equatable, H: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. + /// + /// - Parameters: + /// - lhs: A merging publisher to compare for equality. + /// - rhs: Another merging publisher to compare for equality. + /// - Returns: `true` if the two merging publishers have equal source publishers; otherwise `false`. + public static func == (lhs: Publishers.Merge8, rhs: Publishers.Merge8) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b && lhs.c == rhs.c && lhs.d == rhs.d && lhs.e == rhs.e && lhs.f == rhs.f && lhs.g == rhs.g && lhs.h == rhs.h + } +} + +extension Publishers.MergeMany: Equatable where Upstream: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. + /// - Parameters: + /// - lhs: A `MergeMany` publisher to compare for equality. + /// - rhs: Another `MergeMany` publisher to compare for equality. + /// - Returns: `true` if the publishers have equal `publishers` properties; otherwise `false`. + public static func == (lhs: Publishers.MergeMany, rhs: Publishers.MergeMany) -> Bool { + lhs.publishers == rhs.publishers + } +} + +// MARK: _Merge + +extension Publishers { + fileprivate class _Merged where Downstream: Subscriber, Input == Downstream.Input, Failure == Downstream.Failure { + let downstream: Downstream + var demand = Subscribers.Demand.none + var terminated = false + let count: Int + var upstreamFinished = 0 + var finished = false + var subscriptions: [Subscription?] + var buffers: [Input?] + let lock = UnfairLock.allocate() + let downstreamLock = UnfairRecursiveLock.allocate() + var recursive = false + var pending = Subscribers.Demand.none + + init(downstream: Downstream, count: Int) { + self.downstream = downstream + self.count = count + subscriptions = .init(repeating: nil, count: count) + buffers = .init(repeating: nil, count: count) + } + + deinit { + lock.deallocate() + downstreamLock.deallocate() + } + } +} + +extension Publishers._Merged: Subscription { + func request(_ demand: Subscribers.Demand) { + lock.lock() + guard !terminated, + !finished, + demand != .none, + self.demand != .unlimited else { + lock.unlock() + return + } + guard !recursive else { + pending = pending + demand + lock.unlock() + return + } + if demand == .unlimited { + self.demand = .unlimited + let buffers = buffers + self.buffers = Array(repeating: nil, count: buffers.count) + let subscriptions = subscriptions + let upstreamFinished = upstreamFinished + let count = count + lock.unlock() + buffers.forEach { input in + guard let input else { + return + } + guardedApplyDownstream { downstream in + _ = downstream.receive(input) + } + } + if upstreamFinished == count { + guardedBecomeTerminal() + guardedApplyDownstream { downstream in + downstream.receive(completion: .finished) + } + } else { + for subscription in subscriptions { + subscription?.request(.unlimited) + } + } + } else { + self.demand = self.demand + demand + var newBuffers: [Input] = [] + var newSubscrptions: [Subscription?] = [] + for (index, buffer) in buffers.enumerated() { + guard self.demand != .zero else { + break + } + guard let buffer else { + continue + } + buffers[index] = nil + if self.demand != .unlimited { + self.demand -= 1 + } + newBuffers.append(buffer) + newSubscrptions.append(subscriptions[index]) + } + let newFinished: Bool + if upstreamFinished == count { + if buffers.allSatisfy({ $0 == nil }) { + newFinished = true + finished = true + } else { + newFinished = false + } + } else { + newFinished = false + } + lock.unlock() + var newDemand = Subscribers.Demand.none + for buffer in newBuffers { + let demandResult = guardedApplyDownstream { downstream in + downstream.receive(buffer) + } + newDemand += demandResult + } + lock.lock() + newDemand = newDemand + pending + pending = .none + lock.unlock() + if newFinished { + guardedBecomeTerminal() + guardedApplyDownstream { downstream in + downstream.receive(completion: .finished) + } + } else { + if newDemand != .none { + lock.lock() + self.demand += newDemand + lock.unlock() + } + for subscrption in newSubscrptions { + subscrption?.request(.max(1)) + } + } + } + } + + func cancel() { + guardedBecomeTerminal() + } +} + +extension Publishers._Merged { + func receive(subscription: Subscription, _ index: Int) { + lock.lock() + guard !terminated, subscriptions[index] == nil else { + lock.unlock() + subscription.cancel() + return + } + subscriptions[index] = subscription + let demand = demand == .unlimited ? demand : .max(1) + lock.unlock() + subscription.request(demand) + } + + func receive(_ input: Input, _ index: Int) -> Subscribers.Demand { + lock.lock() + guard demand != .unlimited else { + lock.unlock() + return guardedApplyDownstream { downstream in + downstream.receive(input) + } + } + if demand == .none { + buffers[index] = input + lock.unlock() + return .none + } else { + lock.unlock() + let result = guardedApplyDownstream { downstream in + downstream.receive(input) + } + lock.lock() + demand = result + pending + demand - 1 + pending = .none + lock.unlock() + return .max(1) + } + } + + func receive(completion: Subscribers.Completion, _ index: Int) { + switch completion { + case .finished: + lock.lock() + upstreamFinished += 1 + subscriptions[index] = nil + if upstreamFinished == count, buffers.allSatisfy({ $0 == nil }) { + finished = true + lock.unlock() + guardedBecomeTerminal() + guardedApplyDownstream { downstream in + downstream.receive(completion: .finished) + } + } else { + lock.unlock() + } + case .failure: + lock.lock() + let terminated = terminated + lock.unlock() + if !terminated { + guardedBecomeTerminal() + guardedApplyDownstream { downstream in + downstream.receive(completion: completion) + } + } + } + } + + private func guardedApplyDownstream(_ block: (Downstream) -> Result) -> Result { + lock.lock() + recursive = true + lock.unlock() + downstreamLock.lock() + let result = block(downstream) + downstreamLock.unlock() + lock.lock() + recursive = false + lock.unlock() + return result + } + + private func guardedBecomeTerminal() { + lock.lock() + terminated = true + let subscriptions = subscriptions + self.subscriptions = Array(repeating: nil, count: subscriptions.count) + buffers = Array(repeating: nil, count: buffers.count) + lock.unlock() + for subscription in subscriptions { + subscription?.cancel() + } + } +} + +extension Publishers._Merged { + struct Side { + let index: Int + let merger: Publishers._Merged + let combineIdentifier: CombineIdentifier + + init(index: Int, merger: Publishers._Merged) { + self.index = index + self.merger = merger + combineIdentifier = CombineIdentifier() + } + } +} + +extension Publishers._Merged.Side: Subscriber { + func receive(subscription: Subscription) { + merger.receive(subscription: subscription, index) + } + + func receive(_ input: Input) -> Subscribers.Demand { + merger.receive(input, index) + } + + func receive(completion: Subscribers.Completion) { + merger.receive(completion: completion, index) + } +} + +extension Publishers._Merged: CustomStringConvertible { + var description: String { "Merge" } +} + +extension Publishers._Merged.Side: CustomStringConvertible { + var description: String { "Merge" } +} + +extension Publishers._Merged: CustomPlaygroundDisplayConvertible { + var playgroundDescription: Any { description } +} + +extension Publishers._Merged.Side: CustomPlaygroundDisplayConvertible { + var playgroundDescription: Any { description } +} + +extension Publishers._Merged: CustomReflectable { + var customMirror: Mirror { Mirror(self, children: [:]) } +} + +extension Publishers._Merged.Side: CustomReflectable { + var customMirror: Mirror { + Mirror(self, children: ["parentSubscription": merger.combineIdentifier]) + } +} diff --git a/Sources/OpenCombine/Publishers/Publishers.Zip.swift b/Sources/OpenCombine/Publishers/Publishers.Zip.swift index ba241eee..c87ffdd4 100644 --- a/Sources/OpenCombine/Publishers/Publishers.Zip.swift +++ b/Sources/OpenCombine/Publishers/Publishers.Zip.swift @@ -3,6 +3,7 @@ // // // Created by Kyle on 2023/7/25. +// Audited for Combine 2023 #if canImport(COpenCombineHelpers) import COpenCombineHelpers diff --git a/Sources/OpenCombine/Subscribers/Subscribers.Demand.swift b/Sources/OpenCombine/Subscribers/Subscribers.Demand.swift index 395c0c52..f7c28049 100644 --- a/Sources/OpenCombine/Subscribers/Subscribers.Demand.swift +++ b/Sources/OpenCombine/Subscribers/Subscribers.Demand.swift @@ -468,7 +468,7 @@ extension Subscribers { /// Creates a demand instance from a decoder. /// - /// - Parameter decoder: The decoder of a previously-encoded ``Subscribers.Demand`` + /// - Parameter decoder: The decoder of a previously-encoded ``Subscribers/Demand`` /// instance. public init(from decoder: Decoder) throws { try self.init(rawValue: decoder.singleValueContainer().decode(UInt.self)) diff --git a/Tests/OpenCombineTests/PublisherTests/MergeTests.swift b/Tests/OpenCombineTests/PublisherTests/MergeTests.swift new file mode 100644 index 00000000..252c3ac0 --- /dev/null +++ b/Tests/OpenCombineTests/PublisherTests/MergeTests.swift @@ -0,0 +1,898 @@ +// +// MergeTests.swift +// +// +// Created by Kyle on 2023/11/22. +// + +import XCTest + +#if OPENCOMBINE_COMPATIBILITY_TEST +import Combine +#else +import OpenCombine +#endif + +@available(macOS 10.15, iOS 13.0, *) +final class MergeTests: XCTestCase { + static let arities = (2 ... 10) + + struct ChildInfo { + let subscription: CustomSubscription + let publisher: CustomPublisher + } + + func testSendsExpectedValues() { + MergeTests.arities.forEach { arity in + let (children, merge) = getChildrenAndMergeForArity(arity) + let downstreamSubscriber = TrackingSubscriber(receiveSubscription: { + $0.request(.unlimited) + }) + merge.subscribe(downstreamSubscriber) + (0 ..< arity).forEach { XCTAssertEqual(children[$0].publisher.send($0), .none) } + XCTAssertEqual( + downstreamSubscriber.history, + [.subscription("Merge")] + (0 ..< arity).map { .value($0) } + ) + } + } + + func testChildDemand() { + [Subscribers.Demand.unlimited, .max(1), .max(10)].forEach { initialDemand in + let (children, merge) = getChildrenAndMergeForArity(2) + + var downstreamSubscription: Subscription? + let downstreamSubscriber = TrackingSubscriberBase( + receiveSubscription: { downstreamSubscription = $0 }) + + merge.subscribe(downstreamSubscriber) + + // Confirm initial demand + downstreamSubscription?.request(initialDemand) + (0 ..< 2).forEach { + if initialDemand == .unlimited { + XCTAssertEqual( + children[$0].subscription.history, + [ + .requested(.max(1)), + .requested(.unlimited), + ] + ) + } else { + XCTAssertEqual( + children[$0].subscription.history, + [.requested(.max(1))] + ) + } + } + (0 ..< 2).forEach { + if initialDemand == .unlimited { + XCTAssertEqual(children[$0].publisher.send(1), .max(0)) + } else if initialDemand == .max(1) { + switch $0 { + case 0: XCTAssertEqual(children[$0].publisher.send(1), .max(1)) + case 1: XCTAssertEqual(children[$0].publisher.send(1), .max(0)) + default: break + } + } else if initialDemand == .max(10) { + XCTAssertEqual(children[$0].publisher.send(1), .max(1)) + } + } + (0 ..< 2).forEach { + if initialDemand == .unlimited { + XCTAssertEqual( + children[$0].subscription.history, + [ + .requested(.max(1)), + .requested(.unlimited), + ] + ) + } else { + XCTAssertEqual( + children[$0].subscription.history, + [.requested(.max(1))] + ) + } + } + + if initialDemand == .max(1) { + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(1), + ]) + } else { + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(1), + .value(1), + ]) + } + + // Confirm subsequent demand + downstreamSubscription?.request(.max(2)) + (0 ..< 2).forEach { + if initialDemand == .unlimited { + XCTAssertEqual(children[$0].publisher.send(1), .max(0)) + } else if initialDemand == .max(1) { + switch $0 { + case 0: XCTAssertEqual(children[$0].publisher.send(1), .max(1)) + case 1: XCTAssertEqual(children[$0].publisher.send(1), .max(0)) + default: break + } + } else if initialDemand == .max(10) { + XCTAssertEqual(children[$0].publisher.send(1), .max(1)) + } + } + } + } + + func testDownstreamDemandRequestedWhileSendingValue() { + [Subscribers.Demand.unlimited, .max(1), .max(10)].forEach { initialDemand in + let (children, merge) = getChildrenAndMergeForArity(2) + var downstreamSubscription: Subscription? + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { + downstreamSubscription = $0 + $0.request(initialDemand) + }, + receiveValue: { _ in + downstreamSubscription?.request(.max(666)) + return Subscribers.Demand.none + } + ) + + merge.subscribe(downstreamSubscriber) + + if initialDemand == .unlimited { + XCTAssertEqual(children[0].publisher.send(1), .max(0)) + XCTAssertEqual(children[1].publisher.send(1), .max(0)) + } else { + XCTAssertEqual(children[0].publisher.send(1), .max(1)) + XCTAssertEqual(children[1].publisher.send(1), .max(1)) + } + + if initialDemand == .unlimited { + XCTAssertEqual(children[0].subscription.history, [.requested(.unlimited)]) + XCTAssertEqual(children[1].subscription.history, [.requested(.unlimited)]) + } else { + XCTAssertEqual(children[0].subscription.history, [.requested(.max(1))]) + XCTAssertEqual(children[1].subscription.history, [.requested(.max(1))]) + } + } + } + + func testUpstreamFinishReceivedWhileSendingValue() { + let (children, merge) = getChildrenAndMergeForArity(2) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }, + receiveValue: { _ in + children[0].publisher.send(completion: .finished) + return .none + } + ) + merge.subscribe(downstreamSubscriber) + XCTAssertEqual(children[0].publisher.send(1), .none) + XCTAssertEqual(children[0].publisher.send(1), .none) + XCTAssertEqual(children[1].publisher.send(1), .none) + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("Merge"), + .value(1), + .value(1), + .completion(.finished), + .value(1), + ] + ) + } + + func testMergeCompletesOnlyAfterAllChildrenComplete() { + let upstreamSubscription = CustomSubscription() + let child1Publisher = CustomPublisher(subscription: upstreamSubscription) + let child2Publisher = CustomPublisher(subscription: upstreamSubscription) + + let merge = child1Publisher.merge(with: child2Publisher) + + let downstreamSubscriber = TrackingSubscriberBase( + receiveSubscription: { $0.request(.unlimited) }) + + merge.subscribe(downstreamSubscriber) + + XCTAssertEqual(child1Publisher.send(100), .none) + XCTAssertEqual(child1Publisher.send(200), .none) + XCTAssertEqual(child1Publisher.send(300), .none) + XCTAssertEqual(child2Publisher.send(1), .none) + child1Publisher.send(completion: .finished) + + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + .value(200), + .value(300), + .value(1), + ]) + XCTAssertEqual(child2Publisher.send(2), .none) + XCTAssertEqual(child2Publisher.send(3), .none) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + .value(200), + .value(300), + .value(1), + .value(2), + .value(3), + ]) + child2Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + .value(200), + .value(300), + .value(1), + .value(2), + .value(3), + .completion(.finished), + ]) + XCTAssertEqual(upstreamSubscription.history, [ + .requested(.unlimited), + .requested(.unlimited), + ]) + } + + func testUpstreamExceedsDemand() { + // Must use CustomPublisher if we want to force send a value beyond the demand + let child1Subscription = CustomSubscription() + let child1Publisher = CustomPublisher(subscription: child1Subscription) + let child2Subscription = CustomSubscription() + let child2Publisher = CustomPublisher(subscription: child2Subscription) + + let merge = child1Publisher.merge(with: child2Publisher) + + var downstreamSubscription: Subscription? + let downstreamSubscriber = TrackingSubscriber(receiveSubscription: { + downstreamSubscription = $0 + $0.request(.max(1)) + }) + + merge.subscribe(downstreamSubscriber) + + XCTAssertEqual(child1Publisher.send(100), .max(1)) + XCTAssertEqual(child2Publisher.send(1), .none) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + ]) + + XCTAssertEqual(child1Publisher.send(200), .none) + XCTAssertEqual(child1Publisher.send(300), .none) + XCTAssertEqual(child2Publisher.send(2), .none) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + ]) + + XCTAssertEqual(child2Publisher.send(3), .none) + downstreamSubscription?.request(.max(1)) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + .value(300), + ]) + } + + private func getChildrenAndMergeForArity(_ childCount: Int) + -> ([ChildInfo], AnyPublisher) { + var children = [ChildInfo]() + for _ in 0 ..< childCount { + let subscription = CustomSubscription() + let publisher = CustomPublisher(subscription: subscription) + children.append(ChildInfo(subscription: subscription, + publisher: publisher)) + } + + let merge: AnyPublisher + switch childCount { + case let childCount where childCount < 2: + fatalError("Unsupported child count") + case 2: + merge = AnyPublisher(children[0].publisher + .merge(with: children[1].publisher) + ) + case 3: + merge = AnyPublisher(children[0].publisher + .merge(with: children[1].publisher) + .merge(with: children[2].publisher) + ) + case 4: + merge = AnyPublisher(children[0].publisher + .merge(with: children[1].publisher) + .merge(with: children[2].publisher) + .merge(with: children[3].publisher) + ) + case 5: + merge = AnyPublisher(children[0].publisher + .merge(with: children[1].publisher) + .merge(with: children[2].publisher) + .merge(with: children[3].publisher) + .merge(with: children[4].publisher) + ) + case 6: + merge = AnyPublisher(children[0].publisher + .merge(with: children[1].publisher) + .merge(with: children[2].publisher) + .merge(with: children[3].publisher) + .merge(with: children[4].publisher) + .merge(with: children[5].publisher) + ) + case 7: + merge = AnyPublisher(children[0].publisher + .merge(with: children[1].publisher) + .merge(with: children[2].publisher) + .merge(with: children[3].publisher) + .merge(with: children[4].publisher) + .merge(with: children[5].publisher) + .merge(with: children[6].publisher) + ) + case 8: + merge = AnyPublisher(children[0].publisher + .merge(with: children[1].publisher) + .merge(with: children[2].publisher) + .merge(with: children[3].publisher) + .merge(with: children[4].publisher) + .merge(with: children[5].publisher) + .merge(with: children[6].publisher) + .merge(with: children[7].publisher) + ) + default: + merge = AnyPublisher(Publishers.MergeMany(children.map(\.publisher))) + } + return (children, merge) + } + + func testImmediateFinishWhenOneChildFinishesWithNoSurplus() { + MergeTests.arities.forEach { arity in + for childToFinish in 0 ..< arity { + let description = "Merge\(arity) childToFinish=\(childToFinish)" + let (children, merge) = getChildrenAndMergeForArity(arity) + let downstreamSubscriber = TrackingSubscriber(receiveSubscription: { + $0.request(.unlimited) + }) + merge.subscribe(downstreamSubscriber) + children[childToFinish].publisher.send(completion: .finished) + XCTAssertEqual( + downstreamSubscriber.history, + [.subscription("Merge")], + description + ) + + for child in 0 ..< arity { + XCTAssertEqual( + children[child].subscription.history, + [.requested(.unlimited)], + description + ) + } + } + } + } + + func testDelayedFinishWhenOneChildFinishesWithSurplus() { + MergeTests.arities.forEach { arity in + for childToSend in 0 ..< arity { + for childToFinish in 0 ..< arity { + let (children, merge) = getChildrenAndMergeForArity(arity) + let downstreamSubscriber = TrackingSubscriber(receiveSubscription: { + $0.request(.unlimited) + }) + merge.subscribe(downstreamSubscriber) + _ = children[childToSend].publisher.send(666) + children[childToFinish].publisher.send(completion: .finished) + if childToSend == childToFinish { + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("Merge"), + .value(666), + ] + ) + // Finish the others + (0 ..< arity) + .filter { $0 != childToFinish } + .forEach { + children[$0].publisher.send(completion: .finished) + } + + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("Merge"), + .value(666), + .completion(.finished), + ] + ) + } else { + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("Merge"), + .value(666), + ] + ) + } + } + } + } + } + + func testBCancelledAfterAFailed() { + let child1Subscription = CustomSubscription() + let child1Publisher = CustomPublisher(subscription: child1Subscription) + + let child2Subscription = CustomSubscription() + let child2Publisher = CustomPublisher(subscription: child2Subscription) + + let merge = child1Publisher.merge(with: child2Publisher) + + let downstreamSubscriber = TrackingSubscriber(receiveSubscription: { + $0.request(.unlimited) + }) + + merge.subscribe(downstreamSubscriber) + + child1Publisher.send(completion: .failure(.oops)) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("Merge"), + .completion(.failure(.oops))]) + + XCTAssertEqual(child1Subscription.history, [.requested(.unlimited), + .cancelled]) + + XCTAssertEqual(child2Subscription.history, [.requested(.unlimited), + .cancelled]) + } + + func testAValueAfterAChildFinishedWithoutSurplus() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let merge = child1Publisher.merge(with: child2Publisher) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + merge.subscribe(downstreamSubscriber) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("Merge")]) + + child1Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("Merge")]) + + child1Publisher.send(200) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("Merge")]) + + child2Publisher.send(1) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(1), + ]) + + child2Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(1), + .completion(.finished), + ]) + } + + func testBValueAfterAChildFinishedWithoutSurplus() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let merge = child1Publisher.merge(with: child2Publisher) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + merge.subscribe(downstreamSubscriber) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("Merge")]) + + child1Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("Merge")]) + + child2Publisher.send(1) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(1), + ]) + + child2Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(1), + .completion(.finished), + ]) + } + + func testAValueAfterAChildFinishedWithSurplus() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let merge = child1Publisher.merge(with: child2Publisher) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + merge.subscribe(downstreamSubscriber) + + child1Publisher.send(100) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + ]) + + child1Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + ]) + + child1Publisher.send(200) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + ]) + + child2Publisher.send(1) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + .value(1), + ]) + + child2Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + .value(1), + .completion(.finished), + ]) + } + + func testBValueAfterAChildFinishedWithSurplus() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let merge = child1Publisher.merge(with: child2Publisher) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + merge.subscribe(downstreamSubscriber) + + child1Publisher.send(100) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + ]) + + child1Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + ]) + + child2Publisher.send(1) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + .value(1), + ]) + + child2Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + .value(1), + .completion(.finished), + ]) + } + + func testValueAfterFailed() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let merge = child1Publisher.merge(with: child2Publisher) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + merge.subscribe(downstreamSubscriber) + + child1Publisher.send(100) + child1Publisher.send(completion: .failure(.oops)) + child2Publisher.send(1) + + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .value(100), + .completion(.failure(.oops)), + ]) + } + + func testFinishAfterFinished() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let merge = child1Publisher.merge(with: child2Publisher) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + merge.subscribe(downstreamSubscriber) + + child1Publisher.send(completion: .finished) + child2Publisher.send(completion: .finished) + child1Publisher.send(completion: .finished) + + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .completion(.finished), + ]) + } + + func testFinishAfterFailed() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let merge = child1Publisher.merge(with: child2Publisher) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + merge.subscribe(downstreamSubscriber) + + child1Publisher.send(completion: .failure(.oops)) + child1Publisher.send(completion: .finished) + + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .completion(.failure(.oops)), + ]) + } + + func testFailedAfterFinished() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let merge = child1Publisher.merge(with: child2Publisher) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + merge.subscribe(downstreamSubscriber) + + child1Publisher.send(completion: .finished) + child2Publisher.send(completion: .finished) + child1Publisher.send(completion: .failure(.oops)) + + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .completion(.finished), + ]) + } + + func testFailedAfterFailed() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let merge = child1Publisher.merge(with: child2Publisher) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + merge.subscribe(downstreamSubscriber) + + child1Publisher.send(completion: .failure(.oops)) + child1Publisher.send(completion: .failure(.oops)) + + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("Merge"), + .completion(.failure(.oops)), + ]) + } + + func testMerge2Lifecycle() throws { + let child2Publisher = PassthroughSubject() + try testLifecycle( + sendValue: 42, + cancellingSubscriptionReleasesSubscriber: false, + finishingIsPassedThrough: false + ) { + $0.merge(with: child2Publisher) + } + } + + func testMerge3Lifecycle() throws { + let child2Publisher = PassthroughSubject() + let child3Publisher = PassthroughSubject() + try testLifecycle( + sendValue: 42, + cancellingSubscriptionReleasesSubscriber: false, + finishingIsPassedThrough: false + ) { + $0.merge(with: child2Publisher, child3Publisher) + } + } + + func testMerge4Lifecycle() throws { + let child2Publisher = PassthroughSubject() + let child3Publisher = PassthroughSubject() + let child4Publisher = PassthroughSubject() + + try testLifecycle( + sendValue: 42, + cancellingSubscriptionReleasesSubscriber: false, + finishingIsPassedThrough: false + ) { + $0.merge(with: child2Publisher, child3Publisher, child4Publisher) + } + } + + func testMerge5Lifecycle() throws { + let child2Publisher = PassthroughSubject() + let child3Publisher = PassthroughSubject() + let child4Publisher = PassthroughSubject() + let child5Publisher = PassthroughSubject() + + try testLifecycle( + sendValue: 42, + cancellingSubscriptionReleasesSubscriber: false, + finishingIsPassedThrough: false + ) { + $0.merge(with: child2Publisher, child3Publisher, child4Publisher, child5Publisher) + } + } + + func testMerge6Lifecycle() throws { + let child2Publisher = PassthroughSubject() + let child3Publisher = PassthroughSubject() + let child4Publisher = PassthroughSubject() + let child5Publisher = PassthroughSubject() + let child6Publisher = PassthroughSubject() + + try testLifecycle( + sendValue: 42, + cancellingSubscriptionReleasesSubscriber: false, + finishingIsPassedThrough: false + ) { + $0.merge(with: child2Publisher, child3Publisher, child4Publisher, child5Publisher, child6Publisher) + } + } + + func testMerge7Lifecycle() throws { + let child2Publisher = PassthroughSubject() + let child3Publisher = PassthroughSubject() + let child4Publisher = PassthroughSubject() + let child5Publisher = PassthroughSubject() + let child6Publisher = PassthroughSubject() + let child7Publisher = PassthroughSubject() + + try testLifecycle( + sendValue: 42, + cancellingSubscriptionReleasesSubscriber: false, + finishingIsPassedThrough: false + ) { + $0.merge(with: child2Publisher, child3Publisher, child4Publisher, child5Publisher, child6Publisher, child7Publisher) + } + } + + func testMerge8Lifecycle() throws { + let child2Publisher = PassthroughSubject() + let child3Publisher = PassthroughSubject() + let child4Publisher = PassthroughSubject() + let child5Publisher = PassthroughSubject() + let child6Publisher = PassthroughSubject() + let child7Publisher = PassthroughSubject() + let child8Publisher = PassthroughSubject() + + try testLifecycle( + sendValue: 42, + cancellingSubscriptionReleasesSubscriber: false, + finishingIsPassedThrough: false + ) { + $0.merge(with: child2Publisher, child3Publisher, child4Publisher, child5Publisher, child6Publisher, child7Publisher, child8Publisher) + } + } + + func testMergeManyLifecycle() throws { + let childPublisher = PassthroughSubject() + + try testLifecycle( + sendValue: 42, + cancellingSubscriptionReleasesSubscriber: false, + finishingIsPassedThrough: false + ) { + $0.merge(with: childPublisher) + } + } + + func testMergeReceiveSubscriptionTwice() throws { + let child2Publisher = PassthroughSubject() + + // Can't use `testReceiveSubscriptionTwice` helper here as `(Int, Int)` output + // can't be made `Equatable`. + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: nil, + receiveValueDemand: .none, + createSut: { $0.merge(with: child2Publisher) } + ) + + XCTAssertEqual(helper.subscription.history, [.requested(.max(1))]) + + let secondSubscription = CustomSubscription() + + try XCTUnwrap(helper.publisher.subscriber) + .receive(subscription: secondSubscription) + + XCTAssertEqual(secondSubscription.history, [.cancelled]) + + try XCTUnwrap(helper.publisher.subscriber) + .receive(subscription: helper.subscription) + + XCTAssertEqual(helper.subscription.history, [.requested(.max(1)), .cancelled]) + + try XCTUnwrap(helper.downstreamSubscription).cancel() + + XCTAssertEqual(helper.subscription.history, [.requested(.max(1)), .cancelled, .cancelled]) + + let thirdSubscription = CustomSubscription() + + try XCTUnwrap(helper.publisher.subscriber) + .receive(subscription: thirdSubscription) + } + + func testNoDemandOnSubscriptionNoCrashes() { + MergeTests.arities.forEach { arity in + let (_, merge) = getChildrenAndMergeForArity(arity) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { subscription in + subscription.request(.none) + } + ) + merge.subscribe(downstreamSubscriber) + } + } + + func testIncreasedDemand() throws { + MergeTests.arities.forEach { arity in + let (children, merge) = getChildrenAndMergeForArity(arity) + let downstreamSubscriber = TrackingSubscriber( + receiveValue: { _ in + .max(1) + } + ) + merge.subscribe(downstreamSubscriber) + + (0 ..< arity).forEach { + let demand = children[$0].publisher.send(1) + if $0 == arity - 1 { + XCTAssertEqual(demand, .max(0)) + } else { + XCTAssertEqual(demand, .none) + } + } + XCTAssertEqual(downstreamSubscriber.history, [.subscription("Merge")]) + } + } + + func testMergeCurrentValueSubject() throws { + let subject = CurrentValueSubject(0) + let merge = [42].publisher.merge(with: subject) + let downstreamSubscriber = TrackingSubscriberBase( + receiveSubscription: { $0.request(.unlimited) }) + merge.subscribe(downstreamSubscriber) + let history = downstreamSubscriber.history + XCTAssertEqual(history, [.subscription("Merge"), .value(42), .value(0)]) + } +}