diff --git a/examples/ex11_async_router_dealer.nim b/examples/ex11_async_router_dealer.nim new file mode 100644 index 0000000..1fc4ee0 --- /dev/null +++ b/examples/ex11_async_router_dealer.nim @@ -0,0 +1,65 @@ +import std/asyncdispatch +import std/sequtils +import std/strutils +import ../zmq + +const N_WORKER = 3 +const host = "tcp://localhost:5572" + +proc worker(): Future[void] {.async.} = + let socket = zmq.connect(host, DEALER) + socket.sendAll("READY") + var isRunning = true + while isRunning: + let multiparts = await socket.receiveAllAsync() + echo "worker receive ", multiparts + let command = multiparts[0] + case command: + of "JOB": + let x = multiparts[1] + let y = parseInt(x)*2 + socket.sendAll("DONE", $x, $y) + of "KILL": + isRunning = false + socket.sendAll("END") + +proc router(): Future[void] {.async.} = + let socket = zmq.listen(host, ROUTER) + var jobs = toSeq(1..10) + var nWorker = 0 + var isRunning = true + while isRunning: + let multiparts = await socket.receiveAllAsync() + echo "router receive ", multiparts + let workerId = multiparts[0] + let command = multiparts[1] + case command: + of "READY": + nWorker += 1 + if jobs.len > 0: + socket.sendAll(workerId, "JOB", $jobs.pop()) + else: + socket.sendAll(workerId, "KILL") + of "END": + nWorker -= 1 + if nWorker == 0: + # stop router if no workers + isRunning = false + of "DONE": + let x = multiparts[2] + let y = multiparts[3] + assert parseInt(x)*2 == parseInt(y) + if jobs.len > 0: + socket.sendAll(workerId, "JOB", $jobs.pop()) + else: + socket.sendAll(workerId, "KILL") + else: + raise newException(CatchableError, "unknown command") + +when isMainModule: + echo "ex11_async_router_dealer.nim" + asyncCheck router() + for i in 1..N_WORKER: + asyncCheck worker() + while hasPendingOperations(): + poll() diff --git a/zmq/asynczmq.nim b/zmq/asynczmq.nim index e73eb4b..c1d93d5 100644 --- a/zmq/asynczmq.nim +++ b/zmq/asynczmq.nim @@ -90,17 +90,7 @@ proc pollAsync*(poller: AsyncZPoller, timeout: int = 2) : Future[int] = result.complete(r) -proc receiveAsync*(conn: ZConnection): Future[string] = - ## Similar to `receive()`, but `receiveAsync()` allows other async tasks to run. - ## `receiveAsync()` allows other async tasks to run in those cases. - ## - ## This will not work in some case because it depends on ZMQ_FD which is not necessarily the 'true' FD of the socket - ## - ## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411 - let fut = newFuture[string]("receiveAsync") - result = fut - let sock = conn.socket - +template receiveAsyncCallbackTemplate(fut: Future, sock: ZSocket, recv, cb) = proc cb(fd: AsyncFD): bool {.closure, gcsafe.} = # the cb should work on the low level socket and not the ZConnection object result = true @@ -116,15 +106,45 @@ proc receiveAsync*(conn: ZConnection): Future[string] = else: # ready to read unregister(fd) - fut.complete sock.receive(DONTWAIT) + fut.complete recv(sock, DONTWAIT) except: unregister(fd) fut.fail getCurrentException() +proc receiveAsync*(conn: ZConnection): Future[string] = + ## Similar to `receive()`, but `receiveAsync()` allows other async tasks to run. + ## `receiveAsync()` allows other async tasks to run in those cases. + ## + ## This will not work in some case because it depends on ZMQ_FD which is not necessarily the 'true' FD of the socket + ## + ## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411 + let fut = newFuture[string]("receiveAsync") + result = fut + receiveAsyncCallbackTemplate(fut, conn.socket, receive, cb) let fd = getsockopt[cint](conn, ZSockOptions.FD).AsyncFD register(fd) discard cb(fd) +proc tryReceiveAsync*(conn: ZConnection): Future[tuple[msgAvailable: bool, moreAvailable: bool, msg: string]] = + ## Async version of `tryReceive()` + let fut = newFuture[tuple[msgAvailable: bool, moreAvailable: bool, msg: string]]("tryReceiveAsync") + result = fut + receiveAsyncCallbackTemplate(fut, conn.socket, tryReceive, cb) + let fd = getsockopt[cint](conn, ZSockOptions.FD).AsyncFD + register(fd) + discard cb(fd) + +proc receiveAllAsync*(conn: ZConnection): Future[seq[string]] {.async.} = + ## async version for `receiveAll()` + var expectMessage = true + while expectMessage: + let (msgAvailable, moreAvailable, msg) = await tryReceiveAsync(conn) + if msgAvailable: + result.add msg + expectMessage = moreAvailable + else: + expectMessage = false + proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWAIT): Future[void] = ## `send()` is blocking for some connection types (e.g. PUSH, DEALER). ## `sendAsync()` allows other async tasks to run in those cases. @@ -134,11 +154,11 @@ proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWA ## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411 let fut = newFuture[void]("sendAsync") result = fut - let sock = conn.socket let status = getsockopt[cint](conn, ZSockOptions.EVENTS) if (status and ZMQ_POLLOUT) == 0: # wait until queue available + let sock = conn.socket proc cb(fd: AsyncFD): bool {.closure, gcsafe.} = result = true