Skip to content

Commit

Permalink
Add callback to delete subscriptions (FreeOpcUa#1656)
Browse files Browse the repository at this point in the history
* Add callback to delete subscriptions

* Improve docstring

* Don't shadow CancelledError

* Simplify subscription storage

* Make it safe to iterate over keys

* Restore accidentally deleted line

* Remove task context

* Remove list

* Fix subscriptions for sessions
  • Loading branch information
chrisjbremner authored Jun 17, 2024
1 parent 6dce240 commit 176c077
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 10 deletions.
12 changes: 8 additions & 4 deletions asyncua/server/internal_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def __init__(self, internal_server: "InternalServer", aspace: AddressSpace, subm
self.state = SessionState.Created
self.session_id = ua.NodeId(self._counter)
InternalSession._counter += 1
self.subscriptions = []
self.auth_token = ua.NodeId(self._auth_counter)
InternalSession._auth_counter += 1
self.logger.info('Created internal session %s', self.name)
Expand Down Expand Up @@ -86,7 +85,13 @@ async def close_session(self, delete_subs=True):
if InternalSession._current_connections < 0:
InternalSession._current_connections = 0
self.state = SessionState.Closed
await self.delete_subscriptions(self.subscriptions)
await self.delete_subscriptions(
[
id
for id, sub in self.subscription_service.subscriptions.items()
if sub.session_id == self.session_id
]
)

def activate_session(self, params, peer_certificate):
self.logger.info('activate session')
Expand Down Expand Up @@ -196,8 +201,7 @@ async def call(self, params):
return await self.iserver.method_service.call(params)

async def create_subscription(self, params, callback, request_callback=None):
result = await self.subscription_service.create_subscription(params, callback, request_callback=request_callback)
self.subscriptions.append(result.SubscriptionId)
result = await self.subscription_service.create_subscription(params, callback, self.session_id, request_callback=request_callback)
return result

async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
Expand Down
31 changes: 27 additions & 4 deletions asyncua/server/internal_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,35 @@ class InternalSubscription:
Runs the publication loop and stores the Publication Results until they are acknowledged.
"""

def __init__(self, data: ua.CreateSubscriptionResult, aspace: AddressSpace,
callback, request_callback=None):
def __init__(
self,
data: ua.CreateSubscriptionResult,
aspace: AddressSpace,
callback,
session_id,
request_callback=None,
delete_callback=None,
):
"""
:param loop: Event loop instance
:param data: Create Subscription Result
:param aspace: Server Address Space
:param callback: Callback for publishing
:param session_id: Id of the session that owns this subscription.
:param request_callback: Callback for getting queued publish requests.
If None, publishing will be done without waiting for a token and no
acknowledging will be expected (for server internal subscriptions)
:param delete_callback: Optional callback to call when the subscription
is stopped due to the publish count exceeding the
RevisedLifetimeCount.
"""
self.logger = logging.getLogger(__name__)
self.data: ua.CreateSubscriptionResult = data
self.pub_result_callback = callback
self.pub_request_callback = request_callback
self.monitored_item_srv = MonitoredItemService(self, aspace)
self.delete_callback = delete_callback
self.session_id = session_id
self._triggered_datachanges: Dict[int, List[ua.MonitoredItemNotification]] = {}
self._triggered_events: Dict[int, List[ua.EventFieldList]] = {}
self._triggered_statuschanges: list = []
Expand All @@ -44,18 +57,21 @@ def __init__(self, data: ua.CreateSubscriptionResult, aspace: AddressSpace,
self._keep_alive_count = 0
self._publish_cycles_count = 0
self._task = None
self._closing = False

def __str__(self):
return f"Subscription(id:{self.data.SubscriptionId})"

async def start(self):
self.logger.debug("starting subscription %s", self.data.SubscriptionId)
if self.data.RevisedPublishingInterval > 0.0:
self._closing = False
self._task = asyncio.create_task(self._subscription_loop())

async def stop(self):
if self._task:
self.logger.info("stopping internal subscription %s", self.data.SubscriptionId)
self._closing = True
self._task.cancel()
try:
await self._task
Expand All @@ -80,7 +96,7 @@ async def _subscription_loop(self):
period = self.data.RevisedPublishingInterval / 1000.0
try:
await self.publish_results()
while True:
while not self._closing:
next_ts = ts + period
sleep_time = next_ts - time.time()
ts = next_ts
Expand Down Expand Up @@ -116,7 +132,14 @@ async def publish_results(self, requestdata=None):
self._publish_cycles_count, self.data.RevisedLifetimeCount)
# FIXME this will never be send since we do not have publish request anyway
await self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
await self.stop()
# Stop the subscription
if self._task:
self._closing = True
self._task = None
if self.delete_callback:
self.delete_callback()
self.monitored_item_srv.delete_all_monitored_items()
return False
if not self.has_published_results():
return False
# called from loop and external request
Expand Down
13 changes: 11 additions & 2 deletions asyncua/server/subscription_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,24 @@ def __init__(self, aspace: AddressSpace):
self.standard_events = {}
self._conditions = {}

async def create_subscription(self, params, callback, request_callback=None):
async def create_subscription(
self, params, callback, session_id, request_callback=None
):
self.logger.info("create subscription")
result = ua.CreateSubscriptionResult()
result.RevisedPublishingInterval = params.RequestedPublishingInterval
result.RevisedLifetimeCount = params.RequestedLifetimeCount
result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
self._sub_id_counter += 1
result.SubscriptionId = self._sub_id_counter
internal_sub = InternalSubscription(result, self.aspace, callback, request_callback=request_callback)
internal_sub = InternalSubscription(
result,
self.aspace,
callback,
session_id,
request_callback=request_callback,
delete_callback=lambda: self.subscriptions.pop(result.SubscriptionId, None),
)
await internal_sub.start()
self.subscriptions[result.SubscriptionId] = internal_sub
return result
Expand Down

0 comments on commit 176c077

Please sign in to comment.