diff --git a/RemainingCombineInterface.swift b/RemainingCombineInterface.swift index 86cfae9f..f06982f2 100644 --- a/RemainingCombineInterface.swift +++ b/RemainingCombineInterface.swift @@ -2,165 +2,6 @@ // Please remove the corresponding piece from this file if you implement something, // and complement this file as features are added in Apple's Combine -extension Publishers { - - /// A publisher that receives and combines the latest elements from two publishers. - public struct CombineLatest : Publisher where A : Publisher, B : Publisher, A.Failure == B.Failure { - - /// The kind of values published by this publisher. - public typealias Output = (A.Output, B.Output) - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public init(_ a: A, _ b: B) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, B.Failure == S.Failure, S.Input == (A.Output, B.Output) - } - - /// A publisher that receives and combines the latest elements from three publishers. - public struct CombineLatest3 : Publisher where A : Publisher, B : Publisher, C : Publisher, A.Failure == B.Failure, B.Failure == C.Failure { - - /// The kind of values published by this publisher. - public typealias Output = (A.Output, B.Output, C.Output) - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public let c: C - - public init(_ a: A, _ b: B, _ c: C) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, C.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output) - } - - /// A publisher that receives and combines the latest elements from four publishers. - public struct CombineLatest4 : Publisher where A : Publisher, B : Publisher, C : Publisher, D : Publisher, A.Failure == B.Failure, B.Failure == C.Failure, C.Failure == D.Failure { - - /// The kind of values published by this publisher. - public typealias Output = (A.Output, B.Output, C.Output, D.Output) - - /// The kind of errors this publisher might publish. - /// - /// Use `Never` if this `Publisher` does not publish errors. - public typealias Failure = A.Failure - - public let a: A - - public let b: B - - public let c: C - - public let d: D - - public init(_ a: A, _ b: B, _ c: C, _ d: D) - - /// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)` - /// - /// - SeeAlso: `subscribe(_:)` - /// - Parameters: - /// - subscriber: The subscriber to attach to this `Publisher`. - /// once attached it can begin to receive values. - public func receive(subscriber: S) where S : Subscriber, D.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output, D.Output) - } -} - -extension Publisher { - - /// Subscribes to an additional publisher and publishes a tuple upon receiving output from either publisher. - /// - /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer. - /// All upstream publishers need to finish for this publisher to finsh. If an upstream publisher never publishes a value, this publisher never finishes. - /// If any of the combined publishers terminates with a failure, this publisher also fails. - /// - Parameters: - /// - other: Another publisher to combine with this one. - /// - Returns: A publisher that receives and combines elements from this and another publisher. - public func combineLatest

(_ other: P) -> Publishers.CombineLatest where P : Publisher, Self.Failure == P.Failure - - /// Subscribes to an additional publisher and invokes a closure upon receiving output from either publisher. - /// - /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer. - /// All upstream publishers need to finish for this publisher to finsh. If an upstream publisher never publishes a value, this publisher never finishes. - /// If any of the combined publishers terminates with a failure, this publisher also fails. - /// - Parameters: - /// - other: Another publisher to combine with this one. - /// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish. - /// - Returns: A publisher that receives and combines elements from this and another publisher. - public func combineLatest(_ other: P, _ transform: @escaping (Self.Output, P.Output) -> T) -> Publishers.Map, T> where P : Publisher, Self.Failure == P.Failure - - /// Subscribes to two additional publishers and publishes a tuple upon receiving output from any of the publishers. - /// - /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer. - /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. - /// If any of the combined publishers terminates with a failure, this publisher also fails. - /// - Parameters: - /// - publisher1: A second publisher to combine with this one. - /// - publisher2: A third publisher to combine with this one. - /// - Returns: A publisher that receives and combines elements from this publisher and two other publishers. - public func combineLatest(_ publisher1: P, _ publisher2: Q) -> Publishers.CombineLatest3 where P : Publisher, Q : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure - - /// Subscribes to two additional publishers and invokes a closure upon receiving output from any of the publishers. - /// - /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer. - /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. - /// If any of the combined publishers terminates with a failure, this publisher also fails. - /// - Parameters: - /// - publisher1: A second publisher to combine with this one. - /// - publisher2: A third publisher to combine with this one. - /// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish. - /// - Returns: A publisher that receives and combines elements from this publisher and two other publishers. - public func combineLatest(_ publisher1: P, _ publisher2: Q, _ transform: @escaping (Self.Output, P.Output, Q.Output) -> T) -> Publishers.Map, T> where P : Publisher, Q : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure - - /// Subscribes to three additional publishers and publishes a tuple upon receiving output from any of the publishers. - /// - /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer. - /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. - /// If any of the combined publishers terminates with a failure, this publisher also fails. - /// - Parameters: - /// - publisher1: A second publisher to combine with this one. - /// - publisher2: A third publisher to combine with this one. - /// - publisher3: A fourth publisher to combine with this one. - /// - Returns: A publisher that receives and combines elements from this publisher and three other publishers. - public func combineLatest(_ publisher1: P, _ publisher2: Q, _ publisher3: R) -> Publishers.CombineLatest4 where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure - - /// Subscribes to three additional publishers and invokes a closure upon receiving output from any of the publishers. - /// - /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most recent value in each buffer. - /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. - /// If any of the combined publishers terminates with a failure, this publisher also fails. - /// - Parameters: - /// - publisher1: A second publisher to combine with this one. - /// - publisher2: A third publisher to combine with this one. - /// - publisher3: A fourth publisher to combine with this one. - /// - transform: A closure that receives the most recent value from each publisher and returns a new value to publish. - /// - Returns: A publisher that receives and combines elements from this publisher and three other publishers. - public func combineLatest(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map, T> where P : Publisher, Q : Publisher, R : Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure -} - extension Publishers { /// A strategy for collecting received elements. @@ -243,40 +84,3 @@ extension Publisher { /// a single array of the collection. public func collect(_ strategy: Publishers.TimeGroupingStrategy, options: S.SchedulerOptions? = nil) -> Publishers.CollectByTime where S : Scheduler } - -extension Publishers.CombineLatest : Equatable where A : Equatable, B : Equatable { - - /// Returns a Boolean value that indicates whether two publishers are equivalent. - /// - /// - Parameters: - /// - lhs: A combineLatest publisher to compare for equality. - /// - rhs: Another combineLatest publisher to compare for equality. - /// - Returns: `true` if the corresponding upstream publishers of each combineLatest publisher are equal, `false` otherwise. - public static func == (lhs: Publishers.CombineLatest, rhs: Publishers.CombineLatest) -> Bool -} - -extension Publishers.CombineLatest3 : Equatable where A : Equatable, B : Equatable, C : Equatable { - - /// Returns a Boolean value indicating whether two values are equal. - /// - /// Equality is the inverse of inequality. For any values `a` and `b`, - /// `a == b` implies that `a != b` is `false`. - /// - /// - Parameters: - /// - lhs: A value to compare. - /// - rhs: Another value to compare. - public static func == (lhs: Publishers.CombineLatest3, rhs: Publishers.CombineLatest3) -> Bool -} - -extension Publishers.CombineLatest4 : Equatable where A : Equatable, B : Equatable, C : Equatable, D : Equatable { - - /// Returns a Boolean value indicating whether two values are equal. - /// - /// Equality is the inverse of inequality. For any values `a` and `b`, - /// `a == b` implies that `a != b` is `false`. - /// - /// - Parameters: - /// - lhs: A value to compare. - /// - rhs: Another value to compare. - public static func == (lhs: Publishers.CombineLatest4, rhs: Publishers.CombineLatest4) -> Bool -} diff --git a/Sources/OpenCombine/AnyPublisher.swift b/Sources/OpenCombine/AnyPublisher.swift index ca2d4366..fb25a03d 100644 --- a/Sources/OpenCombine/AnyPublisher.swift +++ b/Sources/OpenCombine/AnyPublisher.swift @@ -3,7 +3,7 @@ // OpenCombine // // Created by Sergej Jaskiewicz on 10.06.2019. -// Audited for Combine 2023 +// Audited for 2023 Release extension Publisher { /// Wraps this publisher with a type eraser. diff --git a/Sources/OpenCombine/Helpers/Violations.swift b/Sources/OpenCombine/Helpers/Violations.swift index 65712e8a..e840e7c4 100644 --- a/Sources/OpenCombine/Helpers/Violations.swift +++ b/Sources/OpenCombine/Helpers/Violations.swift @@ -19,14 +19,16 @@ internal func APIViolationUnexpectedCompletion(file: StaticString = #file, fatalError("API Violation: received an unexpected completion", file: file, line: line) } -internal func abstractMethod(file: StaticString = #file, line: UInt = #line) -> Never { - fatalError("Abstract method call", file: file, line: line) +@_transparent +@inline(__always) +func abstractMethod(file: StaticString = #file, line: UInt = #line) -> Never { + fatalError("Abstract method", file: file, line: line) } extension Subscribers.Demand { @_transparent @inline(__always) - internal func assertNonZero() { - precondition(rawValue != .zero) + func assertNonZero() { + precondition(self != 0) } } diff --git a/Sources/OpenCombine/Publishers/Publishers.CombineLatest.swift b/Sources/OpenCombine/Publishers/Publishers.CombineLatest.swift new file mode 100644 index 00000000..f8082ad2 --- /dev/null +++ b/Sources/OpenCombine/Publishers/Publishers.CombineLatest.swift @@ -0,0 +1,746 @@ +// +// Publishers.CombineLatest.swift +// OpenCombine +// +// Created by Kyle on 2023/12/27. +// Audited for 2023 Release + +// MARK: - combineLatest methods on Publisher + +extension Publisher { + /// Subscribes to an additional publisher and publishes a tuple upon receiving output from either publisher. + /// + /// Use ``Publisher/combineLatest(_:)`` when you want the downstream subscriber to receive a tuple of the most-recent element from multiple publishers when any of them emit a value. To pair elements from multiple publishers, use ``Publisher/zip(_:)`` instead. To receive just the most-recent element from multiple publishers rather than tuples, use ``Publisher/merge(with:)-394v9``. + /// + /// > Tip: The combined publisher doesn't produce elements until each of its upstream publishers publishes at least one element. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t ``Subscribers/Demand/unlimited``, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most-recent value in each buffer. + /// + /// In this example, ``PassthroughSubject`` `pub1` and also `pub2` emit values; as ``Publisher/combineLatest(_:)`` receives input from either upstream publisher, it combines the latest value from each publisher into a tuple and publishes it. + /// + /// let pub1 = PassthroughSubject() + /// let pub2 = PassthroughSubject() + /// + /// cancellable = pub1 + /// .combineLatest(pub2) + /// .sink { print("Result: \($0).") } + /// + /// pub1.send(1) + /// pub1.send(2) + /// pub2.send(2) + /// pub1.send(3) + /// pub1.send(45) + /// pub2.send(22) + /// + /// // Prints: + /// // Result: (2, 2). // pub1 latest = 2, pub2 latest = 2 + /// // Result: (3, 2). // pub1 latest = 3, pub2 latest = 2 + /// // Result: (45, 2). // pub1 latest = 45, pub2 latest = 2 + /// // Result: (45, 22). // pub1 latest = 45, pub2 latest = 22 + /// + /// When all upstream publishers finish, this publisher finishes. If an upstream publisher never publishes a value, this publisher never finishes. + /// + /// - Parameter other: Another publisher to combine with this one. + /// - Returns: A publisher that receives and combines elements from this and another publisher. + public func combineLatest

