Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work around open-telemetry/opentelemetry-swift#615 #667

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
21 changes: 3 additions & 18 deletions [email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ let package = Package(
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.20.2"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.4"),
.package(url: "https://github.com/apple/swift-metrics.git", from: "2.1.1"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0")
],
targets: [
.target(name: "OpenTelemetryApi",
dependencies: []),
.target(name: "OpenTelemetrySdk",
dependencies: ["OpenTelemetryApi"].withAtomicsIfNeeded()),
dependencies: ["OpenTelemetryApi",
.product(name: "Atomics", package: "swift-atomics", condition: .when(platforms: [.linux]))]),
.target(name: "OpenTelemetryConcurrency",
dependencies: ["OpenTelemetryApi"]),
.target(name: "OpenTelemetryTestUtils",
Expand Down Expand Up @@ -134,25 +136,8 @@ let package = Package(
]
).addPlatformSpecific()

extension [Target.Dependency] {
func withAtomicsIfNeeded() -> [Target.Dependency] {
#if canImport(Darwin)
return self
#else
var dependencies = self
dependencies.append(.product(name: "Atomics", package: "swift-atomics"))
return dependencies
#endif
}
}

extension Package {
func addPlatformSpecific() -> Self {
#if !canImport(Darwin)
dependencies.append(
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.2.0"))
)
#endif
#if canImport(ObjectiveC)
dependencies.append(
.package(url: "https://github.com/undefinedlabs/opentracing-objc", from: "0.5.2")
Expand Down
6 changes: 5 additions & 1 deletion Sources/Exporters/OpenTelemetryProtocolHttp/Lock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
#else
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
#error("Unsupported platform")
#endif

/// A threading lock based on `libpthread` instead of `libdispatch`.
Expand Down
6 changes: 5 additions & 1 deletion Sources/Importers/OpenTracingShim/Locks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
import Glibc
#error("Unsupported platform")
#endif

/// A threading lock based on `libpthread` instead of `libdispatch`.
Expand Down
6 changes: 5 additions & 1 deletion Sources/Importers/SwiftMetricsShim/Locks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
#else
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
#error("Unsupported platform")
#endif

/// A threading lock based on `libpthread` instead of `libdispatch`.
Expand Down
6 changes: 5 additions & 1 deletion Sources/OpenTelemetrySdk/Internal/Locks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
#else
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
#error("Unsupported platform")
#endif

/// A threading lock based on `libpthread` instead of `libdispatch`.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//
//

import Foundation
import OpenTelemetryApi
Expand Down Expand Up @@ -36,7 +36,8 @@ public class BatchLogRecordProcessor: LogRecordProcessor {
}
}

private class BatchWorker: Thread {
private class BatchWorker {
var thread: Thread!
let logRecordExporter: LogRecordExporter
let scheduleDelay: TimeInterval
let maxQueueSize: Int
Expand Down Expand Up @@ -65,11 +66,22 @@ private class BatchWorker: Thread {
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1
self.thread = Thread(block: { [weak self] in
self?.main()
})
}

func start() {
self.thread.start()
}

func cancel() {
self.thread.cancel()
}

func emit(logRecord: ReadableLogRecord) {
cond.lock()
defer { cond.unlock()}
defer { cond.unlock() }
if logRecordList.count == maxQueueSize {
// TODO: record a counter for dropped logs
return
Expand All @@ -82,22 +94,22 @@ private class BatchWorker: Thread {
}
}

override func main() {
func main() {
repeat {
autoreleasepool {
var logRecordsCopy: [ReadableLogRecord]
cond.lock()
if logRecordList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while logRecordList.isEmpty && !self.isCancelled
} while logRecordList.isEmpty && !thread.isCancelled
}
logRecordsCopy = logRecordList
logRecordList.removeAll()
cond.unlock()
self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: exportTimeout)
}
} while !self.isCancelled
} while !thread.isCancelled
}

public func forceFlush(explicitTimeout: TimeInterval? = nil) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public struct BatchSpanProcessor: SpanProcessor {
/// BatchWorker is a thread that batches multiple spans and calls the registered SpanExporter to export
/// the data.
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
private class BatchWorker: Thread {
private class BatchWorker {
var thread: Thread!
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider?
let scheduleDelay: TimeInterval
Expand Down Expand Up @@ -140,6 +141,10 @@ private class BatchWorker: Thread {
)
}
}

self.thread = Thread(block: { [weak self] in
self?.main()
})
}

deinit {
Expand All @@ -148,6 +153,14 @@ private class BatchWorker: Thread {
self.spanGaugeObserver?.close()
}

func start() {
self.thread.start()
}

func cancel() {
self.thread.cancel()
}

func addSpan(span: ReadableSpan) {
cond.lock()
defer { cond.unlock() }
Expand All @@ -168,22 +181,22 @@ private class BatchWorker: Thread {
}
}

override func main() {
func main() {
repeat {
autoreleasepool {
var spansCopy: [ReadableSpan]
cond.lock()
if spanList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while spanList.isEmpty && !self.isCancelled
} while spanList.isEmpty && !thread.isCancelled
}
spansCopy = spanList
spanList.removeAll()
cond.unlock()
self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout)
}
} while !self.isCancelled
} while !thread.isCancelled
}

func shutdown() {
Expand Down
Loading