You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm using Prefect a workflow orchestration tool. I'm using PIConnect and I want to serialize the connection. I want to make the connection persistent so that I don't need to connect it every task run. The issue is that the prefect cache is not able to serialize the connection. The code is below:
from prefect import flow, task
from prefect.logging import get_run_logger
from datetime import datetime, timedelta
from prefect.cache_policies import TASK_SOURCE
def get_run_name():
return datetime.now().strftime('%d-%m-%Y %H:%M:%S')
@task(cache_policy=TASK_SOURCE)
def get_pi():
import PIconnect as PI
PI.PIConfig.DEFAULT_TIMEZONE = 'Asia/Kolkata'
DA = PI.PIServer()
return DA
@task(name='OSIPI_'+get_run_name(), description='Get data Task Run')
def get_data():
DA = get_pi()
start_dt = datetime.now() - timedelta(hours=10)
end_dt = datetime.now()
frequency = '1h'
point = DA.search('PI_TAG_GOES HERE')[0]
data = point.interpolated_values(start_dt, end_dt, frequency) if end_dt is not None else point.interpolated_value(start_dt)
return data
@task(name='process_'+get_run_name(), description='Get data Task Run')
def process_data(data):
print(data.dtypes)
@flow(name='OSIPI get data')
def osipi_get_data():
logger = get_run_logger()
logger.info('OSIPI get data Process Started')
df = get_data()
logger.info('Data Fetched')
process_data(df)
logger.info('Process Completed')
if __name__ == '__main__':
osipi_get_data.serve(name="OSIPI Test",
description="Test OSIPI",
tags=["onboarding"]
)
The code is working fine, and it can get the data from OSIPI but I'm getting below error in prefect consol:
12:50:56.609 | INFO | prefect.flow_runs.runner - Runner 'OSIPI Test' submitting flow run '4f2233d2-b843-482d-b73a-ee4ad6e117db'
12:50:56.734 | INFO | prefect.flow_runs.runner - Opening process...
12:50:56.769 | INFO | prefect.flow_runs.runner - Completed submission of flow run '4f2233d2-b843-482d-b73a-ee4ad6e117db'
12:51:00.905 | INFO | Flow run 'mini-boobook' - Downloading flow code from storage at '.'
12:51:01.756 | INFO | Flow run 'mini-boobook' - OSIPI get data Process Started
12:51:10.462 | INFO | Task run 'get_pi-fa9' - Finished in state Completed()
12:51:11.695 | WARNING | Task run 'get_pi-fa9' - Encountered an error while serializing result for transaction '50d02614c60b588431cacf863d7b7b88': **Failed to serialize object of type 'PIServer' with serializer 'pickle'.** You can try a different serializer (e.g. result_serializer="json") or disabling persistence (persist_result=False) for this flow or task. Code execution will continue, but the transaction will not be committed.
12:51:11.700 | INFO | Task run 'OSIPI_31-12-2024 12:51:00-ec2' - Finished in state Completed()
12:51:11.702 | INFO | Flow run 'mini-boobook' - Data Fetched
12:51:12.281 | ERROR | Task run 'process_31-12-2024 12:51:00-5fa' - Error encountered when computing cache key - result will not be persisted.
Traceback (most recent call last):
File "D:\.virtualenvs\Airflow\Lib\site-packages\prefect\cache_policies.py", line 297, in compute_key
return hash_objects(hashed_inputs, raise_on_failure=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\.virtualenvs\Airflow\Lib\site-packages\prefect\utilities\hashing.py", line 89, in hash_objects
raise HashError(msg)
prefect.exceptions.HashError: Unable to create hash - objects could not be serialized.
JSON error: Unable to serialize unknown type: <class 'PIconnect.PIData.PISeries'>
Pickle error: cannot pickle 'AFEnumerationValue' object
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "D:\.virtualenvs\Airflow\Lib\site-packages\prefect\task_engine.py", line 158, in compute_transaction_key
key = self.task.cache_policy.compute_key(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\.virtualenvs\Airflow\Lib\site-packages\prefect\cache_policies.py", line 169, in compute_key
policy_key = policy.compute_key(
^^^^^^^^^^^^^^^^^^^
File "D:\.virtualenvs\Airflow\Lib\site-packages\prefect\cache_policies.py", line 169, in compute_key
policy_key = policy.compute_key(
^^^^^^^^^^^^^^^^^^^
File "D:\.virtualenvs\Airflow\Lib\site-packages\prefect\cache_policies.py", line 307, in compute_key
raise ValueError(msg) from exc
ValueError: Unable to create hash - objects could not be serialized.
JSON error: Unable to serialize unknown type: <class 'PIconnect.PIData.PISeries'>
Pickle error: cannot pickle 'AFEnumerationValue' object
This often occurs when task inputs contain objects that cannot be cached like locks, file handles, or other system resources.
To resolve this, you can:
1. Exclude these arguments by defining a custom `cache_key_fn`
2. Disable caching by passing `cache_policy=NONE`
12:51:12.294 | INFO | Task run 'process_31-12-2024 12:51:00-5fa' - Finished in state Completed()
12:51:12.295 | INFO | Flow run 'mini-boobook' - Process Completed
12:51:12.422 | INFO | Flow run 'mini-boobook' - Finished in state Completed()
OSIsoft(r) AF SDK Version: 2.10.9.593
object
12:51:17.817 | INFO | prefect.flow_runs.runner - Process for flow run 'mini-boobook' exited cleanly.
Is there a way to serialize the connection so that I don't have to reconnect it with each task run?
Version info
Python Version : 3.10.8
Prefect : 3.1.8
OS: Windows 11 Pro
Pydantic Version: 2.10.4
Database name : OSIPI historian
Python package to connect database : PIconnect
PIconnect Version : 0.12.1
The text was updated successfully, but these errors were encountered:
I think this might be due to the fact that the python connection object is just a wrapper around the .NET connection. There is some deep magic going on inside pythonnet to allow the use of .NET objects inside python, which makes it probably impossible for pickle to serialize it.
Bug summary
I'm using Prefect a workflow orchestration tool. I'm using PIConnect and I want to serialize the connection. I want to make the connection persistent so that I don't need to connect it every task run. The issue is that the prefect cache is not able to serialize the connection. The code is below:
The code is working fine, and it can get the data from OSIPI but I'm getting below error in prefect consol:
Is there a way to serialize the connection so that I don't have to reconnect it with each task run?
Version info
The text was updated successfully, but these errors were encountered: