Skip to content

Commit

Permalink
Creating a decorator to be used in async class methods (#1049)
Browse files Browse the repository at this point in the history
* Creating a decorator to be used in async class methods to create a new OTEL span
  • Loading branch information
BryanFauble authored Jan 23, 2024
1 parent 5f2e2b7 commit 93e1b9a
Show file tree
Hide file tree
Showing 7 changed files with 502 additions and 429 deletions.
3 changes: 3 additions & 0 deletions docs/reference/core.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ functions. Use at your own risk.

::: synapseclient.core.utils

## Async Utils

::: synapseclient.core.async_utils

## Versions
::: synapseclient.core.version_check
Expand Down
49 changes: 49 additions & 0 deletions synapseclient/core/async_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""This utility class is to hold any utilities that are needed for async operations."""
from typing import Any, Callable, Coroutine, Union
from opentelemetry import trace


tracer = trace.get_tracer("synapseclient")


def otel_trace_method(
method_to_trace_name: Union[Callable[..., str], None] = None
) -> Callable[..., Callable[..., Coroutine[Any, Any, None]]]:
"""
Decorator to trace a method with OpenTelemetry in an async environment. This function
is specifically written to be used on a method within a class.
This will pass the class instance as the first argument to the method. This allows
you to modify the name of the trace to include information about the class instance.
Example: Decorating a method within a class that will be traced with OpenTelemetry.
Setting the trace name:
@otel_trace_method(method_to_trace_name=lambda self, **kwargs: f"Project_Store: {self.name}")
async def store(self):
Args:
method_to_trace_name: A callable that takes the class instance as the first argument
and returns a string to be used as the trace name. If this is not provided,
the trace name will be set to the method name.
Returns:
A callable decorator that will trace the method with OpenTelemetry.
"""

def decorator(f) -> Callable[..., Coroutine[Any, Any, None]]:
"""Function decorator."""

async def wrapper(self, *arg, **kwargs) -> None:
"""Wrapper for the function to be traced."""
trace_name = (
method_to_trace_name(self, *arg, **kwargs)
if method_to_trace_name
else None
)
with tracer.start_as_current_span(trace_name or f"Synaspse::{f.__name__}"):
return await f(self, *arg, **kwargs)

return wrapper

return decorator
31 changes: 17 additions & 14 deletions synapseclient/models/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from synapseclient import Synapse
from synapseclient.annotations import ANNO_TYPE_TO_FUNC
from synapseclient.core.async_utils import otel_trace_method


tracer = trace.get_tracer("synapseclient")
Expand Down Expand Up @@ -40,6 +41,9 @@ class Annotations:

is_loaded: bool = False

@otel_trace_method(
method_to_trace_name=lambda self, **kwargs: f"Annotation_store: {self.id}"
)
async def store(
self,
synapse_client: Optional[Synapse] = None,
Expand All @@ -48,20 +52,19 @@ async def store(
# TODO: Validation that id and etag are present

print(f"Storing annotations for id: {self.id}, etag: {self.etag}")
with tracer.start_as_current_span(f"Annotation_store: {self.id}"):
loop = asyncio.get_event_loop()
current_context = context.get_current()
result = await loop.run_in_executor(
None,
lambda: set_annotations(
annotations=self,
synapse_client=synapse_client,
opentelemetry_context=current_context,
),
)
print(f"annotations store for {self.id} complete")
self.annotations = Annotations.from_dict(result)
# TODO: From the returned call do we need to update anything in the root object?
loop = asyncio.get_event_loop()
current_context = context.get_current()
result = await loop.run_in_executor(
None,
lambda: set_annotations(
annotations=self,
synapse_client=synapse_client,
opentelemetry_context=current_context,
),
)
print(f"annotations store for {self.id} complete")
self.annotations = Annotations.from_dict(result)
# TODO: From the returned call do we need to update anything in the root object?
return self

async def get(self):
Expand Down
135 changes: 70 additions & 65 deletions synapseclient/models/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from synapseclient.entity import File as Synapse_File
from synapseclient import Synapse
from synapseclient.core.async_utils import otel_trace_method

from typing import Optional, TYPE_CHECKING

Expand Down Expand Up @@ -159,6 +160,9 @@ def fill_from_dict(
)
return self

@otel_trace_method(
method_to_trace_name=lambda self, **kwargs: f"File_Store: {self.path if self.path else self.id}"
)
# TODO: How the parent is stored/referenced needs to be thought through
async def store(
self,
Expand All @@ -174,47 +178,47 @@ async def store(
Returns:
The file object.
"""
with tracer.start_as_current_span(
f"File_Store: {self.path if self.path else self.id}"
):
# TODO - We need to add in some validation before the store to verify we have enough
# information to store the data

# Call synapse
if self.path:
loop = asyncio.get_event_loop()
synapse_file = Synapse_File(
path=self.path,
name=self.name,
parent=parent.id if parent else self.parent_id,
)
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).store(
obj=synapse_file, opentelemetry_context=current_context
),
)

self.fill_from_dict(synapse_file=entity, set_annotations=False)

print(f"Stored file {self.name}, id: {self.id}: {self.path}")
elif self.id and not self.etag:
# This elif is to handle if only annotations are being stored without
# a file path.
annotations_to_persist = self.annotations
await self.get(synapse_client=synapse_client, download_file=False)
self.annotations = annotations_to_persist

if self.annotations:
result = await Annotations(
id=self.id, etag=self.etag, annotations=self.annotations
).store(synapse_client=synapse_client)
self.annotations = result.annotations

return self
# TODO - We need to add in some validation before the store to verify we have enough
# information to store the data

# Call synapse
if self.path:
loop = asyncio.get_event_loop()
synapse_file = Synapse_File(
path=self.path,
name=self.name,
parent=parent.id if parent else self.parent_id,
)
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).store(
obj=synapse_file, opentelemetry_context=current_context
),
)

self.fill_from_dict(synapse_file=entity, set_annotations=False)

print(f"Stored file {self.name}, id: {self.id}: {self.path}")
elif self.id and not self.etag:
# This elif is to handle if only annotations are being stored without
# a file path.
annotations_to_persist = self.annotations
await self.get(synapse_client=synapse_client, download_file=False)
self.annotations = annotations_to_persist

if self.annotations:
result = await Annotations(
id=self.id, etag=self.etag, annotations=self.annotations
).store(synapse_client=synapse_client)
self.annotations = result.annotations

return self

@otel_trace_method(
method_to_trace_name=lambda self, **kwargs: f"File_Get: {self.id}"
)
# TODO: We need to provide all the other options that can be provided here, like collision, follow link ect...
async def get(
self,
Expand All @@ -232,22 +236,24 @@ async def get(
Returns:
The file object.
"""
with tracer.start_as_current_span(f"File_Get: {self.id}"):
loop = asyncio.get_event_loop()
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).get(
entity=self.id,
downloadFile=download_file,
downloadLocation=download_location,
opentelemetry_context=current_context,
),
)

self.fill_from_dict(synapse_file=entity, set_annotations=True)
return self
loop = asyncio.get_event_loop()
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).get(
entity=self.id,
downloadFile=download_file,
downloadLocation=download_location,
opentelemetry_context=current_context,
),
)

self.fill_from_dict(synapse_file=entity, set_annotations=True)
return self

@otel_trace_method(
method_to_trace_name=lambda self, **kwargs: f"File_Get_Provenance: {self.id}"
)
async def delete(self, synapse_client: Optional[Synapse] = None) -> None:
"""Delete the file from Synapse.
Expand All @@ -257,13 +263,12 @@ async def delete(self, synapse_client: Optional[Synapse] = None) -> None:
Returns:
None
"""
with tracer.start_as_current_span(f"File_Delete: {self.id}"):
loop = asyncio.get_event_loop()
current_context = context.get_current()
await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).delete(
obj=self.id,
opentelemetry_context=current_context,
),
)
loop = asyncio.get_event_loop()
current_context = context.get_current()
await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).delete(
obj=self.id,
opentelemetry_context=current_context,
),
)
Loading

0 comments on commit 93e1b9a

Please sign in to comment.