diff --git a/client/qiskit_serverless/core/clients/serverless_client.py b/client/qiskit_serverless/core/clients/serverless_client.py index 0456008be..79e335c9b 100644 --- a/client/qiskit_serverless/core/clients/serverless_client.py +++ b/client/qiskit_serverless/core/clients/serverless_client.py @@ -49,6 +49,7 @@ MAX_ARTIFACT_FILE_SIZE_MB, ) from qiskit_serverless.core.client import BaseClient +from qiskit_serverless.core.decorators import trace_decorator_factory from qiskit_serverless.core.files import GatewayFilesClient from qiskit_serverless.core.job import ( Job, @@ -72,6 +73,9 @@ QiskitObjectsDecoder, ) +_trace_job = trace_decorator_factory("job") +_trace_functions = trace_decorator_factory("function") + class ServerlessClient(BaseClient): """ @@ -146,47 +150,45 @@ def _verify_token(self, token: str): ####### JOBS ####### #################### + @_trace_job("list") def jobs(self, **kwargs) -> List[Job]: - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.list"): - limit = kwargs.get("limit", 10) - kwargs["limit"] = limit - offset = kwargs.get("offset", 0) - kwargs["offset"] = offset - - response_data = safe_json_request_as_dict( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/jobs", - params=kwargs, - headers={"Authorization": f"Bearer {self.token}"}, - timeout=REQUESTS_TIMEOUT, - ) + limit = kwargs.get("limit", 10) + kwargs["limit"] = limit + offset = kwargs.get("offset", 0) + kwargs["offset"] = offset + + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/jobs", + params=kwargs, + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, ) + ) return [ Job(job.get("id"), job_service=self, raw_data=job) for job in response_data.get("results", []) ] + @_trace_job("get") def job(self, job_id: str) -> Optional[Job]: - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.get"): - url = f"{self.host}/api/{self.version}/jobs/{job_id}/" - response_data = safe_json_request_as_dict( - request=lambda: requests.get( - url, - headers={"Authorization": f"Bearer {self.token}"}, - timeout=REQUESTS_TIMEOUT, - ) + url = f"{self.host}/api/{self.version}/jobs/{job_id}/" + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + url, + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, ) + ) - job = None - job_id = response_data.get("id") - if job_id is not None: - job = Job( - job_id=job_id, - job_service=self, - ) + job = None + job_id = response_data.get("id") + if job_id is not None: + job = Job( + job_id=job_id, + job_service=self, + ) return job @@ -205,7 +207,7 @@ def run( tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.run") as span: - span.set_attribute("program", title) + span.set_attribute("function", title) span.set_attribute("provider", provider) span.set_attribute("arguments", str(arguments)) @@ -234,66 +236,62 @@ def run( return Job(job_id, job_service=self) + @_trace_job def status(self, job_id: str): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.status"): - default_status = "Unknown" - response_data = safe_json_request_as_dict( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/jobs/{job_id}/", - headers={"Authorization": f"Bearer {self.token}"}, - timeout=REQUESTS_TIMEOUT, - ) + default_status = "Unknown" + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/jobs/{job_id}/", + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, ) + ) return response_data.get("status", default_status) + @_trace_job def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.stop"): - if service: - data = { - "service": json.dumps(service, cls=QiskitObjectsEncoder), - } - else: - data = { - "service": None, - } - response_data = safe_json_request_as_dict( - request=lambda: requests.post( - f"{self.host}/api/{self.version}/jobs/{job_id}/stop/", - headers={"Authorization": f"Bearer {self.token}"}, - timeout=REQUESTS_TIMEOUT, - json=data, - ) + if service: + data = { + "service": json.dumps(service, cls=QiskitObjectsEncoder), + } + else: + data = { + "service": None, + } + response_data = safe_json_request_as_dict( + request=lambda: requests.post( + f"{self.host}/api/{self.version}/jobs/{job_id}/stop/", + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + json=data, ) + ) return response_data.get("message") + @_trace_job def result(self, job_id: str): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.result"): - response_data = safe_json_request_as_dict( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/jobs/{job_id}/", - headers={"Authorization": f"Bearer {self.token}"}, - timeout=REQUESTS_TIMEOUT, - ) + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/jobs/{job_id}/", + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, ) + ) return json.loads( response_data.get("result", "{}") or "{}", cls=QiskitObjectsDecoder ) + @_trace_job def logs(self, job_id: str): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.logs"): - response_data = safe_json_request_as_dict( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/jobs/{job_id}/logs/", - headers={"Authorization": f"Bearer {self.token}"}, - timeout=REQUESTS_TIMEOUT, - ) + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/jobs/{job_id}/logs/", + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, ) + ) return response_data.get("logs") def filtered_logs(self, job_id: str, **kwargs): @@ -323,8 +321,8 @@ def filtered_logs(self, job_id: str, **kwargs): def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.run") as span: - span.set_attribute("program", program.title) + with tracer.start_as_current_span("function.upload") as span: + span.set_attribute("function", program.title) url = f"{self.host}/api/{self.version}/programs/upload/" if program.image is not None: @@ -344,18 +342,17 @@ def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: return function_uploaded + @_trace_functions("list") def functions(self, **kwargs) -> List[RunnableQiskitFunction]: - """Returns list of available programs.""" - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("program.list"): - response_data = safe_json_request_as_list( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/programs", - headers={"Authorization": f"Bearer {self.token}"}, - params=kwargs, - timeout=REQUESTS_TIMEOUT, - ) + """Returns list of available functions.""" + response_data = safe_json_request_as_list( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/programs", + headers={"Authorization": f"Bearer {self.token}"}, + params=kwargs, + timeout=REQUESTS_TIMEOUT, ) + ) return [ RunnableQiskitFunction( @@ -368,6 +365,7 @@ def functions(self, **kwargs) -> List[RunnableQiskitFunction]: for program in response_data ] + @_trace_functions("get_by_title") def function( self, title: str, provider: Optional[str] = None ) -> Optional[RunnableQiskitFunction]: @@ -376,22 +374,21 @@ def function( request_provider=provider, title=title ) - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("program.get_by_title"): - response_data = safe_json_request_as_dict( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/programs/get_by_title/{title}", - headers={"Authorization": f"Bearer {self.token}"}, - params={"provider": provider}, - timeout=REQUESTS_TIMEOUT, - ) - ) - return RunnableQiskitFunction( - client=self, - title=response_data.get("title"), - provider=response_data.get("provider", None), - raw_data=response_data, + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/programs/get_by_title/{title}", + headers={"Authorization": f"Bearer {self.token}"}, + params={"provider": provider}, + timeout=REQUESTS_TIMEOUT, ) + ) + + return RunnableQiskitFunction( + client=self, + title=response_data.get("title"), + provider=response_data.get("provider", None), + raw_data=response_data, + ) ##################### ####### FILES ####### @@ -548,8 +545,8 @@ def _upload_with_docker_image( ) program_title = response_data.get("title", "na") program_provider = response_data.get("provider", "na") - span.set_attribute("program.title", program_title) - span.set_attribute("program.provider", program_provider) + span.set_attribute("function.title", program_title) + span.set_attribute("function.provider", program_provider) response_data["client"] = client return RunnableQiskitFunction.from_json(response_data) @@ -616,8 +613,8 @@ def _upload_with_artifact( timeout=REQUESTS_TIMEOUT, ) ) - span.set_attribute("program.title", response_data.get("title", "na")) - span.set_attribute("program.provider", response_data.get("provider", "na")) + span.set_attribute("function.title", response_data.get("title", "na")) + span.set_attribute("function.provider", response_data.get("provider", "na")) response_data["client"] = client response_function = RunnableQiskitFunction.from_json(response_data) except Exception as error: # pylint: disable=broad-exception-caught diff --git a/client/qiskit_serverless/core/decorators.py b/client/qiskit_serverless/core/decorators.py index ffccc9305..07ddf8947 100644 --- a/client/qiskit_serverless/core/decorators.py +++ b/client/qiskit_serverless/core/decorators.py @@ -34,6 +34,7 @@ import inspect import os import shutil +from types import FunctionType import warnings from dataclasses import dataclass from typing import Optional, Dict, Any, Union, List, Callable, Sequence @@ -451,3 +452,37 @@ def distribute_program( "Please, use `distribute_qiskit_function` instead." ) return distribute_qiskit_function(provider, dependencies, working_dir) + + +def trace_decorator_factory(traced_feature: str): + """Factory for generate decorators for classes or features.""" + + def generated_decorator(traced_function: Union[FunctionType, str]): + """ + The decorator wrapper to generate optional arguments + if traced_function is string it will be used in the span, + the function.__name__ attribute will be used otherwise + """ + + def decorator_trace(func: FunctionType): + """The decorator that python call""" + + def wrapper(*args, **kwargs): + """The wrapper""" + tracer = trace.get_tracer("client.tracer") + function_name = ( + traced_function + if isinstance(traced_function, str) + else func.__name__ + ) + with tracer.start_as_current_span(f"{traced_feature}.${function_name}"): + result = func(*args, **kwargs) + return result + + return wrapper + + if callable(traced_function): + return decorator_trace(traced_function) + return decorator_trace + + return generated_decorator diff --git a/client/qiskit_serverless/core/files.py b/client/qiskit_serverless/core/files.py index 0a0d7a6a3..ef5a42522 100644 --- a/client/qiskit_serverless/core/files.py +++ b/client/qiskit_serverless/core/files.py @@ -30,18 +30,21 @@ from typing import List, Optional import requests -from opentelemetry import trace from tqdm import tqdm from qiskit_serverless.core.constants import ( REQUESTS_STREAMING_TIMEOUT, REQUESTS_TIMEOUT, ) +from qiskit_serverless.core.decorators import trace_decorator_factory from qiskit_serverless.core.function import QiskitFunction from qiskit_serverless.exception import QiskitServerlessException from qiskit_serverless.utils.json import safe_json_request_as_dict +_trace = trace_decorator_factory("files") + + class GatewayFilesClient: """GatewayFilesClient.""" @@ -91,6 +94,7 @@ def _download_with_url( # pylint: disable=too-many-positional-arguments progress_bar.close() return file_name + @_trace def download( self, file: str, @@ -99,16 +103,15 @@ def download( target_name: Optional[str] = None, ) -> Optional[str]: """Downloads user file.""" - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("files.download"): - return self._download_with_url( - file, - download_location, - function, - os.path.join(self._files_url, "download"), - target_name, - ) - + return self._download_with_url( + file, + download_location, + function, + os.path.join(self._files_url, "download"), + target_name, + ) + + @_trace def provider_download( self, file: str, @@ -120,129 +123,121 @@ def provider_download( if not function.provider: raise QiskitServerlessException("`function` doesn't have a provider.") - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("files.provider_download"): - return self._download_with_url( - file, - download_location, - function, - os.path.join(self._files_url, "provider", "download"), - target_name, - ) + return self._download_with_url( + file, + download_location, + function, + os.path.join(self._files_url, "provider", "download"), + target_name, + ) + @_trace def upload( self, file: str, function: QiskitFunction, provider: Optional[str] = None ) -> Optional[str]: """Uploads file.""" - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("files.upload"): - with open(file, "rb") as f: - with requests.post( - os.path.join(self._files_url, "upload/"), - files={"file": f}, - params={"provider": provider, "function": function.title}, - stream=True, - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_STREAMING_TIMEOUT, - ) as req: - if req.ok: - return req.text - return "Upload failed" - return "Can not open file" - + with open(file, "rb") as f: + with requests.post( + os.path.join(self._files_url, "upload/"), + files={"file": f}, + params={"provider": provider, "function": function.title}, + stream=True, + headers={"Authorization": f"Bearer {self._token}"}, + timeout=REQUESTS_STREAMING_TIMEOUT, + ) as req: + if req.ok: + return req.text + return "Upload failed" + return "Can not open file" + + @_trace def provider_upload( self, file: str, function: QiskitFunction, provider: str ) -> Optional[str]: """Uploads file to provider/function file storage.""" - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("files.upload"): - with open(file, "rb") as f: - with requests.post( - os.path.join(self._files_url, "upload/"), - files={"file": f}, - params={"provider": provider, "function": function.title}, - stream=True, - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_STREAMING_TIMEOUT, - ) as req: - if req.ok: - return req.text - return "Upload failed" - return "Can not open file" - + with open(file, "rb") as f: + with requests.post( + os.path.join(self._files_url, "upload/"), + files={"file": f}, + params={"provider": provider, "function": function.title}, + stream=True, + headers={"Authorization": f"Bearer {self._token}"}, + timeout=REQUESTS_STREAMING_TIMEOUT, + ) as req: + if req.ok: + return req.text + return "Upload failed" + return "Can not open file" + + @_trace def list(self, function: QiskitFunction) -> List[str]: """Returns list of available files to download produced by programs,""" - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("files.list"): - response_data = safe_json_request_as_dict( - request=lambda: requests.get( - self._files_url, - params={"function": function.title}, - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + self._files_url, + params={"function": function.title}, + headers={"Authorization": f"Bearer {self._token}"}, + timeout=REQUESTS_TIMEOUT, ) + ) return response_data.get("results", []) + @_trace def provider_list(self, function: QiskitFunction) -> List[str]: """Returns list of available files to download produced by programs,""" if not function.provider: raise QiskitServerlessException("`function` doesn't have a provider.") - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("files.provider_list"): - response_data = safe_json_request_as_dict( - request=lambda: requests.get( - os.path.join(self._files_url, "provider"), - params={"provider": function.provider, "function": function.title}, - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + os.path.join(self._files_url, "provider"), + params={"provider": function.provider, "function": function.title}, + headers={"Authorization": f"Bearer {self._token}"}, + timeout=REQUESTS_TIMEOUT, ) + ) return response_data.get("results", []) + @_trace def delete( self, file: str, function: QiskitFunction, provider: Optional[str] = None ) -> Optional[str]: """Deletes file uploaded or produced by the programs,""" - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("files.delete"): - response_data = safe_json_request_as_dict( - request=lambda: requests.delete( - os.path.join(self._files_url, "delete"), - params={ - "file": file, - "function": function.title, - "provider": provider, - }, - headers={ - "Authorization": f"Bearer {self._token}", - "format": "json", - }, - timeout=REQUESTS_TIMEOUT, - ) + response_data = safe_json_request_as_dict( + request=lambda: requests.delete( + os.path.join(self._files_url, "delete"), + params={ + "file": file, + "function": function.title, + "provider": provider, + }, + headers={ + "Authorization": f"Bearer {self._token}", + "format": "json", + }, + timeout=REQUESTS_TIMEOUT, ) + ) return response_data.get("message", "") + @_trace def provider_delete( self, file: str, function: QiskitFunction, provider: str ) -> Optional[str]: """Deletes file uploaded or produced by the programs,""" - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("files.provider_delete"): - response_data = safe_json_request_as_dict( - request=lambda: requests.delete( - os.path.join(self._files_url, "provider", "delete"), - params={ - "file": file, - "function": function.title, - "provider": provider, - }, - headers={ - "Authorization": f"Bearer {self._token}", - "format": "json", - }, - timeout=REQUESTS_TIMEOUT, - ) + response_data = safe_json_request_as_dict( + request=lambda: requests.delete( + os.path.join(self._files_url, "provider", "delete"), + params={ + "file": file, + "function": function.title, + "provider": provider, + }, + headers={ + "Authorization": f"Bearer {self._token}", + "format": "json", + }, + timeout=REQUESTS_TIMEOUT, ) + ) return response_data.get("message", "")