From 93e1b9ab4828874bf221bf55eefb4c40c6d23f57 Mon Sep 17 00:00:00 2001 From: BryanFauble <17128019+BryanFauble@users.noreply.github.com> Date: Tue, 23 Jan 2024 15:32:27 -0700 Subject: [PATCH] Creating a decorator to be used in async class methods (#1049) * Creating a decorator to be used in async class methods to create a new OTEL span --- docs/reference/core.md | 3 + synapseclient/core/async_utils.py | 49 +++++ synapseclient/models/annotations.py | 31 +-- synapseclient/models/file.py | 135 ++++++------- synapseclient/models/folder.py | 221 +++++++++++----------- synapseclient/models/project.py | 210 +++++++++++---------- synapseclient/models/table.py | 282 ++++++++++++++-------------- 7 files changed, 502 insertions(+), 429 deletions(-) create mode 100644 synapseclient/core/async_utils.py diff --git a/docs/reference/core.md b/docs/reference/core.md index ad2ec3660..c2ab37db5 100644 --- a/docs/reference/core.md +++ b/docs/reference/core.md @@ -42,6 +42,9 @@ functions. Use at your own risk. ::: synapseclient.core.utils +## Async Utils + +::: synapseclient.core.async_utils ## Versions ::: synapseclient.core.version_check diff --git a/synapseclient/core/async_utils.py b/synapseclient/core/async_utils.py new file mode 100644 index 000000000..fac7a63e9 --- /dev/null +++ b/synapseclient/core/async_utils.py @@ -0,0 +1,49 @@ +"""This utility class is to hold any utilities that are needed for async operations.""" +from typing import Any, Callable, Coroutine, Union +from opentelemetry import trace + + +tracer = trace.get_tracer("synapseclient") + + +def otel_trace_method( + method_to_trace_name: Union[Callable[..., str], None] = None +) -> Callable[..., Callable[..., Coroutine[Any, Any, None]]]: + """ + Decorator to trace a method with OpenTelemetry in an async environment. This function + is specifically written to be used on a method within a class. + + This will pass the class instance as the first argument to the method. This allows + you to modify the name of the trace to include information about the class instance. + + Example: Decorating a method within a class that will be traced with OpenTelemetry. + Setting the trace name: + + @otel_trace_method(method_to_trace_name=lambda self, **kwargs: f"Project_Store: {self.name}") + async def store(self): + + Args: + method_to_trace_name: A callable that takes the class instance as the first argument + and returns a string to be used as the trace name. If this is not provided, + the trace name will be set to the method name. + + Returns: + A callable decorator that will trace the method with OpenTelemetry. + """ + + def decorator(f) -> Callable[..., Coroutine[Any, Any, None]]: + """Function decorator.""" + + async def wrapper(self, *arg, **kwargs) -> None: + """Wrapper for the function to be traced.""" + trace_name = ( + method_to_trace_name(self, *arg, **kwargs) + if method_to_trace_name + else None + ) + with tracer.start_as_current_span(trace_name or f"Synaspse::{f.__name__}"): + return await f(self, *arg, **kwargs) + + return wrapper + + return decorator diff --git a/synapseclient/models/annotations.py b/synapseclient/models/annotations.py index 7e5c9bc33..f311d28d6 100644 --- a/synapseclient/models/annotations.py +++ b/synapseclient/models/annotations.py @@ -8,6 +8,7 @@ from synapseclient import Synapse from synapseclient.annotations import ANNO_TYPE_TO_FUNC +from synapseclient.core.async_utils import otel_trace_method tracer = trace.get_tracer("synapseclient") @@ -40,6 +41,9 @@ class Annotations: is_loaded: bool = False + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Annotation_store: {self.id}" + ) async def store( self, synapse_client: Optional[Synapse] = None, @@ -48,20 +52,19 @@ async def store( # TODO: Validation that id and etag are present print(f"Storing annotations for id: {self.id}, etag: {self.etag}") - with tracer.start_as_current_span(f"Annotation_store: {self.id}"): - loop = asyncio.get_event_loop() - current_context = context.get_current() - result = await loop.run_in_executor( - None, - lambda: set_annotations( - annotations=self, - synapse_client=synapse_client, - opentelemetry_context=current_context, - ), - ) - print(f"annotations store for {self.id} complete") - self.annotations = Annotations.from_dict(result) - # TODO: From the returned call do we need to update anything in the root object? + loop = asyncio.get_event_loop() + current_context = context.get_current() + result = await loop.run_in_executor( + None, + lambda: set_annotations( + annotations=self, + synapse_client=synapse_client, + opentelemetry_context=current_context, + ), + ) + print(f"annotations store for {self.id} complete") + self.annotations = Annotations.from_dict(result) + # TODO: From the returned call do we need to update anything in the root object? return self async def get(self): diff --git a/synapseclient/models/file.py b/synapseclient/models/file.py index df3f98392..60a088d56 100644 --- a/synapseclient/models/file.py +++ b/synapseclient/models/file.py @@ -7,6 +7,7 @@ from synapseclient.entity import File as Synapse_File from synapseclient import Synapse +from synapseclient.core.async_utils import otel_trace_method from typing import Optional, TYPE_CHECKING @@ -159,6 +160,9 @@ def fill_from_dict( ) return self + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"File_Store: {self.path if self.path else self.id}" + ) # TODO: How the parent is stored/referenced needs to be thought through async def store( self, @@ -174,47 +178,47 @@ async def store( Returns: The file object. """ - with tracer.start_as_current_span( - f"File_Store: {self.path if self.path else self.id}" - ): - # TODO - We need to add in some validation before the store to verify we have enough - # information to store the data - - # Call synapse - if self.path: - loop = asyncio.get_event_loop() - synapse_file = Synapse_File( - path=self.path, - name=self.name, - parent=parent.id if parent else self.parent_id, - ) - # TODO: Propogating OTEL context is not working in this case - current_context = context.get_current() - entity = await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).store( - obj=synapse_file, opentelemetry_context=current_context - ), - ) - - self.fill_from_dict(synapse_file=entity, set_annotations=False) - - print(f"Stored file {self.name}, id: {self.id}: {self.path}") - elif self.id and not self.etag: - # This elif is to handle if only annotations are being stored without - # a file path. - annotations_to_persist = self.annotations - await self.get(synapse_client=synapse_client, download_file=False) - self.annotations = annotations_to_persist - - if self.annotations: - result = await Annotations( - id=self.id, etag=self.etag, annotations=self.annotations - ).store(synapse_client=synapse_client) - self.annotations = result.annotations - - return self + # TODO - We need to add in some validation before the store to verify we have enough + # information to store the data + # Call synapse + if self.path: + loop = asyncio.get_event_loop() + synapse_file = Synapse_File( + path=self.path, + name=self.name, + parent=parent.id if parent else self.parent_id, + ) + # TODO: Propogating OTEL context is not working in this case + current_context = context.get_current() + entity = await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).store( + obj=synapse_file, opentelemetry_context=current_context + ), + ) + + self.fill_from_dict(synapse_file=entity, set_annotations=False) + + print(f"Stored file {self.name}, id: {self.id}: {self.path}") + elif self.id and not self.etag: + # This elif is to handle if only annotations are being stored without + # a file path. + annotations_to_persist = self.annotations + await self.get(synapse_client=synapse_client, download_file=False) + self.annotations = annotations_to_persist + + if self.annotations: + result = await Annotations( + id=self.id, etag=self.etag, annotations=self.annotations + ).store(synapse_client=synapse_client) + self.annotations = result.annotations + + return self + + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"File_Get: {self.id}" + ) # TODO: We need to provide all the other options that can be provided here, like collision, follow link ect... async def get( self, @@ -232,22 +236,24 @@ async def get( Returns: The file object. """ - with tracer.start_as_current_span(f"File_Get: {self.id}"): - loop = asyncio.get_event_loop() - current_context = context.get_current() - entity = await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).get( - entity=self.id, - downloadFile=download_file, - downloadLocation=download_location, - opentelemetry_context=current_context, - ), - ) - - self.fill_from_dict(synapse_file=entity, set_annotations=True) - return self + loop = asyncio.get_event_loop() + current_context = context.get_current() + entity = await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).get( + entity=self.id, + downloadFile=download_file, + downloadLocation=download_location, + opentelemetry_context=current_context, + ), + ) + + self.fill_from_dict(synapse_file=entity, set_annotations=True) + return self + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"File_Get_Provenance: {self.id}" + ) async def delete(self, synapse_client: Optional[Synapse] = None) -> None: """Delete the file from Synapse. @@ -257,13 +263,12 @@ async def delete(self, synapse_client: Optional[Synapse] = None) -> None: Returns: None """ - with tracer.start_as_current_span(f"File_Delete: {self.id}"): - loop = asyncio.get_event_loop() - current_context = context.get_current() - await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).delete( - obj=self.id, - opentelemetry_context=current_context, - ), - ) + loop = asyncio.get_event_loop() + current_context = context.get_current() + await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).delete( + obj=self.id, + opentelemetry_context=current_context, + ), + ) diff --git a/synapseclient/models/folder.py b/synapseclient/models/folder.py index 67b9a139a..8b2a39b4d 100644 --- a/synapseclient/models/folder.py +++ b/synapseclient/models/folder.py @@ -8,6 +8,7 @@ from synapseclient import Synapse from synapseclient.entity import Folder as Synapse_Folder from synapseclient.models import File, Annotations +from synapseclient.core.async_utils import otel_trace_method if TYPE_CHECKING: from synapseclient.models import Project @@ -130,6 +131,9 @@ def fill_from_dict( # TODO: Do I get information about the files/folders contained within this folder? return self + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Folder_Store: {self.name}" + ) async def store( self, parent: Optional[Union["Folder", "Project"]] = None, @@ -144,71 +148,71 @@ async def store( Returns: The folder object. """ - with tracer.start_as_current_span(f"Folder_Store: {self.name}"): - # TODO - We need to add in some validation before the store to verify we have enough - # information to store the data - - # Call synapse - loop = asyncio.get_event_loop() - synapse_folder = Synapse_Folder( - name=self.name, parent=parent.id if parent else self.parent_id + # TODO - We need to add in some validation before the store to verify we have enough + # information to store the data + + # Call synapse + loop = asyncio.get_event_loop() + synapse_folder = Synapse_Folder( + name=self.name, parent=parent.id if parent else self.parent_id + ) + current_context = context.get_current() + entity = await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).store( + obj=synapse_folder, opentelemetry_context=current_context + ), + ) + + self.fill_from_dict(synapse_folder=entity, set_annotations=False) + + tasks = [] + if self.files: + tasks.extend( + file.store(parent=self, synapse_client=synapse_client) + for file in self.files ) - current_context = context.get_current() - entity = await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).store( - obj=synapse_folder, opentelemetry_context=current_context - ), - ) - - self.fill_from_dict(synapse_folder=entity, set_annotations=False) - tasks = [] - if self.files: - tasks.extend( - file.store(parent=self, synapse_client=synapse_client) - for file in self.files - ) + if self.folders: + tasks.extend( + folder.store(parent=self, synapse_client=synapse_client) + for folder in self.folders + ) - if self.folders: - tasks.extend( - folder.store(parent=self, synapse_client=synapse_client) - for folder in self.folders + if self.annotations: + tasks.append( + asyncio.create_task( + Annotations( + id=self.id, etag=self.etag, annotations=self.annotations + ).store(synapse_client=synapse_client) ) + ) - if self.annotations: - tasks.append( - asyncio.create_task( - Annotations( - id=self.id, etag=self.etag, annotations=self.annotations - ).store(synapse_client=synapse_client) - ) - ) + try: + results = await asyncio.gather(*tasks, return_exceptions=True) + + # TODO: Proper exception handling + for result in results: + if isinstance(result, Folder): + print(f"Stored {result.name}") + elif isinstance(result, File): + print(f"Stored {result.name} at: {result.path}") + elif isinstance(result, Annotations): + self.annotations = result.annotations + print(f"Stored annotations id: {result.id}, etag: {result.etag}") + else: + raise ValueError(f"Unknown type: {type(result)}") + except Exception as ex: + Synapse.get_client(synapse_client=synapse_client).logger.exception(ex) + print("I hit an exception") + + print(f"Saved all files and folders in {self.name}") - try: - results = await asyncio.gather(*tasks, return_exceptions=True) - - # TODO: Proper exception handling - for result in results: - if isinstance(result, Folder): - print(f"Stored {result.name}") - elif isinstance(result, File): - print(f"Stored {result.name} at: {result.path}") - elif isinstance(result, Annotations): - self.annotations = result.annotations - print( - f"Stored annotations id: {result.id}, etag: {result.etag}" - ) - else: - raise ValueError(f"Unknown type: {type(result)}") - except Exception as ex: - Synapse.get_client(synapse_client=synapse_client).logger.exception(ex) - print("I hit an exception") - - print(f"Saved all files and folders in {self.name}") - - return self + return self + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Folder_Get: {self.id}" + ) async def get( self, include_children: Optional[bool] = False, @@ -223,54 +227,54 @@ async def get( Returns: The folder object. """ - with tracer.start_as_current_span(f"Folder_Get: {self.id}"): - loop = asyncio.get_event_loop() - current_context = context.get_current() - entity = await loop.run_in_executor( + loop = asyncio.get_event_loop() + current_context = context.get_current() + entity = await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).get( + entity=self.id, + opentelemetry_context=current_context, + ), + ) + + self.fill_from_dict(synapse_folder=entity, set_annotations=True) + if include_children: + children_objects = await loop.run_in_executor( None, - lambda: Synapse.get_client(synapse_client=synapse_client).get( - entity=self.id, + lambda: Synapse.get_client(synapse_client=synapse_client).getChildren( + parent=self.id, + includeTypes=["folder", "file"], opentelemetry_context=current_context, ), ) - self.fill_from_dict(synapse_folder=entity, set_annotations=True) - if include_children: - children_objects = await loop.run_in_executor( - None, - lambda: Synapse.get_client( - synapse_client=synapse_client - ).getChildren( - parent=self.id, - includeTypes=["folder", "file"], - opentelemetry_context=current_context, - ), - ) + folders = [] + files = [] + for child in children_objects: + if ( + "type" in child + and child["type"] == "org.sagebionetworks.repo.model.Folder" + ): + folder = Folder().fill_from_dict(synapse_folder=child) + folder.parent_id = self.id + folders.append(folder) + + elif ( + "type" in child + and child["type"] == "org.sagebionetworks.repo.model.FileEntity" + ): + file = File().fill_from_dict(synapse_file=child) + file.parent_id = self.id + files.append(file) + + self.files.extend(files) + self.folders.extend(folders) - folders = [] - files = [] - for child in children_objects: - if ( - "type" in child - and child["type"] == "org.sagebionetworks.repo.model.Folder" - ): - folder = Folder().fill_from_dict(synapse_folder=child) - folder.parent_id = self.id - folders.append(folder) - - elif ( - "type" in child - and child["type"] == "org.sagebionetworks.repo.model.FileEntity" - ): - file = File().fill_from_dict(synapse_file=child) - file.parent_id = self.id - files.append(file) - - self.files.extend(files) - self.folders.extend(folders) - - return self + return self + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Folder_Delete: {self.id}" + ) async def delete(self, synapse_client: Optional[Synapse] = None) -> None: """Delete the folder from Synapse. @@ -280,13 +284,12 @@ async def delete(self, synapse_client: Optional[Synapse] = None) -> None: Returns: None """ - with tracer.start_as_current_span(f"Folder_Delete: {self.id}"): - loop = asyncio.get_event_loop() - current_context = context.get_current() - await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).delete( - obj=self.id, - opentelemetry_context=current_context, - ), - ) + loop = asyncio.get_event_loop() + current_context = context.get_current() + await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).delete( + obj=self.id, + opentelemetry_context=current_context, + ), + ) diff --git a/synapseclient/models/project.py b/synapseclient/models/project.py index 6d4a21dc3..a414b6598 100644 --- a/synapseclient/models/project.py +++ b/synapseclient/models/project.py @@ -10,6 +10,7 @@ from synapseclient.models import Folder, File, Annotations from synapseclient import Synapse +from synapseclient.core.async_utils import otel_trace_method tracer = trace.get_tracer("synapseclient") @@ -157,6 +158,9 @@ def fill_from_dict( ) return self + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Project_Store: {self.name}" + ) async def store(self, synapse_client: Optional[Synapse] = None) -> "Project": """Store project, files, and folders to synapse. @@ -166,66 +170,65 @@ async def store(self, synapse_client: Optional[Synapse] = None) -> "Project": Returns: The project object. """ - - with tracer.start_as_current_span(f"Project_Store: {self.name}"): - # Call synapse - loop = asyncio.get_event_loop() - synapse_project = Synapse_Project(self.name) - current_context = context.get_current() - entity = await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).store( - obj=synapse_project, opentelemetry_context=current_context - ), + # Call synapse + loop = asyncio.get_event_loop() + synapse_project = Synapse_Project(self.name) + current_context = context.get_current() + entity = await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).store( + obj=synapse_project, opentelemetry_context=current_context + ), + ) + self.fill_from_dict(synapse_project=entity, set_annotations=False) + + tasks = [] + if self.files: + tasks.extend( + file.store(parent=self, synapse_client=synapse_client) + for file in self.files ) - self.fill_from_dict(synapse_project=entity, set_annotations=False) - tasks = [] - if self.files: - tasks.extend( - file.store(parent=self, synapse_client=synapse_client) - for file in self.files - ) + if self.folders: + tasks.extend( + folder.store(parent=self, synapse_client=synapse_client) + for folder in self.folders + ) - if self.folders: - tasks.extend( - folder.store(parent=self, synapse_client=synapse_client) - for folder in self.folders + if self.annotations: + tasks.append( + asyncio.create_task( + Annotations( + id=self.id, etag=self.etag, annotations=self.annotations + ).store(synapse_client=synapse_client) ) + ) - if self.annotations: - tasks.append( - asyncio.create_task( - Annotations( - id=self.id, etag=self.etag, annotations=self.annotations - ).store(synapse_client=synapse_client) - ) - ) + try: + results = await asyncio.gather(*tasks, return_exceptions=True) + # TODO: Proper exception handling + + for result in results: + if isinstance(result, Folder): + print(f"Stored {result.name}") + elif isinstance(result, File): + print(f"Stored {result.name} at: {result.path}") + elif isinstance(result, Annotations): + self.annotations = result.annotations + print(f"Stored annotations id: {result.id}, etag: {result.etag}") + else: + raise ValueError(f"Unknown type: {type(result)}", result) + except Exception as ex: + Synapse.get_client(synapse_client=synapse_client).logger.exception(ex) + print("I hit an exception") + + print(f"Saved all files and folders in {self.name}") - try: - results = await asyncio.gather(*tasks, return_exceptions=True) - # TODO: Proper exception handling - - for result in results: - if isinstance(result, Folder): - print(f"Stored {result.name}") - elif isinstance(result, File): - print(f"Stored {result.name} at: {result.path}") - elif isinstance(result, Annotations): - self.annotations = result.annotations - print( - f"Stored annotations id: {result.id}, etag: {result.etag}" - ) - else: - raise ValueError(f"Unknown type: {type(result)}") - except Exception as ex: - Synapse.get_client(synapse_client=synapse_client).logger.exception(ex) - print("I hit an exception") - - print(f"Saved all files and folders in {self.name}") - - return self + return self + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Project_Get: ID: {self.id}, Name: {self.name}" + ) async def get( self, include_children: Optional[bool] = False, @@ -240,54 +243,54 @@ async def get( Returns: The project object. """ - with tracer.start_as_current_span(f"Project_Get: {self.id}"): - loop = asyncio.get_event_loop() - current_context = context.get_current() - entity = await loop.run_in_executor( + loop = asyncio.get_event_loop() + current_context = context.get_current() + entity = await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).get( + entity=self.id, + opentelemetry_context=current_context, + ), + ) + + self.fill_from_dict(synapse_project=entity, set_annotations=True) + if include_children: + children_objects = await loop.run_in_executor( None, - lambda: Synapse.get_client(synapse_client=synapse_client).get( - entity=self.id, + lambda: Synapse.get_client(synapse_client=synapse_client).getChildren( + parent=self.id, + includeTypes=["folder", "file"], opentelemetry_context=current_context, ), ) - self.fill_from_dict(synapse_project=entity, set_annotations=True) - if include_children: - children_objects = await loop.run_in_executor( - None, - lambda: Synapse.get_client( - synapse_client=synapse_client - ).getChildren( - parent=self.id, - includeTypes=["folder", "file"], - opentelemetry_context=current_context, - ), - ) + folders = [] + files = [] + for child in children_objects: + if ( + "type" in child + and child["type"] == "org.sagebionetworks.repo.model.Folder" + ): + folder = Folder().fill_from_dict(synapse_folder=child) + folder.parent_id = self.id + folders.append(folder) + + elif ( + "type" in child + and child["type"] == "org.sagebionetworks.repo.model.FileEntity" + ): + file = File().fill_from_dict(synapse_file=child) + file.parent_id = self.id + files.append(file) + + self.files.extend(files) + self.folders.extend(folders) - folders = [] - files = [] - for child in children_objects: - if ( - "type" in child - and child["type"] == "org.sagebionetworks.repo.model.Folder" - ): - folder = Folder().fill_from_dict(synapse_folder=child) - folder.parent_id = self.id - folders.append(folder) - - elif ( - "type" in child - and child["type"] == "org.sagebionetworks.repo.model.FileEntity" - ): - file = File().fill_from_dict(synapse_file=child) - file.parent_id = self.id - files.append(file) - - self.files.extend(files) - self.folders.extend(folders) - - return self + return self + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Project_Delete: {self.id}" + ) async def delete(self, synapse_client: Optional[Synapse] = None) -> None: """Delete the project from Synapse. @@ -297,13 +300,12 @@ async def delete(self, synapse_client: Optional[Synapse] = None) -> None: Returns: None """ - with tracer.start_as_current_span(f"Project_Delete: {self.id}"): - loop = asyncio.get_event_loop() - current_context = context.get_current() - await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).delete( - obj=self.id, - opentelemetry_context=current_context, - ), - ) + loop = asyncio.get_event_loop() + current_context = context.get_current() + await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).delete( + obj=self.id, + opentelemetry_context=current_context, + ), + ) diff --git a/synapseclient/models/table.py b/synapseclient/models/table.py index 63cee9c5b..db40d17ca 100644 --- a/synapseclient/models/table.py +++ b/synapseclient/models/table.py @@ -16,6 +16,7 @@ delete_rows, ) from synapseclient.models import Annotations +from synapseclient.core.async_utils import otel_trace_method from opentelemetry import trace, context @@ -301,33 +302,35 @@ def fill_from_dict(self, synapse_column: Synapse_Column) -> "Column": self.json_sub_columns = synapse_column.get("jsonSubColumns", None) return self + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Column_Store: {self.name}" + ) async def store(self, synapse_client: Optional[Synapse] = None): """Persist the column to Synapse. :param synapse_client: If not passed in or None this will use the last client from the `.login()` method. :return: Column """ - with tracer.start_as_current_span(f"Column_Store: {self.name}"): - # TODO - We need to add in some validation before the store to verify we have enough - # information to store the data - - # Call synapse - loop = asyncio.get_event_loop() - current_context = context.get_current() - entity = await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).createColumn( - name=self.name, - columnType=self.column_type, - opentelemetry_context=current_context, - ), - ) - print(entity) - self.fill_from_dict(entity) + # TODO - We need to add in some validation before the store to verify we have enough + # information to store the data + + # Call synapse + loop = asyncio.get_event_loop() + current_context = context.get_current() + entity = await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).createColumn( + name=self.name, + columnType=self.column_type, + opentelemetry_context=current_context, + ), + ) + print(entity) + self.fill_from_dict(entity) - print(f"Stored column {self.name}, id: {self.id}") + print(f"Stored column {self.name}, id: {self.id}") - return self + return self @dataclass() @@ -466,6 +469,9 @@ def fill_from_dict( ) return self + @otel_trace_method( + method_to_trace_name=lambda _, **kwargs: f"Store_rows_by_csv: {kwargs.get('csv_path', None)}" + ) async def store_rows_from_csv( self, csv_path: str, synapse_client: Optional[Synapse] = None ) -> str: @@ -478,20 +484,22 @@ async def store_rows_from_csv( Returns: The path to the CSV that was stored. """ - with tracer.start_as_current_span(f"Store_rows_by_csv: {csv_path}"): - synapse_table = Synapse_Table(schema=self.id, values=csv_path) - loop = asyncio.get_event_loop() - current_context = context.get_current() - entity = await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).store( - obj=synapse_table, opentelemetry_context=current_context - ), - ) - print(entity) - # TODO: What should this return? + synapse_table = Synapse_Table(schema=self.id, values=csv_path) + loop = asyncio.get_event_loop() + current_context = context.get_current() + entity = await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).store( + obj=synapse_table, opentelemetry_context=current_context + ), + ) + print(entity) + # TODO: What should this return? return csv_path + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Delete_rows: {self.name}" + ) async def delete_rows( self, rows: List[Row], synapse_client: Optional[Synapse] = None ) -> None: @@ -504,22 +512,24 @@ async def delete_rows( Returns: None """ - with tracer.start_as_current_span(f"Delete_rows: {self.name}"): - rows_to_delete = [] - for row in rows: - rows_to_delete.append([row.row_id, row.version_number]) - loop = asyncio.get_event_loop() - current_context = context.get_current() - await loop.run_in_executor( - None, - lambda: delete_rows( - syn=Synapse.get_client(synapse_client=synapse_client), - table_id=self.id, - row_id_vers_list=rows_to_delete, - opentelemetry_context=current_context, - ), - ) - + rows_to_delete = [] + for row in rows: + rows_to_delete.append([row.row_id, row.version_number]) + loop = asyncio.get_event_loop() + current_context = context.get_current() + await loop.run_in_executor( + None, + lambda: delete_rows( + syn=Synapse.get_client(synapse_client=synapse_client), + table_id=self.id, + row_id_vers_list=rows_to_delete, + opentelemetry_context=current_context, + ), + ) + + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Table_Schema_Store: {self.name}" + ) async def store_schema(self, synapse_client: Optional[Synapse] = None) -> "Table": """Store non-row information about a table including the columns and annotations. @@ -529,79 +539,76 @@ async def store_schema(self, synapse_client: Optional[Synapse] = None) -> "Table Returns: The Table instance stored in synapse. """ - with tracer.start_as_current_span(f"Table_Schema_Store: {self.name}"): - tasks = [] - if self.columns: - # TODO: When a table is retrieved via `.get()` we create Column objects but - # TODO: We only have the ID attribute. THis is causing this if check to eval - # TODO: To True, however, we aren't actually modifying the column. - # TODO: Perhaps we should have a `has_changed` boolean on all dataclasses - # TODO: That we can check to see if we need to store the data. - tasks.extend( - column.store(synapse_client=synapse_client) - for column in self.columns - ) - try: - results = await asyncio.gather(*tasks, return_exceptions=True) - - # TODO: Proper exception handling - for result in results: - if isinstance(result, Column): - print(f"Stored {result.name}") - else: - raise ValueError(f"Unknown type: {type(result)}") - except Exception as ex: - Synapse.get_client(synapse_client=synapse_client).logger.exception( - ex - ) - print("I hit an exception") - - synapse_schema = Synapse_Schema( - name=self.name, - columns=self.columns, - parent=self.parent_id, + tasks = [] + if self.columns: + # TODO: When a table is retrieved via `.get()` we create Column objects but + # TODO: We only have the ID attribute. THis is causing this if check to eval + # TODO: To True, however, we aren't actually modifying the column. + # TODO: Perhaps we should have a `has_changed` boolean on all dataclasses + # TODO: That we can check to see if we need to store the data. + tasks.extend( + column.store(synapse_client=synapse_client) for column in self.columns ) - - loop = asyncio.get_event_loop() - current_context = context.get_current() - entity = await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).store( - obj=synapse_schema, opentelemetry_context=current_context - ), - ) - - self.fill_from_dict(synapse_table=entity, set_annotations=False) - - tasks = [] - if self.annotations: - tasks.append( - asyncio.create_task( - Annotations( - id=self.id, etag=self.etag, annotations=self.annotations - ).store(synapse_client=synapse_client) - ) + try: + results = await asyncio.gather(*tasks, return_exceptions=True) + + # TODO: Proper exception handling + for result in results: + if isinstance(result, Column): + print(f"Stored {result.name}") + else: + raise ValueError(f"Unknown type: {type(result)}", result) + except Exception as ex: + Synapse.get_client(synapse_client=synapse_client).logger.exception(ex) + print("I hit an exception") + + synapse_schema = Synapse_Schema( + name=self.name, + columns=self.columns, + parent=self.parent_id, + ) + + loop = asyncio.get_event_loop() + current_context = context.get_current() + entity = await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).store( + obj=synapse_schema, opentelemetry_context=current_context + ), + ) + + self.fill_from_dict(synapse_table=entity, set_annotations=False) + + tasks = [] + if self.annotations: + tasks.append( + asyncio.create_task( + Annotations( + id=self.id, etag=self.etag, annotations=self.annotations + ).store(synapse_client=synapse_client) ) + ) - try: - results = await asyncio.gather(*tasks, return_exceptions=True) - - # TODO: Proper exception handling - for result in results: - if isinstance(result, Annotations): - self.annotations = result.annotations - print( - f"Stored annotations id: {result.id}, etag: {result.etag}" - ) - else: - raise ValueError(f"Unknown type: {type(result)}") - except Exception as ex: - Synapse.get_client(synapse_client=synapse_client).logger.exception( - ex - ) - print("I hit an exception") - return self + try: + results = await asyncio.gather(*tasks, return_exceptions=True) + + # TODO: Proper exception handling + for result in results: + if isinstance(result, Annotations): + self.annotations = result.annotations + print( + f"Stored annotations id: {result.id}, etag: {result.etag}" + ) + else: + raise ValueError(f"Unknown type: {type(result)}", result) + except Exception as ex: + Synapse.get_client(synapse_client=synapse_client).logger.exception(ex) + print("I hit an exception") + return self + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Table_Get: {self.name}" + ) async def get(self, synapse_client: Optional[Synapse] = None) -> "Table": """Get the metadata about the table from synapse. @@ -612,18 +619,20 @@ async def get(self, synapse_client: Optional[Synapse] = None) -> "Table": The Table instance stored in synapse. """ # TODO: How do we want to support retriving the table? Do we want to support by name, and parent? - with tracer.start_as_current_span(f"Table_Get: {self.name}"): - loop = asyncio.get_event_loop() - current_context = context.get_current() - entity = await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).get( - entity=self.id, opentelemetry_context=current_context - ), - ) - self.fill_from_dict(synapse_table=entity, set_annotations=True) - return self + loop = asyncio.get_event_loop() + current_context = context.get_current() + entity = await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).get( + entity=self.id, opentelemetry_context=current_context + ), + ) + self.fill_from_dict(synapse_table=entity, set_annotations=True) + return self + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Table_Delete: {self.name}" + ) # TODO: Synapse allows immediate deletion of entities, but the Synapse Client does not # TODO: Should we support immediate deletion? async def delete(self, synapse_client: Optional[Synapse] = None) -> None: @@ -635,19 +644,18 @@ async def delete(self, synapse_client: Optional[Synapse] = None) -> None: Returns: None """ - with tracer.start_as_current_span(f"Table_Delete: {self.name}"): - loop = asyncio.get_event_loop() - current_context = context.get_current() - await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).delete( - obj=self.id, opentelemetry_context=current_context - ), - ) + loop = asyncio.get_event_loop() + current_context = context.get_current() + await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).delete( + obj=self.id, opentelemetry_context=current_context + ), + ) @classmethod async def query( - self, + cls, query: str, result_format: Union[CsvResultFormat, RowsetResultFormat] = CsvResultFormat(), synapse_client: Optional[Synapse] = None,