Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Subscribe for updates from Data Pipe #795

Open
aabrodskiy opened this issue Oct 11, 2024 · 6 comments
Open

Feature request: Subscribe for updates from Data Pipe #795

aabrodskiy opened this issue Oct 11, 2024 · 6 comments

Comments

@aabrodskiy
Copy link

Feature request

Abstract

PI AF SDK support subscription-based reading of data instead of polling, which is the most efficient way of continuously streaming data from PI/AF to consumers

Motivation and summary

Polling data for large amount of tags and/or high frequency puts significant load both on the PI Server and on the polling client. It would be great to support in python the same way the AF SDK supports callbacks on new messages coming from the Data Pipe, ideally from all three types of it in AF SDK.

Suggested solution

Implement messaging system with subscription maintenance and async callbacks. Ping connection to ensure it is active and doesn't drop, re-connect if the connection drops by the server and resubscribe for the subscribed tags / AF Attributes.

Rejected options

No alternatives available for Linux-based python as far as I'm aware

@aabrodskiy aabrodskiy changed the title Subscribe for updates from Data Pipe Feature request: Subscribe for updates from Data Pipe Oct 11, 2024
@miguetronic
Copy link

Please, make it real!

@Hugovdberg
Copy link
Owner

First of all, I'm surprised by the amount of upvotes in this short timespan.. not that it's a bad request, but apparently people are actively monitoring this repo :-)

Regarding the feature request, it would be nice to have this indeed. However, it might be tricky to get this to work nicely. The problem is that async is just syntactic sugar, not only in python, but in .NET as well. That means that async interaction between python and .NET is tricky because they both run a separate event loop, which might cause deadlocks, and I'm not well versed enough in the async world to be able to determine whether this is a serious risk in this case.
That said, I did experiment with this some time ago without causing any deadlocks. I think that in the PIconnect/SDK interactions there is little reason for the SDK to ever wait on feedback from the python side, which probably helps.

Is there any example code of how the .NET subscription model works?

@aabrodskiy
Copy link
Author

Thank you for a quick reply and attention. Wow, surprisingly this sounds like a hot topic for many.
The most basic example implementation is described by Rick Davin here: AVEVA github
I'll try to carve out a more elaborate example from our subscriber/receiver and post here.

@aabrodskiy
Copy link
Author

This is an example of AF Scubscriber:

using Newtonsoft.Json;
using OSIsoft.AF.Asset;
using OSIsoft.AF.Data;
using OSIsoft.AF.PI;
using Serilog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;


    public sealed class AFDataSubscriber : IDisposable
    {
        #region Private Variables
        private static AFDataSubscriber _instance = null;
        private static readonly object _mutex = new Object();
        private Dictionary<Guid, SubscribedAFAttribute> _subscribedAttributes = new Dictionary<Guid, SubscribedAFAttribute>();
        private readonly AFDataPipe _afDataPipe = new AFDataPipe();
        private Timer _timer;
        private CancellationToken _ct;
        #endregion

        #region Constructor
        private AFDataSubscriber(CancellationToken cancellationToken)
        {
            _ct = cancellationToken;
            // Private Singleton constructor
            _afDataPipe.Subscribe(new AFDataReceiver(_ct));
        }
        #endregion

        #region Private Methods
        private void CheckForData(object o)
        {
            bool hasMoreEvents;
            do
            {
                _afDataPipe.GetObserverEvents(out hasMoreEvents);
            }
            while (hasMoreEvents);
        }

        private void AddSignup(IList<AFAttribute> attributes)
        {
            List<AFAttribute> attributesToSignup = new List<AFAttribute>();
            foreach (AFAttribute attribute in attributes)
            {
                if (_subscribedAttributes.ContainsKey(attribute.ID))
                {
                    SubscribedAFAttribute subscribedAttribute = _subscribedAttributes[attribute.ID];
                    if (!subscribedAttribute.IsSubscribed)
                    {
                        subscribedAttribute.IsSubscribed = true;
                        attributesToSignup.Add(subscribedAttribute.AttributeLookup.Attribute);
                    }
                }
            }
            if (attributesToSignup.Count > 0)
            {
                _afDataPipe.AddSignups(attributesToSignup);
            }
        }

        #endregion

        #region Public Methods
        // Singleton entry
        public static AFDataSubscriber GetInstance(CancellationToken cancellationToken)
        {
            if (_instance == null)
            {
                lock (_mutex)
                {
                    if (_instance == null)
                    {
                        _instance = new AFDataSubscriber(cancellationToken);
                    }
                }
            }
            return _instance;
        }


        public void StartListening(TimeSpan checkIntervall)
        {
            if (_timer == null)
                _timer = new Timer(CheckForData, null, 0, (int)checkIntervall.TotalMilliseconds);
        }

        public void StopListening()
        {
            if (_timer != null)
                _timer.Dispose();
        }

        public void Listen()
        {
            if (_subscribedAttributes.Count > 0)
            {
                StartListening(TimeSpan.FromSeconds(5));
            }
        }

        public void Dispose()
        {
            StopListening();
            _afDataPipe.Dispose();
        }

        public static void CloseInstance()
        {
            if (_instance != null)
            {
                _instance.Dispose();
                _instance = null;
                Log.Information("Monitoring AF Events Stopped");
            }
        }
        #endregion
    }
}

@aabrodskiy
Copy link
Author

And here is the receiver of the events from the pipe

using OSIsoft.AF.Asset;
using OSIsoft.AF.Data;
using Serilog;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

    public class AFDataReceiver : IObserver<AFDataPipeEvent>
    {
        private ConcurrentDictionary<string, ConcurrentDictionary<string, TValue>> _batchDataDict = new ConcurrentDictionary<string, ConcurrentDictionary<string, TValue>>();
        private readonly Timer _batchTimer;
        private readonly object _timerLock = new object(); // Create a lock object for the timer
        private readonly object _dictLockObj = new object();
        private readonly CancellationToken _cancellationToken; // Add a field to store the cancellation token

        public AFDataReceiver(CancellationToken cancellationToken)
        {
            _cancellationToken = cancellationToken; // Store the cancellation token
        }


        /// <summary>  
        /// Provides the observer with new data.  
        /// </summary>  
        /// <param name="value"></param>  
        public void OnNext(AFDataPipeEvent value)
        {
            /// logic to process data goes here, we put it in a dictionary to batch out output
        }


        /// <summary>  
        /// An error has occured  
        /// </summary>  
        /// <param name="error"></param>  
        public void OnError(Exception error)
        {
            Log.Error("Provider (AF) has sent an error");
            Log.Error(error.Message);
            Log.Error(error.StackTrace);
        }

        /// <summary>  
        /// Notifies the observer that the provider has finished sending push-based notifications.  
        /// </summary>  
        public void OnCompleted()
        {
            Log.Information("Provider (AF) has terminated sending data");
        }
    }

@wesnm
Copy link

wesnm commented Dec 3, 2024

Just FYI for casual readers, neither AFDataPipe nor PIDataPipe support a true "push" mode. Even after events are subscribed to, the GetObserverEvents() method must be called regularly to pull events from the server, which will then be sent to subscribers. The class above uses a repeating timer, and you'll have to do something similar in your code as well.

For Python, I found it easier to use a wrapper class around the datapipe and register "callbacks" (using queues, threads, or whatever you want to avoid blocking the main loop) to that class rather than the datapipe. This lets you pre-process the events so you can deliver a Python object to your listeners instead of a .NET object. This is handy if your wrapper library is meant to fully abstract the AFSDK objects away from applications.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants