From 4e352c420de4e11806956840c53e1197aedcf39e Mon Sep 17 00:00:00 2001 From: Clonkk Date: Wed, 6 Dec 2023 12:38:30 +0100 Subject: [PATCH 1/8] move tryReceive to be truly non blocking for coherence with channels, introduce waitForReceive to wait a message based on RCVTIMEO in a blocking way instead; add TODO comment about exception in =destroy introduced by Nim 2.0 --- tests/tzmq.nim | 67 +++++++++++++++++++++++++++++++++++----- zmq/asynczmq.nim | 1 + zmq/bindings.nim | 1 - zmq/connections.nim | 75 +++++++++++++++++++++++++++++++++++++++------ 4 files changed, 125 insertions(+), 19 deletions(-) diff --git a/tests/tzmq.nim b/tests/tzmq.nim index 84944dd..2e03ad3 100644 --- a/tests/tzmq.nim +++ b/tests/tzmq.nim @@ -1,5 +1,5 @@ import ../zmq -import std/[unittest, os] +import std/[unittest, os, times, monotimes] import std/[asyncdispatch, asyncfutures] proc reqrep() = @@ -88,7 +88,8 @@ proc routerdealer() = test "routerdealer": const sockaddr = "tcp://127.0.0.1:55001" var router = listen(sockaddr, mode = ROUTER) - router.setsockopt(RCVTIMEO, 350.cint) + router.setsockopt(RCVTIMEO, 500.cint) + defer: router.close() var dealer = connect(sockaddr, mode = DEALER) defer: dealer.close() @@ -106,12 +107,23 @@ proc routerdealer() = check dealer.receive() == payload # Let receive timeout block: + let start = getMonoTime() # On receive return empty message - let recv = router.receive() + let + recv = router.receive() + stop = getMonoTime() + elapsed = stop - start + check (elapsed - initDuration(milliseconds=500)) < initDuration(milliseconds=1) check recv == "" + block: # On try receive, check flag is flase - let recv = router.tryReceive() + let + start = getMonoTime() + recv = router.waitForReceive(350) + stop = getMonoTime() + elapsed = stop - start + check (elapsed - initDuration(milliseconds=350)) < initDuration(milliseconds=1) check recv.msgAvailable == false proc inproc_sharectx() = @@ -146,31 +158,32 @@ proc pairpair() = var pairs = @[listen(sockaddr, PAIR), connect(sockaddr, PAIR)] pairs[1].setsockopt(RCVTIMEO, 500.cint) + block: pairs[0].send(ping, SNDMORE) pairs[0].send(ping, SNDMORE) pairs[0].send(ping) block: - let content = pairs[1].tryReceive() + let content = pairs[1].waitForReceive() check content.msgAvailable check content.moreAvailable check content.msg == ping block: - let content = pairs[1].tryReceive() + let content = pairs[1].waitForReceive() check content.msgAvailable check content.moreAvailable check content.msg == ping block: - let content = pairs[1].tryReceive() + let content = pairs[1].waitForReceive() check content.msgAvailable check (not content.moreAvailable) check content.msg == ping block: - let content = pairs[1].tryReceive() + let content = pairs[1].waitForReceive() check (not content.msgAvailable) block: @@ -300,6 +313,43 @@ proc async_pub_sub() = test "async pub_sub": check count == N_mSGS +proc non_blocking_recv() = + const sockaddr = "tcp://127.0.0.1:55001" + test "non-blocking receive": + var router = listen(sockaddr, mode = ROUTER) + let res = router.tryReceive() + check res == (false, false, "") + + var dealer = connect(sockaddr, mode = DEALER) + let payload = "payload" + block: + # Dealer send a message to router + dealer.send(payload) + + block: + # Remove "envelope" of router / dealer + let dealerSocketId = router.receive() + let res = router.waitForReceive(250) + check res.msgAvailable + check not res.moreAvailable + check res.msg == payload + + block: + # Remove "envelope" of router / dealer + let + start = getMonoTime() + res = router.waitForReceive(250) + stop = getMonoTime() + elapsed = stop - start + check (elapsed - initDuration(milliseconds=250)) < initDuration(milliseconds=1) + + check not res.msgAvailable + check not res.moreAvailable + check res.msg == "" + + router.close(250) + dealer.close(250) + when isMainModule: reqrep() pubsub() @@ -308,3 +358,4 @@ when isMainModule: pairpair() async_pub_sub() asyncpoll() + non_blocking_recv() diff --git a/zmq/asynczmq.nim b/zmq/asynczmq.nim index b2ddd66..4d3ab67 100644 --- a/zmq/asynczmq.nim +++ b/zmq/asynczmq.nim @@ -15,6 +15,7 @@ proc len*(poller: AsyncZPoller): int = result = poller.zpoll.len() proc `=destroy`*(obj: var AsyncZPoller) = + # TODO Handle exception in =destroy if hasPendingOperations(): drain(500) diff --git a/zmq/bindings.nim b/zmq/bindings.nim index 2c7eaad..1916461 100644 --- a/zmq/bindings.nim +++ b/zmq/bindings.nim @@ -1,4 +1,3 @@ -{.deadCodeElim: on.} when defined(windows): const zmqdll* = "(lib|)zmq.dll" diff --git a/zmq/connections.nim b/zmq/connections.nim index 17c2c03..83fd175 100644 --- a/zmq/connections.nim +++ b/zmq/connections.nim @@ -142,6 +142,9 @@ when defined(gcDestructors): proc close*(c: var ZConnectionImpl, linger: int = 500) proc `=destroy`(x: var ZConnectionImpl) = if x.alive: + # TODO + # Handle exception in =destroy hook or use private close without possible exception ? + # How to remove "var T" in =destroy when the ojbect needs to be mutated ? x.close() #[ @@ -302,12 +305,7 @@ proc sendAll*(c: ZConnection, msg: varargs[string]) = sendAll(c.socket, msg) # receive with ZSocket type -proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] = - ## Receives a message from a socket. - ## - ## Indicate whether a message was received or EAGAIN occured by ``msgAvailable`` - ## - ## Indicate if more parts are needed to be received by ``moreAvailable`` +proc receiveImpl(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] = result.moreAvailable = false result.msgAvailable = false @@ -321,6 +319,8 @@ proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvaila result.msg = newString(msg_size(m)) if result.msg.len > 0: copyMem(addr(result.msg[0]), msg_data(m), result.msg.len) + + # Check if more part follows result.moreAvailable = msg_more(m).bool else: # Either an error or EAGAIN @@ -330,11 +330,53 @@ proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvaila if msg_close(m) != 0: zmqError() +proc waitForReceive*(s: ZSocket, timeout: int = -2, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] = + ## Set RCVTIMEO for the socket and wait until a message is available. + ## This function is blocking. + ## + ## timeout: + ## -1 means infinite wait + ## positive value is in milliseconds + ## negative value strictly below -1 are ignored and the wait time will default to RCVTIMEO set for the socket (which by default is -1). + ## + ## Indicate whether a message was received or EAGAIN occured by ``msgAvailable`` + ## Indicate if more parts are needed to be received by ``moreAvailable`` + result.moreAvailable = false + result.msgAvailable = false + + let curtimeout : cint = getsockopt[cint](s, RCVTIMEO) + + # If rcvtimeout is set and not timeout argument is passed (or -1), use the existing timeout + # Otherwise update the rcvtimeout + let shouldUpdateTimeout = (timeout >= -1) and ((curtimeout > 0 and timeout > 0) or (curtimeout < 0)) + + if shouldUpdateTimeout: + s.setsockopt(RCVTIMEO, timeout.cint) + + result = receiveImpl(s, flags) + + if shouldUpdateTimeout: + s.setsockopt(RCVTIMEO, curtimeout.cint) + +proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] = + ## Receives a message from a socket in a non-blocking way. + ## + ## Indicate whether a message was received or EAGAIN occured by ``msgAvailable`` + ## + ## Indicate if more parts are needed to be received by ``moreAvailable`` + result.moreAvailable = false + result.msgAvailable = false + + let status = getsockopt[cint](s, ZSockOptions.EVENTS).int() + # Check if socket has an incoming message + if (status and ZMQ_POLLIN) != 0: + result = receiveImpl(s, flags) + proc receive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): string = ## Receive a message on socket. - ## + # ## Return an empty string on EAGAIN - tryReceive(s, flags).msg + receiveImpl(s, flags).msg proc receiveAll*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): seq[string] = ## Receive all parts of a message @@ -342,15 +384,28 @@ proc receiveAll*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): seq[string] = ## If EAGAIN occurs without any data being received, it will be an empty seq var expectMessage = true while expectMessage: - let (msgAvailable, moreAvailable, msg) = tryReceive(s, flags) + let (msgAvailable, moreAvailable, msg) = receiveImpl(s, flags) if msgAvailable: result.add msg expectMessage = moreAvailable else: expectMessage = false +proc waitForReceive*(c: ZConnection, timeout: int = -1, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] = + ## Set RCVTIMEO for the socket and wait until a message is available. + ## This function is blocking. + ## + ## timeout: + ## -1 means infinite wait + ## positive value is in milliseconds + ## negative value strictly below -1 are ignored and the wait time will default to RCVTIMEO set for the socket (which by default is -1). + ## + ## Indicate whether a message was received or EAGAIN occured by ``msgAvailable`` + ## Indicate if more parts are needed to be received by ``moreAvailable`` + waitForReceive(c.socket, timeout, flags) + proc tryReceive*(c: ZConnection, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] = - ## Receives a message from a connection. + ## Receives a message from a socket in a non-blocking way. ## ## Indicate whether a message was received or EAGAIN occured by ``msgAvailable`` ## From 0f2b7a58a4195c097f2e5ce22784f519760e63df Mon Sep 17 00:00:00 2001 From: Clonkk Date: Wed, 6 Dec 2023 14:48:15 +0100 Subject: [PATCH 2/8] update nimble version --- zmq.nimble | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zmq.nimble b/zmq.nimble index 3fbfdf0..85bf94f 100644 --- a/zmq.nimble +++ b/zmq.nimble @@ -1,6 +1,6 @@ # Package -version = "1.4.0" +version = "1.5.0" author = "Andreas Rumpf" description = "ZeroMQ wrapper" license = "MIT" From 2dc38c2ca9820bbf32b54535f9cdcd205b5080b1 Mon Sep 17 00:00:00 2001 From: Clonkk Date: Wed, 13 Dec 2023 10:11:06 +0100 Subject: [PATCH 3/8] implement hooks --- tests/tasync_only.nim | 85 +++++++++++++++++++++++++++++++++++++++++++ tests/tdestruc.nim | 49 +++++++++++++++++++++++++ zmq/asynczmq.nim | 29 ++++++++++++--- zmq/connections.nim | 48 ++++++++++++++++++++---- 4 files changed, 199 insertions(+), 12 deletions(-) create mode 100644 tests/tasync_only.nim create mode 100644 tests/tdestruc.nim diff --git a/tests/tasync_only.nim b/tests/tasync_only.nim new file mode 100644 index 0000000..b2aa2e4 --- /dev/null +++ b/tests/tasync_only.nim @@ -0,0 +1,85 @@ +import ../zmq +import std/[asyncdispatch, asyncfutures, unittest] + +proc asyncpoll() = + test "asyncZPoller": + const zaddr = "tcp://127.0.0.1:15571" + const zaddr2 = "tcp://127.0.0.1:15572" + var pusher = listen(zaddr, PUSH) + var puller = connect(zaddr, PULL) + + var pusher2 = listen(zaddr2, PUSH) + var puller2 = connect(zaddr2, PULL) + var poller: AsyncZPoller + + var i = 0 + # Register the callback + # Check message received are correct (should be even integer in string format) + var msglist = @["0", "2", "4", "6", "8"] + var msgCount = 0 + poller.register( + puller2, + ZMQ_POLLIN, + proc(x: ZSocket) = + let msg = x.receive() + inc(msgCount) + if msglist.contains(msg): + msglist.delete(0) + check true + else: + check false + ) + # Check message received are correct (should be even integer in string format) + var msglist2 = @["0", "2", "4", "6", "8"] + var msgCount2 = 0 + poller.register( + puller, + ZMQ_POLLIN, + proc(x: ZSocket) = + let msg = x.receive() + inc(msgCount2) + if msglist2.contains(msg): + msglist2.delete(0) + check true + else: + check false + ) + + let + N = 10 + N_MAX_TIMEOUT = 5 + + var sndCount = 0 + # A client send some message + for i in 0.. Date: Wed, 13 Dec 2023 11:20:06 +0100 Subject: [PATCH 4/8] improve destructor poller + check valgrind --- tests/{tasync_only.nim => async_demo.nim} | 21 +++++++++++---------- tests/tdestruc.nim | 13 +++---------- zmq/asynczmq.nim | 4 +++- zmq/connections.nim | 4 ++-- zmq/poller.nim | 3 +++ 5 files changed, 22 insertions(+), 23 deletions(-) rename tests/{tasync_only.nim => async_demo.nim} (81%) diff --git a/tests/tasync_only.nim b/tests/async_demo.nim similarity index 81% rename from tests/tasync_only.nim rename to tests/async_demo.nim index b2aa2e4..c74d52a 100644 --- a/tests/tasync_only.nim +++ b/tests/async_demo.nim @@ -1,8 +1,9 @@ import ../zmq -import std/[asyncdispatch, asyncfutures, unittest] +import std/[asyncdispatch] proc asyncpoll() = - test "asyncZPoller": + # test "asyncZPoller": + block: const zaddr = "tcp://127.0.0.1:15571" const zaddr2 = "tcp://127.0.0.1:15572" var pusher = listen(zaddr, PUSH) @@ -14,7 +15,7 @@ proc asyncpoll() = var i = 0 # Register the callback - # Check message received are correct (should be even integer in string format) + # assert message received are correct (should be even integer in string format) var msglist = @["0", "2", "4", "6", "8"] var msgCount = 0 poller.register( @@ -25,11 +26,11 @@ proc asyncpoll() = inc(msgCount) if msglist.contains(msg): msglist.delete(0) - check true + assert true else: - check false + assert false ) - # Check message received are correct (should be even integer in string format) + # assert message received are correct (should be even integer in string format) var msglist2 = @["0", "2", "4", "6", "8"] var msgCount2 = 0 poller.register( @@ -40,9 +41,9 @@ proc asyncpoll() = inc(msgCount2) if msglist2.contains(msg): msglist2.delete(0) - check true + assert true else: - check false + assert false ) let @@ -73,8 +74,8 @@ proc asyncpoll() = while hasPendingOperations(): drain() - check msgCount == msgCount2 - check msgCount == sndCount + assert msgCount == msgCount2 + assert msgCount == sndCount pusher.close() puller.close() diff --git a/tests/tdestruc.nim b/tests/tdestruc.nim index b956536..0f46f28 100644 --- a/tests/tdestruc.nim +++ b/tests/tdestruc.nim @@ -5,12 +5,10 @@ proc sockHandler(req, rep: ZConnection, pong: string) = req.send(pong) let r = rep.receive() check r == pong - debugEcho "EndCB" + # debugEcho "EndCB" proc testDestroy() = const sockaddr = "tcp://127.0.0.1:55001" - # var unsafeReqPtr : ptr ZConnection - # var unsafeRepPtr : ptr ZConnection test "Destroy & Copy": let @@ -30,20 +28,15 @@ proc testDestroy() = req2.send(ping) let r = rep.receive() check r == ping - debugEcho "end block scope" + # debugEcho "end block scope" rep.send(pong) block: var req2 = req let r = req2.receive() check r == pong - debugEcho "end test scope" + # debugEcho "end test scope" - # test "ZConnection isNil": - # # Won't work - # check unsafeReqPtr[].isNil() - # check unsafeRepPtr[].isNil() - # when isMainModule: testDestroy() diff --git a/zmq/asynczmq.nim b/zmq/asynczmq.nim index 68336ca..40b99b2 100644 --- a/zmq/asynczmq.nim +++ b/zmq/asynczmq.nim @@ -35,8 +35,10 @@ proc waitAll(obj: AsyncZPoller) {.raises: [].} = except Exception: discard -proc `=destroy`*(obj: AsyncZPoller) {.raises: [].} = +proc `=destroy`*(obj: AsyncZPoller) = obj.waitAll() + `=destroy`(obj.cb) + `=destroy`(obj.zpoll) proc register*(poller: var AsyncZPoller, sock: ZSocket, event: int, cb: AsyncZPollCB) = ## Register ZSocket function diff --git a/zmq/connections.nim b/zmq/connections.nim index ecbb581..06a534e 100644 --- a/zmq/connections.nim +++ b/zmq/connections.nim @@ -142,7 +142,7 @@ proc getsockopt*[T: SomeOrdinal|string](c: ZConnection, option: ZSockOptions): T when defined(gcDestructors): proc `=destroy`(x: ZConnectionImpl) = # Handle exception in =destroy hook or use private close without possible exception ? - if x.alive: + if x.alive and not isNil(x.socket): var linger = 500.cint # Use low level primitive to avoid throwing if setsockopt(x.socket, LINGER, addr(linger), sizeof(linger)) != 0: @@ -153,7 +153,7 @@ when defined(gcDestructors): # Handle error in closure ? echo("Error in closing ZMQ-socket") - if x.ownctx: + if x.ownctx and not isNil(x.context): if ctx_term(x.context) != 0: echo("Error in closing ZMQ-context") diff --git a/zmq/poller.nim b/zmq/poller.nim index 1984cc0..ef9ec06 100644 --- a/zmq/poller.nim +++ b/zmq/poller.nim @@ -12,6 +12,9 @@ type ## It is mandatory to manage the lifetimes of the polled sockets independently of the ``ZPoller`` - either manually or by using a ``ZConnection``. items*: seq[ZPollItem] +proc `=destroy`*(poll: Zpoller) = + `=destroy`(poll.items) + proc `[]`*(poller: ZPoller, idx: int): lent ZPollItem = ## Access registered element by index poller.items[idx] From 98cd0351ae77117bde927be41720aaa4e4c003df Mon Sep 17 00:00:00 2001 From: Clonkk Date: Wed, 13 Dec 2023 12:45:30 +0100 Subject: [PATCH 5/8] add --mm:orc to doc gen --- zmq.nimble | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zmq.nimble b/zmq.nimble index 85bf94f..4f1d9e2 100644 --- a/zmq.nimble +++ b/zmq.nimble @@ -6,7 +6,7 @@ description = "ZeroMQ wrapper" license = "MIT" # Dependencies -requires "nim >= 0.18.0" +requires "nim >= 1.4.0" task buildexamples, "Compile all examples": withDir "examples": @@ -17,5 +17,5 @@ task buildexamples, "Compile all examples": selfExec("cpp --mm:orc -d:release " & fstr) task gendoc, "Generate documentation": - exec("nimble doc --project zmq.nim --out:docs/") + exec("nim doc --mm:orc --project --out:docs/ zmq.nim") From 949f952937f24b7556fe4435de208220988fe158 Mon Sep 17 00:00:00 2001 From: Clonkk Date: Wed, 13 Dec 2023 14:01:16 +0100 Subject: [PATCH 6/8] remove commented code --- tests/tdestruc.nim | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/tdestruc.nim b/tests/tdestruc.nim index 0f46f28..24b43c6 100644 --- a/tests/tdestruc.nim +++ b/tests/tdestruc.nim @@ -5,7 +5,6 @@ proc sockHandler(req, rep: ZConnection, pong: string) = req.send(pong) let r = rep.receive() check r == pong - # debugEcho "EndCB" proc testDestroy() = const sockaddr = "tcp://127.0.0.1:55001" @@ -17,8 +16,6 @@ proc testDestroy() = var rep = listen(sockaddr, REP) var req = connect(sockaddr, REQ) - # unsafeRepPtr = addr(rep) - # unsafeReqPtr = addr(req) sockHandler(req, rep, ping) sockHandler(rep, req, pong) @@ -28,14 +25,12 @@ proc testDestroy() = req2.send(ping) let r = rep.receive() check r == ping - # debugEcho "end block scope" rep.send(pong) block: var req2 = req let r = req2.receive() check r == pong - # debugEcho "end test scope" when isMainModule: testDestroy() From 0c1906cd4d077467ad40b0db52efc1cfb546aeb0 Mon Sep 17 00:00:00 2001 From: Regis Caillaud <35006197+Clonkk@users.noreply.github.com> Date: Fri, 15 Dec 2023 11:42:47 +0100 Subject: [PATCH 7/8] don't export alive field --- zmq/connections.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zmq/connections.nim b/zmq/connections.nim index 06a534e..789414c 100644 --- a/zmq/connections.nim +++ b/zmq/connections.nim @@ -16,7 +16,7 @@ type context*: ZContext ## Zmq context. Can be 'owned' by another connection (useful for inproc protocol). socket*: ZSocket ## Embedded socket. ownctx: bool ## Boolean indicating if the connection owns the Zmq context - alive*: bool ## Boolean indicating if the connection has been closed + alive: bool ## Boolean indicating if the connection has been closed sockaddr: string ## Address of the embedded socket ZConnection * = ref ZConnectionImpl From 073831a384f036fa8d9973873e36326254a54b6e Mon Sep 17 00:00:00 2001 From: Regis Caillaud <35006197+Clonkk@users.noreply.github.com> Date: Fri, 15 Dec 2023 11:57:16 +0100 Subject: [PATCH 8/8] Update comments on connections.nim --- zmq/connections.nim | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/zmq/connections.nim b/zmq/connections.nim index 789414c..d986241 100644 --- a/zmq/connections.nim +++ b/zmq/connections.nim @@ -13,11 +13,11 @@ type ZConnectionImpl* {.pure, final.} = object ## A Zmq connection. Since ``ZContext`` and ``ZSocket`` are pointers, it is highly recommended to **not** copy ``ZConnection``. - context*: ZContext ## Zmq context. Can be 'owned' by another connection (useful for inproc protocol). - socket*: ZSocket ## Embedded socket. - ownctx: bool ## Boolean indicating if the connection owns the Zmq context - alive: bool ## Boolean indicating if the connection has been closed - sockaddr: string ## Address of the embedded socket + context*: ZContext ## Zmq context from C-bindings. + socket*: ZSocket ## Zmq socket from C-bindings. + ownctx: bool # Boolean indicating if the connection owns the Zmq context + alive: bool # Boolean indicating if the connection has been closed + sockaddr: string # Address of the embedded socket ZConnection * = ref ZConnectionImpl @@ -213,7 +213,7 @@ proc bindAddr*(conn: var ZConnection, address: string) = conn.sockaddr = address proc connect*(address: string, mode: ZSocketType, context: ZContext): ZConnection = - ## Open a new connection on an external ``ZContext`` and connect the socket + ## Open a new connection on an external ``ZContext`` and connect the socket. External context are useful for inproc connections. result = new(ZConnection) result.context = context result.ownctx = false @@ -248,7 +248,7 @@ proc connect*(address: string, mode: ZSocketType): ZConnection = result.ownctx = true proc listen*(address: string, mode: ZSocketType, context: ZContext): ZConnection = - ## Open a new connection on an external ``ZContext`` and binds on the socket + ## Open a new connection on an external ``ZContext`` and binds on the socket. External context are useful for inproc connections. runnableExamples: import zmq var monoserver = listen("tcp://127.0.0.1:34444", PAIR)