(_ other: P) -> Publishers.CombineLatest where P: Publisher, Self.Failure == P.Failure { + Publishers.CombineLatest(self, other) + } + + /// Subscribes to an additional publisher and invokes a closure upon receiving output from either publisher. + /// + /// Use `combineLatest(_:)` to combine the current and one additional publisher and transform them using a closure you specify to publish a new value to the downstream. + /// + /// > Tip: The combined publisher doesn't produce elements until each of its upstream publishers publishes at least one element. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most-recent value in each buffer. + /// + /// In the example below, `combineLatest()` receives the most-recent values published by the two publishers, it multiplies them together, and republishes the result: + /// + /// let pub1 = PassthroughSubject() + /// let pub2 = PassthroughSubject() + /// + /// cancellable = pub1 + /// .combineLatest(pub2) { (first, second) in + /// return first * second + /// } + /// .sink { print("Result: \($0).") } + /// + /// pub1.send(1) + /// pub1.send(2) + /// pub2.send(2) + /// pub1.send(9) + /// pub1.send(3) + /// pub2.send(12) + /// pub1.send(13) + /// // + /// // Prints: + /// //Result: 4. (pub1 latest = 2, pub2 latest = 2) + /// //Result: 18. (pub1 latest = 9, pub2 latest = 2) + /// //Result: 6. (pub1 latest = 3, pub2 latest = 2) + /// //Result: 36. (pub1 latest = 3, pub2 latest = 12) + /// //Result: 156. (pub1 latest = 13, pub2 latest = 12) + /// + /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. + /// If any of the combined publishers terminates with a failure, this publisher also fails. + /// + /// - Parameters: + /// - other: Another publisher to combine with this one. + /// - transform: A closure that receives the most-recent value from each publisher and returns a new value to publish. + /// - Returns: A publisher that receives and combines elements from this and another publisher. + public func combineLatest(_ other: P, _ transform: @escaping (Self.Output, P.Output) -> T) -> Publishers.Map, T> where P: Publisher, Self.Failure == P.Failure { + Publishers.CombineLatest(self, other).map(transform) + } + + /// Subscribes to two additional publishers and publishes a tuple upon receiving output from any of the publishers. + /// + /// Use ``Publisher/combineLatest(_:_:)-81vgd`` when you want the downstream subscriber to receive a tuple of the most-recent element from multiple publishers when any of them emit a value. To combine elements from multiple publishers, use ``Publisher/zip(_:_:)-2p498`` instead. To receive just the most-recent element from multiple publishers rather than tuples, use ``Publisher/merge(with:_:)``. + /// + /// > Tip: The combined publisher doesn't produce elements until each of its upstream publishers publishes at least one element. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t ``Subscribers/Demand/unlimited``, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most-recent value in each buffer. + /// + /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. + /// + /// In this example, three instances of ``PassthroughSubject`` emit values; as ``Publisher/combineLatest(_:_:)-81vgd`` receives input from any of the upstream publishers, it combines the latest value from each publisher into a tuple and publishes it: + /// + /// let pub = PassthroughSubject() + /// let pub2 = PassthroughSubject() + /// let pub3 = PassthroughSubject() + /// + /// cancellable = pub + /// .combineLatest(pub2, pub3) + /// .sink { print("Result: \($0).") } + /// + /// pub.send(1) + /// pub.send(2) + /// pub2.send(2) + /// pub3.send(9) + /// + /// pub.send(3) + /// pub2.send(12) + /// pub.send(13) + /// pub3.send(19) + /// + /// // Prints: + /// // Result: (2, 2, 9). + /// // Result: (3, 2, 9). + /// // Result: (3, 12, 9). + /// // Result: (13, 12, 9). + /// // Result: (13, 12, 19). + /// + /// If any of the combined publishers terminates with a failure, this publisher also fails. + /// - Parameters: + /// - publisher1: A second publisher to combine with the first publisher. + /// - publisher2: A third publisher to combine with the first publisher. + /// - Returns: A publisher that receives and combines elements from this publisher and two other publishers. + public func combineLatest(_ publisher1: P, _ publisher2: Q) -> Publishers.CombineLatest3 where P: Publisher, Q: Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure { + Publishers.CombineLatest3(self, publisher1, publisher2) + } + + /// Subscribes to two additional publishers and invokes a closure upon receiving output from any of the publishers. + /// + /// Use `combineLatest(_:,_:)` to combine the current and two additional publishers and transform them using a closure you specify to publish a new value to the downstream. + /// + /// > Tip: The combined publisher doesn't produce elements until each of its upstream publishers publishes at least one element. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t `.unlimited`, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most-recent value in each buffer. + /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. + /// If any of the combined publishers terminates with a failure, this publisher also fails. + /// + /// In the example below, `combineLatest()` receives the most-recent values published by three publishers, multiplies them together, and republishes the result: + /// + /// let pub = PassthroughSubject() + /// let pub2 = PassthroughSubject() + /// let pub3 = PassthroughSubject() + /// + /// cancellable = pub + /// .combineLatest(pub2, pub3) { firstValue, secondValue, thirdValue in + /// return firstValue * secondValue * thirdValue + /// } + /// .sink { print("Result: \($0).") } + /// + /// pub.send(1) + /// pub.send(2) + /// pub2.send(2) + /// pub3.send(10) + /// + /// pub.send(9) + /// pub3.send(4) + /// pub2.send(12) + /// + /// // Prints: + /// // Result: 40. // pub = 2, pub2 = 2, pub3 = 10 + /// // Result: 180. // pub = 9, pub2 = 2, pub3 = 10 + /// // Result: 72. // pub = 9, pub2 = 2, pub3 = 4 + /// // Result: 432. // pub = 9, pub2 = 12, pub3 = 4 + /// + /// - Parameters: + /// - publisher1: A second publisher to combine with the first publisher. + /// - publisher2: A third publisher to combine with the first publisher. + /// - transform: A closure that receives the most-recent value from each publisher and returns a new value to publish. + /// - Returns: A publisher that receives and combines elements from this publisher and two other publishers. + public func combineLatest(_ publisher1: P, _ publisher2: Q, _ transform: @escaping (Self.Output, P.Output, Q.Output) -> T) -> Publishers.Map, T> where P: Publisher, Q: Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure { + Publishers.CombineLatest3(self, publisher1, publisher2).map(transform) + } + + /// Subscribes to three additional publishers and publishes a tuple upon receiving output from any of the publishers. + /// + /// Use ``Publisher/combineLatest(_:_:_:)-7mt86`` when you want the downstream subscriber to receive a tuple of the most-recent element from multiple publishers when any of them emit a value. To combine elements from multiple publishers, use ``Publisher/zip(_:_:_:)-67czn`` instead. To receive just the most-recent element from multiple publishers rather than tuples, use ``Publisher/merge(with:_:_:)``. + /// + /// > Tip: The combined publisher doesn't produce elements until each of its upstream publishers publishes at least one element. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t ``Subscribers/Demand/unlimited``, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most-recent value in each buffer. + /// + /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. + /// + /// In the example below, ``Publisher/combineLatest(_:_:_:)-7mt86`` receives input from any of the publishers, combines the latest value from each publisher into a tuple and publishes it: + /// + /// let pub = PassthroughSubject() + /// let pub2 = PassthroughSubject() + /// let pub3 = PassthroughSubject() + /// let pub4 = PassthroughSubject() + /// + /// cancellable = pub + /// .combineLatest(pub2, pub3, pub4) + /// .sink { print("Result: \($0).") } + /// + /// pub.send(1) + /// pub.send(2) + /// pub2.send(2) + /// pub3.send(9) + /// pub4.send(1) + /// + /// pub.send(3) + /// pub2.send(12) + /// pub.send(13) + /// pub3.send(19) + /// // + /// // Prints: + /// // Result: (2, 2, 9, 1). + /// // Result: (3, 2, 9, 1). + /// // Result: (3, 12, 9, 1). + /// // Result: (13, 12, 9, 1). + /// // Result: (13, 12, 19, 1). + /// + /// If any individual publisher of the combined set terminates with a failure, this publisher also fails. + /// + /// - Parameters: + /// - publisher1: A second publisher to combine with the first publisher. + /// - publisher2: A third publisher to combine with the first publisher. + /// - publisher3: A fourth publisher to combine with the first publisher. + /// - Returns: A publisher that receives and combines elements from this publisher and three other publishers. + public func combineLatest(_ publisher1: P, _ publisher2: Q, _ publisher3: R) -> Publishers.CombineLatest4 where P: Publisher, Q: Publisher, R: Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure { + Publishers.CombineLatest4(self, publisher1, publisher2, publisher3) + } + + /// Subscribes to three additional publishers and invokes a closure upon receiving output from any of the publishers. + /// + /// Use ``Publisher/combineLatest(_:_:_:_:)`` when you need to combine the current and 3 additional publishers and transform the values using a closure in which you specify the published elements, to publish a new element. + /// + /// > Tip: The combined publisher doesn't produce elements until each of its upstream publishers publishes at least one element. + /// + /// The combined publisher passes through any requests to *all* upstream publishers. However, it still obeys the demand-fulfilling rule of only sending the request amount downstream. If the demand isn’t ``Subscribers/Demand/unlimited``, it drops values from upstream publishers. It implements this by using a buffer size of 1 for each upstream, and holds the most-recent value in each buffer. + /// + /// All upstream publishers need to finish for this publisher to finish. If an upstream publisher never publishes a value, this publisher never finishes. + /// + /// In the example below, as ``Publisher/combineLatest(_:_:_:_:)`` receives the most-recent values published by four publishers, multiplies them together, and republishes the result: + /// + /// let pub = PassthroughSubject() + /// let pub2 = PassthroughSubject() + /// let pub3 = PassthroughSubject() + /// let pub4 = PassthroughSubject() + /// + /// cancellable = pub + /// .combineLatest(pub2, pub3, pub4) { firstValue, secondValue, thirdValue, fourthValue in + /// return firstValue * secondValue * thirdValue * fourthValue + /// } + /// .sink { print("Result: \($0).") } + /// + /// pub.send(1) + /// pub.send(2) + /// pub2.send(2) + /// pub3.send(9) + /// pub4.send(1) + /// + /// pub.send(3) + /// pub2.send(12) + /// pub.send(13) + /// pub3.send(19) + /// + /// // Prints: + /// // Result: 36. // pub = 2, pub2 = 2, pub3 = 9, pub4 = 1 + /// // Result: 54. // pub = 3, pub2 = 2, pub3 = 9, pub4 = 1 + /// // Result: 324. // pub = 3, pub2 = 12, pub3 = 9, pub4 = 1 + /// // Result: 1404. // pub = 13, pub2 = 12, pub3 = 9, pub4 = 1 + /// // Result: 2964. // pub = 13, pub2 = 12, pub3 = 19, pub4 = 1 + /// + /// - Parameters: + /// - publisher1: A second publisher to combine with the first publisher. + /// - publisher2: A third publisher to combine with the first publisher. + /// - publisher3: A fourth publisher to combine with the first publisher. + /// - transform: A closure that receives the most-recent value from each publisher and returns a new value to publish. + /// - Returns: A publisher that receives and combines elements from this publisher and three other publishers. + public func combineLatest(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map, T> where P: Publisher, Q: Publisher, R: Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure { + Publishers.CombineLatest4(self, publisher1, publisher2, publisher3).map(transform) + } +} + +// MARK: - CombineLatest Publishers + +extension Publishers { + /// A publisher that receives and combines the latest elements from two publishers. + public struct CombineLatest: Publisher where A: Publisher, B: Publisher, A.Failure == B.Failure { + /// The kind of values published by this publisher. + /// + /// This publisher produces two-element tuples of the upstream publishers' output types. + public typealias Output = (A.Output, B.Output) + + /// The kind of errors this publisher might publish. + /// + /// This publisher produces the failure type shared by its upstream publishers. + public typealias Failure = A.Failure + + public let a: A + + public let b: B + + /// Creates a publisher that receives and combines the latest elements from two publishers. + /// - Parameters: + /// - a: The first upstream publisher. + /// - b: The second upstream publisher. + public init(_ a: A, _ b: B) { + self.a = a + self.b = b + } + + public func receive(subscriber: S) where S: Subscriber, B.Failure == S.Failure, S.Input == (A.Output, B.Output) { + typealias Inner = CombineLatest2Inner + let inner = Inner(downstream: subscriber, upstreamCount: 2) + a.subscribe(Inner.Side(index: 0, combiner: inner)) + b.subscribe(Inner.Side(index: 1, combiner: inner)) + inner.subscribe() + } + } + + /// A publisher that receives and combines the latest elements from three publishers. + public struct CombineLatest3: Publisher where A: Publisher, B: Publisher, C: Publisher, A.Failure == B.Failure, B.Failure == C.Failure { + /// The kind of values published by this publisher. + /// + /// This publisher produces three-element tuples of the upstream publishers' output types. + public typealias Output = (A.Output, B.Output, C.Output) + + /// The kind of errors this publisher might publish. + /// + /// This publisher produces the failure type shared by its upstream publishers. + public typealias Failure = A.Failure + + public let a: A + + public let b: B + + public let c: C + + public init(_ a: A, _ b: B, _ c: C) { + self.a = a + self.b = b + self.c = c + } + + public func receive(subscriber: S) where S: Subscriber, C.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output) { + typealias Inner = CombineLatest3Inner + let inner = Inner(downstream: subscriber, upstreamCount: 3) + a.subscribe(Inner.Side(index: 0, combiner: inner)) + b.subscribe(Inner.Side(index: 1, combiner: inner)) + c.subscribe(Inner.Side(index: 2, combiner: inner)) + inner.subscribe() + } + } + + /// A publisher that receives and combines the latest elements from four publishers. + public struct CombineLatest4: Publisher where A: Publisher, B: Publisher, C: Publisher, D: Publisher, A.Failure == B.Failure, B.Failure == C.Failure, C.Failure == D.Failure { + /// The kind of values published by this publisher. + /// + /// This publisher produces four-element tuples of the upstream publishers' output types. + public typealias Output = (A.Output, B.Output, C.Output, D.Output) + + /// The kind of errors this publisher might publish. + /// + /// This publisher produces the failure type shared by its upstream publishers. + public typealias Failure = A.Failure + + public let a: A + + public let b: B + + public let c: C + + public let d: D + + public init(_ a: A, _ b: B, _ c: C, _ d: D) { + self.a = a + self.b = b + self.c = c + self.d = d + } + + public func receive(subscriber: S) where S: Subscriber, D.Failure == S.Failure, S.Input == (A.Output, B.Output, C.Output, D.Output) { + typealias Inner = CombineLatest4Inner + let inner = Inner(downstream: subscriber, upstreamCount: 4) + a.subscribe(Inner.Side(index: 0, combiner: inner)) + b.subscribe(Inner.Side(index: 1, combiner: inner)) + c.subscribe(Inner.Side(index: 2, combiner: inner)) + d.subscribe(Inner.Side(index: 3, combiner: inner)) + inner.subscribe() + } + } +} + +// MARK: - Equatable conformances + +extension Publishers.CombineLatest: Equatable where A: Equatable, B: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. + /// + /// - Parameters: + /// - lhs: A combineLatest publisher to compare for equality. + /// - rhs: Another combineLatest publisher to compare for equality. + /// - Returns: `true` if the corresponding upstream publishers of each combineLatest publisher are equal; otherwise `false`. + public static func == (lhs: Publishers.CombineLatest, rhs: Publishers.CombineLatest) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b + } +} + +extension Publishers.CombineLatest3: Equatable where A: Equatable, B: Equatable, C: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. + /// + /// - Parameters: + /// - lhs: A combineLatest publisher to compare for equality. + /// - rhs: Another combineLatest publisher to compare for equality. + /// - Returns: `true` if the corresponding upstream publishers of each combineLatest publisher are equal; otherwise `false`. + public static func == (lhs: Publishers.CombineLatest3, rhs: Publishers.CombineLatest3) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b && lhs.c == rhs.c + } +} + +extension Publishers.CombineLatest4: Equatable where A: Equatable, B: Equatable, C: Equatable, D: Equatable { + /// Returns a Boolean value that indicates whether two publishers are equivalent. + /// + /// - Parameters: + /// - lhs: A combineLatest publisher to compare for equality. + /// - rhs: Another combineLatest publisher to compare for equality. + /// - Returns: `true` if the corresponding upstream publishers of each combineLatest publisher are equal; otherwise `false`. + public static func == (lhs: Publishers.CombineLatest4, rhs: Publishers.CombineLatest4) -> Bool { + lhs.a == rhs.a && lhs.b == rhs.b && lhs.c == rhs.c && lhs.d == rhs.d + } +} + +// MARK: - AbstractCombineLatest + +private class AbstractCombineLatest< + Output, + Failure, + Downstream +> + where Downstream: Subscriber, + Downstream.Input == Output, + Downstream.Failure == Failure { + let downstream: Downstream + var buffers: [Any?] + var subscriptions: [Subscription?] + var demand = Subscribers.Demand.none + var recursion = false + var finished = false + var cancelled = false + let upstreamCount: Int + var finishCount = 0 + let lock = UnfairLock.allocate() + let downstreamLock = UnfairRecursiveLock.allocate() + var established = false + var pendingCompletion: Subscribers.Completion? + + init(downstream: Downstream, upstreamCount: Int) { + self.downstream = downstream + self.buffers = Array(repeating: nil, count: upstreamCount) + self.subscriptions = Array(repeating: nil, count: upstreamCount) + self.upstreamCount = upstreamCount + } + + deinit { + lock.deallocate() + downstreamLock.deallocate() + } + + final func subscribe() { + downstream.receive(subscription: self) + lock.lock() + established = true + let completion = pendingCompletion + pendingCompletion = nil + lock.unlock() + if let completion { + downstreamLock.lock() + downstream.receive(completion: completion) + downstreamLock.unlock() + } + } + + func convert(values _: [Any?]) -> Output { + abstractMethod() + } + + final func receive(subscription: Subscription, index: Int) { + precondition(upstreamCount > index) + lock.lock() + guard !cancelled, + subscriptions[index] == nil else { + lock.unlock() + subscription.cancel() + return + } + subscriptions[index] = subscription + lock.unlock() + } + + // FIXME: To be audited + final func receive(_ input: Any, index: Int) -> Subscribers.Demand { + precondition(upstreamCount > index) + lock.lock() + guard !cancelled,!finished else { + lock.unlock() + return .none + } + buffers[index] = input + let buffers = buffers + guard !recursion, demand != 0, buffers.allSatisfy({ $0 != nil }) else { + lock.unlock() + return .none + } + demand -= 1 + lock.unlock() + let input = convert(values: buffers) + lock.lock() + recursion = true + lock.unlock() + downstreamLock.lock() + let newDemand = downstream.receive(input) + downstreamLock.unlock() + lock.lock() + recursion = false + demand += newDemand + lock.unlock() + return newDemand + } + + // FIXME: To be audited + final func receive(completion: Subscribers.Completion, index: Int) { + switch completion { + case .finished: + lock.lock() + guard !finished else { + lock.unlock() + return + } + finishCount += 1 + subscriptions[index] = nil + if finishCount == upstreamCount { + finished = true + buffers = Array(repeating: nil, count: upstreamCount) + if established { + lock.unlock() + downstreamLock.lock() + downstream.receive(completion: completion) + downstreamLock.unlock() + } else { + pendingCompletion = completion + lock.unlock() + } + } else { + lock.unlock() + } + case .failure: + lock.lock() + if finished { + let subscriptions = subscriptions + lock.unlock() + for (i, subscription) in subscriptions.enumerated() where i != index { + subscription?.cancel() + } + } else { + finished = true + let subscriptions = subscriptions + self.subscriptions = Array(repeating: nil, count: upstreamCount) + buffers = Array(repeating: nil, count: upstreamCount) + let established = established + if !established { + pendingCompletion = completion + } + lock.unlock() + for (i, subscription) in subscriptions.enumerated() where i != index { + subscription?.cancel() + } + if established { + downstreamLock.lock() + downstream.receive(completion: completion) + downstreamLock.unlock() + } + } + } + } +} + +extension AbstractCombineLatest: Subscription { + final func request(_ demand: Subscribers.Demand) { + demand.assertNonZero() + lock.lock() + guard !cancelled, !finished else { + lock.unlock() + return + } + let subscriptions = subscriptions + self.demand += demand + lock.unlock() + for subscription in subscriptions { + subscription?.request(demand) + } + } + + final func cancel() { + lock.lock() + let subscriptions = self.subscriptions + cancelled = true + self.subscriptions = Array(repeating: nil, count: upstreamCount) + buffers = Array(repeating: nil, count: upstreamCount) + lock.unlock() + for subscription in subscriptions { + subscription?.cancel() + } + } +} + +extension AbstractCombineLatest: CustomStringConvertible { + final var description: String { "CombineLatest" } +} + +extension AbstractCombineLatest: CustomPlaygroundDisplayConvertible { + final var playgroundDescription: Any { description } +} + +extension AbstractCombineLatest: CustomReflectable { + var customMirror: Mirror { + lock.lock() + defer { lock.unlock() } + return Mirror(self, children: [ + "downstream": downstream, + "upstreamSubscriptions": subscriptions, + "demand": demand, + "buffers": buffers, + ]) + } +} + +// MARK: - AbstractCombineLatest.Side + +extension AbstractCombineLatest { + struct Side { + let index: Int + let combiner: AbstractCombineLatest + + init(index: Int, combiner: AbstractCombineLatest) { + self.index = index + self.combiner = combiner + } + } +} + +extension AbstractCombineLatest.Side: Subscriber { + // NOTE: Audited with Combine 2023 release. + // A better implementation is `let combineIdentifier = CombineIdentifier()` IMO. + var combineIdentifier: CombineIdentifier { + // `CombineIdentifier(AbstractCombineLatest.self) will cause build fail on non-Darwin platform + // Tracked by https://github.com/apple/swift/issues/70645 + CombineIdentifier(AbstractCombineLatest.self as AnyObject) + } + + func receive(subscription: Subscription) { + combiner.receive(subscription: subscription, index: index) + } + + func receive(_ input: Input) -> Subscribers.Demand { + combiner.receive(input, index: index) + } + + func receive(completion: Subscribers.Completion) { + combiner.receive(completion: completion, index: index) + } +} + +extension AbstractCombineLatest.Side: CustomStringConvertible { + var description: String { "CombineLatest" } +} + +// MARK: - CombineLatest Inners + +private final class CombineLatest2Inner< + Output0, + Output1, + Failure, + Downstream +>: AbstractCombineLatest< + (Output0, Output1), + Failure, + Downstream +> where Downstream: Subscriber, + Downstream.Input == (Output0, Output1), + Downstream.Failure == Failure { + override func convert(values: [Any?]) -> (Output0, Output1) { + ( + values[0] as! Output0, + values[1] as! Output1 + ) + } +} + +private final class CombineLatest3Inner< + Output0, + Output1, + Output2, + Failure, + Downstream +>: AbstractCombineLatest< + (Output0, Output1, Output2), + Failure, + Downstream +> where Downstream: Subscriber, + Downstream.Input == (Output0, Output1, Output2), + Downstream.Failure == Failure { + override func convert(values: [Any?]) -> (Output0, Output1, Output2) { + ( + values[0] as! Output0, + values[1] as! Output1, + values[2] as! Output2 + ) + } +} + +private final class CombineLatest4Inner< + Output0, + Output1, + Output2, + Output3, + Failure, + Downstream: Subscriber +>: AbstractCombineLatest< + (Output0, Output1, Output2, Output3), + Failure, + Downstream +> where Downstream: Subscriber, + Downstream.Input == (Output0, Output1, Output2, Output3), + Downstream.Failure == Failure { + override func convert(values: [Any?]) -> (Output0, Output1, Output2, Output3) { + ( + values[0] as! Output0, + values[1] as! Output1, + values[2] as! Output2, + values[3] as! Output3 + ) + } +} diff --git a/Sources/OpenCombine/Publishers/Publishers.Merge.swift b/Sources/OpenCombine/Publishers/Publishers.Merge.swift index e9544d27..0a9567e3 100644 --- a/Sources/OpenCombine/Publishers/Publishers.Merge.swift +++ b/Sources/OpenCombine/Publishers/Publishers.Merge.swift @@ -3,12 +3,14 @@ // OpenCombine // // Created by Kyle on 2023/11/21. -// Audited for Combine 2023 +// Audited for 2023 Release #if canImport(COpenCombineHelpers) @_implementationOnly import COpenCombineHelpers #endif +// MARK: - merge methods on Publisher + extension Publisher { /// Combines elements from this publisher with those from another publisher, delivering an interleaved sequence of elements. /// @@ -45,7 +47,7 @@ extension Publisher { /// Combines elements from this publisher with those from two other publishers, delivering an interleaved sequence of elements. /// - /// Use ``Publisher/merge(with:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:)-5crqg``. + /// Use ``Publisher/merge(with:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:)-81vgd``. /// To combine elements from multiple upstream publishers, use ``Publisher/zip(_:_:)-2p498``. /// /// In this example, as ``Publisher/merge(with:_:)`` receives input from the upstream publishers, it republishes the interleaved elements to the downstream: @@ -80,7 +82,7 @@ extension Publisher { /// Combines elements from this publisher with those from three other publishers, delivering an interleaved sequence of elements. /// - /// Use ``Publisher/merge(with:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-48buc``. + /// Use ``Publisher/merge(with:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-7mt86``. /// To combine elements from multiple upstream publishers, use ``Publisher/zip(_:_:_:)-67czn``. /// /// In this example, as ``Publisher/merge(with:_:_:)`` receives input from the upstream publishers, it republishes the interleaved elements to the downstream: @@ -119,7 +121,7 @@ extension Publisher { /// Combines elements from this publisher with those from four other publishers, delivering an interleaved sequence of elements. /// - /// Use ``Publisher/merge(with:_:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-48buc``. + /// Use ``Publisher/merge(with:_:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-7mt86``. /// To combine elements from multiple upstream publishers, use ``Publisher/zip(_:_:_:)-67czn``. /// /// In this example, as ``Publisher/merge(with:_:_:_:)`` receives input from the upstream publishers, it republishes the interleaved elements to the downstream: @@ -163,7 +165,7 @@ extension Publisher { /// Combines elements from this publisher with those from five other publishers, delivering an interleaved sequence of elements. /// - /// Use ``Publisher/merge(with:_:_:_:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-48buc``. + /// Use ``Publisher/merge(with:_:_:_:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-7mt86``. /// To combine elements from multiple upstream publishers, use ``Publisher/zip(_:_:_:)-67czn``. /// /// In this example, as ``Publisher/merge(with:_:_:_:_:_:)`` receives input from the upstream publishers, it republishes the interleaved elements to the downstream: @@ -211,7 +213,7 @@ extension Publisher { /// Combines elements from this publisher with those from six other publishers, delivering an interleaved sequence of elements. /// - /// Use ``Publisher/merge(with:_:_:_:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-48buc``. + /// Use ``Publisher/merge(with:_:_:_:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-7mt86``. /// To combine elements from multiple upstream publishers, use ``Publisher/zip(_:_:_:)-67czn``. /// /// In this example, as ``Publisher/merge(with:_:_:_:_:_:)`` receives input from the upstream publishers; it republishes the interleaved elements to the downstream: @@ -264,7 +266,7 @@ extension Publisher { /// Combines elements from this publisher with those from seven other publishers, delivering an interleaved sequence of elements. /// - /// Use ``Publisher/merge(with:_:_:_:_:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-48buc``. + /// Use ``Publisher/merge(with:_:_:_:_:_:_:)`` when you want to receive a new element whenever any of the upstream publishers emits an element. To receive tuples of the most-recent value from all the upstream publishers whenever any of them emit a value, use ``Publisher/combineLatest(_:_:_:)-7mt86``. /// To combine elements from multiple upstream publishers, use ``Publisher/zip(_:_:_:)-67czn``. /// /// In this example, as ``Publisher/merge(with:_:_:_:_:_:_:)`` receives input from the upstream publishers, it republishes the interleaved elements to the downstream: @@ -327,6 +329,8 @@ extension Publisher { } } +// MARK: - Merge Publishers + extension Publishers { /// A publisher created by applying the merge function to two upstream publishers. public struct Merge: Publisher where A: Publisher, B: Publisher, A.Failure == B.Failure, A.Output == B.Output { @@ -832,6 +836,8 @@ extension Publishers { } } +// MARK: - Equatable conformances + extension Publishers.Merge: Equatable where A: Equatable, B: Equatable { /// Returns a Boolean value that indicates whether two publishers are equivalent. /// @@ -927,7 +933,7 @@ extension Publishers.MergeMany: Equatable where Upstream: Equatable { } } -// MARK: _Merge +// MARK: - _Merge extension Publishers { fileprivate class _Merged where Downstream: Subscriber, Input == Downstream.Input, Failure == Downstream.Failure { diff --git a/Sources/OpenCombine/Publishers/Publishers.Zip.swift b/Sources/OpenCombine/Publishers/Publishers.Zip.swift index 9b5c0060..2e3f5a7f 100644 --- a/Sources/OpenCombine/Publishers/Publishers.Zip.swift +++ b/Sources/OpenCombine/Publishers/Publishers.Zip.swift @@ -3,12 +3,14 @@ // OpenCombine // // Created by Kyle on 2023/7/25. -// Audited for Combine 2023 +// Audited for 2023 Release #if canImport(COpenCombineHelpers) @_implementationOnly import COpenCombineHelpers #endif +// MARK: - zip methods on Publisher + extension Publisher { /// Combines elements from another publisher and deliver pairs of elements as tuples. /// @@ -71,7 +73,7 @@ extension Publisher { /// - transform: A closure that receives the most-recent value from each publisher and returns a new value to publish. /// - Returns: A publisher that uses the `transform` closure to emit new elements, produced by combining the most recent value from two upstream publishers. public func zip(_ other: P, _ transform: @escaping (Self.Output, P.Output) -> T) -> Publishers.Map, T> where P: Publisher, Self.Failure == P.Failure { - zip(other).map(transform) + Publishers.Zip(self, other).map(transform) } /// Combines elements from two other publishers and delivers groups of elements as tuples. @@ -146,7 +148,7 @@ extension Publisher { /// - transform: A closure that receives the most-recent value from each publisher and returns a new value to publish. /// - Returns: A publisher that uses the `transform` closure to emit new elements, produced by combining the most recent value from three upstream publishers. public func zip(_ publisher1: P, _ publisher2: Q, _ transform: @escaping (Self.Output, P.Output, Q.Output) -> T) -> Publishers.Map, T> where P: Publisher, Q: Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure { - zip(publisher1, publisher2).map(transform) + Publishers.Zip3(self, publisher1, publisher2).map(transform) } /// Combines elements from three other publishers and delivers groups of elements as tuples. @@ -228,10 +230,12 @@ extension Publisher { /// - transform: A closure that receives the most-recent value from each publisher and returns a new value to publish. /// - Returns: A publisher that uses the `transform` closure to emit new elements, produced by combining the most recent value from four upstream publishers. public func zip(_ publisher1: P, _ publisher2: Q, _ publisher3: R, _ transform: @escaping (Self.Output, P.Output, Q.Output, R.Output) -> T) -> Publishers.Map, T> where P: Publisher, Q: Publisher, R: Publisher, Self.Failure == P.Failure, P.Failure == Q.Failure, Q.Failure == R.Failure { - zip(publisher1, publisher2, publisher3).map(transform) + Publishers.Zip4(self, publisher1, publisher2, publisher3).map(transform) } } +// MARK: - Zip Publishers + extension Publishers { /// A publisher created by applying the zip function to two upstream publishers. /// @@ -371,6 +375,8 @@ extension Publishers { } } +// MARK: - Equatable conformances + extension Publishers.Zip: Equatable where A: Equatable, B: Equatable { /// Returns a Boolean value that indicates whether two publishers are equivalent. /// @@ -409,35 +415,33 @@ extension Publishers.Zip4: Equatable where A: Equatable, B: Equatable, C: Equata // MARK: - AbstractZip -private class AbstractZip where Downstream: Subscriber, Input == Downstream.Input, Failure == Downstream.Failure { +private class AbstractZip< + Output, + Failure, + Downstream +> where Downstream: Subscriber, + Downstream.Input == Output, + Downstream.Failure == Failure { let downstream: Downstream var buffers: [[Any]] var subscriptions: [Subscription?] - var cancelled: Bool - var errored: Bool - var finished: Bool + var cancelled = false + var errored = false + var finished = false var upstreamFinished: [Bool] let upstreamCount: Int - var lock: UnfairLock - let downstreamLock: UnfairRecursiveLock - var recursive: Bool - var pendingDemand: Subscribers.Demand + var lock = UnfairLock.allocate() + let downstreamLock = UnfairRecursiveLock.allocate() + var recursive = false + var pendingDemand = Subscribers.Demand.none var pendingCompletion: Subscribers.Completion? init(downstream: Downstream, upstreamCount: Int) { self.downstream = downstream - self.buffers = Array(repeating: [], count: upstreamCount) - self.subscriptions = Array(repeating: nil, count: upstreamCount) - self.cancelled = false - self.errored = false - self.finished = false - self.upstreamFinished = Array(repeating: false, count: upstreamCount) + buffers = Array(repeating: [], count: upstreamCount) + subscriptions = Array(repeating: nil, count: upstreamCount) + upstreamFinished = Array(repeating: false, count: upstreamCount) self.upstreamCount = upstreamCount - self.lock = UnfairLock.allocate() - self.downstreamLock = UnfairRecursiveLock.allocate() - self.recursive = false - self.pendingDemand = .none - self.pendingCompletion = nil } deinit { @@ -445,8 +449,8 @@ private class AbstractZip where Downstream: Subscrib downstreamLock.deallocate() } - func convert(values _: [Any]) -> Input { - fatalError("Abstract method") + func convert(values _: [Any]) -> Output { + abstractMethod() } func receive(subscription: Subscription, index: Int) { @@ -476,6 +480,7 @@ private class AbstractZip where Downstream: Subscrib } } + // FIXME: To be audited func receive(_ value: Any, index: Int) -> Subscribers.Demand { precondition(upstreamCount > index) lock.lock() @@ -484,7 +489,7 @@ private class AbstractZip where Downstream: Subscrib return .none } buffers[index].append(value) - if buffers.contains(where: { $0.isEmpty }) { + if buffers.contains(where: \.isEmpty) { lock.unlock() return .none } @@ -545,7 +550,7 @@ private class AbstractZip where Downstream: Subscrib } else { lock.unlock() } - case .failure(_): + case .failure: errored = true lockedSendCompletion(completion: completion) } @@ -617,7 +622,7 @@ extension AbstractZip: Subscription { let subscriptions = self.subscriptions cancelled = true self.subscriptions = Array(repeating: nil, count: upstreamCount) - self.buffers = Array(repeating: [], count: upstreamCount) + buffers = Array(repeating: [], count: upstreamCount) lock.unlock() for subscription in subscriptions { subscription?.cancel() @@ -625,8 +630,22 @@ extension AbstractZip: Subscription { } } +extension AbstractZip: CustomStringConvertible { + var description: String { "Zip" } +} + +extension AbstractZip: CustomPlaygroundDisplayConvertible { + var playgroundDescription: Any { description } +} + +extension AbstractZip: CustomReflectable { + var customMirror: Mirror { Mirror(self, children: [:]) } +} + +// MARK: - AbstractZip.Side + extension AbstractZip { - struct Side { + struct Side { let index: Int let zip: AbstractZip let combineIdentifier: CombineIdentifier @@ -640,13 +659,11 @@ extension AbstractZip { } extension AbstractZip.Side: Subscriber { - typealias Input = SideInput - func receive(subscription: Subscription) { zip.receive(subscription: subscription, index: index) } - func receive(_ input: SideInput) -> Subscribers.Demand { + func receive(_ input: Input) -> Subscribers.Demand { zip.receive(input, index: index) } @@ -655,48 +672,84 @@ extension AbstractZip.Side: Subscriber { } } -extension AbstractZip: CustomStringConvertible { - var description: String { "Zip" } -} - extension AbstractZip.Side: CustomStringConvertible { var description: String { "Zip" } } -extension AbstractZip: CustomPlaygroundDisplayConvertible { - var playgroundDescription: Any { description } -} - extension AbstractZip.Side: CustomPlaygroundDisplayConvertible { var playgroundDescription: Any { description } } -extension AbstractZip: CustomReflectable { - var customMirror: Mirror { Mirror(self, children: [:]) } -} - extension AbstractZip.Side: CustomReflectable { var customMirror: Mirror { - Mirror(self, children: ["parentSubscription" : zip.combineIdentifier]) + Mirror(self, children: ["parentSubscription": zip.combineIdentifier]) } } -// MARK: ZipInner - -private final class Zip2Inner: AbstractZip<(Input1, Input2), Failure, Downstream> where Downstream: Subscriber, (Input1, Input2) == Downstream.Input, Failure == Downstream.Failure { - override func convert(values: [Any]) -> (Input1, Input2) { - (values[0] as! Input1, values[1] as! Input2) +// MARK: Zip Inners + +private final class Zip2Inner< + Output0, + Output1, + Failure, + Downstream +>: AbstractZip< + (Output0, Output1), + Failure, + Downstream +> where Downstream: Subscriber, + Downstream.Input == (Output0, Output1), + Downstream.Failure == Failure { + override func convert(values: [Any]) -> (Output0, Output1) { + ( + values[0] as! Output0, + values[1] as! Output1 + ) } } -private final class Zip3Inner: AbstractZip<(Input1, Input2, Input3), Failure, Downstream> where Downstream: Subscriber, (Input1, Input2, Input3) == Downstream.Input, Failure == Downstream.Failure { - override func convert(values: [Any]) -> (Input1, Input2, Input3) { - (values[0] as! Input1, values[1] as! Input2, values[2] as! Input3) +private final class Zip3Inner< + Output0, + Output1, + Output2, + Failure, + Downstream +>: AbstractZip< + (Output0, Output1, Output2), + Failure, + Downstream +> where Downstream: Subscriber, + Downstream.Input == (Output0, Output1, Output2), + Downstream.Failure == Failure { + override func convert(values: [Any]) -> (Output0, Output1, Output2) { + ( + values[0] as! Output0, + values[1] as! Output1, + values[2] as! Output2 + ) } } -private final class Zip4Inner: AbstractZip<(Input1, Input2, Input3, Input4), Failure, Downstream> where Downstream: Subscriber, (Input1, Input2, Input3, Input4) == Downstream.Input, Failure == Downstream.Failure { - override func convert(values: [Any]) -> (Input1, Input2, Input3, Input4) { - (values[0] as! Input1, values[1] as! Input2, values[2] as! Input3, values[3] as! Input4) +private final class Zip4Inner< + Output0, + Output1, + Output2, + Output3, + Failure, + Downstream: Subscriber +>: AbstractZip< + (Output0, Output1, Output2, Output3), + Failure, + Downstream +> where Downstream: Subscriber, + Downstream.Input == (Output0, Output1, Output2, Output3), + Downstream.Failure == Failure { + override func convert(values: [Any]) -> (Output0, Output1, Output2, Output3) { + ( + values[0] as! Output0, + values[1] as! Output1, + values[2] as! Output2, + values[3] as! Output3 + ) } } diff --git a/Sources/OpenCombine/RootProtocols.swift b/Sources/OpenCombine/RootProtocols.swift index d08c36a1..0a1eb3d5 100644 --- a/Sources/OpenCombine/RootProtocols.swift +++ b/Sources/OpenCombine/RootProtocols.swift @@ -33,10 +33,8 @@ /// by mapping or filtering elements, while only OpenCombine provides time-based /// operations like /// ``Publisher/debounce(for:scheduler:options:)`` and -/// ``Publisher/throttle(for:scheduler:latest:)``. -/// -/// -/// +/// ``Publisher/throttle(for:scheduler:latest:)``, and combining operations like +/// ``Publisher/merge(with:)-9qb5x`` and ``Publisher/combineLatest(_:_:)-9ip85``. /// To bridge the two approaches, the property ``Publisher/values-32o4h`` exposes /// a publisher's elements as an `AsyncSequence`, allowing you to iterate over /// them with `for`-`await`-`in` rather than attaching a ``Subscriber``. diff --git a/Tests/OpenCombineTests/PublisherTests/CombineLatestTests.swift b/Tests/OpenCombineTests/PublisherTests/CombineLatestTests.swift new file mode 100644 index 00000000..897ebb4e --- /dev/null +++ b/Tests/OpenCombineTests/PublisherTests/CombineLatestTests.swift @@ -0,0 +1,797 @@ +// +// CombineLatestTests.swift +// +// +// Created by Kyle on 2023/12/28. +// + +import XCTest + +#if OPENCOMBINE_COMPATIBILITY_TEST +import Combine +#else +import OpenCombine +#endif + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +final class CombineLatestTests: XCTestCase { + static let arities = (2...4) + + struct ChildInfo { + let subscription: CustomSubscription + let publisher: CustomPublisher + } + + func testSendsExpectedValues() { + CombineLatestTests.arities.forEach { arity in + let (children, combineLatest) = getChildrenAndCombineLatestForArity(arity) + + let downstreamSubscriber = TrackingSubscriber(receiveSubscription: { + $0.request(.unlimited) + }) + + combineLatest.subscribe(downstreamSubscriber) + + (0..( + receiveSubscription: { downstreamSubscription = $0 }) + + combineLatest.subscribe(downstreamSubscriber) + + // Confirm initial demand + downstreamSubscription?.request(initialDemand) + (0..<2).forEach { XCTAssertEqual(children[$0].subscription.history, + [.requested(initialDemand)]) + } + + // Confirm no incremental demand + (0..<2).forEach { XCTAssertEqual(children[$0].publisher.send(1), .max(0)) } + + // Confirm no additional subscription demand + (0..<2).forEach { XCTAssertEqual(children[$0].subscription.history, + [.requested(initialDemand)]) + } + + // Confirm value was sent + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .value(2)]) + + // Confirm subsequent demand + downstreamSubscription?.request(.max(2)) + (0..<2).forEach { XCTAssertEqual(children[$0].subscription.history, + [.requested(initialDemand), + .requested(.max(2))]) + } + } + } + + func testDownstreamDemandRequestedWhileSendingValue() { + [Subscribers.Demand.unlimited, .max(10)].forEach { initialDemand in + let (children, combineLatest) = getChildrenAndCombineLatestForArity(2) + var downstreamSubscription: Subscription? + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { + downstreamSubscription = $0 + $0.request(initialDemand) + }, + receiveValue: { _ in + downstreamSubscription?.request(.max(666)) + return Subscribers.Demand.none + } + ) + + combineLatest.subscribe(downstreamSubscriber) + + XCTAssertEqual(children[0].publisher.send(1), .none) + // Apple will use the result of .receive(_ input:) INSTEAD of sending + // .request to the subscription if a request is received WHILE processing + // the .receive. + // AppleRef: 001 + XCTAssertEqual(children[1].publisher.send(1), .max(0)) + + XCTAssertEqual(children[0].subscription.history, + [.requested(initialDemand), + .requested(.max(666))]) + XCTAssertEqual(children[1].subscription.history, + [.requested(initialDemand), + .requested(.max(666))]) + } + } + + func testUpstreamValueReceivedWhileSendingValue() { + let (children, combineLatest) = getChildrenAndCombineLatestForArity(2) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }, + receiveValue: { _ in + XCTAssertEqual(children[0].publisher.send(1), .none) + return Subscribers.Demand.none + } + ) + + combineLatest.subscribe(downstreamSubscriber) + + XCTAssertEqual(children[0].publisher.send(1), .none) + XCTAssertEqual(children[1].publisher.send(1), .none) + + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .value(2)]) + } + + func testUpstreamFinishReceivedWhileSendingValue() { + let (children, combineLatest) = getChildrenAndCombineLatestForArity(2) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }, + receiveValue: { _ in + children[0].publisher.send(completion: .finished) + return Subscribers.Demand.none + } + ) + + combineLatest.subscribe(downstreamSubscriber) + + XCTAssertEqual(children[0].publisher.send(1), .none) + XCTAssertEqual(children[0].publisher.send(1), .none) + XCTAssertEqual(children[1].publisher.send(1), .none) + + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .value(2)]) + } + + func testCombineLatestCompletesOnlyAfterAllChildrenComplete() { + let upstreamSubscription = CustomSubscription() + let child1Publisher = CustomPublisher(subscription: upstreamSubscription) + let child2Publisher = CustomPublisher(subscription: upstreamSubscription) + + let combineLatest = child1Publisher.combineLatest(child2Publisher) { $0 + $1 } + + let downstreamSubscriber = TrackingSubscriberBase( + receiveSubscription: { $0.request(.unlimited) }) + + combineLatest.subscribe(downstreamSubscriber) + + XCTAssertEqual(child1Publisher.send(100), .none) + XCTAssertEqual(child1Publisher.send(200), .none) + XCTAssertEqual(child1Publisher.send(300), .none) + XCTAssertEqual(child2Publisher.send(1), .none) + child1Publisher.send(completion: .finished) + + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .value(301)]) + + XCTAssertEqual(child2Publisher.send(2), .none) + XCTAssertEqual(child2Publisher.send(3), .none) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .value(301), + .value(302), + .value(303)]) + + child2Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .value(301), + .value(302), + .value(303), + .completion(.finished)]) + + XCTAssertEqual( + upstreamSubscription.history, + [.requested(.unlimited), .requested(.unlimited)] + ) + } + + func testUpstreamExceedsDemand() { + // Must use CustomPublisher if we want to force send a value beyond the demand + let child1Subscription = CustomSubscription() + let child1Publisher = CustomPublisher(subscription: child1Subscription) + let child2Subscription = CustomSubscription() + let child2Publisher = CustomPublisher(subscription: child2Subscription) + + let combineLatest = child1Publisher.combineLatest(child2Publisher) { $0 + $1 } + + var downstreamSubscription: Subscription? + let downstreamSubscriber = TrackingSubscriber(receiveSubscription: { + downstreamSubscription = $0 + $0.request(.max(1)) + }) + + combineLatest.subscribe(downstreamSubscriber) + + XCTAssertEqual(child1Publisher.send(100), .none) + XCTAssertEqual(child2Publisher.send(1), .none) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .value(101)]) + + XCTAssertEqual(child1Publisher.send(200), .none) + XCTAssertEqual(child1Publisher.send(300), .none) + XCTAssertEqual(child2Publisher.send(2), .none) + // Surplus is sent downstream despite demand of zero + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .value(101)]) + + XCTAssertEqual(child2Publisher.send(3), .none) + downstreamSubscription?.request(.max(1)) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .value(101)]) + } + + private func getChildrenAndCombineLatestForArity(_ childCount: Int) + -> ([ChildInfo], AnyPublisher) + { + var children = [ChildInfo]() + for _ in (0.. + + switch childCount { + case 2: + combineLatest = AnyPublisher(children[0].publisher.combineLatest(children[1].publisher) + { $0 + $1 }) + case 3: + combineLatest = AnyPublisher(children[0].publisher + .combineLatest(children[1].publisher, + children[2].publisher) { $0 + $1 + $2 }) + case 4: + combineLatest = AnyPublisher(children[0].publisher + .combineLatest(children[1].publisher, + children[2].publisher, + children[3].publisher) { $0 + $1 + $2 + $3 }) + default: + fatalError() + } + + return (children, combineLatest) + } + + func testImmediateFinishWhenOneChildFinishesWithNoSurplus() { + CombineLatestTests.arities.forEach { arity in + for childToFinish in (0..() + let child2Publisher = PassthroughSubject() + + let combineLatest = child1Publisher.combineLatest(child2Publisher) { $0 + $1 } + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + combineLatest.subscribe(downstreamSubscriber) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest")]) + + child1Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest")]) + + child1Publisher.send(200) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest")]) + + child2Publisher.send(1) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest")]) + + child2Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .completion(.finished)]) + } + + func testBValueAfterAChildFinishedWithoutSurplus() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let combineLatest = child1Publisher.combineLatest(child2Publisher) { $0 + $1 } + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + combineLatest.subscribe(downstreamSubscriber) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest")]) + + child1Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest")]) + + child2Publisher.send(1) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest")]) + + child2Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .completion(.finished)]) + } + + func testAValueAfterAChildFinishedWithSurplus() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let combineLatest = child1Publisher.combineLatest(child2Publisher) { $0 + $1 } + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + combineLatest.subscribe(downstreamSubscriber) + + child1Publisher.send(100) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest")]) + + child1Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest")]) + + child1Publisher.send(200) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest")]) + + child2Publisher.send(1) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("CombineLatest"), + .value(101), + ]) + + child2Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .value(101), + .completion(.finished)]) + } + + func testBValueAfterAChildFinishedWithSurplus() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let combineLatest = child1Publisher.combineLatest(child2Publisher) { $0 + $1 } + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + combineLatest.subscribe(downstreamSubscriber) + + child1Publisher.send(100) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest")]) + + child1Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest")]) + + child2Publisher.send(1) + XCTAssertEqual(downstreamSubscriber.history, [ + .subscription("CombineLatest"), + .value(101), + ]) + + child2Publisher.send(completion: .finished) + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .value(101), + .completion(.finished)]) + } + + func testValueAfterFailed() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let combineLatest = child1Publisher.combineLatest(child2Publisher) { $0 + $1 } + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + combineLatest.subscribe(downstreamSubscriber) + + child1Publisher.send(100) + child1Publisher.send(completion: .failure(.oops)) + child2Publisher.send(1) + + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .completion(.failure(.oops))]) + } + + func testFinishAfterFinished() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let combineLatest = child1Publisher.combineLatest(child2Publisher) { $0 + $1 } + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + combineLatest.subscribe(downstreamSubscriber) + + child1Publisher.send(completion: .finished) + child2Publisher.send(completion: .finished) + child1Publisher.send(completion: .finished) + + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .completion(.finished)]) + } + + func testFinishAfterFailed() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let combineLatest = child1Publisher.combineLatest(child2Publisher) { $0 + $1 } + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + combineLatest.subscribe(downstreamSubscriber) + + child1Publisher.send(completion: .failure(.oops)) + child1Publisher.send(completion: .finished) + + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .completion(.failure(.oops))]) + } + + func testFailedAfterFinished() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let combineLatest = child1Publisher.combineLatest(child2Publisher) { $0 + $1 } + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + combineLatest.subscribe(downstreamSubscriber) + + child1Publisher.send(completion: .finished) + child2Publisher.send(completion: .finished) + child1Publisher.send(completion: .failure(.oops)) + + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .completion(.finished)]) + } + + func testFailedAfterFailed() { + let child1Publisher = PassthroughSubject() + let child2Publisher = PassthroughSubject() + + let combineLatest = child1Publisher.combineLatest(child2Publisher) { $0 + $1 } + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { $0.request(.unlimited) }) + + combineLatest.subscribe(downstreamSubscriber) + + child1Publisher.send(completion: .failure(.oops)) + child1Publisher.send(completion: .failure(.oops)) + + XCTAssertEqual(downstreamSubscriber.history, [.subscription("CombineLatest"), + .completion(.failure(.oops))]) + } + + func testCombineLatest2Lifecycle() throws { + let child2Publisher = PassthroughSubject() + try testLifecycle(sendValue: 42, + cancellingSubscriptionReleasesSubscriber: false, + finishingIsPassedThrough: false, + { $0.combineLatest(child2Publisher) }) + } + + func testCombineLatest3Lifecycle() throws { + let child2Publisher = PassthroughSubject() + let child3Publisher = PassthroughSubject() + try testLifecycle(sendValue: 42, + cancellingSubscriptionReleasesSubscriber: false, + finishingIsPassedThrough: false, + { $0.combineLatest(child2Publisher, child3Publisher) }) + } + + func testCombineLatest4Lifecycle() throws { + let child2Publisher = PassthroughSubject() + let child3Publisher = PassthroughSubject() + let child4Publisher = PassthroughSubject() + try testLifecycle(sendValue: 31, + cancellingSubscriptionReleasesSubscriber: false, + finishingIsPassedThrough: false, + { $0.combineLatest(child2Publisher, child3Publisher, child4Publisher) }) + } + + func testCombineLatestReceiveSubscriptionTwice() throws { + let child2Publisher = PassthroughSubject() + + // Can't use `testReceiveSubscriptionTwice` helper here as `(Int, Int)` output + // can't be made `Equatable`. + let helper = OperatorTestHelper( + publisherType: CustomPublisher.self, + initialDemand: nil, + receiveValueDemand: .none, + createSut: { $0.combineLatest(child2Publisher) } + ) + + XCTAssertEqual(helper.subscription.history, []) + + let secondSubscription = CustomSubscription() + + try XCTUnwrap(helper.publisher.subscriber) + .receive(subscription: secondSubscription) + + XCTAssertEqual(secondSubscription.history, [.cancelled]) + + try XCTUnwrap(helper.publisher.subscriber) + .receive(subscription: helper.subscription) + + XCTAssertEqual(helper.subscription.history, [.cancelled]) + + try XCTUnwrap(helper.downstreamSubscription).cancel() + + XCTAssertEqual(helper.subscription.history, [.cancelled, .cancelled]) + + let thirdSubscription = CustomSubscription() + + try XCTUnwrap(helper.publisher.subscriber) + .receive(subscription: thirdSubscription) + } + + func testNoDemandOnSubscriptionCrashes() { + CombineLatestTests.arities.forEach { arity in + let (_, combineLatest) = getChildrenAndCombineLatestForArity(arity) + + let downstreamSubscriber = TrackingSubscriber( + receiveSubscription: { subscription in + self.assertCrashes { subscription.request(.none) } + } + ) + + combineLatest.subscribe(downstreamSubscriber) + } + } + + func testCombineLatestCurrentValueSubject() throws { + let subject = CurrentValueSubject(()) + + let combineLatest = [42].publisher.combineLatest(subject) + + let downstreamSubscriber = TrackingSubscriberBase<(Int, ()), Never>( + receiveSubscription: { $0.request(.unlimited) }) + + combineLatest.subscribe(downstreamSubscriber) + + let history = downstreamSubscriber.history + XCTAssertEqual(history.count, 2) + + // tuples aren't Equatable, so matching the elements one by one + switch history[0] { + case .subscription("CombineLatest"): + break + default: + XCTFail("Failed to match the first subscription event in \(#function)") + } + + switch history[1] { + case .value(let v): + if v.0 != 42 || v.1 != () { + XCTFail("Failed to match the value event in \(#function)") + } + default: + XCTFail("Failed to match the value event in \(#function)") + } + } + + #if !os(WASI) + func testCombineLatestReferenceIssue() throws { + var subscriptions: Set = [] + #if OPENCOMBINE_COMPATIBILITY_TEST + let scheduler = DispatchQueue.main + #else + let scheduler = DispatchQueue.OCombine(DispatchQueue.main) + #endif + + let expectation = self.expectation(description: #function) + var result: (Int, Int)? + + let firstPublisher = Just(1) + .delay(for: .milliseconds(600), scheduler: scheduler) + let secondPublisher = Just(2) + .delay(for: .milliseconds(600), scheduler: scheduler) + Publishers.CombineLatest(firstPublisher, secondPublisher) + .sink(receiveValue: { + result = ($0.0, $0.1) + expectation.fulfill() + }) + .store(in: &subscriptions) + + wait(for: [expectation], timeout: 5) + + XCTAssertEqual(result?.0, 1) + XCTAssertEqual(result?.1, 2) + } + #endif + + func testCombineLatestDocumentationDemo() { + let pub = PassthroughSubject() + let pub2 = PassthroughSubject() + let pub3 = PassthroughSubject() + let pub4 = PassthroughSubject() + let combineLatest = pub + .combineLatest(pub2, pub3, pub4) { firstValue, secondValue, thirdValue, fourthValue in + return firstValue * secondValue * thirdValue * fourthValue + } + + let downstreamSubscriber = TrackingSubscriberBase( + receiveSubscription: { $0.request(.unlimited) }) + combineLatest.subscribe(downstreamSubscriber) + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("CombineLatest"), + ] + ) + pub.send(1) + pub.send(2) + pub2.send(2) + pub3.send(9) + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("CombineLatest"), + ] + ) + pub4.send(1) + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("CombineLatest"), + .value(36), // pub = 2, pub2 = 2, pub3 = 9, pub4 = 1 + ] + ) + pub.send(3) + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("CombineLatest"), + .value(36), // pub = 2, pub2 = 2, pub3 = 9, pub4 = 1 + .value(54), // pub = 3, pub2 = 2, pub3 = 9, pub4 = 1 + ] + ) + pub2.send(12) + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("CombineLatest"), + .value(36), // pub = 2, pub2 = 2, pub3 = 9, pub4 = 1 + .value(54), // pub = 3, pub2 = 2, pub3 = 9, pub4 = 1 + .value(324), // pub = 3, pub2 = 12, pub3 = 9, pub4 = 1 + ] + ) + pub.send(13) + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("CombineLatest"), + .value(36), // pub = 2, pub2 = 2, pub3 = 9, pub4 = 1 + .value(54), // pub = 3, pub2 = 2, pub3 = 9, pub4 = 1 + .value(324), // pub = 3, pub2 = 12, pub3 = 9, pub4 = 1 + .value(1404), // pub = 13, pub2 = 12, pub3 = 9, pub4 = 1 + ] + ) + pub3.send(19) + XCTAssertEqual( + downstreamSubscriber.history, + [ + .subscription("CombineLatest"), + .value(36), // pub = 2, pub2 = 2, pub3 = 9, pub4 = 1 + .value(54), // pub = 3, pub2 = 2, pub3 = 9, pub4 = 1 + .value(324), // pub = 3, pub2 = 12, pub3 = 9, pub4 = 1 + .value(1404), // pub = 13, pub2 = 12, pub3 = 9, pub4 = 1 + .value(2964), // pub = 13, pub2 = 12, pub3 = 19, pub4 = 1 + ] + ) + } + + func testEquatable() { + enum E: Equatable { + case a, b + } + let numbersPub = Just(1) + let lettersPub = Just("A") + let enumPub = Just(E.a) + let fractionsPub = Just(1.0) + + let combineLatestNumberLetter = numbersPub.combineLatest(lettersPub) + XCTAssertEqual(combineLatestNumberLetter, Publishers.CombineLatest(numbersPub, lettersPub)) + XCTAssertNotEqual(combineLatestNumberLetter, Publishers.CombineLatest(numbersPub, Just("B"))) + + let combineLatestNumberLetterEnum = numbersPub.combineLatest(lettersPub, enumPub) + XCTAssertEqual(combineLatestNumberLetterEnum, Publishers.CombineLatest3(numbersPub, lettersPub, enumPub)) + XCTAssertNotEqual(combineLatestNumberLetterEnum, Publishers.CombineLatest3(numbersPub, lettersPub, Just(E.b))) + + let combineLatestNumberLetterEnumFraction = numbersPub.combineLatest(lettersPub, enumPub, fractionsPub) + XCTAssertEqual(combineLatestNumberLetterEnumFraction, Publishers.CombineLatest4(numbersPub, lettersPub, enumPub, fractionsPub)) + XCTAssertNotEqual(combineLatestNumberLetterEnumFraction, Publishers.CombineLatest4(numbersPub, lettersPub, enumPub, Just(1.5))) + } + + // TODO: The above test case is mostly copied from ZipTests. Optimized for CombineLatest later - +} diff --git a/Tests/OpenCombineTests/PublisherTests/MergeTests.swift b/Tests/OpenCombineTests/PublisherTests/MergeTests.swift index 0b2b2ca4..8704cdf7 100644 --- a/Tests/OpenCombineTests/PublisherTests/MergeTests.swift +++ b/Tests/OpenCombineTests/PublisherTests/MergeTests.swift @@ -895,4 +895,6 @@ final class MergeTests: XCTestCase { let history = downstreamSubscriber.history XCTAssertEqual(history, [.subscription("Merge"), .value(42), .value(0)]) } + + // TODO: The above test case is mostly copied from ZipTests. Optimized for MergeTests later - }