Skip to content

Commit

Permalink
Fix closing error
Browse files Browse the repository at this point in the history
  • Loading branch information
codingl2k1 committed Dec 5, 2024
1 parent 4126e1e commit 2d1e9e9
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 24 deletions.
7 changes: 0 additions & 7 deletions python/xoscar/backends/communication/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,5 @@ def _discard(t):
async def close(self):
await super().close()
if self._task is not None:
task_loop = self._task.get_loop()
if task_loop is not None:
if not task_loop.is_running():
logger.warning(
"Dummy channel cancel task on a stopped loop, dest address: %s.",
self.dest_address,
)
self._task.cancel()
self._task = None
18 changes: 12 additions & 6 deletions python/xoscar/backends/communication/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from ...utils import classproperty, implements, is_py_312, is_v6_ip
from .base import Channel, ChannelType, Client, Server
from .core import register_client, register_server
from .errors import ChannelClosed
from .utils import read_buffers, write_buffers

_is_windows: bool = sys.platform.startswith("win")
Expand Down Expand Up @@ -80,12 +81,17 @@ async def send(self, message: Any):
serializer = AioSerializer(message, compress=compress)
buffers = await serializer.run()

# write buffers
write_buffers(self.writer, buffers)
async with self._send_lock:
# add lock, or when parallel send,
# assertion error may be raised
await self.writer.drain()
try:
# write buffers
write_buffers(self.writer, buffers)
async with self._send_lock:
# add lock, or when parallel send,
# assertion error may be raised
await self.writer.drain()
except RuntimeError as e:
if self.writer.is_closing():
raise ChannelClosed("Channel already closed, cannot write message") from e
raise e

@implements(Channel.recv)
async def recv(self):
Expand Down
21 changes: 10 additions & 11 deletions python/xoscar/backends/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,14 @@ async def call(
return await self.call_with_client(client, message, wait)

async def stop(self):
logger.debug("Actor caller stop.")
try:
await asyncio.gather(*[client.close() for client in self._clients])
except (ConnectionError, ServerClosed):
pass
self.cancel_tasks()
try:
self.cancel_tasks()
except:
pass

def cancel_tasks(self):
# cancel listening for all clients
Expand Down Expand Up @@ -232,14 +234,11 @@ def _cancel_all_tasks(loop):


def _safe_run_forever(loop):
loop.run_forever()
_cancel_all_tasks(loop)


def _safe_exit_thread(loop, thread):
# To avoid _enter_buffered_busy: could not acquire lock
loop.call_soon_threadsafe(loop.stop)
thread.join()
try:
loop.run_forever()
finally:
_cancel_all_tasks(loop)
loop.stop()


class ActorCaller:
Expand All @@ -253,7 +252,7 @@ class _RefHolder:
target=_safe_run_forever, args=(_close_loop,), daemon=True
)
_close_thread.start()
atexit.register(_safe_exit_thread, _close_loop, _close_thread)
atexit.register(_close_loop.call_soon_threadsafe, _close_loop.stop)

def __init__(self):
self._thread_local = threading.local()
Expand Down

0 comments on commit 2d1e9e9

Please sign in to comment.