Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added tryReceiveAsync, receiveAllAsync #50

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions examples/ex11_async_router_dealer.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import std/asyncdispatch
import std/sequtils
import std/strutils
import ../zmq

const N_WORKER = 3
const host = "tcp://localhost:5572"

proc worker(): Future[void] {.async.} =
let socket = zmq.connect(host, DEALER)
socket.sendAll("READY")
var isRunning = true
while isRunning:
let multiparts = await socket.receiveAllAsync()
echo "worker receive ", multiparts
let command = multiparts[0]
case command:
of "JOB":
let x = multiparts[1]
let y = parseInt(x)*2
socket.sendAll("DONE", $x, $y)
of "KILL":
isRunning = false
socket.sendAll("END")

proc router(): Future[void] {.async.} =
let socket = zmq.listen(host, ROUTER)
var jobs = toSeq(1..10)
var nWorker = 0
var isRunning = true
while isRunning:
let multiparts = await socket.receiveAllAsync()
echo "router receive ", multiparts
let workerId = multiparts[0]
let command = multiparts[1]
case command:
of "READY":
nWorker += 1
if jobs.len > 0:
socket.sendAll(workerId, "JOB", $jobs.pop())
else:
socket.sendAll(workerId, "KILL")
of "END":
nWorker -= 1
if nWorker == 0:
# stop router if no workers
isRunning = false
of "DONE":
let x = multiparts[2]
let y = multiparts[3]
assert parseInt(x)*2 == parseInt(y)
if jobs.len > 0:
socket.sendAll(workerId, "JOB", $jobs.pop())
else:
socket.sendAll(workerId, "KILL")
else:
raise newException(CatchableError, "unknown command")

when isMainModule:
echo "ex11_async_router_dealer.nim"
asyncCheck router()
for i in 1..N_WORKER:
asyncCheck worker()
while hasPendingOperations():
poll()
46 changes: 33 additions & 13 deletions zmq/asynczmq.nim
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,7 @@ proc pollAsync*(poller: AsyncZPoller, timeout: int = 2) : Future[int] =

result.complete(r)

proc receiveAsync*(conn: ZConnection): Future[string] =
## Similar to `receive()`, but `receiveAsync()` allows other async tasks to run.
## `receiveAsync()` allows other async tasks to run in those cases.
##
## This will not work in some case because it depends on ZMQ_FD which is not necessarily the 'true' FD of the socket
##
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
let fut = newFuture[string]("receiveAsync")
result = fut
let sock = conn.socket

template receiveAsyncCallbackTemplate(fut: Future, sock: ZSocket, recv, cb) =
proc cb(fd: AsyncFD): bool {.closure, gcsafe.} =
# the cb should work on the low level socket and not the ZConnection object
result = true
Expand All @@ -116,15 +106,45 @@ proc receiveAsync*(conn: ZConnection): Future[string] =
else:
# ready to read
unregister(fd)
fut.complete sock.receive(DONTWAIT)
fut.complete recv(sock, DONTWAIT)
except:
unregister(fd)
fut.fail getCurrentException()

proc receiveAsync*(conn: ZConnection): Future[string] =
## Similar to `receive()`, but `receiveAsync()` allows other async tasks to run.
## `receiveAsync()` allows other async tasks to run in those cases.
##
## This will not work in some case because it depends on ZMQ_FD which is not necessarily the 'true' FD of the socket
##
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
let fut = newFuture[string]("receiveAsync")
result = fut
receiveAsyncCallbackTemplate(fut, conn.socket, receive, cb)
let fd = getsockopt[cint](conn, ZSockOptions.FD).AsyncFD
register(fd)
discard cb(fd)

proc tryReceiveAsync*(conn: ZConnection): Future[tuple[msgAvailable: bool, moreAvailable: bool, msg: string]] =
## Async version of `tryReceive()`
let fut = newFuture[tuple[msgAvailable: bool, moreAvailable: bool, msg: string]]("tryReceiveAsync")
result = fut
receiveAsyncCallbackTemplate(fut, conn.socket, tryReceive, cb)
let fd = getsockopt[cint](conn, ZSockOptions.FD).AsyncFD
register(fd)
discard cb(fd)

proc receiveAllAsync*(conn: ZConnection): Future[seq[string]] {.async.} =
## async version for `receiveAll()`
var expectMessage = true
while expectMessage:
let (msgAvailable, moreAvailable, msg) = await tryReceiveAsync(conn)
if msgAvailable:
result.add msg
expectMessage = moreAvailable
else:
expectMessage = false

proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWAIT): Future[void] =
## `send()` is blocking for some connection types (e.g. PUSH, DEALER).
## `sendAsync()` allows other async tasks to run in those cases.
Expand All @@ -134,11 +154,11 @@ proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWA
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
let fut = newFuture[void]("sendAsync")
result = fut
let sock = conn.socket

let status = getsockopt[cint](conn, ZSockOptions.EVENTS)
if (status and ZMQ_POLLOUT) == 0:
# wait until queue available
let sock = conn.socket
proc cb(fd: AsyncFD): bool {.closure, gcsafe.} =
result = true

Expand Down
Loading