Skip to content

Extensions and additions to AsyncSequence, AsyncStream and AsyncThrowingStream.

License

Notifications You must be signed in to change notification settings

gshahbazian/Asynchrone

 
 

Repository files navigation

Asynchone

Extensions and additions to AsyncSequence, AsyncStream and AsyncThrowingStream.

Requirements

  • iOS 14.0+
  • macOS 12.0+

Installation

Swift Package Manager

In Xcode:

  1. Click Project.
  2. Click Package Dependencies.
  3. Click +.
  4. Enter package URL: https://github.com/reddavis/Asynchrone.
  5. Add Asynchone to your app target.

Documentation

Documentation can be found here.

Overview

AsyncSequence

Assign

class MyClass {
    var value: Int = 0 {
        didSet { print("Set to \(self.value)") }
    }
}


let sequence = AsyncStream<Int> { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.finish()
}

let object = MyClass()
sequence.assign(to: \.value, on: object)

// Prints:
// Set to 1
// Set to 2
// Set to 3

First

let sequence = AsyncStream<Int> { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.finish()
}

print(await sequence.first())

// Prints:
// 1

Collect

let sequence = AsyncStream<Int> { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.finish()
}

print(await sequence.collect())

// Prints:
// [1, 2, 3]

Sink

let sequence = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.finish()
}

sequence.sink { print($0) }

// Prints:
// 1
// 2
// 3

Sink with completion

let sequence = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.finish(throwing: TestError())
}

sequence.sink(
    receiveValue: { print("Value: \($0)") },
    receiveCompletion: { print("Complete: \($0)") }
)

// Prints:
// Value: 1
// Value: 2
// Value: 3
// Complete: failure(TestError())
let sequence = Just(1)
    .map(String.init)
    .eraseToAnyAsyncSequenceable()
let stream = Fail<Int, TestError>(error: TestError.a)
    .eraseToAnyThrowingAsyncSequenceable()
let sequenceA = AsyncStream<Int> { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.finish()
}

let sequenceB = AsyncStream<Int> { continuation in
    continuation.yield(4)
    continuation.yield(5)
    continuation.yield(6)
    continuation.finish()
}

let sequenceC = AsyncStream<Int> { continuation in
    continuation.yield(7)
    continuation.yield(8)
    continuation.yield(9)
    continuation.finish()
}

for await value in sequenceA.chain(with: sequenceB).chain(with: sequenceC) {
    print(value)
}

// Prints:
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
let streamA = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.yield(4)
    continuation.finish()
}

let streamB = .init { continuation in
    continuation.yield(5)
    continuation.yield(6)
    continuation.yield(7)
    continuation.yield(8)
    continuation.yield(9)
    continuation.finish()
}

for await value in streamA.combineLatest(streamB) {
    print(value)
}

// Prints:
// (1, 5)
// (2, 6)
// (3, 7)
// (4, 8)
// (4, 9)
let streamA = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.yield(4)
    continuation.finish()
}

let streamB = .init { continuation in
    continuation.yield(5)
    continuation.yield(6)
    continuation.yield(7)
    continuation.yield(8)
    continuation.yield(9)
    continuation.finish()
}

let streamC = .init { continuation in
    continuation.yield(10)
    continuation.yield(11)
    continuation.finish()
}

for await value in streamA.combineLatest(streamB, streamC) {
    print(value)
}

// Prints:
// (1, 5, 10)
// (2, 6, 11)
// (3, 7, 11)
// (4, 8, 11)
// (4, 9, 11)
let sequence = CurrentElementAsyncSequence(0)
print(await sequence.element)

await stream.yield(1)
print(await sequence.element)

await stream.yield(2)
await stream.yield(3)
await stream.yield(4)
print(await sequence.element)

// Prints:
// 0
// 1
// 4
let stream = AsyncStream<Int> { continuation in
    continuation.yield(0)
    try? await Task.sleep(nanoseconds: 200_000_000)
    continuation.yield(1)
    try? await Task.sleep(nanoseconds: 200_000_000)
    continuation.yield(2)
    continuation.yield(3)
    continuation.yield(4)
    continuation.yield(5)
    continuation.finish()
}

for element in try await self.stream.debounce(for: 0.1) {
    print(element)
}

// Prints:
// 0
// 1
// 5
let stream = AsyncStream<Int> { continuation in
    continuation.yield(0)
    continuation.yield(1)
    continuation.yield(2)
    continuation.finish()
}

let start = Date.now
for element in try await self.stream.delay(for: 0.5) {
    print("\(element) - \(Date.now.timeIntervalSince(start))")
}

// Prints:
// 0 - 0.5
// 1 - 1.0
// 2 - 1.5
>>>>>>> main
Empty<Int>().sink(
    receiveValue: { print($0) },
    receiveCompletion: { completion in
        switch completion {
        case .finished:
            print("Finished")
        case .failure:
            print("Failed")
        }
    }
)

