Skip to content

Commit

Permalink
Trace decorator (#1553)
Browse files Browse the repository at this point in the history
* create a trace decorator

* fix decorator

* change documentation

---------

Co-authored-by: David <[email protected]>
  • Loading branch information
korgan00 and Tansito authored Dec 20, 2024
1 parent 587ea86 commit f3b8de9
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 206 deletions.
205 changes: 101 additions & 104 deletions client/qiskit_serverless/core/clients/serverless_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
MAX_ARTIFACT_FILE_SIZE_MB,
)
from qiskit_serverless.core.client import BaseClient
from qiskit_serverless.core.decorators import trace_decorator_factory
from qiskit_serverless.core.files import GatewayFilesClient
from qiskit_serverless.core.job import (
Job,
Expand All @@ -72,6 +73,9 @@
QiskitObjectsDecoder,
)

_trace_job = trace_decorator_factory("job")
_trace_functions = trace_decorator_factory("function")


class ServerlessClient(BaseClient):
"""
Expand Down Expand Up @@ -146,47 +150,45 @@ def _verify_token(self, token: str):
####### JOBS #######
####################

@_trace_job("list")
def jobs(self, **kwargs) -> List[Job]:
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.list"):
limit = kwargs.get("limit", 10)
kwargs["limit"] = limit
offset = kwargs.get("offset", 0)
kwargs["offset"] = offset

response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs",
params=kwargs,
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
limit = kwargs.get("limit", 10)
kwargs["limit"] = limit
offset = kwargs.get("offset", 0)
kwargs["offset"] = offset

response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs",
params=kwargs,
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
)

return [
Job(job.get("id"), job_service=self, raw_data=job)
for job in response_data.get("results", [])
]

@_trace_job("get")
def job(self, job_id: str) -> Optional[Job]:
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.get"):
url = f"{self.host}/api/{self.version}/jobs/{job_id}/"
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
url,
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
url = f"{self.host}/api/{self.version}/jobs/{job_id}/"
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
url,
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
)

job = None
job_id = response_data.get("id")
if job_id is not None:
job = Job(
job_id=job_id,
job_service=self,
)
job = None
job_id = response_data.get("id")
if job_id is not None:
job = Job(
job_id=job_id,
job_service=self,
)

return job

Expand All @@ -205,7 +207,7 @@ def run(

tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.run") as span:
span.set_attribute("program", title)
span.set_attribute("function", title)
span.set_attribute("provider", provider)
span.set_attribute("arguments", str(arguments))

Expand Down Expand Up @@ -234,66 +236,62 @@ def run(

return Job(job_id, job_service=self)

@_trace_job
def status(self, job_id: str):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.status"):
default_status = "Unknown"
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
default_status = "Unknown"
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
)

return response_data.get("status", default_status)

@_trace_job
def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.stop"):
if service:
data = {
"service": json.dumps(service, cls=QiskitObjectsEncoder),
}
else:
data = {
"service": None,
}
response_data = safe_json_request_as_dict(
request=lambda: requests.post(
f"{self.host}/api/{self.version}/jobs/{job_id}/stop/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
json=data,
)
if service:
data = {
"service": json.dumps(service, cls=QiskitObjectsEncoder),
}
else:
data = {
"service": None,
}
response_data = safe_json_request_as_dict(
request=lambda: requests.post(
f"{self.host}/api/{self.version}/jobs/{job_id}/stop/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
json=data,
)
)

return response_data.get("message")

@_trace_job
def result(self, job_id: str):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.result"):
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
)
return json.loads(
response_data.get("result", "{}") or "{}", cls=QiskitObjectsDecoder
)

@_trace_job
def logs(self, job_id: str):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.logs"):
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/logs/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/logs/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
)
return response_data.get("logs")

def filtered_logs(self, job_id: str, **kwargs):
Expand Down Expand Up @@ -323,8 +321,8 @@ def filtered_logs(self, job_id: str, **kwargs):

def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]:
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.run") as span:
span.set_attribute("program", program.title)
with tracer.start_as_current_span("function.upload") as span:
span.set_attribute("function", program.title)
url = f"{self.host}/api/{self.version}/programs/upload/"

if program.image is not None:
Expand All @@ -344,18 +342,17 @@ def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]:

return function_uploaded

@_trace_functions("list")
def functions(self, **kwargs) -> List[RunnableQiskitFunction]:
"""Returns list of available programs."""
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("program.list"):
response_data = safe_json_request_as_list(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/programs",
headers={"Authorization": f"Bearer {self.token}"},
params=kwargs,
timeout=REQUESTS_TIMEOUT,
)
"""Returns list of available functions."""
response_data = safe_json_request_as_list(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/programs",
headers={"Authorization": f"Bearer {self.token}"},
params=kwargs,
timeout=REQUESTS_TIMEOUT,
)
)

return [
RunnableQiskitFunction(
Expand All @@ -368,6 +365,7 @@ def functions(self, **kwargs) -> List[RunnableQiskitFunction]:
for program in response_data
]

@_trace_functions("get_by_title")
def function(
self, title: str, provider: Optional[str] = None
) -> Optional[RunnableQiskitFunction]:
Expand All @@ -376,22 +374,21 @@ def function(
request_provider=provider, title=title
)

tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("program.get_by_title"):
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/programs/get_by_title/{title}",
headers={"Authorization": f"Bearer {self.token}"},
params={"provider": provider},
timeout=REQUESTS_TIMEOUT,
)
)
return RunnableQiskitFunction(
client=self,
title=response_data.get("title"),
provider=response_data.get("provider", None),
raw_data=response_data,
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/programs/get_by_title/{title}",
headers={"Authorization": f"Bearer {self.token}"},
params={"provider": provider},
timeout=REQUESTS_TIMEOUT,
)
)

return RunnableQiskitFunction(
client=self,
title=response_data.get("title"),
provider=response_data.get("provider", None),
raw_data=response_data,
)

#####################
####### FILES #######
Expand Down Expand Up @@ -548,8 +545,8 @@ def _upload_with_docker_image(
)
program_title = response_data.get("title", "na")
program_provider = response_data.get("provider", "na")
span.set_attribute("program.title", program_title)
span.set_attribute("program.provider", program_provider)
span.set_attribute("function.title", program_title)
span.set_attribute("function.provider", program_provider)
response_data["client"] = client
return RunnableQiskitFunction.from_json(response_data)

Expand Down Expand Up @@ -616,8 +613,8 @@ def _upload_with_artifact(
timeout=REQUESTS_TIMEOUT,
)
)
span.set_attribute("program.title", response_data.get("title", "na"))
span.set_attribute("program.provider", response_data.get("provider", "na"))
span.set_attribute("function.title", response_data.get("title", "na"))
span.set_attribute("function.provider", response_data.get("provider", "na"))
response_data["client"] = client
response_function = RunnableQiskitFunction.from_json(response_data)
except Exception as error: # pylint: disable=broad-exception-caught
Expand Down
35 changes: 35 additions & 0 deletions client/qiskit_serverless/core/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import inspect
import os
import shutil
from types import FunctionType
import warnings
from dataclasses import dataclass
from typing import Optional, Dict, Any, Union, List, Callable, Sequence
Expand Down Expand Up @@ -451,3 +452,37 @@ def distribute_program(
"Please, use `distribute_qiskit_function` instead."
)
return distribute_qiskit_function(provider, dependencies, working_dir)


def trace_decorator_factory(traced_feature: str):
"""Factory for generate decorators for classes or features."""

def generated_decorator(traced_function: Union[FunctionType, str]):
"""
The decorator wrapper to generate optional arguments
if traced_function is string it will be used in the span,
the function.__name__ attribute will be used otherwise
"""

def decorator_trace(func: FunctionType):
"""The decorator that python call"""

def wrapper(*args, **kwargs):
"""The wrapper"""
tracer = trace.get_tracer("client.tracer")
function_name = (
traced_function
if isinstance(traced_function, str)
else func.__name__
)
with tracer.start_as_current_span(f"{traced_feature}.${function_name}"):
result = func(*args, **kwargs)
return result

return wrapper

if callable(traced_function):
return decorator_trace(traced_function)
return decorator_trace

return generated_decorator
Loading

0 comments on commit f3b8de9

Please sign in to comment.