From 235d96963d18080eed14237a198bcf6186616fdd Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Thu, 29 Jun 2023 14:53:29 +0330 Subject: [PATCH 01/13] libcurl multi-socket method bindings --- .../org/http4s/curl/unsafe/libcurl.scala | 110 ++++++++++++++++++ project/Versions.scala | 2 +- 2 files changed, 111 insertions(+), 1 deletion(-) diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/libcurl.scala b/curl/src/main/scala/org/http4s/curl/unsafe/libcurl.scala index 0519a0c..fb9a698 100644 --- a/curl/src/main/scala/org/http4s/curl/unsafe/libcurl.scala +++ b/curl/src/main/scala/org/http4s/curl/unsafe/libcurl.scala @@ -27,6 +27,9 @@ private[curl] object libcurl_const { final val CURLOPTTYPE_LONG = 0 final val CURLOPTTYPE_OBJECTPOINT = 10000 final val CURLOPTTYPE_FUNCTIONPOINT = 20000 + final val CURLOPTTYPE_OFF_T = 30000 + final val CURLOPTTYPE_BLOB = 40000 + final val CURLOPTTYPE_STRINGPOINT = CURLOPTTYPE_OBJECTPOINT final val CURLOPTTYPE_SLISTPOINT = CURLOPTTYPE_OBJECTPOINT final val CURLOPT_CUSTOMREQUEST = CURLOPTTYPE_OBJECTPOINT + 36 @@ -44,6 +47,56 @@ private[curl] object libcurl_const { final val CURLOPT_UPLOAD = CURLOPTTYPE_LONG + 46 final val CURLOPT_WS_OPTIONS = CURLOPTTYPE_LONG + 320 + /* This is the socket callback function pointer */ + final val CURLMOPT_SOCKETFUNCTION = CURLOPTTYPE_FUNCTIONPOINT + 1 + + /* This is the argument passed to the socket callback */ + final val CURLMOPT_SOCKETDATA = CURLOPTTYPE_OBJECTPOINT + 2 + + /* set to 1 to enable pipelining for this multi handle */ + final val CURLMOPT_PIPELINING = CURLOPTTYPE_LONG + 3 + + /* This is the timer callback function pointer */ + final val CURLMOPT_TIMERFUNCTION = CURLOPTTYPE_FUNCTIONPOINT + 4 + + /* This is the argument passed to the timer callback */ + final val CURLMOPT_TIMERDATA = CURLOPTTYPE_OBJECTPOINT + 5 + + /* maximum number of entries in the connection cache */ + final val CURLMOPT_MAXCONNECTS = CURLOPTTYPE_LONG + 6 + + /* maximum number of (pipelining) connections to one host */ + final val CURLMOPT_MAX_HOST_CONNECTIONS = CURLOPTTYPE_LONG + 7 + + /* maximum number of requests in a pipeline */ + final val CURLMOPT_MAX_PIPELINE_LENGTH = CURLOPTTYPE_LONG + 8 + + /* a connection with a content-length longer than this + will not be considered for pipelining */ + final val CURLMOPT_CONTENT_LENGTH_PENALTY_SIZE = CURLOPTTYPE_OFF_T + 9 + + /* a connection with a chunk length longer than this + will not be considered for pipelining */ + final val CURLMOPT_CHUNK_LENGTH_PENALTY_SIZE = CURLOPTTYPE_OFF_T + 10 + + /* a list of site names(+port) that are blocked from pipelining */ + final val CURLMOPT_PIPELINING_SITE_BL = CURLOPTTYPE_OBJECTPOINT + 11 + + /* a list of server types that are blocked from pipelining */ + final val CURLMOPT_PIPELINING_SERVER_BL = CURLOPTTYPE_OBJECTPOINT + 12 + + /* maximum number of open connections in total */ + final val CURLMOPT_MAX_TOTAL_CONNECTIONS = CURLOPTTYPE_LONG + 13 + + /* This is the server push callback function pointer */ + final val CURLMOPT_PUSHFUNCTION = CURLOPTTYPE_FUNCTIONPOINT + 14 + + /* This is the argument passed to the server push callback */ + final val CURLMOPT_PUSHDATA = CURLOPTTYPE_OBJECTPOINT + 15 + + /* maximum number of concurrent streams to support on a connection */ + final val CURLMOPT_MAX_CONCURRENT_STREAMS = CURLOPTTYPE_LONG + 16 + final val CURL_HTTP_VERSION_NONE = 0L final val CURL_HTTP_VERSION_1_0 = 1L final val CURL_HTTP_VERSION_1_1 = 2L @@ -74,6 +127,16 @@ private[curl] object libcurl_const { // websocket options flags final val CURLWS_RAW_MODE = 1 << 0 + + final val CURL_CSELECT_IN = 0x01 + final val CURL_CSELECT_OUT = 0x02 + final val CURL_CSELECT_ERR = 0x04 + + final val CURL_POLL_NONE = 0 + final val CURL_POLL_IN = 1 + final val CURL_POLL_OUT = 2 + final val CURL_POLL_INOUT = 3 + final val CURL_POLL_REMOVE = 4 } final private[curl] case class CURLcode(value: CInt) extends AnyVal { @@ -102,6 +165,8 @@ private[curl] object libcurl { type CURLversion = CUnsignedInt + type curl_socket_t = CInt + type curl_slist type curl_version_info_data @@ -112,6 +177,10 @@ private[curl] object libcurl { type read_callback = CFuncPtr4[Ptr[CChar], CSize, CSize, Ptr[Byte], CSize] + type socket_callback = CFuncPtr5[Ptr[CURL], curl_socket_t, CInt, Ptr[Byte], Ptr[Byte], CInt] + + type timer_callback = CFuncPtr3[Ptr[CURLM], CLong, Ptr[Byte], CInt] + type curl_ws_frame = CStruct4[CInt, CInt, Long, Long] // age, flags, offset, bytesleft def curl_version(): Ptr[CChar] = extern @@ -273,6 +342,47 @@ private[curl] object libcurl { @name("curl_multi_strerror") def curl_multi_strerror(code: CURLMcode): Ptr[CChar] = extern + @name("curl_multi_setopt") + def curl_multi_setopt_socketfunction( + curl: Ptr[CURLM], + option: CURLMOPT_SOCKETFUNCTION.type, + callback: socket_callback, + ): CURLMcode = + extern + + @name("curl_multi_setopt") + def curl_multi_setopt_socketdata( + curl: Ptr[CURLM], + option: CURLMOPT_SOCKETDATA.type, + pointer: Ptr[Byte], + ): CURLMcode = + extern + + @name("curl_multi_setopt") + def curl_multi_setopt_timerfunction( + curl: Ptr[CURLM], + option: CURLMOPT_TIMERFUNCTION.type, + callback: timer_callback, + ): CURLMcode = + extern + + @name("curl_multi_setopt") + def curl_multi_setopt_timerdata( + curl: Ptr[CURLM], + option: CURLMOPT_TIMERFUNCTION.type, + pointer: Ptr[Byte], + ): CURLMcode = + extern + + @name("curl_multi_socket_action") + def curl_multi_socket_action( + curl: Ptr[CURLM], + socket: curl_socket_t, + evBitmask: CInt, + runningHandles: Ptr[Int], + ): CURLMcode = + extern + @name("curl_easy_setopt") def curl_easy_setopt_websocket( curl: Ptr[CURL], diff --git a/project/Versions.scala b/project/Versions.scala index a538f22..cc655be 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -1,7 +1,7 @@ object Versions { val scala3 = "3.3.0" val scala213 = "2.13.11" - val catsEffectVersion = "3.5.1" + val catsEffectVersion = "3.6-e9aeb8c" val http4sVersion = "0.23.22" val munitCEVersion = "2.0.0-M3" } From 829ba96923ae6a00e19e8a9569d8bbe80ceb82a2 Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Mon, 3 Jul 2023 22:29:09 +0330 Subject: [PATCH 02/13] Implemented CurlMultiSocket This implementation uses curl multi socket drive, and uses FileDescriptorPoller from cats effect. It allows using this library alongside other cats effect libraries. --- .../org/http4s/curl/http/CurlClient.scala | 3 + .../org/http4s/curl/http/CurlRequest.scala | 65 ++++ .../org/http4s/curl/unsafe/CURLMcode.scala | 30 ++ .../org/http4s/curl/unsafe/CURLcode.scala | 30 ++ .../curl/unsafe/CurlExecutorScheduler.scala | 2 + .../curl/unsafe/CurlMultiSocketImpl.scala | 286 ++++++++++++++++++ .../org/http4s/curl/unsafe/libcurl.scala | 14 +- .../http4s/curl/websocket/Connection.scala | 37 +++ .../http4s/curl/websocket/CurlWSClient.scala | 49 +++ .../scala/CurlClientMultiSocketSuite.scala | 85 ++++++ .../scala/CurlWSClientMultiSocketSuite.scala | 90 ++++++ 11 files changed, 681 insertions(+), 10 deletions(-) create mode 100644 curl/src/main/scala/org/http4s/curl/unsafe/CURLMcode.scala create mode 100644 curl/src/main/scala/org/http4s/curl/unsafe/CURLcode.scala create mode 100644 curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocketImpl.scala create mode 100644 tests/http/src/test/scala/CurlClientMultiSocketSuite.scala create mode 100644 tests/websocket/src/test/scala/CurlWSClientMultiSocketSuite.scala diff --git a/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala b/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala index 6412220..e3bd98c 100644 --- a/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala +++ b/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala @@ -19,10 +19,13 @@ package org.http4s.curl.http import cats.effect._ import org.http4s.client.Client import org.http4s.curl.unsafe.CurlExecutorScheduler +import org.http4s.curl.unsafe.CurlMultiSocket private[curl] object CurlClient { def apply(ec: CurlExecutorScheduler): Client[IO] = Client(CurlRequest(ec, _)) + def multiSocket(ms: CurlMultiSocket): Client[IO] = Client(CurlRequest.applyMultiSocket(ms, _)) + def get: IO[Client[IO]] = IO.executionContext.flatMap { case ec: CurlExecutorScheduler => IO.pure(apply(ec)) case _ => IO.raiseError(new RuntimeException("Not running on CurlExecutorScheduler")) diff --git a/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala b/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala index cd5c7a7..5480f64 100644 --- a/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala +++ b/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala @@ -22,6 +22,7 @@ import org.http4s.Response import org.http4s.curl.internal.Utils import org.http4s.curl.internal._ import org.http4s.curl.unsafe.CurlExecutorScheduler +import org.http4s.curl.unsafe.CurlMultiSocket private[curl] object CurlRequest { private def setup( @@ -78,6 +79,57 @@ private[curl] object CurlRequest { ) ) + private def setup( + handle: CurlEasy, + send: RequestSend, + recv: RequestRecv, + req: Request[IO], + ): Resource[IO, Unit] = + Utils.newZone.flatMap(implicit zone => + CurlSList().evalMap(headers => + IO { + // TODO add in options + // handle.setVerbose(true) + + import org.http4s.curl.unsafe.libcurl_const + import scala.scalanative.unsafe._ + import org.http4s.Header + import org.http4s.HttpVersion + import org.typelevel.ci._ + + handle.setCustomRequest(toCString(req.method.renderString)) + + handle.setUpload(true) + + handle.setUrl(toCString(req.uri.renderString)) + + val httpVersion = req.httpVersion match { + case HttpVersion.`HTTP/1.0` => libcurl_const.CURL_HTTP_VERSION_1_0 + case HttpVersion.`HTTP/1.1` => libcurl_const.CURL_HTTP_VERSION_1_1 + case HttpVersion.`HTTP/2` => libcurl_const.CURL_HTTP_VERSION_2 + case HttpVersion.`HTTP/3` => libcurl_const.CURL_HTTP_VERSION_3 + case _ => libcurl_const.CURL_HTTP_VERSION_NONE + } + handle.setHttpVersion(httpVersion) + + req.headers // curl adds these headers automatically, so we explicitly disable them + .transform(Header.Raw(ci"Expect", "") :: Header.Raw(ci"Transfer-Encoding", "") :: _) + .foreach(header => headers.append(header.toString)) + + handle.setHttpHeader(headers.toPtr) + + handle.setReadData(Utils.toPtr(send)) + handle.setReadFunction(RequestSend.readCallback(_, _, _, _)) + + handle.setHeaderData(Utils.toPtr(recv)) + handle.setHeaderFunction(RequestRecv.headerCallback(_, _, _, _)) + + handle.setWriteData(Utils.toPtr(recv)) + handle.setWriteFunction(RequestRecv.writeCallback(_, _, _, _)) + } + ) + ) + def apply(ec: CurlExecutorScheduler, req: Request[IO]): Resource[IO, Response[IO]] = for { gc <- GCRoot() handle <- CurlEasy() @@ -89,4 +141,17 @@ private[curl] object CurlRequest { _ <- req.body.through(send.pipe).compile.drain.background resp <- recv.response() } yield resp + + def applyMultiSocket(ms: CurlMultiSocket, req: Request[IO]): Resource[IO, Response[IO]] = for { + gc <- GCRoot() + handle <- CurlEasy() + flow <- FlowControl(handle) + send <- RequestSend(flow) + recv <- RequestRecv(flow) + _ <- gc.add(send, recv) + _ <- setup(handle, send, recv, req) + _ <- ms.addHandlerTerminating(handle, recv.onTerminated).toResource + _ <- req.body.through(send.pipe).compile.drain.background + resp <- recv.response() + } yield resp } diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CURLMcode.scala b/curl/src/main/scala/org/http4s/curl/unsafe/CURLMcode.scala new file mode 100644 index 0000000..cbff814 --- /dev/null +++ b/curl/src/main/scala/org/http4s/curl/unsafe/CURLMcode.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2022 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.curl.unsafe + +import org.http4s.curl.CurlError + +import scala.scalanative.unsafe._ + +final private[curl] case class CURLMcode(value: CInt) extends AnyVal { + @inline def isOk: Boolean = value == 0 + @inline def isError: Boolean = value != 0 + @inline def throwOnError: Unit = + if (isError) { + throw CurlError.fromMCode(this) + } +} diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CURLcode.scala b/curl/src/main/scala/org/http4s/curl/unsafe/CURLcode.scala new file mode 100644 index 0000000..b10fa65 --- /dev/null +++ b/curl/src/main/scala/org/http4s/curl/unsafe/CURLcode.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2022 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.curl.unsafe + +import org.http4s.curl.CurlError + +import scala.scalanative.unsafe._ + +final private[curl] case class CURLcode(value: CInt) extends AnyVal { + @inline def isOk: Boolean = value == 0 + @inline def isError: Boolean = value != 0 + @inline def throwOnError: Unit = + if (isError) { + throw CurlError.fromCode(this) + } +} diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CurlExecutorScheduler.scala b/curl/src/main/scala/org/http4s/curl/unsafe/CurlExecutorScheduler.scala index 27df139..02b4d65 100644 --- a/curl/src/main/scala/org/http4s/curl/unsafe/CurlExecutorScheduler.scala +++ b/curl/src/main/scala/org/http4s/curl/unsafe/CurlExecutorScheduler.scala @@ -21,11 +21,13 @@ import cats.effect.kernel.Resource import cats.effect.unsafe.PollingExecutorScheduler import org.http4s.curl.CurlError +import scala.annotation.nowarn import scala.collection.mutable import scala.concurrent.duration.Duration import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ +@nowarn final private[curl] class CurlExecutorScheduler(multiHandle: Ptr[libcurl.CURLM], pollEvery: Int) extends PollingExecutorScheduler(pollEvery) { diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocketImpl.scala b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocketImpl.scala new file mode 100644 index 0000000..8e9293a --- /dev/null +++ b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocketImpl.scala @@ -0,0 +1,286 @@ +/* + * Copyright 2022 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.curl.unsafe + +import cats.effect.FiberIO +import cats.effect.FileDescriptorPollHandle +import cats.effect.FileDescriptorPoller +import cats.effect.IO +import cats.effect.kernel.Ref +import cats.effect.kernel.Resource +import cats.effect.std.AtomicCell +import cats.effect.std.Dispatcher +import cats.syntax.all._ +import org.http4s.curl.CurlError +import org.http4s.curl.internal._ + +import scala.concurrent.duration._ +import scala.scalanative.unsafe._ + +private[curl] trait CurlMultiSocket { + def addHandlerTerminating(easy: CurlEasy, cb: Either[Throwable, Unit] => Unit): IO[Unit] + def addHandlerNonTerminating( + easy: CurlEasy, + cb: Either[Throwable, Unit] => Unit, + ): Resource[IO, Unit] +} + +private[curl] object CurlMultiSocket { + implicit private class OptFibOps(private val f: Option[FiberIO[?]]) extends AnyVal { + def cancel: IO[Unit] = f.fold(IO.unit)(_.cancel) + } + + private val getFDPoller = IO.pollers.flatMap( + _.collectFirst { case poller: FileDescriptorPoller => poller }.liftTo[IO]( + new RuntimeException("Installed PollingSystem does not provide a FileDescriptorPoller") + ) + ) + + private val newCurlMutli = Resource.make(IO { + val multiHandle = libcurl.curl_multi_init() + if (multiHandle == null) + throw new RuntimeException("curl_multi_init") + multiHandle + })(mhandle => + IO { + val code = libcurl.curl_multi_cleanup(mhandle) + if (code.isError) + throw CurlError.fromMCode(code) + } + ) + + private lazy val curlGlobalSetup = { + val initCode = libcurl.curl_global_init(2) + if (initCode.isError) + throw CurlError.fromCode(initCode) + } + + def apply(): Resource[IO, CurlMultiSocket] = for { + _ <- IO(curlGlobalSetup).toResource + handle <- newCurlMutli + fdPoller <- getFDPoller.toResource + disp <- Dispatcher.sequential[IO] + mapping <- AtomicCell[IO].of(Map.empty[libcurl.curl_socket_t, Monitoring]).toResource + timeout <- IO.ref[Option[FiberIO[Unit]]](None).toResource + cms = new CurlMultiSocketImpl(handle, fdPoller, mapping, disp, timeout) + _ <- setup(cms, handle).toResource + } yield cms + + private def setup(cms: CurlMultiSocketImpl, handle: Ptr[libcurl.CURLM]) = IO { + val data = Utils.toPtr(cms) + + libcurl + .curl_multi_setopt_timerdata( + handle, + libcurl_const.CURLMOPT_TIMERDATA, + data, + ) + .throwOnError + + libcurl + .curl_multi_setopt_socketdata( + handle, + libcurl_const.CURLMOPT_SOCKETDATA, + data, + ) + .throwOnError + + libcurl + .curl_multi_setopt_timerfunction( + handle, + libcurl_const.CURLMOPT_TIMERFUNCTION, + onTimeout(_, _, _), + ) + .throwOnError + + libcurl + .curl_multi_setopt_socketfunction( + handle, + libcurl_const.CURLMOPT_SOCKETFUNCTION, + onSocket(_, _, _, _, _), + ) + .throwOnError + + } *> cms.notifyTimeout + + final private case class Monitoring( + read: Option[FiberIO[Nothing]], + write: Option[FiberIO[Nothing]], + handle: FileDescriptorPollHandle, + unregister: IO[Unit], + ) { + def clean: IO[Unit] = IO.uncancelable(_ => read.cancel !> write.cancel !> unregister) + } + + private def onTimeout( + mutli: Ptr[libcurl.CURLM], + timeoutMs: CLong, + userdata: Ptr[Byte], + ): CInt = { + val d = Utils.fromPtr[CurlMultiSocketImpl](userdata) + + if (timeoutMs == -1) { + d.removeTimeout + } else { + d.setTimeout(timeoutMs) + } + 0 + } + + private def onSocket( + easy: Ptr[libcurl.CURL], + fd: libcurl.curl_socket_t, + what: Int, + userdata: Ptr[Byte], + socketdata: Ptr[Byte], + ): CInt = { + val d = Utils.fromPtr[CurlMultiSocketImpl](userdata) + + what match { + case libcurl_const.CURL_POLL_IN => d.addFD(fd, true, false) + case libcurl_const.CURL_POLL_OUT => d.addFD(fd, false, true) + case libcurl_const.CURL_POLL_INOUT => d.addFD(fd, true, true) + case libcurl_const.CURL_POLL_REMOVE => d.remove(fd) + case other => throw new UnknownError(s"Received unknown socket request: $other!") + } + + 0 + } + + final private class CurlMultiSocketImpl( + multiHandle: Ptr[libcurl.CURLM], + fdpoller: FileDescriptorPoller, + mapping: AtomicCell[IO, Map[libcurl.curl_socket_t, Monitoring]], + disp: Dispatcher[IO], + timeout: Ref[IO, Option[FiberIO[Unit]]], + ) extends CurlMultiSocket { + + private val callbacks = + scala.collection.mutable.Map[Ptr[libcurl.CURL], Either[Throwable, Unit] => Unit]() + + override def addHandlerTerminating( + easy: CurlEasy, + cb: Either[Throwable, Unit] => Unit, + ): IO[Unit] = IO { + libcurl.curl_multi_add_handle(multiHandle, easy.curl).throwOnError + callbacks(easy.curl) = cb + } + + override def addHandlerNonTerminating( + easy: CurlEasy, + cb: Either[Throwable, Unit] => Unit, + ): Resource[IO, Unit] = + Resource.make(addHandlerTerminating(easy, cb))(_ => + IO { + libcurl.curl_multi_remove_handle(multiHandle, easy.curl).throwOnError + callbacks.remove(easy.curl).foreach(_(Right(()))) + } + ) + + def addFD(fd: libcurl.curl_socket_t, read: Boolean, write: Boolean): Unit = + disp.unsafeRunAndForget { + + val newMonitor = fdpoller.registerFileDescriptor(fd, read, write).allocated.flatMap { + case (handle, unregister) => + ( + Option.when(read)(readLoop(fd, handle)).sequence, + Option.when(write)(writeLoop(fd, handle)).sequence, + ) + .mapN(Monitoring(_, _, handle, unregister)) + } + + IO.uncancelable(_ => + mapping.evalUpdate { m => + m.get(fd) match { + case None => + newMonitor.map(m.updated(fd, _)) + case Some(s: Monitoring) => + s.clean *> newMonitor.map(m.updated(fd, _)) + } + } + ) + } + + def remove(fd: libcurl.curl_socket_t): Unit = + disp.unsafeRunAndForget( + IO.uncancelable(_ => + mapping.evalUpdate { m => + m.get(fd) match { + case None => IO(m) + case Some(s) => s.clean.as(m - fd) + } + } + ) + ) + + def setTimeout(duration: Long): Unit = disp.unsafeRunAndForget( + (IO.sleep(duration.millis) *> notifyTimeout).start.flatMap(f => + timeout.getAndSet(Some(f)).flatMap(_.cancel) + ) + ) + + def removeTimeout: Unit = disp.unsafeRunAndForget( + timeout.getAndSet(None).flatMap(_.cancel) + ) + + def notifyTimeout: IO[Unit] = IO { + val running = stackalloc[Int]() + libcurl + .curl_multi_socket_action(multiHandle, libcurl_const.CURL_SOCKET_TIMEOUT, 0, running) + .throwOnError + + postAction + } + + private def postAction = while ({ + val msgsInQueue = stackalloc[CInt]() + val info = libcurl.curl_multi_info_read(multiHandle, msgsInQueue) + + if (info != null) { + val curMsg = libcurl.curl_CURLMsg_msg(info) + if (curMsg == libcurl_const.CURLMSG_DONE) { + val handle = libcurl.curl_CURLMsg_easy_handle(info) + callbacks.remove(handle).foreach { cb => + val result = libcurl.curl_CURLMsg_data_result(info) + cb( + if (result.isOk) Right(()) + else Left(CurlError.fromCode(result)) + ) + } + + val code = libcurl.curl_multi_remove_handle(multiHandle, handle) + if (code.isError) + throw CurlError.fromMCode(code) + } + true + } else false + }) {} + + private def action(fd: libcurl.curl_socket_t, ev: CInt) = IO { + val running = stackalloc[Int]() + libcurl.curl_multi_socket_action(multiHandle, fd, ev, running) + + postAction + + Left(()) + } + private def readLoop(fd: libcurl.curl_socket_t, p: FileDescriptorPollHandle) = + p.pollReadRec(())(_ => action(fd, libcurl_const.CURL_CSELECT_IN)).start + private def writeLoop(fd: libcurl.curl_socket_t, p: FileDescriptorPollHandle) = + p.pollWriteRec(())(_ => action(fd, libcurl_const.CURL_CSELECT_OUT)).start + } +} diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/libcurl.scala b/curl/src/main/scala/org/http4s/curl/unsafe/libcurl.scala index fb9a698..976b5ea 100644 --- a/curl/src/main/scala/org/http4s/curl/unsafe/libcurl.scala +++ b/curl/src/main/scala/org/http4s/curl/unsafe/libcurl.scala @@ -128,6 +128,9 @@ private[curl] object libcurl_const { // websocket options flags final val CURLWS_RAW_MODE = 1 << 0 + final val CURL_SOCKET_BAD = -1 + final val CURL_SOCKET_TIMEOUT = CURL_SOCKET_BAD + final val CURL_CSELECT_IN = 0x01 final val CURL_CSELECT_OUT = 0x02 final val CURL_CSELECT_ERR = 0x04 @@ -139,15 +142,6 @@ private[curl] object libcurl_const { final val CURL_POLL_REMOVE = 4 } -final private[curl] case class CURLcode(value: CInt) extends AnyVal { - @inline def isOk: Boolean = value == 0 - @inline def isError: Boolean = value != 0 -} -final private[curl] case class CURLMcode(value: CInt) extends AnyVal { - @inline def isOk: Boolean = value == 0 - @inline def isError: Boolean = value != 0 -} - @link("curl") @extern private[curl] object libcurl { @@ -369,7 +363,7 @@ private[curl] object libcurl { @name("curl_multi_setopt") def curl_multi_setopt_timerdata( curl: Ptr[CURLM], - option: CURLMOPT_TIMERFUNCTION.type, + option: CURLMOPT_TIMERDATA.type, pointer: Ptr[Byte], ): CURLMcode = extern diff --git a/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala b/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala index bd9a5d7..692e8b8 100644 --- a/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala +++ b/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala @@ -29,6 +29,7 @@ import org.http4s.client.websocket._ import org.http4s.curl.internal.Utils import org.http4s.curl.internal._ import org.http4s.curl.unsafe.CurlExecutorScheduler +import org.http4s.curl.unsafe.CurlMultiSocket import org.http4s.curl.unsafe.libcurl import org.http4s.curl.unsafe.libcurl_const import scodec.bits.ByteVector @@ -242,6 +243,42 @@ private object Connection { _ <- estab.get.flatMap(IO.fromEither).toResource } yield con + def apply( + req: WSRequest, + ms: CurlMultiSocket, + recvBufferSize: Int, + pauseOn: Int, + resumeOn: Int, + verbose: Boolean, + ): Resource[IO, Connection] = for { + gc <- GCRoot() + dispatcher <- Dispatcher.sequential[IO] + recvQ <- Queue.bounded[IO, Option[WSFrame]](recvBufferSize).toResource + recv <- Ref[SyncIO].of(Option.empty[Receiving]).to[IO].toResource + estab <- IO.deferred[Either[Throwable, Unit]].toResource + handler <- CurlEasy() + brk <- Breaker( + handler, + capacity = recvBufferSize, + close = resumeOn, + open = pauseOn, + verbose, + ).toResource + con = new Connection( + handler, + recvQ, + recv, + dispatcher, + estab, + brk, + ) + _ <- setup(req, verbose)(con) + _ <- gc.add(con) + _ <- ms.addHandlerNonTerminating(handler, con.onTerminated) + // Wait until established or throw error + _ <- estab.get.flatMap(IO.fromEither).toResource + } yield con + } sealed private trait ReceivingType extends Serializable with Product diff --git a/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala b/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala index 276d977..b21089d 100644 --- a/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala +++ b/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala @@ -22,6 +22,7 @@ import cats.implicits._ import org.http4s.client.websocket.WSFrame._ import org.http4s.client.websocket._ import org.http4s.curl.unsafe.CurlExecutorScheduler +import org.http4s.curl.unsafe.CurlMultiSocket import org.http4s.curl.unsafe.CurlRuntime import org.http4s.curl.unsafe.libcurl_const import scodec.bits.ByteVector @@ -89,4 +90,52 @@ private[curl] object CurlWSClient { ) } } + + def apply( + ms: CurlMultiSocket, + recvBufferSize: Int = 100, + pauseOn: Int = 10, + resumeOn: Int = 30, + verbose: Boolean = false, + ): Option[WSClient[IO]] = + Option.when(CurlRuntime.isWebsocketAvailable && CurlRuntime.curlVersionNumber >= 0x75700) { + WSClient(true) { req => + Connection(req, ms, recvBufferSize, pauseOn, resumeOn, verbose) + .map(con => + new WSConnection[IO] { + override def send(wsf: WSFrame): IO[Unit] = wsf match { + case Close(_, _) => + val flags = libcurl_const.CURLWS_CLOSE + con.send(flags, ByteVector.empty) + case Ping(data) => + val flags = libcurl_const.CURLWS_PING + con.send(flags, data) + case Pong(data) => + val flags = libcurl_const.CURLWS_PONG + con.send(flags, data) + case Text(data, true) => + val flags = libcurl_const.CURLWS_TEXT + val bv = + ByteVector.encodeUtf8(data).getOrElse(throw InvalidTextFrame) + con.send(flags, bv) + case Binary(data, true) => + val flags = libcurl_const.CURLWS_BINARY + con.send(flags, data) + case _ => + // NOTE curl needs to know total amount of fragment size in first send + // and it is not compatible with current websocket interface in http4s + IO.raiseError(PartialFragmentFrame) + } + + override def sendMany[G[_]: Foldable, A <: WSFrame](wsfs: G[A]): IO[Unit] = + wsfs.traverse_(send) + + override def receive: IO[Option[WSFrame]] = con.receive + + override def subprotocol: Option[String] = None + + } + ) + } + } } diff --git a/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala b/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala new file mode 100644 index 0000000..39f95ea --- /dev/null +++ b/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala @@ -0,0 +1,85 @@ +/* + * Copyright 2022 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.curl + +import cats.effect.IO +import cats.effect.SyncIO +import cats.effect.std.Random +import cats.syntax.all._ +import munit.CatsEffectSuite +import org.http4s.Method._ +import org.http4s.Request +import org.http4s.Status +import org.http4s.client.Client +import org.http4s.curl.unsafe.CurlMultiSocket +import org.http4s.syntax.all._ + +class CurlClientMultiSocketSuite extends CatsEffectSuite { + + val clientFixture: SyncIO[FunFixture[Client[IO]]] = ResourceFunFixture( + CurlMultiSocket().map(http.CurlClient.multiSocket(_)) + ) + + clientFixture.test("3 get echos") { client => + client + .expect[String]("http://localhost:8080/http") + .map(_.nonEmpty) + .assert + .parReplicateA_(3) + } + + clientFixture.test("500") { client => + client + .statusFromString("http://localhost:8080/http/500") + .assertEquals(Status.InternalServerError) + } + + clientFixture.test("unexpected") { client => + client + .expect[String]("http://localhost:8080/http/500") + .attempt + .map(_.isLeft) + .assert + } + + clientFixture.test("error") { client => + client.expect[String]("unsupported://server").intercept[CurlError] + } + + clientFixture.test("error") { client => + client.expect[String]("").intercept[CurlError] + } + + clientFixture.test("3 post echos") { client => + Random.scalaUtilRandom[IO].flatMap { random => + random + .nextString(8) + .flatMap { s => + val msg = s"hello postman $s" + client + .expect[String]( + Request[IO](POST, uri = uri"http://localhost:8080/http/echo").withEntity(msg) + ) + .flatTap(IO.println) + .map(_.contains(msg)) + .assert + } + .parReplicateA_(3) + } + } + +} diff --git a/tests/websocket/src/test/scala/CurlWSClientMultiSocketSuite.scala b/tests/websocket/src/test/scala/CurlWSClientMultiSocketSuite.scala new file mode 100644 index 0000000..8997f35 --- /dev/null +++ b/tests/websocket/src/test/scala/CurlWSClientMultiSocketSuite.scala @@ -0,0 +1,90 @@ +/* + * Copyright 2022 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.curl + +import cats.effect.IO +import cats.syntax.all._ +import munit.CatsEffectSuite +import org.http4s.client.websocket.WSFrame +import org.http4s.client.websocket.WSRequest +import org.http4s.curl.unsafe.CurlMultiSocket +import org.http4s.curl.websocket.CurlWSClient +import org.http4s.implicits._ + +class CurlWSClientMultiSocketSuite extends CatsEffectSuite { + + private val clientFixture = ResourceFunFixture( + CurlMultiSocket().evalMap( + CurlWSClient(_).liftTo[IO]( + new RuntimeException("websocket client is not supported in this environment") + ) + ) + ) + + clientFixture.test("websocket echo") { + val frames = List.range(1, 5).map(i => WSFrame.Text(s"text $i")) + + _.connectHighLevel(WSRequest(uri"ws://localhost:8080/ws/echo")) + .use(con => + con.receiveStream + .take(4) + .evalTap(IO.println) + .compile + .toList <& (frames.traverse(con.send(_))) + ) + .assertEquals(frames) + } + + clientFixture.test("websocket bounded") { + _.connectHighLevel(WSRequest(uri"ws://localhost:8080/ws/bounded")) + .use(con => + con.receiveStream + .evalTap(IO.println) + .compile + .toList + ) + .assertEquals(List(WSFrame.Text("everything"))) + } + + clientFixture.test("websocket closed") { + _.connectHighLevel(WSRequest(uri"ws://localhost:8080/ws/closed")) + .use(con => con.receiveStream.compile.toList) + .assertEquals(Nil) + .parReplicateA_(4) + } + + clientFixture.test("error") { client => + client + .connectHighLevel(WSRequest(uri"")) + .use_ + .intercept[CurlError] + } + + clientFixture.test("error") { client => + client + .connectHighLevel(WSRequest(uri"server")) + .use_ + .intercept[CurlError] + } + + clientFixture.test("invalid protocol") { client => + client + .connectHighLevel(WSRequest(uri"http://localhost:8080/http")) + .use_ + .intercept[IllegalArgumentException] + } +} From 349f22f3b4a9ef108f6daa02f6ceba6a57978083 Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Tue, 4 Jul 2023 21:24:21 +0330 Subject: [PATCH 03/13] Proper clean up for CurlMultiSocket --- ...SocketImpl.scala => CurlMultiSocket.scala} | 169 +++++++++++------- 1 file changed, 106 insertions(+), 63 deletions(-) rename curl/src/main/scala/org/http4s/curl/unsafe/{CurlMultiSocketImpl.scala => CurlMultiSocket.scala} (67%) diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocketImpl.scala b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala similarity index 67% rename from curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocketImpl.scala rename to curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala index 8e9293a..f9c4685 100644 --- a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocketImpl.scala +++ b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala @@ -31,14 +31,6 @@ import org.http4s.curl.internal._ import scala.concurrent.duration._ import scala.scalanative.unsafe._ -private[curl] trait CurlMultiSocket { - def addHandlerTerminating(easy: CurlEasy, cb: Either[Throwable, Unit] => Unit): IO[Unit] - def addHandlerNonTerminating( - easy: CurlEasy, - cb: Either[Throwable, Unit] => Unit, - ): Resource[IO, Unit] -} - private[curl] object CurlMultiSocket { implicit private class OptFibOps(private val f: Option[FiberIO[?]]) extends AnyVal { def cancel: IO[Unit] = f.fold(IO.unit)(_.cancel) @@ -74,49 +66,12 @@ private[curl] object CurlMultiSocket { handle <- newCurlMutli fdPoller <- getFDPoller.toResource disp <- Dispatcher.sequential[IO] - mapping <- AtomicCell[IO].of(Map.empty[libcurl.curl_socket_t, Monitoring]).toResource + mapping <- AtomicCell[IO].of(State.empty).toResource timeout <- IO.ref[Option[FiberIO[Unit]]](None).toResource cms = new CurlMultiSocketImpl(handle, fdPoller, mapping, disp, timeout) - _ <- setup(cms, handle).toResource + _ <- cms.setup } yield cms - private def setup(cms: CurlMultiSocketImpl, handle: Ptr[libcurl.CURLM]) = IO { - val data = Utils.toPtr(cms) - - libcurl - .curl_multi_setopt_timerdata( - handle, - libcurl_const.CURLMOPT_TIMERDATA, - data, - ) - .throwOnError - - libcurl - .curl_multi_setopt_socketdata( - handle, - libcurl_const.CURLMOPT_SOCKETDATA, - data, - ) - .throwOnError - - libcurl - .curl_multi_setopt_timerfunction( - handle, - libcurl_const.CURLMOPT_TIMERFUNCTION, - onTimeout(_, _, _), - ) - .throwOnError - - libcurl - .curl_multi_setopt_socketfunction( - handle, - libcurl_const.CURLMOPT_SOCKETFUNCTION, - onSocket(_, _, _, _, _), - ) - .throwOnError - - } *> cms.notifyTimeout - final private case class Monitoring( read: Option[FiberIO[Nothing]], write: Option[FiberIO[Nothing]], @@ -126,6 +81,19 @@ private[curl] object CurlMultiSocket { def clean: IO[Unit] = IO.uncancelable(_ => read.cancel !> write.cancel !> unregister) } + sealed private trait State + private object State { + val empty: State = Active() + final case class Active(monitors: Map[libcurl.curl_socket_t, Monitoring] = Map.empty) + extends State { + def add(fd: libcurl.curl_socket_t, monitor: Monitoring): Active = copy( + monitors.updated(fd, monitor) + ) + def remove(fd: libcurl.curl_socket_t): Active = copy(monitors - fd) + } + case object Released extends State + } + private def onTimeout( mutli: Ptr[libcurl.CURLM], timeoutMs: CLong, @@ -164,11 +132,73 @@ private[curl] object CurlMultiSocket { final private class CurlMultiSocketImpl( multiHandle: Ptr[libcurl.CURLM], fdpoller: FileDescriptorPoller, - mapping: AtomicCell[IO, Map[libcurl.curl_socket_t, Monitoring]], + mapping: AtomicCell[IO, State], disp: Dispatcher[IO], timeout: Ref[IO, Option[FiberIO[Unit]]], ) extends CurlMultiSocket { + private def init = IO { + val data = Utils.toPtr(this) + + libcurl + .curl_multi_setopt_timerdata( + multiHandle, + libcurl_const.CURLMOPT_TIMERDATA, + data, + ) + .throwOnError + + libcurl + .curl_multi_setopt_socketdata( + multiHandle, + libcurl_const.CURLMOPT_SOCKETDATA, + data, + ) + .throwOnError + + libcurl + .curl_multi_setopt_timerfunction( + multiHandle, + libcurl_const.CURLMOPT_TIMERFUNCTION, + onTimeout(_, _, _), + ) + .throwOnError + + libcurl + .curl_multi_setopt_socketfunction( + multiHandle, + libcurl_const.CURLMOPT_SOCKETFUNCTION, + onSocket(_, _, _, _, _), + ) + .throwOnError + + } *> notifyTimeout + + private def cleanup = + removeTimeoutIO !> mapping.evalUpdate { + case State.Active(monitors) => + // First clean all monitors, this ensures that we don't call any + // curl callbacks afterwards, and callback cleaning and notifications + // is deterministic. + monitors.values.toList.traverse(_.clean) !> IO { + val error = new InterruptedException("Runtime shutdown!") + + // Remove and notify all easy handles + // Note that we do this in mapping.evalUpdate in order to block + // other new usages while cleaning up + callbacks.foreach { case (easy, cb) => + libcurl.curl_multi_remove_handle(multiHandle, easy).throwOnError + cb(Left(error)) + } + callbacks.clear() + }.as(State.Released) + case State.Released => + // It must not happen, but we leave a clue here if it happened! + IO.raiseError(new IllegalStateException("Cannot clean a released resource!")) + } + + def setup: Resource[IO, Unit] = Resource.make(init)(_ => cleanup) + private val callbacks = scala.collection.mutable.Map[Ptr[libcurl.CURL], Either[Throwable, Unit] => Unit]() @@ -204,13 +234,16 @@ private[curl] object CurlMultiSocket { } IO.uncancelable(_ => - mapping.evalUpdate { m => - m.get(fd) match { - case None => - newMonitor.map(m.updated(fd, _)) - case Some(s: Monitoring) => - s.clean *> newMonitor.map(m.updated(fd, _)) - } + mapping.evalUpdate { + case state @ State.Active(monitors) => + monitors.get(fd) match { + case None => + newMonitor.map(state.add(fd, _)) + case Some(s: Monitoring) => + s.clean *> newMonitor.map(state.add(fd, _)) + } + case State.Released => + IO.raiseError(new IllegalStateException("Runtime is already closed!")) } ) } @@ -218,11 +251,14 @@ private[curl] object CurlMultiSocket { def remove(fd: libcurl.curl_socket_t): Unit = disp.unsafeRunAndForget( IO.uncancelable(_ => - mapping.evalUpdate { m => - m.get(fd) match { - case None => IO(m) - case Some(s) => s.clean.as(m - fd) - } + mapping.evalUpdate { + case state @ State.Active(monitors) => + monitors.get(fd) match { + case None => IO(state) + case Some(s) => s.clean.as(state.remove(fd)) + } + case State.Released => + IO.raiseError(new IllegalStateException("Runtime is already closed!")) } ) ) @@ -233,9 +269,8 @@ private[curl] object CurlMultiSocket { ) ) - def removeTimeout: Unit = disp.unsafeRunAndForget( - timeout.getAndSet(None).flatMap(_.cancel) - ) + private def removeTimeoutIO = timeout.getAndSet(None).flatMap(_.cancel) + def removeTimeout: Unit = disp.unsafeRunAndForget(removeTimeoutIO) def notifyTimeout: IO[Unit] = IO { val running = stackalloc[Int]() @@ -284,3 +319,11 @@ private[curl] object CurlMultiSocket { p.pollWriteRec(())(_ => action(fd, libcurl_const.CURL_CSELECT_OUT)).start } } + +private[curl] trait CurlMultiSocket { + def addHandlerTerminating(easy: CurlEasy, cb: Either[Throwable, Unit] => Unit): IO[Unit] + def addHandlerNonTerminating( + easy: CurlEasy, + cb: Either[Throwable, Unit] => Unit, + ): Resource[IO, Unit] +} From 5a2fa32e4498676920e8fd542e41732b297ef151 Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Wed, 5 Jul 2023 08:42:46 +0330 Subject: [PATCH 04/13] Extracted CurlMulti --- .../org/http4s/curl/http/CurlClient.scala | 4 +-- .../org/http4s/curl/http/CurlRequest.scala | 3 +- .../org/http4s/curl/internal/CurlMulti.scala | 28 +++++++++++++++++++ .../http4s/curl/unsafe/CurlMultiSocket.scala | 12 ++------ .../http4s/curl/websocket/Connection.scala | 3 +- .../http4s/curl/websocket/CurlWSClient.scala | 4 +-- 6 files changed, 36 insertions(+), 18 deletions(-) create mode 100644 curl/src/main/scala/org/http4s/curl/internal/CurlMulti.scala diff --git a/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala b/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala index e3bd98c..49258e8 100644 --- a/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala +++ b/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala @@ -18,13 +18,13 @@ package org.http4s.curl.http import cats.effect._ import org.http4s.client.Client +import org.http4s.curl.internal.CurlMulti import org.http4s.curl.unsafe.CurlExecutorScheduler -import org.http4s.curl.unsafe.CurlMultiSocket private[curl] object CurlClient { def apply(ec: CurlExecutorScheduler): Client[IO] = Client(CurlRequest(ec, _)) - def multiSocket(ms: CurlMultiSocket): Client[IO] = Client(CurlRequest.applyMultiSocket(ms, _)) + def multiSocket(ms: CurlMulti): Client[IO] = Client(CurlRequest.applyMultiSocket(ms, _)) def get: IO[Client[IO]] = IO.executionContext.flatMap { case ec: CurlExecutorScheduler => IO.pure(apply(ec)) diff --git a/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala b/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala index 5480f64..baa1bc4 100644 --- a/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala +++ b/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala @@ -22,7 +22,6 @@ import org.http4s.Response import org.http4s.curl.internal.Utils import org.http4s.curl.internal._ import org.http4s.curl.unsafe.CurlExecutorScheduler -import org.http4s.curl.unsafe.CurlMultiSocket private[curl] object CurlRequest { private def setup( @@ -142,7 +141,7 @@ private[curl] object CurlRequest { resp <- recv.response() } yield resp - def applyMultiSocket(ms: CurlMultiSocket, req: Request[IO]): Resource[IO, Response[IO]] = for { + def applyMultiSocket(ms: CurlMulti, req: Request[IO]): Resource[IO, Response[IO]] = for { gc <- GCRoot() handle <- CurlEasy() flow <- FlowControl(handle) diff --git a/curl/src/main/scala/org/http4s/curl/internal/CurlMulti.scala b/curl/src/main/scala/org/http4s/curl/internal/CurlMulti.scala new file mode 100644 index 0000000..ff31be4 --- /dev/null +++ b/curl/src/main/scala/org/http4s/curl/internal/CurlMulti.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2022 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.curl.internal + +import cats.effect.IO +import cats.effect.kernel.Resource + +private[curl] trait CurlMulti { + def addHandlerTerminating(easy: CurlEasy, cb: Either[Throwable, Unit] => Unit): IO[Unit] + def addHandlerNonTerminating( + easy: CurlEasy, + cb: Either[Throwable, Unit] => Unit, + ): Resource[IO, Unit] +} diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala index f9c4685..3487d91 100644 --- a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala +++ b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala @@ -61,7 +61,7 @@ private[curl] object CurlMultiSocket { throw CurlError.fromCode(initCode) } - def apply(): Resource[IO, CurlMultiSocket] = for { + def apply(): Resource[IO, CurlMulti] = for { _ <- IO(curlGlobalSetup).toResource handle <- newCurlMutli fdPoller <- getFDPoller.toResource @@ -135,7 +135,7 @@ private[curl] object CurlMultiSocket { mapping: AtomicCell[IO, State], disp: Dispatcher[IO], timeout: Ref[IO, Option[FiberIO[Unit]]], - ) extends CurlMultiSocket { + ) extends CurlMulti { private def init = IO { val data = Utils.toPtr(this) @@ -319,11 +319,3 @@ private[curl] object CurlMultiSocket { p.pollWriteRec(())(_ => action(fd, libcurl_const.CURL_CSELECT_OUT)).start } } - -private[curl] trait CurlMultiSocket { - def addHandlerTerminating(easy: CurlEasy, cb: Either[Throwable, Unit] => Unit): IO[Unit] - def addHandlerNonTerminating( - easy: CurlEasy, - cb: Either[Throwable, Unit] => Unit, - ): Resource[IO, Unit] -} diff --git a/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala b/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala index 692e8b8..7672c4f 100644 --- a/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala +++ b/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala @@ -29,7 +29,6 @@ import org.http4s.client.websocket._ import org.http4s.curl.internal.Utils import org.http4s.curl.internal._ import org.http4s.curl.unsafe.CurlExecutorScheduler -import org.http4s.curl.unsafe.CurlMultiSocket import org.http4s.curl.unsafe.libcurl import org.http4s.curl.unsafe.libcurl_const import scodec.bits.ByteVector @@ -245,7 +244,7 @@ private object Connection { def apply( req: WSRequest, - ms: CurlMultiSocket, + ms: CurlMulti, recvBufferSize: Int, pauseOn: Int, resumeOn: Int, diff --git a/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala b/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala index b21089d..81ce551 100644 --- a/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala +++ b/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala @@ -21,8 +21,8 @@ import cats.effect.IO import cats.implicits._ import org.http4s.client.websocket.WSFrame._ import org.http4s.client.websocket._ +import org.http4s.curl.internal.CurlMulti import org.http4s.curl.unsafe.CurlExecutorScheduler -import org.http4s.curl.unsafe.CurlMultiSocket import org.http4s.curl.unsafe.CurlRuntime import org.http4s.curl.unsafe.libcurl_const import scodec.bits.ByteVector @@ -92,7 +92,7 @@ private[curl] object CurlWSClient { } def apply( - ms: CurlMultiSocket, + ms: CurlMulti, recvBufferSize: Int = 100, pauseOn: Int = 10, resumeOn: Int = 30, From 7eee341524063bbd3e45875c7a96ecd740d9884e Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Wed, 5 Jul 2023 08:44:15 +0330 Subject: [PATCH 05/13] Runtimes use CurlMultiDriver --- .../org/http4s/curl/http/CurlClient.scala | 11 +- .../org/http4s/curl/http/CurlRequest.scala | 2 +- .../org/http4s/curl/internal/CurlMulti.scala | 99 ++++++++++++++++- .../curl/internal/CurlMultiDriver.scala | 48 ++++++++ .../curl/unsafe/CurlMultiPerformPoller.scala | 105 ++++++++++++++++++ .../http4s/curl/unsafe/CurlMultiSocket.scala | 99 ++++------------- .../http4s/curl/websocket/Connection.scala | 2 +- .../http4s/curl/websocket/CurlWSClient.scala | 4 +- .../http/src/test/scala/CurlClientSuite.scala | 10 +- 9 files changed, 284 insertions(+), 96 deletions(-) create mode 100644 curl/src/main/scala/org/http4s/curl/internal/CurlMultiDriver.scala create mode 100644 curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiPerformPoller.scala diff --git a/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala b/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala index 49258e8..0f899e0 100644 --- a/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala +++ b/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala @@ -18,16 +18,23 @@ package org.http4s.curl.http import cats.effect._ import org.http4s.client.Client -import org.http4s.curl.internal.CurlMulti +import org.http4s.curl.internal.CurlMultiDriver import org.http4s.curl.unsafe.CurlExecutorScheduler +import org.http4s.curl.unsafe.CurlMultiPerformPoller +import org.http4s.curl.unsafe.CurlMultiSocket private[curl] object CurlClient { def apply(ec: CurlExecutorScheduler): Client[IO] = Client(CurlRequest(ec, _)) - def multiSocket(ms: CurlMulti): Client[IO] = Client(CurlRequest.applyMultiSocket(ms, _)) + def multiSocket(ms: CurlMultiDriver): Client[IO] = Client(CurlRequest.applyMultiSocket(ms, _)) def get: IO[Client[IO]] = IO.executionContext.flatMap { case ec: CurlExecutorScheduler => IO.pure(apply(ec)) case _ => IO.raiseError(new RuntimeException("Not running on CurlExecutorScheduler")) } + + val default: Resource[IO, Client[IO]] = IO.pollers.toResource.flatMap { + _.collectFirst { case mp: CurlMultiPerformPoller => Resource.eval(IO(multiSocket(mp))) } + .getOrElse(CurlMultiSocket().map(multiSocket)) + } } diff --git a/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala b/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala index baa1bc4..5eeebc3 100644 --- a/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala +++ b/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala @@ -141,7 +141,7 @@ private[curl] object CurlRequest { resp <- recv.response() } yield resp - def applyMultiSocket(ms: CurlMulti, req: Request[IO]): Resource[IO, Response[IO]] = for { + def applyMultiSocket(ms: CurlMultiDriver, req: Request[IO]): Resource[IO, Response[IO]] = for { gc <- GCRoot() handle <- CurlEasy() flow <- FlowControl(handle) diff --git a/curl/src/main/scala/org/http4s/curl/internal/CurlMulti.scala b/curl/src/main/scala/org/http4s/curl/internal/CurlMulti.scala index ff31be4..5ec3770 100644 --- a/curl/src/main/scala/org/http4s/curl/internal/CurlMulti.scala +++ b/curl/src/main/scala/org/http4s/curl/internal/CurlMulti.scala @@ -18,11 +18,98 @@ package org.http4s.curl.internal import cats.effect.IO import cats.effect.kernel.Resource +import org.http4s.curl.CurlError +import org.http4s.curl.unsafe.libcurl +import org.http4s.curl.unsafe.libcurl_const -private[curl] trait CurlMulti { - def addHandlerTerminating(easy: CurlEasy, cb: Either[Throwable, Unit] => Unit): IO[Unit] - def addHandlerNonTerminating( - easy: CurlEasy, - cb: Either[Throwable, Unit] => Unit, - ): Resource[IO, Unit] +import scala.collection.mutable +import scala.scalanative.unsafe.Ptr +import scala.scalanative.unsafe._ + +final private[curl] class CurlMulti private (val multiHandle: Ptr[libcurl.CURLM]) { + private val callbacks = mutable.Map[Ptr[libcurl.CURL], Either[Throwable, Unit] => Unit]() + + /** Adds a curl handler to the multi handle + * + * @param handle curl easy handle to add + * @param cb callback to run when this handler has finished its transfer + */ + def addHandle(handle: Ptr[libcurl.CURL], cb: Either[Throwable, Unit] => Unit): Unit = { + val code = libcurl.curl_multi_add_handle(multiHandle, handle) + if (code.isError) + throw CurlError.fromMCode(code) + callbacks(handle) = cb + } + + /** Removes a handle and notifies success + * + * @param handle curl easy handle to add + */ + def removeHandle( + handle: Ptr[libcurl.CURL] + ): Unit = callbacks.remove(handle).foreach(_(Right(()))) + + def noCallbacks: Boolean = callbacks.isEmpty + + /** Event handler */ + def onTick: Unit = + if (noCallbacks) () + else { + while ({ + val msgsInQueue = stackalloc[CInt]() + val info = libcurl.curl_multi_info_read(multiHandle, msgsInQueue) + + if (info != null) { + val curMsg = libcurl.curl_CURLMsg_msg(info) + if (curMsg == libcurl_const.CURLMSG_DONE) { + val handle = libcurl.curl_CURLMsg_easy_handle(info) + callbacks.remove(handle).foreach { cb => + val result = libcurl.curl_CURLMsg_data_result(info) + cb( + if (result.isOk) Right(()) + else Left(CurlError.fromCode(result)) + ) + } + + val code = libcurl.curl_multi_remove_handle(multiHandle, handle) + if (code.isError) + throw CurlError.fromMCode(code) + } + true + } else false + }) {} + } + + /** Clears callbacks and notifies all waiting fibers */ + def clearCallbacks: Unit = { + val error = new InterruptedException("Runtime shutdown!") + callbacks.foreach { case (easy, cb) => + libcurl.curl_multi_remove_handle(multiHandle, easy).throwOnError + cb(Left(error)) + } + callbacks.clear() + } + + def clean: Unit = { + clearCallbacks + + libcurl.curl_multi_cleanup(multiHandle).throwOnError + } +} + +object CurlMulti { + + /** global init must be called exactly once */ + private lazy val setup: Unit = libcurl.curl_global_init(2).throwOnError + + private def newCurlMutli: Ptr[libcurl.CURLM] = { + setup + val multiHandle = libcurl.curl_multi_init() + if (multiHandle == null) + throw new RuntimeException("curl_multi_init") + multiHandle + } + + def apply(): Resource[IO, CurlMulti] = Resource.make(IO(unsafe()))(m => IO(m.clean)) + def unsafe(): CurlMulti = new CurlMulti(newCurlMutli) } diff --git a/curl/src/main/scala/org/http4s/curl/internal/CurlMultiDriver.scala b/curl/src/main/scala/org/http4s/curl/internal/CurlMultiDriver.scala new file mode 100644 index 0000000..108adc3 --- /dev/null +++ b/curl/src/main/scala/org/http4s/curl/internal/CurlMultiDriver.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2022 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.curl.internal + +import cats.effect.IO +import cats.effect.kernel.Resource + +private[curl] trait CurlMultiDriver { + + /** Adds a curl handler that is expected to terminate + * like a normal http request + * + * IMPORTANT NOTE: if you add a transfer that does not terminate (e.g. websocket) using this method, + * application might hang, because those transfer don't seem to change state, + * so it's not distinguishable whether they are finished or have other work to do + * + * @param handle curl easy handle to add + * @param cb callback to run when this handler has finished its transfer + */ + def addHandlerTerminating(easy: CurlEasy, cb: Either[Throwable, Unit] => Unit): IO[Unit] + + /** Add a curl handle for a transfer that doesn't finish e.g. a websocket transfer + * it adds a handle to multi handle, and removes it when it goes out of scope + * so no dangling handler will remain in multi handler + * callback is called when the transfer is terminated or goes out of scope + * + * @param handle curl easy handle to add + * @param cb callback to run if this handler is terminated unexpectedly + */ + def addHandlerNonTerminating( + easy: CurlEasy, + cb: Either[Throwable, Unit] => Unit, + ): Resource[IO, Unit] +} diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiPerformPoller.scala b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiPerformPoller.scala new file mode 100644 index 0000000..b73d21c --- /dev/null +++ b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiPerformPoller.scala @@ -0,0 +1,105 @@ +/* + * Copyright 2022 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.curl.unsafe + +import cats.effect.IO +import cats.effect.kernel.Resource +import cats.effect.unsafe.PollingSystem +import org.http4s.curl.internal.CurlEasy +import org.http4s.curl.internal.CurlMulti +import org.http4s.curl.internal.CurlMultiDriver + +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +final private[curl] class CurlMultiPerformPoller( + multiHandle: CurlMulti +) extends PollingSystem + with CurlMultiDriver { + + private[this] var needsPoll = true + + type Api = CurlMultiDriver + type Poller = CurlMultiPerformPoller + + override def addHandlerTerminating( + easy: CurlEasy, + cb: Either[Throwable, Unit] => Unit, + ): IO[Unit] = IO(multiHandle.addHandle(easy.curl, cb)) + + override def addHandlerNonTerminating( + easy: CurlEasy, + cb: Either[Throwable, Unit] => Unit, + ): Resource[IO, Unit] = + Resource.make(addHandlerTerminating(easy, cb))(_ => IO(multiHandle.removeHandle(easy.curl))) + + override def close(): Unit = () + + override def makeApi(register: (Poller => Unit) => Unit): Api = this + + override def makePoller(): Poller = this + + override def closePoller(poller: Poller): Unit = { + multiHandle.clean + libcurl.curl_global_cleanup() + } + + override def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = { + val timeoutIsInf = nanos == -1 + val noCallbacks = multiHandle.noCallbacks + + if (timeoutIsInf && noCallbacks) false + else { + val timeoutMillis = + if (timeoutIsInf) Int.MaxValue else (nanos / 1e6).toInt + + if (nanos > 0) { + + libcurl + .curl_multi_poll( + multiHandle.multiHandle, + null, + 0.toUInt, + timeoutMillis, + null, + ) + .throwOnError + } + + if (noCallbacks) false + else { + val runningHandles = stackalloc[CInt]() + libcurl.curl_multi_perform(multiHandle.multiHandle, runningHandles).throwOnError + + multiHandle.onTick + + needsPoll = !runningHandles > 0 + true + } + } + + } + + override def needsPoll(poller: Poller): Boolean = needsPoll + + override def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () + +} + +private[curl] object CurlMultiPerformPoller { + def apply(): CurlMultiPerformPoller = new CurlMultiPerformPoller(CurlMulti.unsafe()) +} diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala index 3487d91..a310b6f 100644 --- a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala +++ b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala @@ -25,7 +25,6 @@ import cats.effect.kernel.Resource import cats.effect.std.AtomicCell import cats.effect.std.Dispatcher import cats.syntax.all._ -import org.http4s.curl.CurlError import org.http4s.curl.internal._ import scala.concurrent.duration._ @@ -42,33 +41,13 @@ private[curl] object CurlMultiSocket { ) ) - private val newCurlMutli = Resource.make(IO { - val multiHandle = libcurl.curl_multi_init() - if (multiHandle == null) - throw new RuntimeException("curl_multi_init") - multiHandle - })(mhandle => - IO { - val code = libcurl.curl_multi_cleanup(mhandle) - if (code.isError) - throw CurlError.fromMCode(code) - } - ) - - private lazy val curlGlobalSetup = { - val initCode = libcurl.curl_global_init(2) - if (initCode.isError) - throw CurlError.fromCode(initCode) - } - - def apply(): Resource[IO, CurlMulti] = for { - _ <- IO(curlGlobalSetup).toResource - handle <- newCurlMutli + def apply(): Resource[IO, CurlMultiDriver] = for { + multi <- CurlMulti() fdPoller <- getFDPoller.toResource disp <- Dispatcher.sequential[IO] mapping <- AtomicCell[IO].of(State.empty).toResource timeout <- IO.ref[Option[FiberIO[Unit]]](None).toResource - cms = new CurlMultiSocketImpl(handle, fdPoller, mapping, disp, timeout) + cms = new CurlMultiSocketImpl(multi, fdPoller, mapping, disp, timeout) _ <- cms.setup } yield cms @@ -130,19 +109,19 @@ private[curl] object CurlMultiSocket { } final private class CurlMultiSocketImpl( - multiHandle: Ptr[libcurl.CURLM], + multiHandle: CurlMulti, fdpoller: FileDescriptorPoller, mapping: AtomicCell[IO, State], disp: Dispatcher[IO], timeout: Ref[IO, Option[FiberIO[Unit]]], - ) extends CurlMulti { + ) extends CurlMultiDriver { private def init = IO { val data = Utils.toPtr(this) libcurl .curl_multi_setopt_timerdata( - multiHandle, + multiHandle.multiHandle, libcurl_const.CURLMOPT_TIMERDATA, data, ) @@ -150,7 +129,7 @@ private[curl] object CurlMultiSocket { libcurl .curl_multi_setopt_socketdata( - multiHandle, + multiHandle.multiHandle, libcurl_const.CURLMOPT_SOCKETDATA, data, ) @@ -158,7 +137,7 @@ private[curl] object CurlMultiSocket { libcurl .curl_multi_setopt_timerfunction( - multiHandle, + multiHandle.multiHandle, libcurl_const.CURLMOPT_TIMERFUNCTION, onTimeout(_, _, _), ) @@ -166,7 +145,7 @@ private[curl] object CurlMultiSocket { libcurl .curl_multi_setopt_socketfunction( - multiHandle, + multiHandle.multiHandle, libcurl_const.CURLMOPT_SOCKETFUNCTION, onSocket(_, _, _, _, _), ) @@ -181,16 +160,10 @@ private[curl] object CurlMultiSocket { // curl callbacks afterwards, and callback cleaning and notifications // is deterministic. monitors.values.toList.traverse(_.clean) !> IO { - val error = new InterruptedException("Runtime shutdown!") - // Remove and notify all easy handles // Note that we do this in mapping.evalUpdate in order to block // other new usages while cleaning up - callbacks.foreach { case (easy, cb) => - libcurl.curl_multi_remove_handle(multiHandle, easy).throwOnError - cb(Left(error)) - } - callbacks.clear() + multiHandle.clearCallbacks }.as(State.Released) case State.Released => // It must not happen, but we leave a clue here if it happened! @@ -199,27 +172,16 @@ private[curl] object CurlMultiSocket { def setup: Resource[IO, Unit] = Resource.make(init)(_ => cleanup) - private val callbacks = - scala.collection.mutable.Map[Ptr[libcurl.CURL], Either[Throwable, Unit] => Unit]() - override def addHandlerTerminating( easy: CurlEasy, cb: Either[Throwable, Unit] => Unit, - ): IO[Unit] = IO { - libcurl.curl_multi_add_handle(multiHandle, easy.curl).throwOnError - callbacks(easy.curl) = cb - } + ): IO[Unit] = IO(multiHandle.addHandle(easy.curl, cb)) override def addHandlerNonTerminating( easy: CurlEasy, cb: Either[Throwable, Unit] => Unit, ): Resource[IO, Unit] = - Resource.make(addHandlerTerminating(easy, cb))(_ => - IO { - libcurl.curl_multi_remove_handle(multiHandle, easy.curl).throwOnError - callbacks.remove(easy.curl).foreach(_(Right(()))) - } - ) + Resource.make(addHandlerTerminating(easy, cb))(_ => IO(multiHandle.removeHandle(easy.curl))) def addFD(fd: libcurl.curl_socket_t, read: Boolean, write: Boolean): Unit = disp.unsafeRunAndForget { @@ -275,41 +237,22 @@ private[curl] object CurlMultiSocket { def notifyTimeout: IO[Unit] = IO { val running = stackalloc[Int]() libcurl - .curl_multi_socket_action(multiHandle, libcurl_const.CURL_SOCKET_TIMEOUT, 0, running) + .curl_multi_socket_action( + multiHandle.multiHandle, + libcurl_const.CURL_SOCKET_TIMEOUT, + 0, + running, + ) .throwOnError - postAction + multiHandle.onTick } - private def postAction = while ({ - val msgsInQueue = stackalloc[CInt]() - val info = libcurl.curl_multi_info_read(multiHandle, msgsInQueue) - - if (info != null) { - val curMsg = libcurl.curl_CURLMsg_msg(info) - if (curMsg == libcurl_const.CURLMSG_DONE) { - val handle = libcurl.curl_CURLMsg_easy_handle(info) - callbacks.remove(handle).foreach { cb => - val result = libcurl.curl_CURLMsg_data_result(info) - cb( - if (result.isOk) Right(()) - else Left(CurlError.fromCode(result)) - ) - } - - val code = libcurl.curl_multi_remove_handle(multiHandle, handle) - if (code.isError) - throw CurlError.fromMCode(code) - } - true - } else false - }) {} - private def action(fd: libcurl.curl_socket_t, ev: CInt) = IO { val running = stackalloc[Int]() - libcurl.curl_multi_socket_action(multiHandle, fd, ev, running) + libcurl.curl_multi_socket_action(multiHandle.multiHandle, fd, ev, running) - postAction + multiHandle.onTick Left(()) } diff --git a/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala b/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala index 7672c4f..16a7e9c 100644 --- a/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala +++ b/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala @@ -244,7 +244,7 @@ private object Connection { def apply( req: WSRequest, - ms: CurlMulti, + ms: CurlMultiDriver, recvBufferSize: Int, pauseOn: Int, resumeOn: Int, diff --git a/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala b/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala index 81ce551..6cf3685 100644 --- a/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala +++ b/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala @@ -21,7 +21,7 @@ import cats.effect.IO import cats.implicits._ import org.http4s.client.websocket.WSFrame._ import org.http4s.client.websocket._ -import org.http4s.curl.internal.CurlMulti +import org.http4s.curl.internal.CurlMultiDriver import org.http4s.curl.unsafe.CurlExecutorScheduler import org.http4s.curl.unsafe.CurlRuntime import org.http4s.curl.unsafe.libcurl_const @@ -92,7 +92,7 @@ private[curl] object CurlWSClient { } def apply( - ms: CurlMulti, + ms: CurlMultiDriver, recvBufferSize: Int = 100, pauseOn: Int = 10, resumeOn: Int = 30, diff --git a/tests/http/src/test/scala/CurlClientSuite.scala b/tests/http/src/test/scala/CurlClientSuite.scala index d1bc0ed..40c3ce6 100644 --- a/tests/http/src/test/scala/CurlClientSuite.scala +++ b/tests/http/src/test/scala/CurlClientSuite.scala @@ -18,7 +18,6 @@ package org.http4s.curl import cats.effect.IO import cats.effect.SyncIO -import cats.effect.kernel.Resource import cats.effect.std.Random import cats.effect.unsafe.IORuntime import cats.syntax.all._ @@ -27,16 +26,15 @@ import org.http4s.Method._ import org.http4s.Request import org.http4s.Status import org.http4s.client.Client -import org.http4s.curl.unsafe.CurlRuntime +import org.http4s.curl.unsafe.CurlMultiPerformPoller import org.http4s.syntax.all._ class CurlClientSuite extends CatsEffectSuite { - override lazy val munitIORuntime: IORuntime = CurlRuntime.global + override lazy val munitIORuntime: IORuntime = + IORuntime.builder().setPollingSystem(CurlMultiPerformPoller()).build() - val clientFixture: SyncIO[FunFixture[Client[IO]]] = ResourceFunFixture( - Resource.eval(http.CurlClient.get) - ) + val clientFixture: SyncIO[FunFixture[Client[IO]]] = ResourceFunFixture(http.CurlClient.default) clientFixture.test("3 get echos") { client => client From cb1b4eff6eb7dc73c25f581c06e45fd1bc0f6282 Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Wed, 5 Jul 2023 10:17:42 +0330 Subject: [PATCH 06/13] Added CurlDriver and builders --- .../main/scala/org/http4s/curl/CurlApp.scala | 53 ++------------ .../org/http4s/curl/CurlClientBuilder.scala | 36 ++++++++++ .../scala/org/http4s/curl/CurlDriver.scala | 35 +++++++++ .../org/http4s/curl/CurlWSClientBuilder.scala | 72 +++++++++++++++++++ .../org/http4s/curl/http/CurlClient.scala | 12 ++-- .../org/http4s/curl/http/CurlRequest.scala | 12 ++-- example/src/main/scala/ExampleApp.scala | 13 ++-- example/src/main/scala/WSExample.scala | 11 +-- 8 files changed, 176 insertions(+), 68 deletions(-) create mode 100644 curl/src/main/scala/org/http4s/curl/CurlClientBuilder.scala create mode 100644 curl/src/main/scala/org/http4s/curl/CurlDriver.scala create mode 100644 curl/src/main/scala/org/http4s/curl/CurlWSClientBuilder.scala diff --git a/curl/src/main/scala/org/http4s/curl/CurlApp.scala b/curl/src/main/scala/org/http4s/curl/CurlApp.scala index c968206..c7d874c 100644 --- a/curl/src/main/scala/org/http4s/curl/CurlApp.scala +++ b/curl/src/main/scala/org/http4s/curl/CurlApp.scala @@ -16,59 +16,14 @@ package org.http4s.curl -import cats.effect.IO import cats.effect.IOApp -import cats.effect.unsafe.IORuntime -import org.http4s.client.Client -import org.http4s.client.websocket.WSClient -import org.http4s.curl.unsafe.CurlExecutorScheduler -import org.http4s.curl.unsafe.CurlRuntime -import org.http4s.curl.websocket.CurlWSClient +import cats.effect.unsafe.PollingSystem +import org.http4s.curl.unsafe.CurlMultiPerformPoller trait CurlApp extends IOApp { + private val multiPerform = CurlMultiPerformPoller() - final override lazy val runtime: IORuntime = { - val installed = CurlRuntime.installGlobal { - CurlRuntime(runtimeConfig) - } - - if (!installed) { - System.err - .println( - "WARNING: CurlRuntime global runtime already initialized; custom configurations will be ignored" - ) - } - - CurlRuntime.global - } - - private def scheduler = runtime.compute.asInstanceOf[CurlExecutorScheduler] - final lazy val curlClient: Client[IO] = http.CurlClient(scheduler) - - /** gets websocket client if current libcurl environment supports it */ - final def websocket( - recvBufferSize: Int = 100, - verbose: Boolean = false, - ): Option[WSClient[IO]] = - CurlWSClient( - scheduler, - recvBufferSize, - pauseOn = recvBufferSize / 10, - resumeOn = (recvBufferSize * 0.3).floor.toInt, - verbose = verbose, - ) - - /** gets websocket client if current libcurl environment supports it throws an error otherwise */ - final def websocketOrError(recvBufferSize: Int = 100, verbose: Boolean = false): WSClient[IO] = - websocket(recvBufferSize, verbose).getOrElse( - throw new RuntimeException( - """Websocket is not supported in this environment! -You need to have curl with version 7.87.0 or higher with websockets enabled. -Note that websocket support in curl is experimental and is not available by default, -so you need to either build it with websocket support or use an already built libcurl with websocket support.""" - ) - ) - + override protected def pollingSystem: PollingSystem = multiPerform } object CurlApp { diff --git a/curl/src/main/scala/org/http4s/curl/CurlClientBuilder.scala b/curl/src/main/scala/org/http4s/curl/CurlClientBuilder.scala new file mode 100644 index 0000000..8c09ea9 --- /dev/null +++ b/curl/src/main/scala/org/http4s/curl/CurlClientBuilder.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2022 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.curl + +import cats.effect.IO +import org.http4s.client.Client +import org.http4s.curl.http.CurlClient +import org.http4s.curl.internal.CurlMultiDriver + +final class CurlClientBuilder private[curl] ( + driver: CurlMultiDriver, + val isVerbose: Boolean = false, +) { + private def copy( + isVerbose: Boolean + ) = new CurlClientBuilder(driver, isVerbose = isVerbose) + + def setVerbose: CurlClientBuilder = copy(isVerbose = true) + def notVerbose: CurlClientBuilder = copy(isVerbose = false) + + def build: Client[IO] = CurlClient.multiSocket(driver, isVerbose = isVerbose) +} diff --git a/curl/src/main/scala/org/http4s/curl/CurlDriver.scala b/curl/src/main/scala/org/http4s/curl/CurlDriver.scala new file mode 100644 index 0000000..94ba85d --- /dev/null +++ b/curl/src/main/scala/org/http4s/curl/CurlDriver.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2022 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.curl + +import cats.effect.IO +import cats.effect.kernel.Resource +import org.http4s.curl.internal.CurlMultiDriver +import org.http4s.curl.unsafe.CurlMultiPerformPoller +import org.http4s.curl.unsafe.CurlMultiSocket + +final class CurlDriver private (driver: CurlMultiDriver) { + def http: CurlClientBuilder = new CurlClientBuilder(driver) + def websocket: CurlWSClientBuilder = new CurlWSClientBuilder(driver) +} + +object CurlDriver { + val default: Resource[IO, CurlDriver] = IO.pollers.toResource.flatMap { + _.collectFirst { case mp: CurlMultiPerformPoller => Resource.eval(IO(new CurlDriver(mp))) } + .getOrElse(CurlMultiSocket().map(new CurlDriver(_))) + } +} diff --git a/curl/src/main/scala/org/http4s/curl/CurlWSClientBuilder.scala b/curl/src/main/scala/org/http4s/curl/CurlWSClientBuilder.scala new file mode 100644 index 0000000..ff8bf1e --- /dev/null +++ b/curl/src/main/scala/org/http4s/curl/CurlWSClientBuilder.scala @@ -0,0 +1,72 @@ +/* + * Copyright 2022 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.curl + +import cats.effect.IO +import org.http4s.client.websocket.WSClient +import org.http4s.curl.internal.CurlMultiDriver +import org.http4s.curl.websocket.CurlWSClient + +final class CurlWSClientBuilder private[curl] ( + driver: CurlMultiDriver, + val isVerbose: Boolean = false, + val recvBufferSize: Int = 100, + val pause: Int = 10, + val resume: Int = 30, +) { + private def copy( + isVerbose: Boolean = isVerbose, + recvBufferSize: Int = recvBufferSize, + pause: Int = pause, + resume: Int = resume, + ) = new CurlWSClientBuilder( + driver, + isVerbose = isVerbose, + recvBufferSize = recvBufferSize, + pause = pause, + resume = resume, + ) + + def setVerbose: CurlWSClientBuilder = copy(isVerbose = true) + def notVerbose: CurlWSClientBuilder = copy(isVerbose = false) + + def withRecvBufferSize(value: Int): CurlWSClientBuilder = { + assert(value > 0, "buffer size must be greater than zero!") + copy(recvBufferSize = value) + } + def withBackpressure(pause: Int, resume: Int): CurlWSClientBuilder = { + assert(pause >= 0 && pause < 100, "pause must be in [0, 100)") + assert(resume > 0 && resume <= 100, "resume must be in (0, 100]") + copy(pause = pause, resume = resume) + } + + def build: Either[RuntimeException, WSClient[IO]] = CurlWSClient( + driver, + recvBufferSize = recvBufferSize, + pauseOn = pause * recvBufferSize / 100, + resumeOn = resume * recvBufferSize / 100, + verbose = isVerbose, + ).toRight( + new RuntimeException( + """Websocket is not supported in this environment! +You need to have curl with version 7.87.0 or higher with websockets enabled. +Note that websocket support in curl is experimental and is not available by default, +so you need to either build it with websocket support or use an already built libcurl with websocket support.""" + ) + ) + def buildIO: IO[WSClient[IO]] = IO.fromEither(build) +} diff --git a/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala b/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala index 0f899e0..9383057 100644 --- a/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala +++ b/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala @@ -18,23 +18,21 @@ package org.http4s.curl.http import cats.effect._ import org.http4s.client.Client +import org.http4s.curl.CurlDriver import org.http4s.curl.internal.CurlMultiDriver import org.http4s.curl.unsafe.CurlExecutorScheduler -import org.http4s.curl.unsafe.CurlMultiPerformPoller -import org.http4s.curl.unsafe.CurlMultiSocket private[curl] object CurlClient { def apply(ec: CurlExecutorScheduler): Client[IO] = Client(CurlRequest(ec, _)) - def multiSocket(ms: CurlMultiDriver): Client[IO] = Client(CurlRequest.applyMultiSocket(ms, _)) + def multiSocket(ms: CurlMultiDriver, isVerbose: Boolean = false): Client[IO] = Client( + CurlRequest.applyMultiSocket(ms, _, isVerbose) + ) def get: IO[Client[IO]] = IO.executionContext.flatMap { case ec: CurlExecutorScheduler => IO.pure(apply(ec)) case _ => IO.raiseError(new RuntimeException("Not running on CurlExecutorScheduler")) } - val default: Resource[IO, Client[IO]] = IO.pollers.toResource.flatMap { - _.collectFirst { case mp: CurlMultiPerformPoller => Resource.eval(IO(multiSocket(mp))) } - .getOrElse(CurlMultiSocket().map(multiSocket)) - } + val default: Resource[IO, Client[IO]] = CurlDriver.default.map(_.http.build) } diff --git a/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala b/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala index 5eeebc3..35e58c5 100644 --- a/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala +++ b/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala @@ -83,12 +83,12 @@ private[curl] object CurlRequest { send: RequestSend, recv: RequestRecv, req: Request[IO], + verbose: Boolean, ): Resource[IO, Unit] = Utils.newZone.flatMap(implicit zone => CurlSList().evalMap(headers => IO { - // TODO add in options - // handle.setVerbose(true) + if (verbose) handle.setVerbose(true) import org.http4s.curl.unsafe.libcurl_const import scala.scalanative.unsafe._ @@ -141,14 +141,18 @@ private[curl] object CurlRequest { resp <- recv.response() } yield resp - def applyMultiSocket(ms: CurlMultiDriver, req: Request[IO]): Resource[IO, Response[IO]] = for { + def applyMultiSocket( + ms: CurlMultiDriver, + req: Request[IO], + isVerbose: Boolean = false, + ): Resource[IO, Response[IO]] = for { gc <- GCRoot() handle <- CurlEasy() flow <- FlowControl(handle) send <- RequestSend(flow) recv <- RequestRecv(flow) _ <- gc.add(send, recv) - _ <- setup(handle, send, recv, req) + _ <- setup(handle, send, recv, req, isVerbose) _ <- ms.addHandlerTerminating(handle, recv.onTerminated).toResource _ <- req.body.through(send.pipe).compile.drain.background resp <- recv.response() diff --git a/example/src/main/scala/ExampleApp.scala b/example/src/main/scala/ExampleApp.scala index 3ac9ce1..1b9cee6 100644 --- a/example/src/main/scala/ExampleApp.scala +++ b/example/src/main/scala/ExampleApp.scala @@ -21,6 +21,7 @@ import cats.syntax.all._ import io.circe._ import org.http4s.circe.CirceEntityCodec._ import org.http4s.curl.CurlApp +import org.http4s.curl.CurlDriver object ExampleApp extends CurlApp.Simple { @@ -29,9 +30,13 @@ object ExampleApp extends CurlApp.Simple { implicit val decoder: Decoder[Joke] = Decoder.forProduct1("joke")(Joke(_)) } - def run: IO[Unit] = for { - responses <- curlClient.expect[Joke]("https://icanhazdadjoke.com/").parReplicateA(3) - _ <- responses.traverse_(r => IO.println(r.joke)) - } yield () + def run: IO[Unit] = CurlDriver.default + .map(_.http.build) + .use(client => + for { + responses <- client.expect[Joke]("https://icanhazdadjoke.com/").parReplicateA(3) + _ <- responses.traverse_(r => IO.println(r.joke)) + } yield () + ) } diff --git a/example/src/main/scala/WSExample.scala b/example/src/main/scala/WSExample.scala index 91ccd46..24e1a5a 100644 --- a/example/src/main/scala/WSExample.scala +++ b/example/src/main/scala/WSExample.scala @@ -20,6 +20,7 @@ import org.http4s.Uri import org.http4s.client.websocket.WSFrame import org.http4s.client.websocket.WSRequest import org.http4s.curl.CurlApp +import org.http4s.curl.CurlDriver import org.http4s.implicits._ import scala.concurrent.duration._ @@ -31,8 +32,9 @@ object WSExample extends CurlApp.Simple { private val local = uri"ws://localhost:8080/ws/large" private val websocket = local - def run: IO[Unit] = websocketOrError(verbose = true) - .connectHighLevel(WSRequest(websocket)) + def run: IO[Unit] = CurlDriver.default + .evalMap(_.websocket.setVerbose.buildIO) + .flatMap(_.connectHighLevel(WSRequest(websocket))) .use { client => IO.println("ready!") >> client.receiveStream.foreach(_ => IO.println("> frame").delayBy(50.millis)).compile.drain @@ -45,8 +47,9 @@ object WSEchoExample extends CurlApp.Simple { // private val echo = uri"wss://ws.postman-echo.com/raw" private val echo = uri"ws://localhost:8080/ws/echo" - def run: IO[Unit] = websocketOrError() - .connectHighLevel(WSRequest(echo)) + def run: IO[Unit] = CurlDriver.default + .evalMap(_.websocket.buildIO) + .flatMap(_.connectHighLevel(WSRequest(echo))) .use { client => val send: IO[Unit] = IO.println("sending ...") >> client.send(WSFrame.Text("hello")).parReplicateA_(4) From f269673c5d1beabcb875f14a0a9a25986b58f850 Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Wed, 5 Jul 2023 11:05:40 +0330 Subject: [PATCH 07/13] Removed replaced implementations --- .../org/http4s/curl/CurlClientBuilder.scala | 2 +- .../curl/{unsafe => }/CurlRuntime.scala | 41 +---- .../org/http4s/curl/http/CurlClient.scala | 12 +- .../org/http4s/curl/http/CurlRequest.scala | 69 +------- .../curl/unsafe/CurlExecutorScheduler.scala | 150 ------------------ .../http4s/curl/websocket/Connection.scala | 37 ----- .../http4s/curl/websocket/CurlWSClient.scala | 70 +------- .../src/test/scala/CurlRuntimeSuite.scala | 2 +- .../scala/CurlClientMultiSocketSuite.scala | 2 +- .../src/test/scala/CurlWSClientSuite.scala | 11 +- 10 files changed, 17 insertions(+), 379 deletions(-) rename curl/src/main/scala/org/http4s/curl/{unsafe => }/CurlRuntime.scala (63%) delete mode 100644 curl/src/main/scala/org/http4s/curl/unsafe/CurlExecutorScheduler.scala diff --git a/curl/src/main/scala/org/http4s/curl/CurlClientBuilder.scala b/curl/src/main/scala/org/http4s/curl/CurlClientBuilder.scala index 8c09ea9..01becc5 100644 --- a/curl/src/main/scala/org/http4s/curl/CurlClientBuilder.scala +++ b/curl/src/main/scala/org/http4s/curl/CurlClientBuilder.scala @@ -32,5 +32,5 @@ final class CurlClientBuilder private[curl] ( def setVerbose: CurlClientBuilder = copy(isVerbose = true) def notVerbose: CurlClientBuilder = copy(isVerbose = false) - def build: Client[IO] = CurlClient.multiSocket(driver, isVerbose = isVerbose) + def build: Client[IO] = CurlClient(driver, isVerbose = isVerbose) } diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CurlRuntime.scala b/curl/src/main/scala/org/http4s/curl/CurlRuntime.scala similarity index 63% rename from curl/src/main/scala/org/http4s/curl/unsafe/CurlRuntime.scala rename to curl/src/main/scala/org/http4s/curl/CurlRuntime.scala index 3bb2c11..9cb5120 100644 --- a/curl/src/main/scala/org/http4s/curl/unsafe/CurlRuntime.scala +++ b/curl/src/main/scala/org/http4s/curl/CurlRuntime.scala @@ -15,50 +15,13 @@ */ package org.http4s.curl -package unsafe - -import cats.effect.unsafe.IORuntime -import cats.effect.unsafe.IORuntimeConfig -import cats.effect.unsafe.Scheduler import scala.collection.mutable.ListBuffer -import scala.concurrent.ExecutionContext import scala.scalanative.unsafe._ -object CurlRuntime { - - def apply(): IORuntime = apply(IORuntimeConfig()) - - def apply(config: IORuntimeConfig): IORuntime = { - val (ecScheduler, shutdown) = defaultExecutionContextScheduler() - IORuntime(ecScheduler, ecScheduler, ecScheduler, shutdown, config) - } - - def defaultExecutionContextScheduler(): (ExecutionContext with Scheduler, () => Unit) = { - val (ecScheduler, shutdown) = CurlExecutorScheduler(64) - (ecScheduler, shutdown) - } - - private[this] var _global: IORuntime = null - - private[curl] def installGlobal(global: => IORuntime): Boolean = - if (_global == null) { - _global = global - true - } else { - false - } - - lazy val global: IORuntime = { - if (_global == null) { - installGlobal { - CurlRuntime() - } - } - - _global - } +import unsafe.libcurl +object CurlRuntime { def curlVersion: String = fromCString(libcurl.curl_version()) private lazy val versionData = libcurl.curl_version_info(libcurl.CURLVERSION_NOW()) diff --git a/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala b/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala index 9383057..33676c6 100644 --- a/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala +++ b/curl/src/main/scala/org/http4s/curl/http/CurlClient.scala @@ -20,19 +20,11 @@ import cats.effect._ import org.http4s.client.Client import org.http4s.curl.CurlDriver import org.http4s.curl.internal.CurlMultiDriver -import org.http4s.curl.unsafe.CurlExecutorScheduler private[curl] object CurlClient { - def apply(ec: CurlExecutorScheduler): Client[IO] = Client(CurlRequest(ec, _)) - - def multiSocket(ms: CurlMultiDriver, isVerbose: Boolean = false): Client[IO] = Client( - CurlRequest.applyMultiSocket(ms, _, isVerbose) + def apply(ms: CurlMultiDriver, isVerbose: Boolean = false): Client[IO] = Client( + CurlRequest(ms, _, isVerbose) ) - def get: IO[Client[IO]] = IO.executionContext.flatMap { - case ec: CurlExecutorScheduler => IO.pure(apply(ec)) - case _ => IO.raiseError(new RuntimeException("Not running on CurlExecutorScheduler")) - } - val default: Resource[IO, Client[IO]] = CurlDriver.default.map(_.http.build) } diff --git a/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala b/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala index 35e58c5..ee88aa2 100644 --- a/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala +++ b/curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala @@ -21,63 +21,8 @@ import org.http4s.Request import org.http4s.Response import org.http4s.curl.internal.Utils import org.http4s.curl.internal._ -import org.http4s.curl.unsafe.CurlExecutorScheduler private[curl] object CurlRequest { - private def setup( - handle: CurlEasy, - ec: CurlExecutorScheduler, - send: RequestSend, - recv: RequestRecv, - req: Request[IO], - ): Resource[IO, Unit] = - Utils.newZone.flatMap(implicit zone => - CurlSList().evalMap(headers => - IO { - // TODO add in options - // handle.setVerbose(true) - - import org.http4s.curl.unsafe.libcurl_const - import scala.scalanative.unsafe._ - import org.http4s.Header - import org.http4s.HttpVersion - import org.typelevel.ci._ - - handle.setCustomRequest(toCString(req.method.renderString)) - - handle.setUpload(true) - - handle.setUrl(toCString(req.uri.renderString)) - - val httpVersion = req.httpVersion match { - case HttpVersion.`HTTP/1.0` => libcurl_const.CURL_HTTP_VERSION_1_0 - case HttpVersion.`HTTP/1.1` => libcurl_const.CURL_HTTP_VERSION_1_1 - case HttpVersion.`HTTP/2` => libcurl_const.CURL_HTTP_VERSION_2 - case HttpVersion.`HTTP/3` => libcurl_const.CURL_HTTP_VERSION_3 - case _ => libcurl_const.CURL_HTTP_VERSION_NONE - } - handle.setHttpVersion(httpVersion) - - req.headers // curl adds these headers automatically, so we explicitly disable them - .transform(Header.Raw(ci"Expect", "") :: Header.Raw(ci"Transfer-Encoding", "") :: _) - .foreach(header => headers.append(header.toString)) - - handle.setHttpHeader(headers.toPtr) - - handle.setReadData(Utils.toPtr(send)) - handle.setReadFunction(RequestSend.readCallback(_, _, _, _)) - - handle.setHeaderData(Utils.toPtr(recv)) - handle.setHeaderFunction(RequestRecv.headerCallback(_, _, _, _)) - - handle.setWriteData(Utils.toPtr(recv)) - handle.setWriteFunction(RequestRecv.writeCallback(_, _, _, _)) - - ec.addHandle(handle.curl, recv.onTerminated) - } - ) - ) - private def setup( handle: CurlEasy, send: RequestSend, @@ -129,19 +74,7 @@ private[curl] object CurlRequest { ) ) - def apply(ec: CurlExecutorScheduler, req: Request[IO]): Resource[IO, Response[IO]] = for { - gc <- GCRoot() - handle <- CurlEasy() - flow <- FlowControl(handle) - send <- RequestSend(flow) - recv <- RequestRecv(flow) - _ <- gc.add(send, recv) - _ <- setup(handle, ec, send, recv, req) - _ <- req.body.through(send.pipe).compile.drain.background - resp <- recv.response() - } yield resp - - def applyMultiSocket( + def apply( ms: CurlMultiDriver, req: Request[IO], isVerbose: Boolean = false, diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CurlExecutorScheduler.scala b/curl/src/main/scala/org/http4s/curl/unsafe/CurlExecutorScheduler.scala deleted file mode 100644 index 02b4d65..0000000 --- a/curl/src/main/scala/org/http4s/curl/unsafe/CurlExecutorScheduler.scala +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright 2022 http4s.org - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.http4s.curl.unsafe - -import cats.effect.IO -import cats.effect.kernel.Resource -import cats.effect.unsafe.PollingExecutorScheduler -import org.http4s.curl.CurlError - -import scala.annotation.nowarn -import scala.collection.mutable -import scala.concurrent.duration.Duration -import scala.scalanative.unsafe._ -import scala.scalanative.unsigned._ - -@nowarn -final private[curl] class CurlExecutorScheduler(multiHandle: Ptr[libcurl.CURLM], pollEvery: Int) - extends PollingExecutorScheduler(pollEvery) { - - private val callbacks = mutable.Map[Ptr[libcurl.CURL], Either[Throwable, Unit] => Unit]() - - def poll(timeout: Duration): Boolean = { - val timeoutIsInf = timeout == Duration.Inf - val noCallbacks = callbacks.isEmpty - - if (timeoutIsInf && noCallbacks) false - else { - val timeoutMillis = - if (timeoutIsInf) Int.MaxValue else timeout.toMillis.min(Int.MaxValue).toInt - - if (timeout > Duration.Zero) { - - val pollCode = libcurl.curl_multi_poll( - multiHandle, - null, - 0.toUInt, - timeoutMillis, - null, - ) - - if (pollCode.isError) - throw CurlError.fromMCode(pollCode) - } - - if (noCallbacks) false - else { - val runningHandles = stackalloc[CInt]() - val performCode = libcurl.curl_multi_perform(multiHandle, runningHandles) - - if (performCode.isError) - throw CurlError.fromMCode(performCode) - - while ({ - val msgsInQueue = stackalloc[CInt]() - val info = libcurl.curl_multi_info_read(multiHandle, msgsInQueue) - - if (info != null) { - val curMsg = libcurl.curl_CURLMsg_msg(info) - if (curMsg == libcurl_const.CURLMSG_DONE) { - val handle = libcurl.curl_CURLMsg_easy_handle(info) - callbacks.remove(handle).foreach { cb => - val result = libcurl.curl_CURLMsg_data_result(info) - cb( - if (result.isOk) Right(()) - else Left(CurlError.fromCode(result)) - ) - } - - val code = libcurl.curl_multi_remove_handle(multiHandle, handle) - if (code.isError) - throw CurlError.fromMCode(code) - } - true - } else false - }) {} - - !runningHandles > 0 - } - } - } - - /** Adds a curl handler that is expected to terminate - * like a normal http request - * - * IMPORTANT NOTE: if you add a transfer that does not terminate (e.g. websocket) using this method, - * application might hang, because those transfer don't seem to change state, - * so it's not distinguishable whether they are finished or have other work to do - * - * @param handle curl easy handle to add - * @param cb callback to run when this handler has finished its transfer - */ - def addHandle(handle: Ptr[libcurl.CURL], cb: Either[Throwable, Unit] => Unit): Unit = { - val code = libcurl.curl_multi_add_handle(multiHandle, handle) - if (code.isError) - throw CurlError.fromMCode(code) - callbacks(handle) = cb - } - - /** Add a curl handle for a transfer that doesn't finish e.g. a websocket transfer - * it adds a handle to multi handle, and removes it when it goes out of scope - * so no dangling handler will remain in multi handler - * callback is called when the transfer is terminated or goes out of scope - * - * @param handle curl easy handle to add - * @param cb callback to run if this handler is terminated unexpectedly - */ - def addHandleR( - handle: Ptr[libcurl.CURL], - cb: Either[Throwable, Unit] => Unit, - ): Resource[IO, Unit] = Resource.make(IO(addHandle(handle, cb)))(_ => - IO(callbacks.remove(handle).foreach(_(Right(())))) - ) -} - -private[curl] object CurlExecutorScheduler { - - def apply(pollEvery: Int): (CurlExecutorScheduler, () => Unit) = { - val initCode = libcurl.curl_global_init(2) - if (initCode.isError) - throw CurlError.fromCode(initCode) - - val multiHandle = libcurl.curl_multi_init() - if (multiHandle == null) - throw new RuntimeException("curl_multi_init") - - val shutdown = () => { - val code = libcurl.curl_multi_cleanup(multiHandle) - libcurl.curl_global_cleanup() - if (code.isError) - throw CurlError.fromMCode(code) - } - - (new CurlExecutorScheduler(multiHandle, pollEvery), shutdown) - } - -} diff --git a/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala b/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala index 16a7e9c..69f1efe 100644 --- a/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala +++ b/curl/src/main/scala/org/http4s/curl/websocket/Connection.scala @@ -28,7 +28,6 @@ import org.http4s.Uri import org.http4s.client.websocket._ import org.http4s.curl.internal.Utils import org.http4s.curl.internal._ -import org.http4s.curl.unsafe.CurlExecutorScheduler import org.http4s.curl.unsafe.libcurl import org.http4s.curl.unsafe.libcurl_const import scodec.bits.ByteVector @@ -206,42 +205,6 @@ private object Connection { size * nitems } - def apply( - req: WSRequest, - ec: CurlExecutorScheduler, - recvBufferSize: Int, - pauseOn: Int, - resumeOn: Int, - verbose: Boolean, - ): Resource[IO, Connection] = for { - gc <- GCRoot() - dispatcher <- Dispatcher.sequential[IO] - recvQ <- Queue.bounded[IO, Option[WSFrame]](recvBufferSize).toResource - recv <- Ref[SyncIO].of(Option.empty[Receiving]).to[IO].toResource - estab <- IO.deferred[Either[Throwable, Unit]].toResource - handler <- CurlEasy() - brk <- Breaker( - handler, - capacity = recvBufferSize, - close = resumeOn, - open = pauseOn, - verbose, - ).toResource - con = new Connection( - handler, - recvQ, - recv, - dispatcher, - estab, - brk, - ) - _ <- setup(req, verbose)(con) - _ <- gc.add(con) - _ <- ec.addHandleR(handler.curl, con.onTerminated) - // Wait until established or throw error - _ <- estab.get.flatMap(IO.fromEither).toResource - } yield con - def apply( req: WSRequest, ms: CurlMultiDriver, diff --git a/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala b/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala index 6cf3685..68689f8 100644 --- a/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala +++ b/curl/src/main/scala/org/http4s/curl/websocket/CurlWSClient.scala @@ -18,79 +18,17 @@ package org.http4s.curl.websocket import cats.Foldable import cats.effect.IO +import cats.effect.kernel.Resource import cats.implicits._ import org.http4s.client.websocket.WSFrame._ import org.http4s.client.websocket._ +import org.http4s.curl.CurlDriver +import org.http4s.curl.CurlRuntime import org.http4s.curl.internal.CurlMultiDriver -import org.http4s.curl.unsafe.CurlExecutorScheduler -import org.http4s.curl.unsafe.CurlRuntime import org.http4s.curl.unsafe.libcurl_const import scodec.bits.ByteVector private[curl] object CurlWSClient { - - // TODO change to builder - def get( - recvBufferSize: Int = 100, - pauseOn: Int = 10, - resumeOn: Int = 30, - verbose: Boolean = false, - ): IO[WSClient[IO]] = IO.executionContext.flatMap { - case ec: CurlExecutorScheduler => - IO.fromOption(apply(ec, recvBufferSize, pauseOn, resumeOn, verbose))( - new RuntimeException("websocket client is not supported in this environment") - ) - case _ => IO.raiseError(new RuntimeException("Not running on CurlExecutorScheduler")) - } - - def apply( - ec: CurlExecutorScheduler, - recvBufferSize: Int, - pauseOn: Int, - resumeOn: Int, - verbose: Boolean, - ): Option[WSClient[IO]] = - Option.when(CurlRuntime.isWebsocketAvailable && CurlRuntime.curlVersionNumber >= 0x75700) { - WSClient(true) { req => - Connection(req, ec, recvBufferSize, pauseOn, resumeOn, verbose) - .map(con => - new WSConnection[IO] { - override def send(wsf: WSFrame): IO[Unit] = wsf match { - case Close(_, _) => - val flags = libcurl_const.CURLWS_CLOSE - con.send(flags, ByteVector.empty) - case Ping(data) => - val flags = libcurl_const.CURLWS_PING - con.send(flags, data) - case Pong(data) => - val flags = libcurl_const.CURLWS_PONG - con.send(flags, data) - case Text(data, true) => - val flags = libcurl_const.CURLWS_TEXT - val bv = - ByteVector.encodeUtf8(data).getOrElse(throw InvalidTextFrame) - con.send(flags, bv) - case Binary(data, true) => - val flags = libcurl_const.CURLWS_BINARY - con.send(flags, data) - case _ => - // NOTE curl needs to know total amount of fragment size in first send - // and it is not compatible with current websocket interface in http4s - IO.raiseError(PartialFragmentFrame) - } - - override def sendMany[G[_]: Foldable, A <: WSFrame](wsfs: G[A]): IO[Unit] = - wsfs.traverse_(send) - - override def receive: IO[Option[WSFrame]] = con.receive - - override def subprotocol: Option[String] = None - - } - ) - } - } - def apply( ms: CurlMultiDriver, recvBufferSize: Int = 100, @@ -138,4 +76,6 @@ private[curl] object CurlWSClient { ) } } + + val default: Resource[IO, WSClient[IO]] = CurlDriver.default.evalMap(_.websocket.buildIO) } diff --git a/tests/common/src/test/scala/CurlRuntimeSuite.scala b/tests/common/src/test/scala/CurlRuntimeSuite.scala index 4317ec9..68cfe91 100644 --- a/tests/common/src/test/scala/CurlRuntimeSuite.scala +++ b/tests/common/src/test/scala/CurlRuntimeSuite.scala @@ -17,7 +17,7 @@ package org.http4s.curl import munit.FunSuite -import org.http4s.curl.unsafe.CurlRuntime +import org.http4s.curl.CurlRuntime class CurlRuntimeSuite extends FunSuite { diff --git a/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala b/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala index 39f95ea..46d36e5 100644 --- a/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala +++ b/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala @@ -31,7 +31,7 @@ import org.http4s.syntax.all._ class CurlClientMultiSocketSuite extends CatsEffectSuite { val clientFixture: SyncIO[FunFixture[Client[IO]]] = ResourceFunFixture( - CurlMultiSocket().map(http.CurlClient.multiSocket(_)) + CurlMultiSocket().map(http.CurlClient(_)) ) clientFixture.test("3 get echos") { client => diff --git a/tests/websocket/src/test/scala/CurlWSClientSuite.scala b/tests/websocket/src/test/scala/CurlWSClientSuite.scala index a11986d..ac3c11b 100644 --- a/tests/websocket/src/test/scala/CurlWSClientSuite.scala +++ b/tests/websocket/src/test/scala/CurlWSClientSuite.scala @@ -17,23 +17,20 @@ package org.http4s.curl import cats.effect.IO -import cats.effect.kernel.Resource import cats.effect.unsafe.IORuntime import cats.syntax.all._ import munit.CatsEffectSuite import org.http4s.client.websocket.WSFrame import org.http4s.client.websocket.WSRequest -import org.http4s.curl.unsafe.CurlRuntime -import org.http4s.curl.websocket.CurlWSClient +import org.http4s.curl.unsafe.CurlMultiPerformPoller import org.http4s.implicits._ class CurlWSClientSuite extends CatsEffectSuite { - override lazy val munitIORuntime: IORuntime = CurlRuntime.global + override lazy val munitIORuntime: IORuntime = + IORuntime.builder().setPollingSystem(CurlMultiPerformPoller()).build() - private val clientFixture = ResourceFunFixture( - Resource.eval(CurlWSClient.get(verbose = true)) - ) + private val clientFixture = ResourceFunFixture(websocket.CurlWSClient.default) clientFixture.test("websocket echo") { val frames = List.range(1, 5).map(i => WSFrame.Text(s"text $i")) From 8d94df0a8419183335171281f0095de3574a5ae7 Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Wed, 5 Jul 2023 12:27:39 +0330 Subject: [PATCH 08/13] Fixed scala3 native link --- build.sbt | 2 + .../http4s/curl/unsafe/CurlMultiSocket.scala | 291 +++++++++--------- 2 files changed, 149 insertions(+), 144 deletions(-) diff --git a/build.sbt b/build.sbt index 53db56a..463c8b4 100644 --- a/build.sbt +++ b/build.sbt @@ -121,3 +121,5 @@ ThisBuild / stopTestServer := { } addCommandAlias("integrate", "startTestServer; test") + +addCommandAlias("precommit", "headerCreateAll;scalafmtAll;scalafmtSbt;scalafixAll;+Test/nativeLink") diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala index a310b6f..2b96097 100644 --- a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala +++ b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala @@ -30,8 +30,10 @@ import org.http4s.curl.internal._ import scala.concurrent.duration._ import scala.scalanative.unsafe._ +import CurlMultiSocket._ + private[curl] object CurlMultiSocket { - implicit private class OptFibOps(private val f: Option[FiberIO[?]]) extends AnyVal { + implicit class OptFibOps(private val f: Option[FiberIO[?]]) extends AnyVal { def cancel: IO[Unit] = f.fold(IO.unit)(_.cancel) } @@ -47,7 +49,7 @@ private[curl] object CurlMultiSocket { disp <- Dispatcher.sequential[IO] mapping <- AtomicCell[IO].of(State.empty).toResource timeout <- IO.ref[Option[FiberIO[Unit]]](None).toResource - cms = new CurlMultiSocketImpl(multi, fdPoller, mapping, disp, timeout) + cms = new CurlMultiSocket(multi, fdPoller, mapping, disp, timeout) _ <- cms.setup } yield cms @@ -78,7 +80,7 @@ private[curl] object CurlMultiSocket { timeoutMs: CLong, userdata: Ptr[Byte], ): CInt = { - val d = Utils.fromPtr[CurlMultiSocketImpl](userdata) + val d = Utils.fromPtr[CurlMultiSocket](userdata) if (timeoutMs == -1) { d.removeTimeout @@ -95,7 +97,7 @@ private[curl] object CurlMultiSocket { userdata: Ptr[Byte], socketdata: Ptr[Byte], ): CInt = { - val d = Utils.fromPtr[CurlMultiSocketImpl](userdata) + val d = Utils.fromPtr[CurlMultiSocket](userdata) what match { case libcurl_const.CURL_POLL_IN => d.addFD(fd, true, false) @@ -108,157 +110,158 @@ private[curl] object CurlMultiSocket { 0 } - final private class CurlMultiSocketImpl( - multiHandle: CurlMulti, - fdpoller: FileDescriptorPoller, - mapping: AtomicCell[IO, State], - disp: Dispatcher[IO], - timeout: Ref[IO, Option[FiberIO[Unit]]], - ) extends CurlMultiDriver { - - private def init = IO { - val data = Utils.toPtr(this) - - libcurl - .curl_multi_setopt_timerdata( - multiHandle.multiHandle, - libcurl_const.CURLMOPT_TIMERDATA, - data, - ) - .throwOnError - - libcurl - .curl_multi_setopt_socketdata( - multiHandle.multiHandle, - libcurl_const.CURLMOPT_SOCKETDATA, - data, - ) - .throwOnError - - libcurl - .curl_multi_setopt_timerfunction( - multiHandle.multiHandle, - libcurl_const.CURLMOPT_TIMERFUNCTION, - onTimeout(_, _, _), - ) - .throwOnError - - libcurl - .curl_multi_setopt_socketfunction( - multiHandle.multiHandle, - libcurl_const.CURLMOPT_SOCKETFUNCTION, - onSocket(_, _, _, _, _), - ) - .throwOnError - - } *> notifyTimeout - - private def cleanup = - removeTimeoutIO !> mapping.evalUpdate { - case State.Active(monitors) => - // First clean all monitors, this ensures that we don't call any - // curl callbacks afterwards, and callback cleaning and notifications - // is deterministic. - monitors.values.toList.traverse(_.clean) !> IO { - // Remove and notify all easy handles - // Note that we do this in mapping.evalUpdate in order to block - // other new usages while cleaning up - multiHandle.clearCallbacks - }.as(State.Released) - case State.Released => - // It must not happen, but we leave a clue here if it happened! - IO.raiseError(new IllegalStateException("Cannot clean a released resource!")) - } +} - def setup: Resource[IO, Unit] = Resource.make(init)(_ => cleanup) - - override def addHandlerTerminating( - easy: CurlEasy, - cb: Either[Throwable, Unit] => Unit, - ): IO[Unit] = IO(multiHandle.addHandle(easy.curl, cb)) - - override def addHandlerNonTerminating( - easy: CurlEasy, - cb: Either[Throwable, Unit] => Unit, - ): Resource[IO, Unit] = - Resource.make(addHandlerTerminating(easy, cb))(_ => IO(multiHandle.removeHandle(easy.curl))) - - def addFD(fd: libcurl.curl_socket_t, read: Boolean, write: Boolean): Unit = - disp.unsafeRunAndForget { - - val newMonitor = fdpoller.registerFileDescriptor(fd, read, write).allocated.flatMap { - case (handle, unregister) => - ( - Option.when(read)(readLoop(fd, handle)).sequence, - Option.when(write)(writeLoop(fd, handle)).sequence, - ) - .mapN(Monitoring(_, _, handle, unregister)) - } +final private class CurlMultiSocket private ( + multiHandle: CurlMulti, + fdpoller: FileDescriptorPoller, + mapping: AtomicCell[IO, State], + disp: Dispatcher[IO], + timeout: Ref[IO, Option[FiberIO[Unit]]], +) extends CurlMultiDriver { + + private def init = IO { + val data = Utils.toPtr(this) + + libcurl + .curl_multi_setopt_timerdata( + multiHandle.multiHandle, + libcurl_const.CURLMOPT_TIMERDATA, + data, + ) + .throwOnError + + libcurl + .curl_multi_setopt_socketdata( + multiHandle.multiHandle, + libcurl_const.CURLMOPT_SOCKETDATA, + data, + ) + .throwOnError + + libcurl + .curl_multi_setopt_timerfunction( + multiHandle.multiHandle, + libcurl_const.CURLMOPT_TIMERFUNCTION, + onTimeout(_, _, _), + ) + .throwOnError + + libcurl + .curl_multi_setopt_socketfunction( + multiHandle.multiHandle, + libcurl_const.CURLMOPT_SOCKETFUNCTION, + onSocket(_, _, _, _, _), + ) + .throwOnError + + } *> notifyTimeout + + private def cleanup = + removeTimeoutIO !> mapping.evalUpdate { + case State.Active(monitors) => + // First clean all monitors, this ensures that we don't call any + // curl callbacks afterwards, and callback cleaning and notifications + // is deterministic. + monitors.values.toList.traverse(_.clean) !> IO { + // Remove and notify all easy handles + // Note that we do this in mapping.evalUpdate in order to block + // other new usages while cleaning up + multiHandle.clearCallbacks + }.as(State.Released) + case State.Released => + // It must not happen, but we leave a clue here if it happened! + IO.raiseError(new IllegalStateException("Cannot clean a released resource!")) + } - IO.uncancelable(_ => - mapping.evalUpdate { - case state @ State.Active(monitors) => - monitors.get(fd) match { - case None => - newMonitor.map(state.add(fd, _)) - case Some(s: Monitoring) => - s.clean *> newMonitor.map(state.add(fd, _)) - } - case State.Released => - IO.raiseError(new IllegalStateException("Runtime is already closed!")) - } - ) + def setup: Resource[IO, Unit] = Resource.make(init)(_ => cleanup) + + override def addHandlerTerminating( + easy: CurlEasy, + cb: Either[Throwable, Unit] => Unit, + ): IO[Unit] = IO(multiHandle.addHandle(easy.curl, cb)) + + override def addHandlerNonTerminating( + easy: CurlEasy, + cb: Either[Throwable, Unit] => Unit, + ): Resource[IO, Unit] = + Resource.make(addHandlerTerminating(easy, cb))(_ => IO(multiHandle.removeHandle(easy.curl))) + + def addFD(fd: libcurl.curl_socket_t, read: Boolean, write: Boolean): Unit = + disp.unsafeRunAndForget { + + val newMonitor = fdpoller.registerFileDescriptor(fd, read, write).allocated.flatMap { + case (handle, unregister) => + ( + Option.when(read)(readLoop(fd, handle)).sequence, + Option.when(write)(writeLoop(fd, handle)).sequence, + ) + .mapN(Monitoring(_, _, handle, unregister)) } - def remove(fd: libcurl.curl_socket_t): Unit = - disp.unsafeRunAndForget( - IO.uncancelable(_ => - mapping.evalUpdate { - case state @ State.Active(monitors) => - monitors.get(fd) match { - case None => IO(state) - case Some(s) => s.clean.as(state.remove(fd)) - } - case State.Released => - IO.raiseError(new IllegalStateException("Runtime is already closed!")) - } - ) + IO.uncancelable(_ => + mapping.evalUpdate { + case state @ State.Active(monitors) => + monitors.get(fd) match { + case None => + newMonitor.map(state.add(fd, _)) + case Some(s: Monitoring) => + s.clean *> newMonitor.map(state.add(fd, _)) + } + case State.Released => + IO.raiseError(new IllegalStateException("Runtime is already closed!")) + } ) + } - def setTimeout(duration: Long): Unit = disp.unsafeRunAndForget( - (IO.sleep(duration.millis) *> notifyTimeout).start.flatMap(f => - timeout.getAndSet(Some(f)).flatMap(_.cancel) + def remove(fd: libcurl.curl_socket_t): Unit = + disp.unsafeRunAndForget( + IO.uncancelable(_ => + mapping.evalUpdate { + case state @ State.Active(monitors) => + monitors.get(fd) match { + case None => IO(state) + case Some(s) => s.clean.as(state.remove(fd)) + } + case State.Released => + IO.raiseError(new IllegalStateException("Runtime is already closed!")) + } ) ) - private def removeTimeoutIO = timeout.getAndSet(None).flatMap(_.cancel) - def removeTimeout: Unit = disp.unsafeRunAndForget(removeTimeoutIO) - - def notifyTimeout: IO[Unit] = IO { - val running = stackalloc[Int]() - libcurl - .curl_multi_socket_action( - multiHandle.multiHandle, - libcurl_const.CURL_SOCKET_TIMEOUT, - 0, - running, - ) - .throwOnError - - multiHandle.onTick - } + def setTimeout(duration: Long): Unit = disp.unsafeRunAndForget( + (IO.sleep(duration.millis) *> notifyTimeout).start.flatMap(f => + timeout.getAndSet(Some(f)).flatMap(_.cancel) + ) + ) - private def action(fd: libcurl.curl_socket_t, ev: CInt) = IO { - val running = stackalloc[Int]() - libcurl.curl_multi_socket_action(multiHandle.multiHandle, fd, ev, running) + private def removeTimeoutIO = timeout.getAndSet(None).flatMap(_.cancel) + def removeTimeout: Unit = disp.unsafeRunAndForget(removeTimeoutIO) + + def notifyTimeout: IO[Unit] = IO { + val running = stackalloc[Int]() + libcurl + .curl_multi_socket_action( + multiHandle.multiHandle, + libcurl_const.CURL_SOCKET_TIMEOUT, + 0, + running, + ) + .throwOnError - multiHandle.onTick + multiHandle.onTick + } - Left(()) - } - private def readLoop(fd: libcurl.curl_socket_t, p: FileDescriptorPollHandle) = - p.pollReadRec(())(_ => action(fd, libcurl_const.CURL_CSELECT_IN)).start - private def writeLoop(fd: libcurl.curl_socket_t, p: FileDescriptorPollHandle) = - p.pollWriteRec(())(_ => action(fd, libcurl_const.CURL_CSELECT_OUT)).start + private def action(fd: libcurl.curl_socket_t, ev: CInt) = IO { + val running = stackalloc[Int]() + libcurl.curl_multi_socket_action(multiHandle.multiHandle, fd, ev, running) + + multiHandle.onTick + + Left(()) } + private def readLoop(fd: libcurl.curl_socket_t, p: FileDescriptorPollHandle) = + p.pollReadRec(())(_ => action(fd, libcurl_const.CURL_CSELECT_IN)).start + private def writeLoop(fd: libcurl.curl_socket_t, p: FileDescriptorPollHandle) = + p.pollWriteRec(())(_ => action(fd, libcurl_const.CURL_CSELECT_OUT)).start } From 64e7c482434a4f0a893328557593d30df930b6a4 Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Wed, 5 Jul 2023 12:32:54 +0330 Subject: [PATCH 09/13] Applied review suggestion https://github.com/http4s/http4s-curl/pull/124#discussion_r1252714755 --- .../curl/unsafe/CurlMultiPerformPoller.scala | 52 +++++++++---------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiPerformPoller.scala b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiPerformPoller.scala index b73d21c..02d51dc 100644 --- a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiPerformPoller.scala +++ b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiPerformPoller.scala @@ -62,36 +62,32 @@ final private[curl] class CurlMultiPerformPoller( val timeoutIsInf = nanos == -1 val noCallbacks = multiHandle.noCallbacks - if (timeoutIsInf && noCallbacks) false - else { - val timeoutMillis = - if (timeoutIsInf) Int.MaxValue else (nanos / 1e6).toInt - - if (nanos > 0) { - - libcurl - .curl_multi_poll( - multiHandle.multiHandle, - null, - 0.toUInt, - timeoutMillis, - null, - ) - .throwOnError - } - - if (noCallbacks) false - else { - val runningHandles = stackalloc[CInt]() - libcurl.curl_multi_perform(multiHandle.multiHandle, runningHandles).throwOnError - - multiHandle.onTick - - needsPoll = !runningHandles > 0 - true - } + val timeoutMillis = + if (timeoutIsInf) Int.MaxValue else (nanos / 1e6).toInt + + if (nanos > 0) { + + libcurl + .curl_multi_poll( + multiHandle.multiHandle, + null, + 0.toUInt, + timeoutMillis, + null, + ) + .throwOnError } + if (noCallbacks) false + else { + val runningHandles = stackalloc[CInt]() + libcurl.curl_multi_perform(multiHandle.multiHandle, runningHandles).throwOnError + + multiHandle.onTick + + needsPoll = !runningHandles > 0 + true + } } override def needsPoll(poller: Poller): Boolean = needsPoll From d7c5f68ce88abea6cdd4c7e635602112d47e73a2 Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Wed, 5 Jul 2023 12:33:45 +0330 Subject: [PATCH 10/13] Ignored multi socket tests on windows --- tests/http/src/test/scala/CurlClientMultiSocketSuite.scala | 1 + .../websocket/src/test/scala/CurlWSClientMultiSocketSuite.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala b/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala index 46d36e5..02584dc 100644 --- a/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala +++ b/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala @@ -29,6 +29,7 @@ import org.http4s.curl.unsafe.CurlMultiSocket import org.http4s.syntax.all._ class CurlClientMultiSocketSuite extends CatsEffectSuite { + override def munitIgnore: Boolean = scala.util.Properties.isWin val clientFixture: SyncIO[FunFixture[Client[IO]]] = ResourceFunFixture( CurlMultiSocket().map(http.CurlClient(_)) diff --git a/tests/websocket/src/test/scala/CurlWSClientMultiSocketSuite.scala b/tests/websocket/src/test/scala/CurlWSClientMultiSocketSuite.scala index 8997f35..fa61d4d 100644 --- a/tests/websocket/src/test/scala/CurlWSClientMultiSocketSuite.scala +++ b/tests/websocket/src/test/scala/CurlWSClientMultiSocketSuite.scala @@ -26,6 +26,7 @@ import org.http4s.curl.websocket.CurlWSClient import org.http4s.implicits._ class CurlWSClientMultiSocketSuite extends CatsEffectSuite { + override def munitIgnore: Boolean = scala.util.Properties.isWin private val clientFixture = ResourceFunFixture( CurlMultiSocket().evalMap( From 767177aa06794ae02d81c726e8dbce1601fd128d Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Wed, 5 Jul 2023 12:43:16 +0330 Subject: [PATCH 11/13] Bumped up base version --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 463c8b4..ce3f8fc 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ import Versions._ -ThisBuild / tlBaseVersion := "0.2" +ThisBuild / tlBaseVersion := "0.3" ThisBuild / developers := List( tlGitHubDev("armanbilge", "Arman Bilge") From 18200750bd788381043675241ab39f0c3426df5c Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Sun, 9 Jul 2023 12:26:16 +0330 Subject: [PATCH 12/13] Ignore remove fd when CurlMultiSocket is closed --- .../main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala index 2b96097..1457ec2 100644 --- a/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala +++ b/curl/src/main/scala/org/http4s/curl/unsafe/CurlMultiSocket.scala @@ -170,7 +170,7 @@ final private class CurlMultiSocket private ( multiHandle.clearCallbacks }.as(State.Released) case State.Released => - // It must not happen, but we leave a clue here if it happened! + // It can not happen, but we leave a clue here if it happened! IO.raiseError(new IllegalStateException("Cannot clean a released resource!")) } @@ -223,8 +223,7 @@ final private class CurlMultiSocket private ( case None => IO(state) case Some(s) => s.clean.as(state.remove(fd)) } - case State.Released => - IO.raiseError(new IllegalStateException("Runtime is already closed!")) + case State.Released => IO(State.Released) // It's already removed! } ) ) From d5f7ca85bad248c2c16982cb4d6d4eca59068aea Mon Sep 17 00:00:00 2001 From: Hossein Naderi Date: Sun, 9 Jul 2023 12:32:59 +0330 Subject: [PATCH 13/13] Tests with more parallel requests --- .../src/test/scala/CurlClientMultiSocketSuite.scala | 12 ++++++------ tests/http/src/test/scala/CurlClientSuite.scala | 11 ++++++----- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala b/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala index 02584dc..37dff08 100644 --- a/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala +++ b/tests/http/src/test/scala/CurlClientMultiSocketSuite.scala @@ -65,21 +65,21 @@ class CurlClientMultiSocketSuite extends CatsEffectSuite { client.expect[String]("").intercept[CurlError] } - clientFixture.test("3 post echos") { client => + clientFixture.test("post echos") { client => Random.scalaUtilRandom[IO].flatMap { random => random .nextString(8) .flatMap { s => - val msg = s"hello postman $s" + val msg = s"hello $s" client .expect[String]( Request[IO](POST, uri = uri"http://localhost:8080/http/echo").withEntity(msg) ) - .flatTap(IO.println) - .map(_.contains(msg)) - .assert + .assertEquals(msg) + .attempt } - .parReplicateA_(3) + .parReplicateA(50) + .assertEquals(List.fill(50)(Right(()))) } } diff --git a/tests/http/src/test/scala/CurlClientSuite.scala b/tests/http/src/test/scala/CurlClientSuite.scala index 40c3ce6..fb8c8d1 100644 --- a/tests/http/src/test/scala/CurlClientSuite.scala +++ b/tests/http/src/test/scala/CurlClientSuite.scala @@ -66,20 +66,21 @@ class CurlClientSuite extends CatsEffectSuite { client.expect[String]("").intercept[CurlError] } - clientFixture.test("3 post echos") { client => + clientFixture.test("post echos") { client => Random.scalaUtilRandom[IO].flatMap { random => random .nextString(8) .flatMap { s => - val msg = s"hello postman $s" + val msg = s"hello $s" client .expect[String]( Request[IO](POST, uri = uri"http://localhost:8080/http/echo").withEntity(msg) ) - .map(_.contains(msg)) - .assert + .assertEquals(msg) + .attempt } - .parReplicateA_(3) + .parReplicateA(50) + .assertEquals(List.fill(50)(Right(()))) } }