// Prints:
// Finished
let stream = Fail<Int, TestError>(error: TestError())

do {
    for try await value in stream {
        print(value)
    }
} catch {
    print("Error!")
}

// Prints:
// Error!
let stream = Just(1)

for await value in stream {
    print(value)
}

// Prints:
// 1
let streamA = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.yield(4)
    continuation.finish()
}

let streamB = .init { continuation in
    continuation.yield(5)
    continuation.yield(6)
    continuation.yield(7)
    continuation.yield(8)
    continuation.yield(9)
    continuation.finish()
}

for await value in streamA.merge(with: streamB) {
    print(value)
}

// Prints:
// 1
// 5
// 2
// 6
// 3
// 7
// 4
// 8
// 9
let streamA = .init { continuation in
    continuation.yield(1)
    continuation.yield(4)
    continuation.finish()
}

let streamB = .init { continuation in
    continuation.yield(2)
    continuation.finish()
}

let streamC = .init { continuation in
    continuation.yield(3)
    continuation.finish()
}

for await value in self.streamA.merge(with: self.streamB, self.streamC) {
    print(value)
}

// Prints:
// 1
// 2
// 3
// 4
let sequence = PassthroughAsyncSequence<Int>()
sequence.yield(0)
sequence.yield(1)
sequence.yield(2)
sequence.finish()

for await value in sequence {
    print(value)
}

// Prints:
// 0
// 1
// 2
let stream = .init { continuation in
    continuation.yield(1)
    continuation.yield(1)
    continuation.yield(2)
    continuation.yield(3)
    continuation.finish()
}

for await value in stream.removeDuplicates() {
    print(value)
}

// Prints:
// 1
// 2
// 3
let sequence = Fail<Int, TestError>(
    error: TestError()
)
.replaceError(with: 0)

for await value in stream {
    print(value)
}

// Prints:
// 0
let sequence = [0, 1, 2, 3].async

for await value in sequence {
    print(value)
}

// Prints:
// 1
// 2
// 3
let values = [
    "a",
    "ab",
    "abc",
    "abcd"
]

let stream = AsyncStream { continuation in
    for value in values {
        continuation.yield(value)
    }
    continuation.finish()
}
.shared()

Task {
    let values = try await self.stream.collect()
    // ...
}

Task.detached {
    let values = try await self.stream.collect()
    // ...
}

let values = try await self.stream.collect()
// ...
let stream = AsyncStream<Int> { continuation in
    continuation.yield(0)
    try? await Task.sleep(nanoseconds: 100_000_000)
    continuation.yield(1)
    try? await Task.sleep(nanoseconds: 100_000_000)
    continuation.yield(2)
    continuation.yield(3)
    continuation.yield(4)
    continuation.yield(5)
    continuation.finish()
}

for element in try await self.stream.throttle(for: 0.05, latest: true) {
    print(element)
}

// Prints:
// 0
// 1
// 2
// 5
let sequence = ThrowingPassthroughAsyncSequence<Int>()
sequence.yield(0)
sequence.yield(1)
sequence.yield(2)
sequence.finish(throwing: TestError())

do {
    for try await value in sequence {
      print(value)
    }
} catch {
    print("Error!")
}

// Prints:
// 0
// 1
// 2
// Error!
let sequence = TimerAsyncSequence(interval: 1)

let start = Date.now
for element in await sequence {
    print(element)
}

// Prints:
// 2022-03-19 20:49:30 +0000
// 2022-03-19 20:49:31 +0000
// 2022-03-19 20:49:32 +0000
let streamA = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.finish()
}

let streamB = .init { continuation in
    continuation.yield(5)
    continuation.yield(6)
    continuation.yield(7)
    continuation.finish()
}

for await value in streamA.zip(streamB) {
    print(value)
}

// Prints:
// (1, 5)
// (2, 6)
let streamA = .init { continuation in
    continuation.yield(1)
    continuation.yield(2)
    continuation.finish()
}

let streamB = .init { continuation in
    continuation.yield(5)
    continuation.yield(6)
    continuation.yield(7)
    continuation.finish()
}

let streamC = .init { continuation in
    continuation.yield(8)
    continuation.yield(9)
    continuation.finish()
}

for await value in streamA.zip(streamB, streamC) {
    print(value)
}

// Prints:
// (1, 5, 8)
// (2, 6, 9)

AsyncStream

AsyncStream.Continuation

AsyncThrowingStream

AsyncThrowingStream.Continuation

About

Extensions and additions to AsyncSequence, AsyncStream and AsyncThrowingStream.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Swift 98.9%
  • Other 1.1%