diff --git a/RemainingCombineInterface.swift b/RemainingCombineInterface.swift index eefd2b61..f9484f82 100644 --- a/RemainingCombineInterface.swift +++ b/RemainingCombineInterface.swift @@ -901,52 +901,3 @@ extension Publishers.MergeMany : Equatable where Upstream : Equatable { /// - rhs: Another value to compare. public static func == (lhs: Publishers.MergeMany, rhs: Publishers.MergeMany) -> Bool } - -extension Publishers.Zip : Equatable where A : Equatable, B : Equatable { - - /// Returns a Boolean value that indicates whether two publishers are equivalent. - /// - /// - Parameters: - /// - lhs: A zip publisher to compare for equality. - /// - rhs: Another zip publisher to compare for equality. - /// - Returns: `true` if the corresponding upstream publishers of each zip publisher are equal, `false` otherwise. - public static func == (lhs: Publishers.Zip, rhs: Publishers.Zip) -> Bool -} - -/// Returns a Boolean value that indicates whether two publishers are equivalent. -/// -/// - Parameters: -/// - lhs: A zip publisher to compare for equality. -/// - rhs: Another zip publisher to compare for equality. -/// - Returns: `true` if the corresponding upstream publishers of each zip publisher are equal, `false` otherwise. -extension Publishers.Zip3 : Equatable where A : Equatable, B : Equatable, C : Equatable { - - /// Returns a Boolean value indicating whether two values are equal. - /// - /// Equality is the inverse of inequality. For any values `a` and `b`, - /// `a == b` implies that `a != b` is `false`. - /// - /// - Parameters: - /// - lhs: A value to compare. - /// - rhs: Another value to compare. - public static func == (lhs: Publishers.Zip3, rhs: Publishers.Zip3) -> Bool -} - -/// Returns a Boolean value that indicates whether two publishers are equivalent. -/// -/// - Parameters: -/// - lhs: A zip publisher to compare for equality. -/// - rhs: Another zip publisher to compare for equality. -/// - Returns: `true` if the corresponding upstream publishers of each zip publisher are equal, `false` otherwise. -extension Publishers.Zip4 : Equatable where A : Equatable, B : Equatable, C : Equatable, D : Equatable { - - /// Returns a Boolean value indicating whether two values are equal. - /// - /// Equality is the inverse of inequality. For any values `a` and `b`, - /// `a == b` implies that `a != b` is `false`. - /// - /// - Parameters: - /// - lhs: A value to compare. - /// - rhs: Another value to compare. - public static func == (lhs: Publishers.Zip4, rhs: Publishers.Zip4) -> Bool -} diff --git a/Sources/OpenCombine/Helpers/Violations.swift b/Sources/OpenCombine/Helpers/Violations.swift index 78eb999d..c9f35f80 100644 --- a/Sources/OpenCombine/Helpers/Violations.swift +++ b/Sources/OpenCombine/Helpers/Violations.swift @@ -30,4 +30,10 @@ extension Subscribers.Demand { precondition(rawValue <= Subscribers.Demand.unlimited.rawValue) precondition(rawValue > Subscribers.Demand.none.rawValue) } + + @_transparent + @inline(__always) + internal func assertValid() { + precondition(rawValue <= Subscribers.Demand.unlimited.rawValue) + } } diff --git a/Sources/OpenCombine/Publishers/Publishers.Throttle.swift b/Sources/OpenCombine/Publishers/Publishers.Throttle.swift index e0f8ec24..f3d9ec95 100644 --- a/Sources/OpenCombine/Publishers/Publishers.Throttle.swift +++ b/Sources/OpenCombine/Publishers/Publishers.Throttle.swift @@ -103,6 +103,15 @@ extension Publishers { self.latest = latest } + /// Attaches the specified subscriber to this publisher. + /// + /// Implementations of ``Publisher`` must implement this method. + /// + /// The provided implementation of ``Publisher/subscribe(_:)-199o9``calls + /// this method. + /// + /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, + /// after which it can receive values. // swiftlint:disable generic_type_name public func receive(subscriber: S) where S: Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input diff --git a/Sources/OpenCombine/Publishers/Publishers.Zip.swift b/Sources/OpenCombine/Publishers/Publishers.Zip.swift index a72a26d8..7c078944 100644 --- a/Sources/OpenCombine/Publishers/Publishers.Zip.swift +++ b/Sources/OpenCombine/Publishers/Publishers.Zip.swift @@ -1,720 +1,723 @@ // // Publishers.Zip.swift // -// Created by Eric Patey on 29.08.2019. // - -// swiftlint:disable large_tuple +// Created by Kyle on 2023/7/25. #if canImport(COpenCombineHelpers) import COpenCombineHelpers #endif -extension Publishers { +extension Publisher { + /// Combines elements from another publisher and deliver pairs of elements as tuples. + /// + /// Use ``Publisher/zip(_:)`` to combine the latest elements from two publishers and emit a tuple to the downstream. The returned publisher waits until both publishers have emitted an event, then delivers the oldest unconsumed event from each publisher together as a tuple to the subscriber. + /// + /// Much like a zipper or zip fastener on a piece of clothing pulls together rows of teeth to link the two sides, ``Publisher/zip(_:)`` combines streams from two different publishers by linking pairs of elements from each side. + /// + /// In this example, `numbers` and `letters` are ``PassthroughSubject``s that emit values; once ``Publisher/zip(_:)`` receives one value from each, it publishes the pair as a tuple to the downstream subscriber. It then waits for the next pair of values. + /// + /// let numbersPub = PassthroughSubject() + /// let lettersPub = PassthroughSubject() + /// + /// cancellable = numbersPub + /// .zip(lettersPub) + /// .sink { print("\($0)") } + /// numbersPub.send(1) // numbersPub: 1 lettersPub: zip output: + /// numbersPub.send(2) // numbersPub: 1,2 lettersPub: zip output: + /// lettersPub.send("A") // numbers: 1,2 letters:"A" zip output: + /// numbersPub.send(3) // numbers: 1,2,3 letters: zip output: (1,"A") + /// lettersPub.send("B") // numbers: 1,2,3 letters: "B" zip output: (2,"B") + /// + /// // Prints: + /// // (1, "A") + /// // (2, "B") + /// + /// If either upstream publisher finishes successfully or fails with an error, the zipped publisher does the same. + /// + /// - Parameter other: Another publisher. + /// - Returns: A publisher that emits pairs of elements from the upstream publishers as tuples. + public func zip

(_ other: P) -> Publishers.Zip where P: Publisher, Self.Failure == P.Failure { + Publishers.Zip(self, other) + } - /// A publisher created by applying the zip function to two upstream publishers. - public struct Zip: Publisher - where UpstreamA.Failure == UpstreamB.Failure - { + /// Combines elements from another publisher and delivers a transformed output. + /// + /// Use ``Publisher/zip(_:_:)-7ve7u`` to return a new publisher that combines the elements from two publishers using a transformation you specify to publish a new value to the downstream. The returned publisher waits until both publishers have emitted an event, then delivers the oldest unconsumed event from each publisher together that the operator uses in the transformation. + /// + /// In this example, ``PassthroughSubject`` instances `numbersPub` and `lettersPub` emit values; ``Publisher/zip(_:_:)-7ve7u`` receives the oldest value from each publisher, uses the `Int` from `numbersPub` and publishes a string that repeats the [String](https://developer.apple.com/documentation/swift/string) from `lettersPub` that many times. + /// + /// let numbersPub = PassthroughSubject() + /// let lettersPub = PassthroughSubject() + /// cancellable = numbersPub + /// .zip(lettersPub) { anInt, aLetter in + /// String(repeating: aLetter, count: anInt) + /// } + /// .sink { print("\($0)") } + /// numbersPub.send(1) // numbersPub: 1 lettersPub: zip output: + /// numbersPub.send(2) // numbersPub: 1,2 lettersPub: zip output: + /// numbersPub.send(3) // numbersPub: 1,2,3 lettersPub: zip output: + /// lettersPub.send("A") // numbersPub: 1,2,3 lettersPub: "A" zip output: "A" + /// lettersPub.send("B") // numbersPub: 2,3 lettersPub: "B" zip output: "BB" + /// // Prints: + /// // A + /// // BB + /// + /// If either upstream publisher finishes successfully or fails with an error, the zipped publisher does the same. + /// + /// - Parameters: + /// - other: Another publisher. + /// - transform: A closure that receives the most-recent value from each publisher and returns a new value to publish. + /// - Returns: A publisher that uses the `transform` closure to emit new elements, produced by combining the most recent value from two upstream publishers. + public func zip(_ other: P, _ transform: @escaping (Self.Output, P.Output) -> T) -> Publishers.Map, T> where P: Publisher, Self.Failure == P.Failure { + zip(other).map(transform) + } + + /// Combines elements from two other publishers and delivers groups of elements as tuples. + /// + /// Use ``Publisher/zip(_:_:)-2p498`` to return a new publisher that combines the elements from two additional publishers to publish a tuple to the downstream. The returned publisher waits until all three publishers have emitted an event, then delivers the oldest unconsumed event from each publisher as a tuple to the subscriber. + /// + /// In this example, `numbersPub`, `lettersPub` and `emojiPub` are each a ``PassthroughSubject``; + /// ``Publisher/zip(_:_:)-2p498`` receives the oldest unconsumed value from each publisher and combines them into a tuple that it republishes to the downstream: + /// + /// let numbersPub = PassthroughSubject() + /// let lettersPub = PassthroughSubject() + /// let emojiPub = PassthroughSubject() + /// + /// cancellable = numbersPub + /// .zip(lettersPub, emojiPub) + /// .sink { print("\($0)") } + /// numbersPub.send(1) // numbersPub: 1 lettersPub: emojiPub: zip output: + /// numbersPub.send(2) // numbersPub: 1,2 lettersPub: emojiPub: zip output: + /// numbersPub.send(3) // numbersPub: 1,2,3 lettersPub: emojiPub: zip output: + /// lettersPub.send("A") // numbersPub: 1,2,3 lettersPub: "A" emojiPub: zip output: + /// emojiPub.send("πŸ˜€") // numbersPub: 2,3 lettersPub: "A" emojiPub: "πŸ˜€" zip output: (1, "A", "πŸ˜€") + /// lettersPub.send("B") // numbersPub: 2,3 lettersPub: "B" emojiPub: zip output: + /// emojiPub.send("πŸ₯°") // numbersPub: 3 lettersPub: emojiPub: zip output: (2, "B", "πŸ₯°") + /// + /// // Prints: + /// // (1, "A", "πŸ˜€") + /// // (2, "B", "πŸ₯°") + /// + /// If any upstream publisher finishes successfully or fails with an error, so too does the zipped publisher. + /// + /// - Parameters: + /// - publisher1: A second publisher. + /// - publisher2: A third publisher. + /// - Returns: A publisher that emits groups of elements from the upstream publishers as tuples. + public func zip(_ publisher1: P, _ publisher2: Q) -> Publishers.Zip3 where P: Publisher, Q: Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure { + Publishers.Zip3(self, publisher1, publisher2) + } + + /// Combines elements from two other publishers and delivers a transformed output. + /// + /// Use ``Publisher/zip(_:_:_:)-19jxo`` to return a new publisher that combines the elements from two other publishers using a transformation you specify to publish a new value to the downstream subscriber. The returned publisher waits until all three publishers have emitted an event, then delivers the oldest unconsumed event from each publisher together that the operator uses in the transformation. + /// + /// In this example, `numbersPub`, `lettersPub` and `emojiPub` are each a ``PassthroughSubject`` that emit values; ``Publisher/zip(_:_:_:)-19jxo`` receives the oldest value from each publisher and uses the `Int` from `numbersPub` and publishes a string that repeats the [String](https://developer.apple.com/documentation/swift/string) from `lettersPub` and `emojiPub` that many times. + /// + /// let numbersPub = PassthroughSubject() + /// let lettersPub = PassthroughSubject() + /// let emojiPub = PassthroughSubject() + /// + /// cancellable = numbersPub + /// .zip(letters, emoji) { anInt, aLetter, anEmoji in + /// ("\(String(repeating: anEmoji, count: anInt)) \(String(repeating: aLetter, count: anInt))") + /// } + /// .sink { print("\($0)") } + /// + /// numbersPub.send(1) // numbersPub: 1 lettersPub: emojiPub: zip output: + /// numbersPub.send(2) // numbersPub: 1,2 lettersPub: emojiPub: zip output: + /// numbersPub.send(3) // numbersPub: 1,2,3 lettersPub: emojiPub: zip output: + /// lettersPub.send("A") // numbersPub: 1,2,3 lettersPub: "A" emojiPub: zip output: + /// emojiPub.send("πŸ˜€") // numbersPub: 2,3 lettersPub: "A" emojiPub:"πŸ˜€" zip output: "πŸ˜€ A" + /// lettersPub.send("B") // numbersPub: 2,3 lettersPub: "B" emojiPub: zip output: + /// emojiPub.send("πŸ₯°") // numbersPub: 3 lettersPub: emojiPub:"πŸ˜€", "πŸ₯°" zip output: "πŸ₯°πŸ₯° BB" + /// + /// // Prints: + /// // πŸ˜€ A + /// // πŸ₯°πŸ₯° BB + /// + /// If any upstream publisher finishes successfully or fails with an error, so too does the zipped publisher. + /// + /// - Parameters: + /// - publisher1: A second publisher. + /// - publisher2: A third publisher. + /// - transform: A closure that receives the most-recent value from each publisher and returns a new value to publish. + /// - Returns: A publisher that uses the `transform` closure to emit new elements, produced by combining the most recent value from three upstream publishers. + public func zip(_ publisher1: P, _ publisher2: Q, _ transform: @escaping (Self.Output, P.Output, Q.Output) -> T) -> Publishers.Map, T> where P: Publisher, Q: Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure { + zip(publisher1, publisher2).map(transform) + } + + /// Combines elements from three other publishers and delivers groups of elements as tuples. + /// + /// Use ``Publisher/zip(_:_:_:)-67czn`` to return a new publisher that combines the elements from three other publishers to publish a tuple to the downstream subscriber. The returned publisher waits until all four publishers have emitted an event, then delivers the oldest unconsumed event from each publisher as a tuple to the subscriber. + /// + /// In this example, several ``PassthroughSubject`` instances emit values; ``Publisher/zip(_:_:_:)-67czn`` receives the oldest unconsumed value from each publisher and combines them into a tuple that it republishes to the downstream: + /// + /// let numbersPub = PassthroughSubject() + /// let lettersPub = PassthroughSubject() + /// let emojiPub = PassthroughSubject() + /// let fractionsPub = PassthroughSubject() + /// + /// cancellable = numbersPub + /// .zip(lettersPub, emojiPub, fractionsPub) + /// .sink { print("\($0)") } + /// numbersPub.send(1) // numbersPub: 1 lettersPub: emojiPub: fractionsPub: zip output: + /// numbersPub.send(2) // numbersPub: 1,2 lettersPub: emojiPub: fractionsPub: zip output: + /// numbersPub.send(3) // numbersPub: 1,2,3 lettersPub: emojiPub: fractionsPub: zip output: + /// fractionsPub.send(0.1) // numbersPub: 1,2,3 lettersPub: "A" emojiPub: fractionsPub: 0.1 zip output: + /// lettersPub.send("A") // numbersPub: 1,2,3 lettersPub: "A" emojiPub: fractionsPub: 0.1 zip output: + /// emojiPub.send("πŸ˜€") // numbersPub: 2,3 lettersPub: "A" emojiPub: "πŸ˜€" fractionsPub: 0.1 zip output: (1, "A", "πŸ˜€", 0.1) + /// lettersPub.send("B") // numbersPub: 2,3 lettersPub: "B" emojiPub: fractionsPub: zip output: + /// fractionsPub.send(0.8) // numbersPub: 2,3 lettersPub: "B" emojiPub: fractionsPub: 0.8 zip output: + /// emojiPub.send("πŸ₯°") // numbersPub: 3 lettersPub: "B" emojiPub: fractionsPub: 0.8 zip output: (2, "B", "πŸ₯°", 0.8) + /// // Prints: + /// // (1, "A", "πŸ˜€", 0.1) + /// // (2, "B", "πŸ₯°", 0.8) + /// + /// + /// If any upstream publisher finishes successfully or fails with an error, so too does the zipped publisher. + /// + /// - Parameters: + /// - publisher1: A second publisher. + /// - publisher2: A third publisher. + /// - publisher3: A fourth publisher. + /// - Returns: A publisher that emits groups of elements from the upstream publishers as tuples. + public func zip(_ publisher1: P, _ publisher2: Q, _ publisher3: R) -> Publishers.Zip4 where P: Publisher, Q: Publisher, R: Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure { + Publishers.Zip4(self, publisher1, publisher2, publisher3) + } + + /// Combines elements from three other publishers and delivers a transformed output. + /// + /// Use ``Publisher/zip(_:_:_:_:)`` to return a new publisher that combines the elements from three other publishers using a transformation you specify to publish a new value to the downstream subscriber. The returned publisher waits until all four publishers have emitted an event, then delivers the oldest unconsumed event from each publisher together that the operator uses in the transformation. + /// + /// In this example, the ``PassthroughSubject`` publishers, `numbersPub`, + /// `fractionsPub`, `lettersPub`, and `emojiPub` emit values. The ``Publisher/zip(_:_:_:_:)`` operator receives the oldest value from each publisher and uses the `Int` from `numbersPub` and publishes a string that repeats the [String](https://developer.apple.com/documentation/swift/string) from `lettersPub` and `emojiPub` that many times and prints out the value in `fractionsPub`. + /// + /// let numbersPub = PassthroughSubject() // first publisher + /// let lettersPub = PassthroughSubject() // second + /// let emojiPub = PassthroughSubject() // third + /// let fractionsPub = PassthroughSubject()// fourth + /// + /// cancellable = numbersPub + /// .zip(lettersPub, emojiPub, fractionsPub) { anInt, aLetter, anEmoji, aFraction in + /// ("\(String(repeating: anEmoji, count: anInt)) \(String(repeating: aLetter, count: anInt)) \(aFraction)") + /// } + /// .sink { print("\($0)") } + /// + /// numbersPub.send(1) // numbersPub: 1 lettersPub: emojiPub: zip output: + /// numbersPub.send(2) // numbersPub: 1,2 lettersPub: emojiPub: zip output: + /// numbersPub.send(3) // numbersPub: 1,2,3 lettersPub: emojiPub: zip output: + /// fractionsPub.send(0.1) // numbersPub: 1,2,3 lettersPub: "A" emojiPub: zip output: + /// lettersPub.send("A") // numbersPub: 1,2,3 lettersPub: "A" emojiPub: zip output: + /// emojiPub.send("πŸ˜€") // numbersPub: 1,2,3 lettersPub: "A" emojiPub:"πŸ˜€" zip output: "πŸ˜€ A" + /// lettersPub.send("B") // numbersPub: 2,3 lettersPub: "B" emojiPub: zip output: + /// fractionsPub.send(0.8) // numbersPub: 2,3 lettersPub: "A" emojiPub: zip output: + /// emojiPub.send("πŸ₯°") // numbersPub: 3 lettersPub: "B" emojiPub: zip output: "πŸ₯°πŸ₯° BB" + /// // Prints: + /// //1 πŸ˜€ A 0.1 + /// //2 πŸ₯°πŸ₯° BB 0.8 + /// + /// If any upstream publisher finishes successfully or fails with an error, so too does the zipped publisher. + /// + /// - Parameters: + /// - publisher1: A second publisher. + /// - publisher2: A third publisher. + /// - publisher3: A fourth publisher. + /// - transform: A closure that receives the most-recent value from each publisher and returns a new value to publish. + /// - Returns: A publisher that uses the `transform` closure to emit new elements, produced by combining the most recent value from four upstream publishers. + public func zip(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map, T> where P: Publisher, Q: Publisher, R: Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure { + zip(publisher1, publisher2, publisher3).map(transform) + } +} +extension Publishers { + /// A publisher created by applying the zip function to two upstream publishers. + /// + /// Use `Publishers.Zip` to combine the latest elements from two publishers and emit a tuple to the downstream. The returned publisher waits until both publishers have emitted an event, then delivers the oldest unconsumed event from each publisher together as a tuple to the subscriber. + /// + /// Much like a zipper or zip fastener on a piece of clothing pulls together rows of teeth to link the two sides, `Publishers.Zip` combines streams from two different publishers by linking pairs of elements from each side. + /// + /// If either upstream publisher finishes successfully or fails with an error, so too does the zipped publisher. + public struct Zip: Publisher where A: Publisher, B: Publisher, A.Failure == B.Failure { /// The kind of values published by this publisher. - public typealias Output = (UpstreamA.Output, UpstreamB.Output) + /// + /// This publisher produces two-element tuples, whose members' types correspond to the types produced by the upstream publishers. + public typealias Output = (A.Output, B.Output) /// The kind of errors this publisher might publish. /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = UpstreamA.Failure + /// This publisher uses its upstream publishers' common failure type. + public typealias Failure = A.Failure - public let a: UpstreamA + /// A publisher to zip. + public let a: A - public let b: UpstreamB + /// Another publisher to zip. + public let b: B - public init(_ a: UpstreamA, _ b: UpstreamB) { + /// Creates a publisher that applies the zip function to two upstream publishers. + /// - Parameters: + /// - a: A publisher to zip. + /// - b: Another publisher to zip. + public init(_ a: A, _ b: B) { self.a = a self.b = b } - /// This function is called to attach the specified `Subscriber` to this - /// `Publisher` by `subscribe(_:)` + /// Attaches the specified subscriber to this publisher. /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: Downstream) where - UpstreamB.Failure == Downstream.Failure, - Downstream.Input == (UpstreamA.Output, UpstreamB.Output) - { - _ = Inner(downstream: subscriber, a, b) + /// Implementations of ``Publisher`` must implement this method. + /// + /// The provided implementation of ``Publisher/subscribe(_:)-199o9``calls this method. + /// + /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values. + public func receive(subscriber: S) where S: Subscriber, B.Failure == S.Failure, S.Input == (A.Output, B.Output) { + typealias Inner = Zip2Inner + let zip = Inner(downstream: subscriber, upstreamCount: 2) + a.subscribe(Inner.Side(index: 0, zip: zip)) + b.subscribe(Inner.Side(index: 1, zip: zip)) } } /// A publisher created by applying the zip function to three upstream publishers. - public struct Zip3 - : Publisher - where UpstreamA.Failure == UpstreamB.Failure, - UpstreamB.Failure == UpstreamC.Failure - { + /// + /// Use a `Publishers.Zip3` to combine the latest elements from three publishers and emit a tuple to the downstream. The returned publisher waits until all three publishers have emitted an event, then delivers the oldest unconsumed event from each publisher as a tuple to the subscriber. + /// + /// If any upstream publisher finishes successfully or fails with an error, so too does the zipped publisher. + public struct Zip3: Publisher where A: Publisher, B: Publisher, C: Publisher, A.Failure == B.Failure, B.Failure == C.Failure { /// The kind of values published by this publisher. - public typealias Output = (UpstreamA.Output, UpstreamB.Output, UpstreamC.Output) + /// + /// This publisher produces three-element tuples, whose members' types correspond to the types produced by the upstream publishers. + public typealias Output = (A.Output, B.Output, C.Output) /// The kind of errors this publisher might publish. /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = UpstreamA.Failure + /// This publisher uses its upstream publishers' common failure type. + public typealias Failure = A.Failure - public let a: UpstreamA + /// A publisher to zip. + public let a: A - public let b: UpstreamB + /// A second publisher to zip. + public let b: B - public let c: UpstreamC + /// A third publisher to zip. + public let c: C - public init(_ a: UpstreamA, _ b: UpstreamB, _ c: UpstreamC) { + /// Creates a publisher that applies the zip function to three upstream publishers. + /// - Parameters: + /// - a: A publisher to zip. + /// - b: A second publisher to zip. + /// - c: A third publisher to zip. + public init(_ a: A, _ b: B, _ c: C) { self.a = a self.b = b self.c = c } - /// This function is called to attach the specified `Subscriber` to this - /// `Publisher` by `subscribe(_:)` + /// Attaches the specified subscriber to this publisher. /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: Downstream) - where Downstream: Subscriber, - UpstreamC.Failure == Downstream.Failure, - Downstream.Input == (UpstreamA.Output, UpstreamB.Output, UpstreamC.Output) - { - _ = Inner(downstream: subscriber, a, b, c) + /// Implementations of ``Publisher`` must implement this method. + /// + /// The provided implementation of ``Publisher/subscribe(_:)-199o9``calls this method. + /// + /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values. + public func receive(subscriber: S) where S: Subscriber, C.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output) { + typealias Inner = Zip3Inner + let zip = Inner(downstream: subscriber, upstreamCount: 3) + a.subscribe(Inner.Side(index: 0, zip: zip)) + b.subscribe(Inner.Side(index: 1, zip: zip)) + c.subscribe(Inner.Side(index: 2, zip: zip)) } } /// A publisher created by applying the zip function to four upstream publishers. - public struct Zip4< - UpstreamA: Publisher, - UpstreamB: Publisher, - UpstreamC: Publisher, - UpstreamD: Publisher - >: Publisher where - UpstreamA.Failure == UpstreamB.Failure, - UpstreamB.Failure == UpstreamC.Failure, - UpstreamC.Failure == UpstreamD.Failure - { - + /// + /// Use a `Publishers.Zip4` to combine the latest elements from four publishers and emit a tuple to the downstream. The returned publisher waits until all four publishers have emitted an event, then delivers the oldest unconsumed event from each publisher as a tuple to the subscriber. + /// + /// If any upstream publisher finishes successfully or fails with an error, so too does the zipped publisher. + public struct Zip4: Publisher where A: Publisher, B: Publisher, C: Publisher, D: Publisher, A.Failure == B.Failure, B.Failure == C.Failure, C.Failure == D.Failure { /// The kind of values published by this publisher. - public typealias Output = ( - UpstreamA.Output, - UpstreamB.Output, - UpstreamC.Output, - UpstreamD.Output) + /// + /// This publisher produces four-element tuples, whose members' types correspond to the types produced by the upstream publishers. + public typealias Output = (A.Output, B.Output, C.Output, D.Output) /// The kind of errors this publisher might publish. /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = UpstreamA.Failure + /// This publisher uses its upstream publishers' common failure type. + public typealias Failure = A.Failure - public let a: UpstreamA + /// A publisher to zip. + public let a: A - public let b: UpstreamB + /// A second publisher to zip. + public let b: B - public let c: UpstreamC + /// A third publisher to zip. + public let c: C - public let d: UpstreamD + /// A fourth publisher to zip. + public let d: D - public init(_ a: UpstreamA, _ b: UpstreamB, _ c: UpstreamC, _ d: UpstreamD) { + /// Creates a publisher created by applying the zip function to four upstream publishers. + /// - Parameters: + /// - a: A publisher to zip. + /// - b: A second publisher to zip. + /// - c: A third publisher to zip. + /// - d: A fourth publisher to zip. + public init(_ a: A, _ b: B, _ c: C, _ d: D) { self.a = a self.b = b self.c = c self.d = d } - /// This function is called to attach the specified `Subscriber` to this - /// `Publisher` by `subscribe(_:)` + /// Attaches the specified subscriber to this publisher. /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: Downstream) - where UpstreamD.Failure == Downstream.Failure, - Downstream.Input == ( - UpstreamA.Output, - UpstreamB.Output, - UpstreamC.Output, - UpstreamD.Output) - { - _ = Inner(downstream: subscriber, a, b, c, d) + /// Implementations of ``Publisher`` must implement this method. + /// + /// The provided implementation of ``Publisher/subscribe(_:)-199o9``calls this method. + /// + /// - Parameter subscriber: The subscriber to attach to this ``Publisher``, after which it can receive values. + public func receive(subscriber: S) where S: Subscriber, D.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output, D.Output) { + typealias Inner = Zip4Inner + let zip = Inner(downstream: subscriber, upstreamCount: 4) + a.subscribe(Inner.Side(index: 0, zip: zip)) + b.subscribe(Inner.Side(index: 1, zip: zip)) + c.subscribe(Inner.Side(index: 2, zip: zip)) + d.subscribe(Inner.Side(index: 3, zip: zip)) } } } -extension Publisher { - - /// Combine elements from another publisher and deliver pairs of elements as tuples. - /// - /// The returned publisher waits until both publishers have emitted an event, then - /// delivers the oldest unconsumed event from each publisher together as a tuple to - /// the subscriber. - /// For example, if publisher `P1` emits elements `a` and `b`, and publisher `P2` - /// emits event `c`, the zip publisher emits the tuple `(a, c)`. It won’t emit a - /// tuple with event `b` until `P2` emits another event. - /// If either upstream publisher finishes successfuly or fails with an error, the - /// zipped publisher does the same. - /// - /// - Parameter other: Another publisher. - /// - Returns: A publisher that emits pairs of elements from the upstream publishers - /// as tuples. - public func zip(_ other: Other) -> Publishers.Zip - where Other: Publisher, Self.Failure == Other.Failure - { - return Publishers.Zip(self, other) - } - - /// Combine elements from another publisher and deliver a transformed output. - /// - /// The returned publisher waits until both publishers have emitted an event, then - /// delivers the oldest unconsumed event from each publisher together as a tuple to - /// the subscriber. - /// For example, if publisher `P1` emits elements `a` and `b`, and publisher `P2` - /// emits event `c`, the zip publisher emits the tuple `(a, c)`. It won’t emit a tuple - /// with event `b` until `P2` emits another event. - /// If either upstream publisher finishes successfuly or fails with an error, the - /// zipped publisher does the same. - /// - /// - Parameter other: Another publisher. - /// - transform: A closure that receives the most recent value from each publisher - /// and returns a new value to publish. - /// - Returns: A publisher that emits pairs of elements from the upstream publishers - /// as tuples. - public func zip( - _ other: Other, - _ transform: @escaping (Self.Output, Other.Output) -> Result) - -> Publishers.Map, Result> - where Other: Publisher, Self.Failure == Other.Failure - { - return Publishers.Map(upstream: Publishers.Zip(self, other), transform: transform) - } - - /// Combine elements from two other publishers and deliver groups of elements as - /// tuples. - /// - /// The returned publisher waits until all three publishers have emitted an event, - /// then delivers the oldest unconsumed event from each publisher as a tuple to the - /// subscriber. - /// For example, if publisher `P1` emits elements `a` and `b`, and publisher `P2` - /// emits elements `c` and `d`, and publisher `P3` emits the event `e`, the zip - /// publisher emits the tuple `(a, c, e)`. It won’t emit a tuple with elements `b` or - /// `d` until `P3` emits another event. - /// If any upstream publisher finishes successfuly or fails with an error, the zipped - /// publisher does the same. - /// - /// - Parameters: - /// - publisher1: A second publisher. - /// - publisher2: A third publisher. - /// - Returns: A publisher that emits groups of elements from the upstream publishers - /// as tuples. - public func zip(_ publisher1: Other1, _ publisher2: Other2) - -> Publishers.Zip3 - where Other1: Publisher, - Other2: Publisher, - Self.Failure == Other1.Failure, - Other1.Failure == Other2.Failure - { - return Publishers.Zip3(self, publisher1, publisher2) - } - - /// Combine elements from two other publishers and deliver a transformed output. - /// - /// The returned publisher waits until all three publishers have emitted an event, - /// then delivers the oldest unconsumed event from each publisher as a tuple to the - /// subscriber. - /// For example, if publisher `P1` emits elements `a` and `b`, and publisher `P2` - /// emits elements `c` and `d`, and publisher `P3` emits the event `e`, the zip - /// publisher emits the tuple `(a, c, e)`. It won’t emit a tuple with elements `b` or - /// `d` until `P3` emits another event. - /// If any upstream publisher finishes successfuly or fails with an error, the zipped - /// publisher does the same. +extension Publishers.Zip: Equatable where A: Equatable, B: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. /// /// - Parameters: - /// - publisher1: A second publisher. - /// - publisher2: A third publisher. - /// - transform: A closure that receives the most recent value from each publisher - /// and returns a new value to publish. - /// - Returns: A publisher that emits groups of elements from the upstream publishers - /// as tuples. - public func zip( - _ publisher1: Other1, - _ publisher2: Other2, - _ transform: @escaping (Self.Output, Other1.Output, Other2.Output) -> Result) - -> Publishers.Map, Result> - where Other1: Publisher, - Other2: Publisher, - Self.Failure == Other1.Failure, - Other1.Failure == Other2.Failure - { - return Publishers.Map(upstream: Publishers.Zip3(self, publisher1, publisher2), - transform: transform) + /// - lhs: A zip publisher to compare for equality. + /// - rhs: Another zip publisher to compare for equality. + /// - Returns: `true` if the corresponding upstream publishers of each zip publisher are equal; otherwise `false`. + public static func == (lhs: Publishers.Zip, rhs: Publishers.Zip) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b } +} - /// Combine elements from three other publishers and deliver groups of elements as - /// tuples. - /// - /// The returned publisher waits until all four publishers have emitted an event, then - /// delivers the oldest unconsumed event from each publisher as a tuple to the - /// subscriber. - /// For example, if publisher `P1` emits elements `a` and `b`, and publisher `P2` - /// emits elements `c` and `d`, and publisher `P3` emits the elements `e` and `f`, and - /// publisher `P4` emits the event `g`, the zip publisher emits the tuple - /// `(a, c, e, g)`. It won’t emit a tuple with elements `b`, `d`, or `f` until `P4` - /// emits another event. - /// If any upstream publisher finishes successfuly or fails with an error, the zipped - /// publisher does the same. +extension Publishers.Zip3: Equatable where A: Equatable, B: Equatable, C: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. /// /// - Parameters: - /// - publisher1: A second publisher. - /// - publisher2: A third publisher. - /// - publisher3: A fourth publisher. - /// - Returns: A publisher that emits groups of elements from the upstream publishers - /// as tuples. - public func zip(_ publisher1: Other1, - _ publisher2: Other2, - _ publisher3: Other3) - -> Publishers.Zip4 - where Other1: Publisher, - Other2: Publisher, - Other3: Publisher, - Self.Failure == Other1.Failure, - Other1.Failure == Other2.Failure, - Other2.Failure == Other3.Failure - { - return Publishers.Zip4(self, publisher1, publisher2, publisher3) + /// - lhs: A zip publisher to compare for equality. + /// - rhs: Another zip publisher to compare for equality. + /// - Returns: `true` if the corresponding upstream publishers of each zip publisher are equal; otherwise `false`. + public static func == (lhs: Publishers.Zip3, rhs: Publishers.Zip3) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b && lhs.c == rhs.c } +} - /// Combine elements from three other publishers and deliver a transformed output. - /// - /// The returned publisher waits until all four publishers have emitted an event, then - /// delivers the oldest unconsumed event from each publisher as a tuple to the - /// subscriber. - /// For example, if publisher `P1` emits elements `a` and `b`, and publisher `P2` - /// emits elements `c` and `d`, and publisher `P3` emits the elements `e` and `f`, and - /// publisher `P4` emits the event `g`, the zip publisher emits the tuple - /// `(a, c, e, g)`. It won’t emit a tuple with elements `b`, `d`, or `f` until `P4` - /// emits another event. - /// If any upstream publisher finishes successfuly or fails with an error, the zipped - /// publisher does the same. +extension Publishers.Zip4: Equatable where A: Equatable, B: Equatable, C: Equatable, D: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. /// /// - Parameters: - /// - publisher1: A second publisher. - /// - publisher2: A third publisher. - /// - publisher3: A fourth publisher. - /// - transform: A closure that receives the most recent value from each publisher - /// and returns a new value to publish. - /// - Returns: A publisher that emits groups of elements from the upstream publishers - /// as tuples. - public func zip( - _ publisher1: Other1, - _ publisher2: Other2, - _ publisher3: Other3, - _ transform: @escaping (Self.Output, Other1.Output, Other2.Output, Other3.Output) - -> Result) - -> Publishers.Map, Result> - where Other1: Publisher, - Other2: Publisher, - Other3: Publisher, - Self.Failure == Other1.Failure, - Other1.Failure == Other2.Failure, - Other2.Failure == Other3.Failure - { - return Publishers.Map(upstream: Publishers.Zip4(self, - publisher1, - publisher2, - publisher3), - transform: transform) - } -} - -extension Publishers.Zip { - private class Inner: InnerBase - where Downstream.Failure == Failure, - Downstream.Input == (UpstreamA.Output, UpstreamB.Output) - { - private lazy var aSubscriber = ChildSubscriber(self, 0) - private lazy var bSubscriber = ChildSubscriber(self, 1) - - init(downstream: Downstream, _ a: UpstreamA, _ b: UpstreamB) { - super.init(downstream: downstream) - - a.subscribe(aSubscriber) - b.subscribe(bSubscriber) - } - - override fileprivate var upstreamSubscriptions: [ChildSubscription] { - return [aSubscriber, bSubscriber] - } - - override fileprivate func dequeueValue() -> Downstream.Input { - return (aSubscriber.dequeueValue(), bSubscriber.dequeueValue()) - } - } -} - -extension Publishers.Zip3 { - private class Inner: InnerBase - where Downstream.Failure == Failure, - Downstream.Input == (UpstreamA.Output, UpstreamB.Output, UpstreamC.Output) - { - private lazy var aSubscriber = ChildSubscriber(self, 0) - private lazy var bSubscriber = ChildSubscriber(self, 1) - private lazy var cSubscriber = ChildSubscriber(self, 2) - - init(downstream: Downstream, _ a: UpstreamA, _ b: UpstreamB, _ c: UpstreamC) { - super.init(downstream: downstream) - - a.subscribe(aSubscriber) - b.subscribe(bSubscriber) - c.subscribe(cSubscriber) - } - - override fileprivate var upstreamSubscriptions: [ChildSubscription] { - return [aSubscriber, bSubscriber, cSubscriber] - } - - override fileprivate func dequeueValue() -> Downstream.Input { - return (aSubscriber.dequeueValue(), - bSubscriber.dequeueValue(), - cSubscriber.dequeueValue()) - } + /// - lhs: A zip publisher to compare for equality. + /// - rhs: Another zip publisher to compare for equality. + /// - Returns: `true` if the corresponding upstream publishers of each zip publisher are equal; otherwise `false`. + public static func == (lhs: Publishers.Zip4, rhs: Publishers.Zip4) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b && lhs.c == rhs.c && lhs.d == rhs.d } } -extension Publishers.Zip4 { - private class Inner: InnerBase - where Downstream.Failure == Failure, - Downstream.Input == ( - UpstreamA.Output, - UpstreamB.Output, - UpstreamC.Output, - UpstreamD.Output) - { - private lazy var aSubscriber = ChildSubscriber(self, 0) - private lazy var bSubscriber = ChildSubscriber(self, 1) - private lazy var cSubscriber = ChildSubscriber(self, 2) - private lazy var dSubscriber = ChildSubscriber(self, 3) - - init(downstream: Downstream, - _ a: UpstreamA, - _ b: UpstreamB, - _ c: UpstreamC, - _ d: UpstreamD) - { - super.init(downstream: downstream) - - a.subscribe(aSubscriber) - b.subscribe(bSubscriber) - c.subscribe(cSubscriber) - d.subscribe(dSubscriber) - } - - override fileprivate var upstreamSubscriptions: [ChildSubscription] { - return [aSubscriber, bSubscriber, cSubscriber, dSubscriber] - } - - override fileprivate func dequeueValue() -> Downstream.Input { - return (aSubscriber.dequeueValue(), - bSubscriber.dequeueValue(), - cSubscriber.dequeueValue(), - dSubscriber.dequeueValue()) - } - } -} - -private class InnerBase: CustomStringConvertible { - let description = "Zip" - - private let lock = UnfairRecursiveLock.allocate() - - private let downstream: Downstream - private var downstreamDemand = Subscribers.Demand.none - private var valueIsBeingProcessed = false - private var value: Downstream.Input? - private var isFinished = false - - // The following two pieces of state are a hacky implementation of subtle Apple - // concurrency behaviors. Specifically, when Zip is processing an upstream child value - // and sending a resulting value downstream, multiple behaviors are changed. - // 1. If a downstream demand request comes in during this period, the demand request - // for that specific triggering upstream child will be communiated via the result - // of `.receive(_ input:)` INSTEAD of a later `.request(_ demand:)` call. - // (AppleRef: 001) - // 2. If an upstream `.finished` comes in during this time period, the "finished - // asssessment check" (AppleRef: 002) is skipped. - // If an upstream value is being processed when a downstream demand request comes in, - // the demand for that specfic upstream child will be communiated via the result - // of `.receive(_ input:)` INSTEAD of a later `.request(_ demand:)` call. - private final var processingValueForChild: ChildSubscription? - private final var demandReceivedWhileProcessing: Subscribers.Demand? - - init(downstream: Downstream) { +// MARK: - AbstractZip + +private class AbstractZip where Downstream: Subscriber, Input == Downstream.Input, Failure == Downstream.Failure { + let downstream: Downstream + var buffers: [[Any]] + var subscriptions: [Subscription?] + var cancelled: Bool + var errored: Bool + var finished: Bool + var upstreamFinished: [Bool] + let upstreamCount: Int + var lock: UnfairLock + let downstreamLock: UnfairRecursiveLock + var recursive: Bool + var pendingDemand: Subscribers.Demand + var pendingCompletion: Subscribers.Completion? + + init(downstream: Downstream, upstreamCount: Int) { self.downstream = downstream + self.buffers = Array(repeating: [], count: upstreamCount) + self.subscriptions = Array(repeating: nil, count: upstreamCount) + self.cancelled = false + self.errored = false + self.finished = false + self.upstreamFinished = Array(repeating: false, count: upstreamCount) + self.upstreamCount = upstreamCount + self.lock = UnfairLock.allocate() + self.downstreamLock = UnfairRecursiveLock.allocate() + self.recursive = false + self.pendingDemand = .none + self.pendingCompletion = nil } deinit { lock.deallocate() + downstreamLock.deallocate() } - fileprivate var upstreamSubscriptions: [ChildSubscription] { - abstractMethod() + func convert(values _: [Any]) -> Input { + fatalError("Abstract method") } - fileprivate func dequeueValue() -> Downstream.Input { - abstractMethod() - } - - fileprivate final func receivedSubscription(for child: ChildSubscription) { + func receive(subscription: Subscription, index: Int) { + precondition(upstreamCount > index) lock.lock() - child.state = .active - let sendSubscriptionDownstream = upstreamSubscriptions - .filter { $0.state == .waitingForSubscription } - .isEmpty + guard !cancelled, subscriptions[index] == nil else { + lock.unlock() + subscription.cancel() + return + } + subscriptions[index] = subscription + let containsNil = subscriptions.contains { $0 == nil } + recursive = !containsNil lock.unlock() - - if sendSubscriptionDownstream { - self.sendSubscriptionDownstream() + if !containsNil { + downstreamLock.lock() + downstream.receive(subscription: self) + downstreamLock.unlock() + lock.lock() + recursive = false + if let pendingCompletion { + subscription.cancel() + lockedSendCompletion(completion: pendingCompletion) + } else { + resolvePendingDemandAndUnlock() + } } } - fileprivate final func receivedChildValue( - child: ChildSubscription, - _ lockedStoreValue: () -> Void - ) -> Subscribers.Demand { + func receive(_ value: Any, index: Int) -> Subscribers.Demand { + precondition(upstreamCount > index) lock.lock() - lockedStoreValue() - defer { - checkShouldFinish() + guard !cancelled, !errored, !finished else { lock.unlock() + return .none } - if let dequeuedValue = maybeDequeueValue() { - value = dequeuedValue - assert(processingValueForChild == nil) - processingValueForChild = child - valueIsBeingProcessed = true - return processValue() ?? .none - } else { + buffers[index].append(value) + if buffers.contains(where: { $0.isEmpty }) { + lock.unlock() return .none } - } - - fileprivate final func receivedCompletion( - _ completion: Subscribers.Completion, - forChild child: ChildSubscription) - { - switch completion { - case .failure: - downstream.receive(completion: completion) - lock.lock() - child.state = .failed - let subscriptionsToCancel = upstreamSubscriptions + var newBuffers: [[Any]] = [] + var values: [Any] = [] + var newFinished = false + for i in buffers.indices { + var buffer = buffers[i] + values.append(buffer.remove(at: 0)) + newBuffers.append(buffer) + if upstreamFinished[i] { + newFinished = newFinished || buffer.isEmpty + } + } + buffers = newBuffers + recursive = true + lock.unlock() + let input = convert(values: values) + downstreamLock.lock() + let demand = downstream.receive(input) + downstreamLock.unlock() + lock.lock() + recursive = false + if newFinished { + finished = true + lockedSendCompletion(completion: .finished) + return .none + } else { + let newDemand = demand + pendingDemand + pendingDemand = .none + if newDemand == .none { + lock.unlock() + return .none + } + let subscriptions = self.subscriptions lock.unlock() - subscriptionsToCancel.forEach { $0.cancel() } - case .finished: - lock.lock() - child.state = .finished - if !valueIsBeingProcessed { - valueIsBeingProcessed = true - if processingValueForChild == nil && - !areMoreValuesPossible && - !isFinished { - sendFinishDownstream() - } else { - processValue() + for (i, subscription) in subscriptions.enumerated() { + if let subscription, i != index { + subscription.request(newDemand) } } - lock.unlock() + return newDemand } } - private func checkShouldFinish() { - if processingValueForChild == nil && upstreamSubscriptions.shouldFinish() { - sendFinishDownstream() - isFinished = true - } - } - - private func maybeDequeueValue() -> Downstream.Input? { - return hasCompleteValueAvailable ? dequeueValue() : nil - } - - private func sendSubscriptionDownstream() { - downstream.receive(subscription: self) - } - - private var hasCompleteValueAvailable: Bool { - return upstreamSubscriptions.allSatisfy { $0.hasValue } - } - - private var areMoreValuesPossible: Bool { - // More values are possible if all children are (active || have surplus) - return upstreamSubscriptions - .allSatisfy { $0.state == .active || $0.hasValue } - } - - @discardableResult - private func processValue() -> Subscribers.Demand? { - assert(valueIsBeingProcessed) - + func receive(completion: Subscribers.Completion, index: Int) { lock.lock() - defer { - valueIsBeingProcessed = false - processingValueForChild = nil - demandReceivedWhileProcessing = nil + guard !cancelled, !errored, !finished else { lock.unlock() + return } - - if let value = self.value { - if downstreamDemand != .none { - downstreamDemand -= 1 - } - let newDemand = downstream.receive(value) - if newDemand != .none { - downstreamDemand += newDemand - demandReceivedWhileProcessing = newDemand + switch completion { + case .finished: + upstreamFinished[index] = true + if buffers[index].isEmpty { + finished = true + lockedSendCompletion(completion: completion) + } else { + lock.unlock() } - self.value = nil + case .failure(_): + errored = true + lockedSendCompletion(completion: completion) } - - return demandReceivedWhileProcessing } - private func sendRequestUpstream(demand: Subscribers.Demand) { + private func lockedSendCompletion(completion: Subscribers.Completion) { + buffers = Array(repeating: [], count: upstreamCount) + let subscriptions = self.subscriptions + recursive = true + pendingCompletion = completion + lock.unlock() + for (index, subscription) in subscriptions.enumerated() where !upstreamFinished[index] { + guard let subscription else { + return + } + subscription.cancel() + } + downstreamLock.lock() + downstream.receive(completion: completion) + downstreamLock.unlock() lock.lock() - let subscriptionsToRequest = upstreamSubscriptions - .filter { $0.childIndex != processingValueForChild?.childIndex } + self.subscriptions = Array(repeating: nil, count: upstreamCount) + pendingCompletion = nil + recursive = false + pendingDemand = .none lock.unlock() - subscriptionsToRequest.forEach { $0.request(demand) } } - private func sendFinishDownstream() { - downstream.receive(completion: .finished) - lock.lock() - let activeChildren = upstreamSubscriptions.filter { $0.state == .active } + private func resolvePendingDemandAndUnlock() { + let subscriptions = self.subscriptions + let demand = pendingDemand + pendingDemand = .none lock.unlock() - activeChildren.forEach { $0.cancel() } + demand.assertValid() + guard demand != .none, !subscriptions.isEmpty else { + return + } + for subscription in subscriptions { + subscription?.request(demand) + } } } -extension InnerBase: Subscription { - fileprivate final func request(_ demand: Subscribers.Demand) { - guard demand != .none else { - fatalError() - } +extension AbstractZip: Subscription { + func request(_ demand: Subscribers.Demand) { + demand.assertNonZero() lock.lock() - downstreamDemand += demand - sendRequestUpstream(demand: demand) - if valueIsBeingProcessed { - demandReceivedWhileProcessing = demand - } else { - valueIsBeingProcessed = true - processValue() + guard !recursive else { + pendingDemand += demand + lock.unlock() + return + } + guard !cancelled, !errored, !finished else { + lock.unlock() + return } + let subscriptions = self.subscriptions lock.unlock() + for subscription in subscriptions { + subscription?.request(demand) + } } - fileprivate final func cancel() { + func cancel() { lock.lock() - let subscriptionsToCancel = upstreamSubscriptions + guard !cancelled else { + lock.unlock() + return + } + let subscriptions = self.subscriptions + cancelled = true + self.subscriptions = Array(repeating: nil, count: upstreamCount) + self.buffers = Array(repeating: [], count: upstreamCount) lock.unlock() - subscriptionsToCancel.forEach { $0.cancel() } + for subscription in subscriptions { + subscription?.cancel() + } } } -extension Array where Element == ChildSubscription { - func shouldFinish() -> Bool { - for subscription in self - where subscription.state == .finished && !subscription.hasValue{ - return true +extension AbstractZip { + struct Side { + let index: Int + let zip: AbstractZip + let combineIdentifier: CombineIdentifier + + init(index: Int, zip: AbstractZip) { + self.index = index + self.zip = zip + self.combineIdentifier = CombineIdentifier() } - return false } } -private enum ChildState { - case waitingForSubscription - case active - case finished - case failed - case canceled -} +extension AbstractZip.Side: Subscriber { + typealias Input = SideInput -// Note that it's critical that this protocol not have any associated types - specifically -// note that it does not refer to `Upstream`. -// This allows `InnerBase` to do most of the heavy lifting without regard to the -// upstream publisher's value type. -private protocol ChildSubscription: AnyObject, Subscription { - var state: ChildState { get set } - var childIndex: Int { get } - var hasValue: Bool { get } -} + func receive(subscription: Subscription) { + zip.receive(subscription: subscription, index: index) + } -private final class ChildSubscriber - where Upstream.Failure == Downstream.Failure -{ - typealias Input = Upstream.Output - typealias Failure = Upstream.Failure - - fileprivate final var state: ChildState = .waitingForSubscription - fileprivate final var upstreamSubscription: Subscription? - private var values = [Upstream.Output]() - private unowned let parent: InnerBase - fileprivate let childIndex: Int - - init(_ parent: InnerBase, _ childIndex: Int) { - self.parent = parent - self.childIndex = childIndex + func receive(_ input: SideInput) -> Subscribers.Demand { + zip.receive(input, index: index) } - fileprivate final func dequeueValue() -> Upstream.Output { - return values.remove(at: 0) + func receive(completion: Subscribers.Completion) { + zip.receive(completion: completion, index: index) } } -extension ChildSubscriber: ChildSubscription { - fileprivate final var hasValue: Bool { - return !values.isEmpty - } +extension AbstractZip: CustomStringConvertible { + var description: String { "Zip" } } -extension ChildSubscriber: Subscription { - fileprivate final func request(_ demand: Subscribers.Demand) { - upstreamSubscription?.request(demand) - } +extension AbstractZip.Side: CustomStringConvertible { + var description: String { "Zip" } +} + +extension AbstractZip: CustomPlaygroundDisplayConvertible { + var playgroundDescription: Any { description } +} + +extension AbstractZip.Side: CustomPlaygroundDisplayConvertible { + var playgroundDescription: Any { description } +} + +extension AbstractZip: CustomReflectable { + var customMirror: Mirror { Mirror(self, children: [:]) } } -extension ChildSubscriber: Cancellable { - fileprivate final func cancel() { - upstreamSubscription?.cancel() - upstreamSubscription = nil +extension AbstractZip.Side: CustomReflectable { + var customMirror: Mirror { + Mirror(self, children: ["parentSubscription" : zip.combineIdentifier]) } } -extension ChildSubscriber: Subscriber { - fileprivate final func receive(subscription: Subscription) { - if upstreamSubscription == nil { - upstreamSubscription = subscription - parent.receivedSubscription(for: self) - } else { - subscription.cancel() - } +// MARK: ZipInner + +private final class Zip2Inner: AbstractZip<(Input1, Input2), Failure, Downstream> where Downstream: Subscriber, (Input1, Input2) == Downstream.Input, Failure == Downstream.Failure { + override func convert(values: [Any]) -> (Input1, Input2) { + (values[0] as! Input1, values[1] as! Input2) } +} - fileprivate final func receive(_ input: Input) -> Subscribers.Demand { - return parent.receivedChildValue(child: self) { values.append(input) } +private final class Zip3Inner: AbstractZip<(Input1, Input2, Input3), Failure, Downstream> where Downstream: Subscriber, (Input1, Input2, Input3) == Downstream.Input, Failure == Downstream.Failure { + override func convert(values: [Any]) -> (Input1, Input2, Input3) { + (values[0] as! Input1, values[1] as! Input2, values[2] as! Input3) } +} - fileprivate final func receive(completion: Subscribers.Completion) { - parent.receivedCompletion(completion, forChild: self) +private final class Zip4Inner: AbstractZip<(Input1, Input2, Input3, Input4), Failure, Downstream> where Downstream: Subscriber, (Input1, Input2, Input3, Input4) == Downstream.Input, Failure == Downstream.Failure { + override func convert(values: [Any]) -> (Input1, Input2, Input3, Input4) { + (values[0] as! Input1, values[1] as! Input2, values[2] as! Input3, values[3] as! Input4) } } diff --git a/Tests/OpenCombineTests/PublisherTests/ZipTests.swift b/Tests/OpenCombineTests/PublisherTests/ZipTests.swift index 7b20c35b..c6809573 100644 --- a/Tests/OpenCombineTests/PublisherTests/ZipTests.swift +++ b/Tests/OpenCombineTests/PublisherTests/ZipTests.swift @@ -740,4 +740,82 @@ final class ZipTests: XCTestCase { XCTFail("Failed to match the completion event in \(#function)") } } + + #if !WASI + // FIXME: swift-testing macro for specifying the relationship between a bug and a test case + // Uncomment the following line when we migrate to swift-testing + // @Test("Zip reference issue", .bug("#241", relationship: .verifiesFix)) + func testZipReferenceIssue() throws { + var subscriptions: Set = [] + #if OPENCOMBINE_COMPATIBILITY_TEST + let scheduler = DispatchQueue.main + #else + let scheduler = DispatchQueue.OCombine(DispatchQueue.main) + #endif + + let expectation = self.expectation(description: #function) + var result: (Int, Int)? + + let firstPublisher = Just(1) + .delay(for: .milliseconds(600), scheduler: scheduler) + let secondPublisher = Just(2) + .delay(for: .milliseconds(600), scheduler: scheduler) + Publishers.Zip(firstPublisher, secondPublisher) + .sink(receiveValue: { + result = ($0.0, $0.1) + expectation.fulfill() + }) + .store(in: &subscriptions) + + wait(for: [expectation], timeout: 5) + + XCTAssertEqual(result?.0, 1) + XCTAssertEqual(result?.1, 2) + } + #endif + + func testZipDocumentationDemo() { + let numbersPub = PassthroughSubject() + let lettersPub = PassthroughSubject() + let emojiPub = PassthroughSubject() + let fractionsPub = PassthroughSubject() + let zip = numbersPub + .zip(lettersPub, emojiPub, fractionsPub) { number, letter, emoji, fraction in + "\(String(repeating: emoji, count: number)) \(String(repeating: letter, count: number)) \(fraction)" + } + + let downstreamSubscriber = TrackingSubscriberBase( + receiveSubscription: { $0.request(.unlimited) }) + zip.subscribe(downstreamSubscriber) + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("Zip"), + ] + ) + numbersPub.send(1) // numbersPub: 1 lettersPub: emojiPub: zip output: + numbersPub.send(2) // numbersPub: 1,2 lettersPub: emojiPub: zip output: + numbersPub.send(3) // numbersPub: 1,2,3 lettersPub: emojiPub: zip output: + fractionsPub.send(0.1) // numbersPub: 1,2,3 lettersPub: "A" emojiPub: zip output: + lettersPub.send("A") // numbersPub: 1,2,3 lettersPub: "A" emojiPub: zip output: + emojiPub.send("πŸ˜€") // numbersPub: 1,2,3 lettersPub: "A" emojiPub:"πŸ˜€" zip output: "πŸ˜€ A" + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("Zip"), + .value("πŸ˜€ A 0.1"), + ] + ) + lettersPub.send("B") // numbersPub: 2,3 lettersPub: "B" emojiPub: zip output: + fractionsPub.send(0.8) // numbersPub: 2,3 lettersPub: "A" emojiPub: zip output: + emojiPub.send("πŸ₯°") // numbersPub: 3 lettersPub: "B" emojiPub: zip output: "πŸ₯°πŸ₯° BB" + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("Zip"), + .value("πŸ˜€ A 0.1"), + .value("πŸ₯°πŸ₯° BB 0.8") + ] + ) + } }