Skip to content

Commit

Permalink
Support MainActor (#294)
Browse files Browse the repository at this point in the history
* ⚡ WIP

* 🌲 Update

* 🌲 Update
  • Loading branch information
muukii authored Aug 27, 2022
1 parent 020abb5 commit e066eb7
Show file tree
Hide file tree
Showing 14 changed files with 512 additions and 132 deletions.
8 changes: 4 additions & 4 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import PackageDescription
let package = Package(
name: "Verge",
platforms: [
.macOS(.v10_12),
.iOS(.v11),
.tvOS(.v10),
.watchOS(.v3),
.macOS(.v10_15),
.iOS(.v13),
.tvOS(.v13),
.watchOS(.v6),
],
products: [
.library(name: "Verge", targets: ["Verge"]),
Expand Down
1 change: 1 addition & 0 deletions Sources/Verge/Derived/Derived+Assign.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ extension Derived {
public func assign(
to binder: @escaping (Changes<Value>) -> Void
) -> VergeAnyCancellable {

sinkValue(queue: .passthrough) { c in
binder(c)
}
Expand Down
105 changes: 90 additions & 15 deletions Sources/Verge/Derived/Derived.swift
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public class Derived<Value>: _VergeObservableObjectBase, DerivedType, @unchecked
/// Returns new Derived object that provides only changed value
///
/// - Parameter predicate: Return true, removes value
public func makeRemovingDuplicates(by predicate: @escaping (Changes<Value>) -> Bool) -> Derived<Value> {
public func makeRemovingDuplicates(by predicate: @escaping @Sendable (Changes<Value>) -> Bool) -> Derived<Value> {
guard !attributes.contains(.dropsDuplicatedOutput) else {
assertionFailure("\(self) has already applied removeDuplicates")
return self
Expand All @@ -223,6 +223,19 @@ public class Derived<Value>: _VergeObservableObjectBase, DerivedType, @unchecked
return chained
}

private func _sinkValue(
dropsFirst: Bool = false,
queue: TargetQueueType,
receive: @escaping (Changes<Value>) -> Void
) -> VergeAnyCancellable {
innerStore._sinkState(
dropsFirst: dropsFirst,
queue: queue,
receive: receive
)
.associate(self)
}

/// Subscribe the state changes
///
/// First object always returns true from ifChanged / hasChanges / noChanges unless dropsFirst is true.
Expand All @@ -233,9 +246,30 @@ public class Derived<Value>: _VergeObservableObjectBase, DerivedType, @unchecked
/// - Returns: A subscriber that performs the provided closure upon receiving values.
public func sinkValue(
dropsFirst: Bool = false,
queue: TargetQueue = .mainIsolated(),
queue: TargetQueue,
receive: @escaping (Changes<Value>) -> Void
) -> VergeAnyCancellable {
) -> VergeAnyCancellable {
innerStore.sinkState(
dropsFirst: dropsFirst,
queue: queue,
receive: receive
)
.associate(self)
}

/// Subscribe the state changes
///
/// First object always returns true from ifChanged / hasChanges / noChanges unless dropsFirst is true.
///
/// - Parameters:
/// - dropsFirst: Drops the latest value on start. if true, receive closure will be called next time state is updated.
/// - queue: Specify a queue to receive changes object.
/// - Returns: A subscriber that performs the provided closure upon receiving values.
public func sinkValue(
dropsFirst: Bool = false,
queue: MainActorTargetQueue = .mainIsolated(),
receive: @escaping @MainActor (Changes<Value>) -> Void
) -> VergeAnyCancellable {
innerStore.sinkState(
dropsFirst: dropsFirst,
queue: queue,
Expand All @@ -256,7 +290,7 @@ public class Derived<Value>: _VergeObservableObjectBase, DerivedType, @unchecked
public func sinkValue<Accumulate>(
scan: Scan<Changes<Value>, Accumulate>,
dropsFirst: Bool = false,
queue: TargetQueue = .mainIsolated(),
queue: TargetQueue,
receive: @escaping (Changes<Value>, Accumulate) -> Void
) -> VergeAnyCancellable {
innerStore.sinkState(
Expand All @@ -267,10 +301,34 @@ public class Derived<Value>: _VergeObservableObjectBase, DerivedType, @unchecked
)
.associate(self)
}

/// Subscribe the state changes
///
/// First object always returns true from ifChanged / hasChanges / noChanges unless dropsFirst is true.
///
/// - Parameters:
/// - scan: Accumulates a specified type of value over receiving updates.
/// - dropsFirst: Drops the latest value on started. if true, receive closure will call from next state updated.
/// - queue: Specify a queue to receive changes object.
/// - Returns: A subscriber that performs the provided closure upon receiving values.
public func sinkValue<Accumulate>(
scan: Scan<Changes<Value>, Accumulate>,
dropsFirst: Bool = false,
queue: MainActorTargetQueue = .mainIsolated(),
receive: @escaping @MainActor (Changes<Value>, Accumulate) -> Void
) -> VergeAnyCancellable {
innerStore.sinkState(
scan: scan,
dropsFirst: dropsFirst,
queue: queue,
receive: receive
)
.associate(self)
}

fileprivate func _makeChain<NewState>(
_ map: Pipeline<Changes<Value>, NewState>,
queue: TargetQueue
queue: TargetQueueType
) -> Derived<NewState> {

vergeSignpostEvent("Derived.chain.new", label: "\(type(of: Value.self)) -> \(type(of: NewState.self))")
Expand All @@ -287,7 +345,7 @@ public class Derived<Value>: _VergeObservableObjectBase, DerivedType, @unchecked
set: { _ in },
initialUpstreamState: value,
subscribeUpstreamState: { callback in
self.innerStore.sinkState(
self.innerStore._sinkState(
dropsFirst: true,
queue: queue,
receive: callback
Expand All @@ -314,7 +372,7 @@ public class Derived<Value>: _VergeObservableObjectBase, DerivedType, @unchecked
public func chain<NewState>(
_ pipeline: Pipeline<Changes<Value>, NewState>,
dropsOutput: ((Changes<NewState>) -> Bool)? = nil,
queue: TargetQueue = .passthrough
queue: TargetQueueType = TargetQueue.passthrough
) -> Derived<NewState> {

let derived = chainCahce2.withValue { cache -> Derived<NewState> in
Expand Down Expand Up @@ -415,7 +473,7 @@ extension Derived where Value : Equatable {
/// - Returns: A subscriber that performs the provided closure upon receiving values.
public func sinkChangedPrimitiveValue(
dropsFirst: Bool = false,
queue: TargetQueue = .mainIsolated(),
queue: TargetQueue,
receive: @escaping (Value) -> Void
) -> VergeAnyCancellable {
sinkValue(dropsFirst: dropsFirst, queue: queue) { (changes) in
Expand All @@ -425,6 +483,23 @@ extension Derived where Value : Equatable {
}
}

/// Subscribe the state changes
///
/// Receives a value only changed
///
/// - Returns: A subscriber that performs the provided closure upon receiving values.
public func sinkChangedPrimitiveValue(
dropsFirst: Bool = false,
queue: MainActorTargetQueue = .mainIsolated(),
receive: @escaping @MainActor (Value) -> Void
) -> VergeAnyCancellable {
sinkValue(dropsFirst: dropsFirst, queue: queue) { (changes) in
changes.ifChanged { value in
receive(value)
}
}
}

}

// `Value == Never` eliminates specializing requirements.
Expand All @@ -438,7 +513,7 @@ extension Derived where Value == Never {
/// - s0:
/// - s1:
/// - Returns:
public static func combined<S0, S1>(_ s0: Derived<S0>, _ s1: Derived<S1>, queue: TargetQueue = .passthrough) -> Derived<(S0, S1)> {
public static func combined<S0, S1>(_ s0: Derived<S0>, _ s1: Derived<S1>, queue: TargetQueueType = .passthrough) -> Derived<(S0, S1)> {

typealias Shape = (S0, S1)

Expand All @@ -452,14 +527,14 @@ extension Derived where Value == Never {
initialUpstreamState: initial,
subscribeUpstreamState: { callback in

let _s0 = s0.sinkValue(dropsFirst: true, queue: queue) { (s0) in
let _s0 = s0._sinkValue(dropsFirst: true, queue: queue) { (s0) in
buffer.modify { value in
value.0 = s0.primitive
callback(value)
}
}

let _s1 = s1.sinkValue(dropsFirst: true, queue: queue) { (s1) in
let _s1 = s1._sinkValue(dropsFirst: true, queue: queue) { (s1) in
buffer.modify { value in
value.1 = s1.primitive
callback(value)
Expand All @@ -486,7 +561,7 @@ extension Derived where Value == Never {
/// - s1:
/// - s2:
/// - Returns:
public static func combined<S0, S1, S2>(_ s0: Derived<S0>, _ s1: Derived<S1>, _ s2: Derived<S2>, queue: TargetQueue = .passthrough) -> Derived<(S0, S1, S2)> {
public static func combined<S0, S1, S2>(_ s0: Derived<S0>, _ s1: Derived<S1>, _ s2: Derived<S2>, queue: TargetQueueType = .passthrough) -> Derived<(S0, S1, S2)> {

typealias Shape = (S0, S1, S2)

Expand All @@ -500,21 +575,21 @@ extension Derived where Value == Never {
initialUpstreamState: initial,
subscribeUpstreamState: { callback in

let _s0 = s0.sinkValue(dropsFirst: true, queue: queue) { (s0) in
let _s0 = s0._sinkValue(dropsFirst: true, queue: queue) { (s0) in
buffer.modify { value in
value.0 = s0.primitive
callback(value)
}
}

let _s1 = s1.sinkValue(dropsFirst: true, queue: queue) { (s1) in
let _s1 = s1._sinkValue(dropsFirst: true, queue: queue) { (s1) in
buffer.modify { value in
value.1 = s1.primitive
callback(value)
}
}

let _s2 = s2.sinkValue(dropsFirst: true, queue: queue) { (s2) in
let _s2 = s2._sinkValue(dropsFirst: true, queue: queue) { (s2) in
buffer.modify { value in
value.2 = s2.primitive
callback(value)
Expand Down
48 changes: 46 additions & 2 deletions Sources/Verge/Store/DispatcherType.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ extension DispatcherType {
*/
public func sinkState(
dropsFirst: Bool = false,
queue: TargetQueue = .mainIsolated(),
queue: TargetQueue,
receive: @escaping (Changes<Scope>) -> Void
) -> VergeAnyCancellable {
let _scope = scope
Expand All @@ -58,6 +58,28 @@ extension DispatcherType {
receive(state.map { $0[keyPath: _scope] })
}
}

/**
Subscribe the state that scoped

First object always returns true from ifChanged / hasChanges / noChanges unless dropsFirst is true.

- Parameters:
- dropsFirst: Drops the latest value on started. if true, receive closure will call from next state updated.
- queue: Specify a queue to receive changes object.
- Returns: A subscriber that performs the provided closure upon receiving values.
*/
public func sinkState(
dropsFirst: Bool = false,
queue: MainActorTargetQueue = .mainIsolated(),
receive: @escaping @MainActor (Changes<Scope>) -> Void
) -> VergeAnyCancellable {
let _scope = scope

return store.asStore().sinkState(dropsFirst: dropsFirst, queue: queue) { state in
receive(state.map { $0[keyPath: _scope] })
}
}

/// Subscribe the state changes
///
Expand All @@ -71,14 +93,36 @@ extension DispatcherType {
public func sinkState<Accumulate>(
scan: Scan<Changes<Scope>, Accumulate>,
dropsFirst: Bool = false,
queue: TargetQueue = .mainIsolated(),
queue: TargetQueue,
receive: @escaping (Changes<Scope>, Accumulate) -> Void
) -> VergeAnyCancellable {
sinkState(dropsFirst: dropsFirst, queue: queue) { (changes) in
let accumulate = scan.accumulate(changes)
receive(changes, accumulate)
}
}

/// Subscribe the state changes
///
/// First object always returns true from ifChanged / hasChanges / noChanges unless dropsFirst is true.
///
/// - Parameters:
/// - scan: Accumulates a specified type of value over receiving updates.
/// - dropsFirst: Drops the latest value on started. if true, receive closure will call from next state updated.
/// - queue: Specify a queue to receive changes object.
/// - Returns: A subscriber that performs the provided closure upon receiving values.
public func sinkState<Accumulate>(
scan: Scan<Changes<Scope>, Accumulate>,
dropsFirst: Bool = false,
queue: MainActorTargetQueue = .mainIsolated(),
receive: @escaping @MainActor (Changes<Scope>, Accumulate) -> Void
) -> VergeAnyCancellable {
sinkState(dropsFirst: dropsFirst, queue: queue) { (changes) in
let accumulate = scan.accumulate(changes)
receive(changes, accumulate)
}
}


/// Send activity
/// - Parameter activity:
Expand Down
Loading

0 comments on commit e066eb7

Please sign in to comment.