diff --git a/.spi.yml b/.spi.yml index 38cb5aa..4ef114b 100644 --- a/.spi.yml +++ b/.spi.yml @@ -1,4 +1,4 @@ version: 1 builder: configs: - - documentation_targets: [NIOExtras, NIOHTTPCompression, NIOSOCKS, NIOHTTPTypes, NIOHTTPTypesHTTP1, NIOHTTPTypesHTTP2, NIOResumableUpload] + - documentation_targets: [NIOExtras, NIOHTTPCompression, NIOSOCKS, NIOHTTPTypes, NIOHTTPTypesHTTP1, NIOHTTPTypesHTTP2, NIOResumableUpload, NIOHTTPResponsiveness] diff --git a/Package.swift b/Package.swift index f2355d6..d1aa7ff 100644 --- a/Package.swift +++ b/Package.swift @@ -197,6 +197,31 @@ var targets: [PackageDescription.Target] = [ .product(name: "NIOEmbedded", package: "swift-nio"), ] ), + .target( + name: "NIOHTTPResponsiveness", + dependencies: [ + "NIOHTTPTypes", + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "HTTPTypes", package: "swift-http-types"), + .product(name: "Algorithms", package: "swift-algorithms"), + ], + swiftSettings: [ + .enableExperimentalFeature("StrictConcurrency") + ] + ), + .testTarget( + name: "NIOHTTPResponsivenessTests", + dependencies: [ + "NIOHTTPResponsiveness", + "NIOHTTPTypes", + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "NIOEmbedded", package: "swift-nio"), + .product(name: "HTTPTypes", package: "swift-http-types"), + ], + swiftSettings: [ + .enableExperimentalFeature("StrictConcurrency") + ] + ), ] let package = Package( @@ -209,13 +234,16 @@ let package = Package( .library(name: "NIOHTTPTypesHTTP1", targets: ["NIOHTTPTypesHTTP1"]), .library(name: "NIOHTTPTypesHTTP2", targets: ["NIOHTTPTypesHTTP2"]), .library(name: "NIOResumableUpload", targets: ["NIOResumableUpload"]), + .library(name: "NIOHTTPResponsiveness", targets: ["NIOHTTPResponsiveness"]), ], dependencies: [ - .package(url: "https://github.com/apple/swift-nio.git", from: "2.78.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.77.0"), .package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.27.0"), .package(url: "https://github.com/apple/swift-http-types.git", from: "1.3.0"), .package(url: "https://github.com/apple/swift-http-structured-headers.git", from: "1.1.0"), .package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0"), + .package(url: "https://github.com/apple/swift-algorithms.git", from: "1.2.0"), + ], targets: targets ) diff --git a/README.md b/README.md index a400b17..9ea0ed5 100644 --- a/README.md +++ b/README.md @@ -59,3 +59,5 @@ On the [`nio-extras-0.1`](https://github.com/apple/swift-nio-extras/tree/nio-ext - [`HTTP2FramePayloadToHTTPClientCodec`](Sources/NIOHTTPTypesHTTP2/HTTP2ToHTTPCodec.swift) A `ChannelHandler` that translates HTTP/2 concepts into shared HTTP types for the client side. - [`HTTP2FramePayloadToHTTPServerCodec`](Sources/NIOHTTPTypesHTTP2/HTTP2ToHTTPCodec.swift) A `ChannelHandler` that translates HTTP/2 concepts into shared HTTP types for the server side. - [`HTTPResumableUploadHandler`](Sources/NIOResumableUpload/HTTPResumableUploadHandler.swift) A `ChannelHandler` that translates HTTP resumable uploads to regular uploads. +- [`HTTPDrippingDownloadHandler`](Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift) A `ChannelHandler` that sends a configurable stream of zeroes to a client. +- [`HTTPReceiveDiscardHandler`](Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift) A `ChannelHandler` that receives arbitrary bytes from a client and discards them. diff --git a/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift b/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift new file mode 100644 index 0000000..76588e5 --- /dev/null +++ b/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift @@ -0,0 +1,255 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPTypes +import NIOCore +import NIOHTTPTypes + +/// HTTP request handler sending a configurable stream of zeroes. Uses HTTPTypes request/response parts. +public final class HTTPDrippingDownloadHandler: ChannelDuplexHandler { + public typealias InboundIn = HTTPRequestPart + public typealias OutboundOut = HTTPResponsePart + public typealias OutboundIn = Never + + // Predefine buffer to reuse over and over again when sending chunks to requester. NIO allows + // us to give it reference counted buffers. Reusing like this allows us to avoid allocations. + static let downloadBodyChunk = ByteBuffer(repeating: 0, count: 65536) + + private var frequency: TimeAmount + private var size: Int + private var count: Int + private var delay: TimeAmount + private var code: HTTPResponse.Status + + private enum Phase { + /// We haven't gotten the request head - nothing to respond to + case waitingOnHead + /// We got the request head and are delaying the response + case delayingResponse + /// We're dripping response chunks to the peer, tracking how many chunks we have left + case dripping(DrippingState) + /// We either sent everything to the client or the request ended short + case done + } + + private struct DrippingState { + var chunksLeft: Int + var currentChunkBytesLeft: Int + } + + private var phase = Phase.waitingOnHead + private var scheduled: Scheduled? + private var scheduledCallbackHandler: HTTPDrippingDownloadHandlerScheduledCallbackHandler? + private var pendingRead = false + private var pendingWrite = false + private var activelyWritingChunk = false + + /// Initializes an `HTTPDrippingDownloadHandler`. + /// - Parameters: + /// - count: How many chunks should be sent. Note that the underlying HTTP + /// stack may split or combine these chunks into data frames as + /// they see fit. + /// - size: How large each chunk should be + /// - frequency: How much time to wait between sending each chunk + /// - delay: How much time to wait before sending the first chunk + /// - code: What HTTP status code to send + public init( + count: Int = 0, + size: Int = 0, + frequency: TimeAmount = .zero, + delay: TimeAmount = .zero, + code: HTTPResponse.Status = .ok + ) { + self.frequency = frequency + self.size = size + self.count = count + self.delay = delay + self.code = code + } + + public convenience init?(queryArgsString: Substring.UTF8View) { + self.init() + + let pairs = queryArgsString.split(separator: UInt8(ascii: "&")) + for pair in pairs { + var pairParts = pair.split(separator: UInt8(ascii: "="), maxSplits: 1).makeIterator() + guard let first = pairParts.next(), let second = pairParts.next() else { + continue + } + + guard let secondNum = Int(Substring(second)) else { + return nil + } + + switch Substring(first) { + case "frequency": + self.frequency = .seconds(Int64(secondNum)) + case "size": + self.size = secondNum + case "count": + self.count = secondNum + case "delay": + self.delay = .seconds(Int64(secondNum)) + case "code": + self.code = HTTPResponse.Status(code: secondNum) + default: + continue + } + } + } + + public func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let part = self.unwrapInboundIn(data) + + switch part { + case .head: + self.phase = .delayingResponse + + if self.delay == TimeAmount.zero { + // If no delay, we might as well start responding immediately + self.onResponseDelayCompleted(context: context) + } else { + let this = NIOLoopBound(self, eventLoop: context.eventLoop) + let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop) + self.scheduled = context.eventLoop.scheduleTask(in: self.delay) { + this.value.onResponseDelayCompleted(context: loopBoundContext.value) + } + } + case .body, .end: + return + } + } + + private func onResponseDelayCompleted(context: ChannelHandlerContext) { + guard case .delayingResponse = self.phase else { + return + } + + var head = HTTPResponse(status: self.code) + + // If the length isn't too big, let's include a content length header + if case (let contentLength, false) = self.size.multipliedReportingOverflow(by: self.count) { + head.headerFields = HTTPFields(dictionaryLiteral: (.contentLength, "\(contentLength)")) + } + + context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil) + self.phase = .dripping( + DrippingState( + chunksLeft: self.count, + currentChunkBytesLeft: self.size + ) + ) + + self.writeChunk(context: context) + } + + public func channelInactive(context: ChannelHandlerContext) { + self.phase = .done + self.scheduled?.cancel() + context.fireChannelInactive() + } + + public func channelWritabilityChanged(context: ChannelHandlerContext) { + if case .dripping = self.phase, self.pendingWrite && context.channel.isWritable { + self.writeChunk(context: context) + } + } + + private func writeChunk(context: ChannelHandlerContext) { + // Make sure we don't accidentally reenter + if self.activelyWritingChunk { + return + } + self.activelyWritingChunk = true + defer { + self.activelyWritingChunk = false + } + + // If we're not dripping the response body, there's nothing to do + guard case .dripping(var drippingState) = self.phase else { + return + } + + // If we've sent all chunks, send end and be done + if drippingState.chunksLeft < 1 { + self.phase = .done + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + return + } + + var dataWritten = false + while drippingState.currentChunkBytesLeft > 0, context.channel.isWritable { + let toSend = min( + drippingState.currentChunkBytesLeft, + HTTPDrippingDownloadHandler.downloadBodyChunk.readableBytes + ) + let buffer = HTTPDrippingDownloadHandler.downloadBodyChunk.getSlice( + at: HTTPDrippingDownloadHandler.downloadBodyChunk.readerIndex, + length: toSend + )! + context.write(self.wrapOutboundOut(.body(buffer)), promise: nil) + drippingState.currentChunkBytesLeft -= toSend + dataWritten = true + } + + // If we weren't able to send the full chunk, it's because the channel isn't writable. We yield until it is + if drippingState.currentChunkBytesLeft > 0 { + self.pendingWrite = true + self.phase = .dripping(drippingState) + if dataWritten { + context.flush() + } + return + } + + // We sent the full chunk. If we have no more chunks to write, we're done! + drippingState.chunksLeft -= 1 + if drippingState.chunksLeft == 0 { + self.phase = .done + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + return + } + + if dataWritten { + context.flush() + } + + // More chunks to write.. Kick off timer + drippingState.currentChunkBytesLeft = self.size + self.phase = .dripping(drippingState) + if self.scheduledCallbackHandler == nil { + let this = NIOLoopBound(self, eventLoop: context.eventLoop) + let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop) + self.scheduledCallbackHandler = HTTPDrippingDownloadHandlerScheduledCallbackHandler( + handler: this, + context: loopBoundContext + ) + } + // SAFTEY: scheduling the callback only potentially throws when invoked off eventloop + do { + try context.eventLoop.scheduleCallback(in: self.frequency, handler: self.scheduledCallbackHandler!) + } catch { + context.fireErrorCaught(error) + } + } + + private struct HTTPDrippingDownloadHandlerScheduledCallbackHandler: NIOScheduledCallbackHandler & Sendable { + var handler: NIOLoopBound + var context: NIOLoopBound + + func handleScheduledCallback(eventLoop: some EventLoop) { + self.handler.value.writeChunk(context: self.context.value) + } + } +} diff --git a/Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift b/Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift new file mode 100644 index 0000000..8fba40d --- /dev/null +++ b/Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift @@ -0,0 +1,90 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPTypes +import NIOCore +import NIOHTTPTypes + +/// HTTP request handler that receives arbitrary bytes and discards them +public final class HTTPReceiveDiscardHandler: ChannelInboundHandler { + + public typealias InboundIn = HTTPRequestPart + public typealias OutboundOut = HTTPResponsePart + + private let expectation: Int? + private var expectationViolated = false + private var received = 0 + + /// Initializes `HTTPReceiveDiscardHandler` + /// - Parameter expectation: how many bytes should be expected. If more + /// bytes are received than expected, an error status code will + /// be sent to the client + public init(expectation: Int?) { + self.expectation = expectation + } + + public func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let part = self.unwrapInboundIn(data) + + switch part { + case .head: + return + case .body(let buffer): + self.received += buffer.readableBytes + + // If the expectation is violated, send 4xx + if let expectation = self.expectation, self.received > expectation { + self.onExpectationViolated(context: context, expectation: expectation) + } + case .end: + if self.expectationViolated { + // Already flushed a response, nothing else to do + return + } + + if let expectation = self.expectation, self.received != expectation { + self.onExpectationViolated(context: context, expectation: expectation) + return + } + + let responseBody = ByteBuffer(string: "Received \(self.received) bytes") + self.writeSimpleResponse(context: context, status: .ok, body: responseBody) + } + } + + private func onExpectationViolated(context: ChannelHandlerContext, expectation: Int) { + self.expectationViolated = true + + let body = ByteBuffer( + string: + "Received in excess of expectation; expected(\(expectation)) received(\(self.received))" + ) + self.writeSimpleResponse(context: context, status: .badRequest, body: body) + } + + private func writeSimpleResponse( + context: ChannelHandlerContext, + status: HTTPResponse.Status, + body: ByteBuffer + ) { + let bodyLen = body.readableBytes + let responseHead = HTTPResponse( + status: status, + headerFields: HTTPFields(dictionaryLiteral: (.contentLength, "\(bodyLen)")) + ) + context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil) + context.write(self.wrapOutboundOut(.body(body)), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + } +} diff --git a/Sources/NIOHTTPResponsiveness/ResponsivenessConfig.swift b/Sources/NIOHTTPResponsiveness/ResponsivenessConfig.swift new file mode 100644 index 0000000..4eb4512 --- /dev/null +++ b/Sources/NIOHTTPResponsiveness/ResponsivenessConfig.swift @@ -0,0 +1,45 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +public struct ResponsivenessConfig: Codable, Hashable, Sendable { + public var version: Int + public var urls: ResponsivenessConfigURLs + + public init(version: Int, urls: ResponsivenessConfigURLs) { + self.version = version + self.urls = urls + } +} + +public struct ResponsivenessConfigURLs: Codable, Hashable, Sendable { + public var largeDownloadURL: String + public var smallDownloadURL: String + public var uploadURL: String + + enum CodingKeys: String, CodingKey { + case largeDownloadURL = "large_download_url" + case smallDownloadURL = "small_download_url" + case uploadURL = "upload_url" + } + + static var largeDownloadSize: Int { 8 * 1_000_000_000 } // 8 * 10^9 + static var smallDownloadSize: Int { 1 } + + public init(scheme: String, authority: String) { + let base = "\(scheme)://\(authority)/responsiveness" + self.largeDownloadURL = "\(base)/download/\(ResponsivenessConfigURLs.largeDownloadSize)" + self.smallDownloadURL = "\(base)/download/\(ResponsivenessConfigURLs.smallDownloadSize)" + self.uploadURL = "\(base)/upload" + } +} diff --git a/Sources/NIOHTTPResponsiveness/SimpleResponsivenessRequestMux.swift b/Sources/NIOHTTPResponsiveness/SimpleResponsivenessRequestMux.swift new file mode 100644 index 0000000..6cc1ff2 --- /dev/null +++ b/Sources/NIOHTTPResponsiveness/SimpleResponsivenessRequestMux.swift @@ -0,0 +1,145 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Algorithms +import HTTPTypes +import NIOCore +import NIOHTTPTypes + +/// Basic request multiplexer that identifies which request type (config, download, upload) is requested and adds the appropriate handler. +/// Once the handler has been added, all data is passed through to the newly added handler. +public final class SimpleResponsivenessRequestMux: ChannelInboundHandler { + + public typealias InboundIn = HTTPRequestPart + public typealias OutboundOut = HTTPResponsePart + + // Predefine some common things we'll need in responses + private static let notFoundBody = ByteBuffer(string: "Not Found") + + // Whether or not we added a handler after us + private var handlerAdded = false + + // Config returned to user that lists responsiveness endpoints + private let responsivenessConfigBuffer: ByteBuffer + + public init(responsivenessConfigBuffer: ByteBuffer) { + self.responsivenessConfigBuffer = responsivenessConfigBuffer + } + + public func channelRead(context: ChannelHandlerContext, data: NIOAny) { + if case let .head(request) = self.unwrapInboundIn(data) { + // TODO: get rid of this altogether and instead create an empty iterator below + guard let path = request.path else { + self.writeSimpleResponse( + context: context, + status: .notFound, + body: SimpleResponsivenessRequestMux.notFoundBody + ) + return + } + + var pathComponents = path.utf8.lazy.split(separator: UInt8(ascii: "?"), maxSplits: 1).makeIterator() + let firstPathComponent = pathComponents.next()! + let queryArgsString = pathComponents.next() + + // Split the path into components + var uriComponentIterator = firstPathComponent.split( + separator: UInt8(ascii: "/"), + maxSplits: 3, + omittingEmptySubsequences: false + ).lazy.map(Substring.init).makeIterator() + + // Handle possible path components + switch ( + request.method, uriComponentIterator.next(), uriComponentIterator.next(), + uriComponentIterator.next(), uriComponentIterator.next().flatMap { Int($0) } + ) { + case (.get, .some(""), .some("responsiveness"), .none, .none): + self.writeSimpleResponse( + context: context, + status: .ok, + body: self.responsivenessConfigBuffer + ) + case (.get, .some(""), .some("responsiveness"), .some("download"), .some(let size)): + self.addHandlerOrInternalError( + context: context, + handler: HTTPDrippingDownloadHandler(count: 1, size: size) + ) + case (.post, .some(""), .some("responsiveness"), .some("upload"), .none): + // Check if we should expect a certain count + var expectation: Int? + if let lengthHeaderValue = request.headerFields[.contentLength] { + if let expectedLength = Int(lengthHeaderValue) { + expectation = expectedLength + } + } + self.addHandlerOrInternalError( + context: context, + handler: HTTPReceiveDiscardHandler(expectation: expectation) + ) + case (_, .some(""), .some("drip"), .none, .none): + if let queryArgsString = queryArgsString { + guard let handler = HTTPDrippingDownloadHandler(queryArgsString: queryArgsString) else { + self.writeSimpleResponse(context: context, status: .badRequest, body: .init()) + return + } + self.addHandlerOrInternalError(context: context, handler: handler) + } else { + self.addHandlerOrInternalError(context: context, handler: HTTPDrippingDownloadHandler()) + } + default: + self.writeSimpleResponse( + context: context, + status: .notFound, + body: SimpleResponsivenessRequestMux.notFoundBody + ) + } + } + + // Only pass through data through if we've actually added a handler. If we didn't add a handler, it's because we + // directly responded. In this case, we don't care about the rest of the request. + if self.handlerAdded { + context.fireChannelRead(data) + } + } + + /// Adding handlers is fallible. If we fail to do it, we should return 500 to the user + private func addHandlerOrInternalError(context: ChannelHandlerContext, handler: ChannelHandler) { + do { + try context.pipeline.syncOperations.addHandler(handler) + self.handlerAdded = true + } catch { + self.writeSimpleResponse( + context: context, + status: .internalServerError, + body: ByteBuffer.init() + ) + } + } + + private func writeSimpleResponse( + context: ChannelHandlerContext, + status: HTTPResponse.Status, + body: ByteBuffer + ) { + let bodyLen = body.readableBytes + let responseHead = HTTPResponse( + status: status, + headerFields: HTTPFields(dictionaryLiteral: (.contentLength, "\(bodyLen)")) + ) + context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil) + context.write(self.wrapOutboundOut(.body(body)), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + } +} diff --git a/Tests/NIOHTTPResponsivenessTests/HTTPDrippingDownloadHandlerTests.swift b/Tests/NIOHTTPResponsivenessTests/HTTPDrippingDownloadHandlerTests.swift new file mode 100644 index 0000000..bff2386 --- /dev/null +++ b/Tests/NIOHTTPResponsivenessTests/HTTPDrippingDownloadHandlerTests.swift @@ -0,0 +1,133 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPTypes +import NIOCore +import NIOEmbedded +import NIOHTTPTypes +import XCTest + +@testable import NIOHTTPResponsiveness + +final class HTTPDrippingDownloadHandlerTests: XCTestCase { + + func testDefault() throws { + let eventLoop = EmbeddedEventLoop() + let channel = EmbeddedChannel(handler: HTTPDrippingDownloadHandler(), loop: eventLoop) + + try channel.writeInbound( + HTTPRequestPart.head(HTTPRequest(method: .get, scheme: "http", authority: "whatever", path: "/drip")) + ) + + eventLoop.run() + + guard case let HTTPResponsePart.head(head) = (try channel.readOutbound())! else { + XCTFail("expected response head") + return + } + XCTAssertEqual(head.status, .ok) + + guard case HTTPResponsePart.end(nil) = (try channel.readOutbound())! else { + XCTFail("expected response end") + return + } + + let _ = try channel.finish() + } + + func testBasic() throws { + try dripTest(count: 2, size: 1024) + } + + func testZeroChunks() throws { + try dripTest(count: 0) + } + + func testNonZeroStatusCode() throws { + try dripTest(count: 1, code: .notAcceptable) + } + + func testZeroChunkSize() throws { + try dripTest(count: 1, size: 0) + } + + func dripTest( + count: Int, + size: Int = 1024, + frequency: TimeAmount = .seconds(1), + delay: TimeAmount = .seconds(5), + code: HTTPResponse.Status = .ok + ) throws { + let eventLoop = EmbeddedEventLoop() + let channel = EmbeddedChannel( + handler: HTTPDrippingDownloadHandler( + count: count, + size: size, + frequency: frequency, + delay: delay, + code: code + ), + loop: eventLoop + ) + + try channel.writeInbound( + HTTPRequestPart.head(HTTPRequest(method: .get, scheme: "http", authority: "whatever", path: nil)) + ) + + // Make sure delay is honored + eventLoop.run() + XCTAssert(try channel.readOutbound() == nil) + + eventLoop.advanceTime(by: delay + .milliseconds(100)) + + guard case let HTTPResponsePart.head(head) = (try channel.readOutbound())! else { + XCTFail("expected response head") + return + } + XCTAssertEqual(head.status, code) + + var chunksReceived = 0 + while chunksReceived < count { + + // Shouldn't need to wait for the first chunk + if chunksReceived > 0 { + eventLoop.advanceTime(by: frequency + .milliseconds(100)) + } + + var chunkBytesReceived = 0 + while chunkBytesReceived < size { + let next: HTTPResponsePart? = try channel.readOutbound() + guard case let .body(dataChunk) = next! else { + XCTFail("expected response data") + return + } + chunkBytesReceived += dataChunk.readableBytes + } + chunksReceived += 1 + + if chunksReceived < count { + let part: HTTPResponsePart? = try channel.readOutbound() + XCTAssert(part == nil) + } + } + + guard case HTTPResponsePart.end(nil) = (try channel.readOutbound())! else { + XCTFail("expected response end") + return + } + + let _ = try channel.finish() + } + +} diff --git a/Tests/NIOHTTPResponsivenessTests/HTTPResponsivenessTests.swift b/Tests/NIOHTTPResponsivenessTests/HTTPResponsivenessTests.swift new file mode 100644 index 0000000..151a23a --- /dev/null +++ b/Tests/NIOHTTPResponsivenessTests/HTTPResponsivenessTests.swift @@ -0,0 +1,172 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPTypes +import NIOCore +import NIOEmbedded +import NIOHTTPTypes +import XCTest + +@testable import NIOHTTPResponsiveness + +final class NIOHTTPResponsivenessTests: XCTestCase { + func download(channel: EmbeddedChannel, n: Int) throws { + // Recv head + try channel.writeInbound( + HTTPRequestPart.head( + HTTPRequest( + method: .get, + scheme: "http", + authority: "localhost:8888", + path: "/responsiveness/download/\(n)" + ) + ) + ) + + // Should get response head with content length + let out: HTTPResponsePart = (try channel.readOutbound())! + guard case let HTTPResponsePart.head(head) = out else { + XCTFail() + return + } + XCTAssertEqual(Int(head.headerFields[.contentLength]!)!, n) + + // Drain response body until completed + var received = 0 + loop: while true { + let out: HTTPResponsePart = (try channel.readOutbound())! + switch out { + case .head: + XCTFail("cannot get head twice") + case .body(let body): + received += body.readableBytes + case .end: + break loop + } + } + XCTAssertEqual(received, n) + + } + + func upload(channel: EmbeddedChannel, length: Int, includeContentLength: Bool) throws { + var head = HTTPRequest( + method: .post, + scheme: "http", + authority: "localhost:8888", + path: "/responsiveness/upload" + ) + if includeContentLength { + head.headerFields[.contentLength] = "\(length)" + } + + // Recv head + try channel.writeInbound(HTTPRequestPart.head(head)) + + // Shouldn't get any immediate response + let out: HTTPResponsePart? = try channel.readOutbound() + XCTAssertNil(out) + + // Send data + var sent = 0 + while sent < length { + let toWrite = min(length - sent, HTTPDrippingDownloadHandler.downloadBodyChunk.readableBytes) + let buf = HTTPDrippingDownloadHandler.downloadBodyChunk.getSlice( + at: HTTPDrippingDownloadHandler.downloadBodyChunk.readerIndex, + length: toWrite + )! + try channel.writeInbound(HTTPRequestPart.body(buf)) + sent += toWrite + } + + // Send fin + try channel.writeInbound(HTTPRequestPart.end(nil)) + + // Get response from server + var part: HTTPResponsePart = (try channel.readOutbound())! + guard case let HTTPResponsePart.head(head) = part else { + XCTFail("expected response head") + return + } + XCTAssertEqual(head.status, .ok) + + // Check response body to confirm server received everything we sent + part = (try channel.readOutbound())! + guard case let HTTPResponsePart.body(body) = part else { + XCTFail("expected response body") + return + } + XCTAssertEqual(String(buffer: body), "Received \(length) bytes") + + // Check server correctly closes the stream + part = (try channel.readOutbound())! + guard case HTTPResponsePart.end(nil) = part else { + XCTFail("expected end") + return + } + } + + private static let defaultValues = [0, 1, 2, 10, 1000, 20000] + + func testDownload() throws { + for val in NIOHTTPResponsivenessTests.defaultValues { + let channel = EmbeddedChannel(handler: HTTPDrippingDownloadHandler(count: 1, size: val)) + try download(channel: channel, n: val) + let _ = try channel.finish() + } + } + + func testUpload() throws { + for val in NIOHTTPResponsivenessTests.defaultValues { + var channel = EmbeddedChannel(handler: HTTPReceiveDiscardHandler(expectation: val)) + try upload(channel: channel, length: val, includeContentLength: true) + let _ = try channel.finish() + + channel = EmbeddedChannel(handler: HTTPReceiveDiscardHandler(expectation: nil)) + try upload(channel: channel, length: val, includeContentLength: false) + let _ = try channel.finish() + } + } + + func testMuxDownload() throws { + for val in NIOHTTPResponsivenessTests.defaultValues { + let channel = EmbeddedChannel( + handler: SimpleResponsivenessRequestMux( + responsivenessConfigBuffer: ByteBuffer(string: "test") + ) + ) + try download(channel: channel, n: val) + let _ = try channel.finish() + } + } + + func testMuxUpload() throws { + for val in NIOHTTPResponsivenessTests.defaultValues { + var channel = EmbeddedChannel( + handler: SimpleResponsivenessRequestMux( + responsivenessConfigBuffer: ByteBuffer(string: "test") + ) + ) + try upload(channel: channel, length: val, includeContentLength: true) + let _ = try channel.finish() + + channel = EmbeddedChannel( + handler: SimpleResponsivenessRequestMux( + responsivenessConfigBuffer: ByteBuffer(string: "test") + ) + ) + try upload(channel: channel, length: val, includeContentLength: false) + let _ = try channel.finish() + } + } +}