diff --git a/asyncua/server/internal_session.py b/asyncua/server/internal_session.py index a78034dfa..1545bb3d3 100644 --- a/asyncua/server/internal_session.py +++ b/asyncua/server/internal_session.py @@ -44,7 +44,6 @@ def __init__(self, internal_server: "InternalServer", aspace: AddressSpace, subm self.state = SessionState.Created self.session_id = ua.NodeId(self._counter) InternalSession._counter += 1 - self.subscriptions = [] self.auth_token = ua.NodeId(self._auth_counter) InternalSession._auth_counter += 1 self.logger.info('Created internal session %s', self.name) @@ -86,7 +85,13 @@ async def close_session(self, delete_subs=True): if InternalSession._current_connections < 0: InternalSession._current_connections = 0 self.state = SessionState.Closed - await self.delete_subscriptions(self.subscriptions) + await self.delete_subscriptions( + [ + id + for id, sub in self.subscription_service.subscriptions.items() + if sub.session_id == self.session_id + ] + ) def activate_session(self, params, peer_certificate): self.logger.info('activate session') @@ -196,8 +201,7 @@ async def call(self, params): return await self.iserver.method_service.call(params) async def create_subscription(self, params, callback, request_callback=None): - result = await self.subscription_service.create_subscription(params, callback, request_callback=request_callback) - self.subscriptions.append(result.SubscriptionId) + result = await self.subscription_service.create_subscription(params, callback, self.session_id, request_callback=request_callback) return result async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters): diff --git a/asyncua/server/internal_subscription.py b/asyncua/server/internal_subscription.py index 8fecb5c59..7a55ea066 100644 --- a/asyncua/server/internal_subscription.py +++ b/asyncua/server/internal_subscription.py @@ -18,22 +18,35 @@ class InternalSubscription: Runs the publication loop and stores the Publication Results until they are acknowledged. """ - def __init__(self, data: ua.CreateSubscriptionResult, aspace: AddressSpace, - callback, request_callback=None): + def __init__( + self, + data: ua.CreateSubscriptionResult, + aspace: AddressSpace, + callback, + session_id, + request_callback=None, + delete_callback=None, + ): """ :param loop: Event loop instance :param data: Create Subscription Result :param aspace: Server Address Space :param callback: Callback for publishing + :param session_id: Id of the session that owns this subscription. :param request_callback: Callback for getting queued publish requests. If None, publishing will be done without waiting for a token and no acknowledging will be expected (for server internal subscriptions) + :param delete_callback: Optional callback to call when the subscription + is stopped due to the publish count exceeding the + RevisedLifetimeCount. """ self.logger = logging.getLogger(__name__) self.data: ua.CreateSubscriptionResult = data self.pub_result_callback = callback self.pub_request_callback = request_callback self.monitored_item_srv = MonitoredItemService(self, aspace) + self.delete_callback = delete_callback + self.session_id = session_id self._triggered_datachanges: Dict[int, List[ua.MonitoredItemNotification]] = {} self._triggered_events: Dict[int, List[ua.EventFieldList]] = {} self._triggered_statuschanges: list = [] @@ -44,6 +57,7 @@ def __init__(self, data: ua.CreateSubscriptionResult, aspace: AddressSpace, self._keep_alive_count = 0 self._publish_cycles_count = 0 self._task = None + self._closing = False def __str__(self): return f"Subscription(id:{self.data.SubscriptionId})" @@ -51,11 +65,13 @@ def __str__(self): async def start(self): self.logger.debug("starting subscription %s", self.data.SubscriptionId) if self.data.RevisedPublishingInterval > 0.0: + self._closing = False self._task = asyncio.create_task(self._subscription_loop()) async def stop(self): if self._task: self.logger.info("stopping internal subscription %s", self.data.SubscriptionId) + self._closing = True self._task.cancel() try: await self._task @@ -80,7 +96,7 @@ async def _subscription_loop(self): period = self.data.RevisedPublishingInterval / 1000.0 try: await self.publish_results() - while True: + while not self._closing: next_ts = ts + period sleep_time = next_ts - time.time() ts = next_ts @@ -116,7 +132,14 @@ async def publish_results(self, requestdata=None): self._publish_cycles_count, self.data.RevisedLifetimeCount) # FIXME this will never be send since we do not have publish request anyway await self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout)) - await self.stop() + # Stop the subscription + if self._task: + self._closing = True + self._task = None + if self.delete_callback: + self.delete_callback() + self.monitored_item_srv.delete_all_monitored_items() + return False if not self.has_published_results(): return False # called from loop and external request diff --git a/asyncua/server/subscription_service.py b/asyncua/server/subscription_service.py index 80ec34cfd..a53a1affb 100644 --- a/asyncua/server/subscription_service.py +++ b/asyncua/server/subscription_service.py @@ -26,7 +26,9 @@ def __init__(self, aspace: AddressSpace): self.standard_events = {} self._conditions = {} - async def create_subscription(self, params, callback, request_callback=None): + async def create_subscription( + self, params, callback, session_id, request_callback=None + ): self.logger.info("create subscription") result = ua.CreateSubscriptionResult() result.RevisedPublishingInterval = params.RequestedPublishingInterval @@ -34,7 +36,14 @@ async def create_subscription(self, params, callback, request_callback=None): result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount self._sub_id_counter += 1 result.SubscriptionId = self._sub_id_counter - internal_sub = InternalSubscription(result, self.aspace, callback, request_callback=request_callback) + internal_sub = InternalSubscription( + result, + self.aspace, + callback, + session_id, + request_callback=request_callback, + delete_callback=lambda: self.subscriptions.pop(result.SubscriptionId, None), + ) await internal_sub.start() self.subscriptions[result.SubscriptionId] = internal_sub return result