diff --git a/setup.cfg b/setup.cfg index 58d1ff2..2347e8d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,8 +29,8 @@ classifiers = [options] install_requires = - # use local path util fixes on namespace package management are made on an official release pds.api-client~=1.6.1 + pandas~=2.2.3 # Change this to False if you use things like __file__ or __path__—which you # shouldn't use anyway, because that's what ``pkg_resources`` is for 🙂 @@ -62,6 +62,7 @@ dev = tox~=4.11.0 types-setuptools>=68.1.0,<74.1.1 Jinja2<3.1 + pandas-stubs~=2.2.3.241009 [options.entry_points] # Put your entry point scripts here diff --git a/src/pds/peppi/__init__.py b/src/pds/peppi/__init__.py index b8fe587..1c8f20d 100644 --- a/src/pds/peppi/__init__.py +++ b/src/pds/peppi/__init__.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """PDS peppi.""" from .client import PDSRegistryClient # noqa -from .client import Products # noqa +from .products import Products # noqa # For future consideration: # diff --git a/src/pds/peppi/client.py b/src/pds/peppi/client.py index 19d2638..9ccede1 100644 --- a/src/pds/peppi/client.py +++ b/src/pds/peppi/client.py @@ -1,21 +1,15 @@ """PDS Registry Client related classes.""" import logging -from datetime import datetime -from typing import Literal -from typing import Optional from pds.api_client import ApiClient from pds.api_client import Configuration -from pds.api_client.api.all_products_api import AllProductsApi + logger = logging.getLogger(__name__) DEFAULT_API_BASE_URL = "https://pds.nasa.gov/api/search/1" """Default URL used when querying PDS API""" -PROCESSING_LEVELS = Literal["telemetry", "raw", "partially-processed", "calibrated", "derived"] -"""Processing level values that can be used with has_processing_level()""" - class PDSRegistryClient: """Used to connect and interface with the PDS Registry. @@ -40,382 +34,3 @@ def __init__(self, base_url=DEFAULT_API_BASE_URL): configuration = Configuration() configuration.host = base_url self.api_client = ApiClient(configuration) - - -class Products: - """Use to access any class of planetary products via the PDS Registry API.""" - - SORT_PROPERTY = "ops:Harvest_Info.ops:harvest_date_time" - """Default property to sort results of a query by.""" - - PAGE_SIZE = 100 - """Default number of results returned in each page fetch from the PDS API.""" - - def __init__(self, client: PDSRegistryClient): - """Creates a new instance of the Products class. - - Parameters - ---------- - client: PDSRegistryClient - The client object used to interact with the PDS Registry API. - - """ - self._products = AllProductsApi(client.api_client) - self._q_string = "" - self._latest_harvest_time = None - self._page_counter = None - self._expected_pages = None - - def __add_clause(self, clause): - """Adds the provided clause to the query string to use on the next fetch of products from the Registry API. - - Repeated calls to this method results in a joining with any previously - added clauses via Logical AND. - - Lazy evaluation is used to only apply the filter when one iterates on this - Products instance. This way, multiple filters can be combined before the - request is actually sent. - - Notes - ----- - This method should not be called while there are still results to - iterate over from a previous query, as this could affect the results - of the next page fetch. The `reset()` method may be used to abandon - a query in progress so that this method may be called safely again. - - Parameters - ---------- - clause : str - The query clause to append. Clause should match the domain language - expected by the PDS Registry API - - Raises - ------ - RuntimeError - If this method is called while there are still results to be iterated - over from a previous query. - - """ - if self._page_counter or self._expected_pages: - raise RuntimeError( - "Cannot modify query while paginating over previous query results.\n" - "Use the reset() method on this Products instance or exhaust all returned " - "results before assigning new query clauses." - ) - - clause = f"({clause})" - if self._q_string: - self._q_string += f" and {clause}" - else: - self._q_string = clause - - def has_target(self, identifier: str): - """Adds a query clause selecting products having a given target identifier. - - Parameters - ---------- - identifier : str - Identifier (LIDVID) of the target. - - Returns - ------- - This Products instance with the "has target" query filter applied. - - """ - clause = f'ref_lid_target eq "{identifier}"' - self.__add_clause(clause) - return self - - def has_investigation(self, identifier: str): - """Adds a query clause selecting products having a given investigation identifier. - - Parameters - ---------- - identifier : str - Identifier (LIDVID) of the target. - - Returns - ------- - This Products instance with the "has investigation" query filter applied. - - """ - clause = f'ref_lid_investigation eq "{identifier}"' - self.__add_clause(clause) - return self - - def before(self, dt: datetime): - """Adds a query clause selecting products with a start date before the given datetime. - - Parameters - ---------- - dt : datetime.datetime - Datetime object containing the desired time. - - Returns - ------- - This Products instance with the "before" filter applied. - - """ - iso8601_datetime = dt.isoformat().replace("+00:00", "Z") - clause = f'pds:Time_Coordinates.pds:start_date_time le "{iso8601_datetime}"' - self.__add_clause(clause) - return self - - def after(self, dt: datetime): - """Adds a query clause selecting products with an end date after the given datetime. - - Parameters - ---------- - dt : datetime.datetime - Datetime object containing the desired time. - - Returns - ------- - This Products instance with the "before" filter applied. - - """ - iso8601_datetime = dt.isoformat().replace("+00:00", "Z") - clause = f'pds:Time_Coordinates.pds:stop_date_time ge "{iso8601_datetime}"' - self.__add_clause(clause) - return self - - def of_collection(self, identifier: str): - """Adds a query clause selecting products belonging to the given Parent Collection identifier. - - Parameters - ---------- - identifier : str - Identifier (LIDVID) of the Collection. - - Returns - ------- - This Products instance with the "Parent Collection" filter applied. - - """ - clause = f'ops:Provenance.ops:parent_collection_identifier eq "{identifier}"' - self.__add_clause(clause) - return self - - def observationals(self): - """Adds a query clause selecting only "Product Observational" type products on the current filter. - - Returns - ------- - This Products instance with the "Observational Product" filter applied. - - """ - clause = 'product_class eq "Product_Observational"' - self.__add_clause(clause) - return self - - def collections(self, collection_type: Optional[str] = None): - """Adds a query clause selecting only "Product Collection" type products on the current filter. - - Parameters - ---------- - collection_type : str, optional - Collection type to filter on. If not provided, all collection types - are included. - - Returns - ------- - This Products instance with the "Product Collection" filter applied. - - """ - clause = 'product_class eq "Product_Collection"' - self.__add_clause(clause) - - if collection_type: - clause = f'pds:Collection.pds:collection_type eq "{collection_type}"' - self.__add_clause(clause) - - return self - - def bundles(self): - """Adds a query clause selecting only "Bundle" type products on the current filter. - - Returns - ------- - This Products instance with the "Product Bundle" filter applied. - - """ - clause = 'product_class eq "Product_Bundle"' - self.__add_clause(clause) - return self - - def has_instrument(self, identifier: str): - """Adds a query clause selecting products having an instrument matching the provided identifier. - - Parameters - ---------- - identifier : str - Identifier (LIDVID) of the instrument. - - Returns - ------- - This Products instance with the "has instrument" filter applied. - - """ - clause = f'ref_lid_instrument eq "{identifier}"' - self.__add_clause(clause) - return self - - def has_instrument_host(self, identifier: str): - """Adds a query clause selecting products having an instrument host matching the provided identifier. - - Parameters - ---------- - identifier : str - Identifier (LIDVID) of the instrument host. - - Returns - ------- - This Products instance with the "has instrument host" filter applied. - - """ - clause = f'ref_lid_instrument_host eq "{identifier}"' - self.__add_clause(clause) - return self - - def has_processing_level(self, processing_level: PROCESSING_LEVELS = "raw"): - """Adds a query clause selecting products with a specific processing level. - - Parameters - ---------- - processing_level : str, optional - The processing level to filter on. Must be one of "telemetry", "raw", - "partially-processed", "calibrated", or "derived". Defaults to "raw". - - Returns - ------- - This Products instance with the "has processing level" filter applied. - - """ - clause = f'pds:Primary_Result_Summary.pds:processing_level eq "{processing_level.title()}"' - self.__add_clause(clause) - return self - - def get(self, identifier: str): - """Adds a query clause selecting the product with a LIDVID matching the provided value. - - Parameters - ---------- - identifier : str - LIDVID of the product to filter for. - - Returns - ------- - This Products instance with the "LIDVID identifier" filter applied. - - """ - self.__add_clause(f'lidvid like "{identifier}"') - return self - - def filter(self, clause: str): - """Selects products that match the provided query clause. - - Parameters - ---------- - clause : str - A custom query clause. - - Returns - ------- - This Products instance with the provided filtering clause applied. - """ - self.__add_clause(clause) - return self - - def _init_new_page(self): - """Quieries the PDS API for the next page of results. - - Any query clauses associated to this Products instance are included here. - - If there are results remaining from the previously acquired page, - they are yieled on each subsequent call to this method. - - Yields - ------ - product : pds.api_client.models.pds_product.PDSProduct - The next product within the current page fetched from the PDS Registry - API. - - Raises - ------ - StopIteration - Once all available pages of query results have been exhausted. - - """ - # Check if we've hit the expected number of pages (or exceeded in cases - # where no results were returned from the query) - if self._page_counter and self._page_counter >= self._expected_pages: - raise StopIteration - - kwargs = {"sort": [self.SORT_PROPERTY], "limit": self.PAGE_SIZE} - - if self._latest_harvest_time is not None: - kwargs["search_after"] = [self._latest_harvest_time] - - if len(self._q_string) > 0: - kwargs["q"] = f"({self._q_string})" - - results = self._products.product_list(**kwargs) - - # If this is the first page fetch, calculate total number of expected pages - # based on hit count - if self._expected_pages is None: - hits = results.summary.hits - - self._expected_pages = hits // self.PAGE_SIZE - if hits % self.PAGE_SIZE: - self._expected_pages += 1 - - self._page_counter = 0 - - for product in results.data: - yield product - self._latest_harvest_time = product.properties[self.SORT_PROPERTY][0] - - # If here, current page has been exhausted - self._page_counter += 1 - - def __iter__(self): - """Iterates over all products returned by the current query filter applied to this Products instance. - - This method handles pagination automatically by fetching additional pages - from the PDS Registry API as needed. Once all available pages and results - have been yielded, this method will reset this Products instance to a - default state which can be used to perform a new query. - - Yields - ------ - product : pds.api_client.models.pds_product.PDSProduct - The next product within the current page fetched from the PDS Registry - API. - - """ - while True: - try: - for product in self._init_new_page(): - yield product - except RuntimeError as err: - # Make sure we got the StopIteration that was converted to a RuntimeError, - # otherwise we need to re-raise - if "StopIteration" not in str(err): - raise err - - self.reset() - break - - def reset(self): - """Resets internal pagination state to default. - - This method should be called before making any modifications to the - query clause stored by this Products instance while still paginating - through the results of a previous query. - - """ - self._q_string = "" - self._expected_pages = None - self._page_counter = None - self._latest_harvest_time = None diff --git a/src/pds/peppi/products.py b/src/pds/peppi/products.py new file mode 100644 index 0000000..517203c --- /dev/null +++ b/src/pds/peppi/products.py @@ -0,0 +1,17 @@ +"""Main class of the library in this module.""" +from .client import PDSRegistryClient +from .result_set import ResultSet + + +class Products(ResultSet): + """Use to access any class of planetary products via the PDS Registry API.""" + + def __init__(self, client: PDSRegistryClient): + """Constructor of the products. + + Attributes + ---------- + client : PDSRegistryClient + Client defining the connexion with the PDS Search API + """ + ResultSet.__init__(self, client) diff --git a/src/pds/peppi/query_builder.py b/src/pds/peppi/query_builder.py new file mode 100644 index 0000000..2b3833e --- /dev/null +++ b/src/pds/peppi/query_builder.py @@ -0,0 +1,297 @@ +"""Module for the QueryBuilder. + +Contains all the methods use to elaborate the PDS4 Information Model queries through the PDS Search API. +""" +import logging +from datetime import datetime +from typing import Literal +from typing import Optional + +logger = logging.getLogger(__name__) + + +PROCESSING_LEVELS = Literal["telemetry", "raw", "partially-processed", "calibrated", "derived"] +"""Processing level values that can be used with has_processing_level()""" + + +class QueryBuilder: + """QueryBuilder provides method to elaborate complex PDS queries.""" + + def __init__(self): + """Creates a new instance of the QueryBuilder class. + + Parameters + ---------- + client: PDSRegistryClient + The client object used to interact with the PDS Registry API. + + """ + self._q_string = "" + self._fields: list[str] = [] + + def __add_clause(self, clause): + """Adds the provided clause to the query string to use on the next fetch of products from the Registry API. + + Repeated calls to this method results in a joining with any previously + added clauses via Logical AND. + + Lazy evaluation is used to only apply the filter when one iterates on this + Products instance. This way, multiple filters can be combined before the + request is actually sent. + + Notes + ----- + This method should not be called while there are still results to + iterate over from a previous query, as this could affect the results + of the next page fetch. The `reset()` method may be used to abandon + a query in progress so that this method may be called safely again. + + Parameters + ---------- + clause : str + The query clause to append. Clause should match the domain language + expected by the PDS Registry API + + Raises + ------ + RuntimeError + If this method is called while there are still results to be iterated + over from a previous query. + + """ + # TODO have something more agnostic of what the iterator is + # since the iterator is not managed by this present object + if hasattr(self, "_page_counter") and self._page_counter: + raise RuntimeError( + "Cannot modify query while paginating over previous query results.\n" + "Use the reset() method on this Products instance or exhaust all returned " + "results before assigning new query clauses." + ) + + clause = f"({clause})" + if self._q_string: + self._q_string += f" and {clause}" + else: + self._q_string = clause + + def has_target(self, identifier: str): + """Adds a query clause selecting products having a given target identifier. + + Parameters + ---------- + identifier : str + Identifier (LIDVID) of the target. + + Returns + ------- + This Products instance with the "has target" query filter applied. + + """ + clause = f'ref_lid_target eq "{identifier}"' + self.__add_clause(clause) + return self + + def has_investigation(self, identifier: str): + """Adds a query clause selecting products having a given investigation identifier. + + Parameters + ---------- + identifier : str + Identifier (LIDVID) of the target. + + Returns + ------- + This Products instance with the "has investigation" query filter applied. + + """ + clause = f'ref_lid_investigation eq "{identifier}"' + self.__add_clause(clause) + return self + + def before(self, dt: datetime): + """Adds a query clause selecting products with a start date before the given datetime. + + Parameters + ---------- + dt : datetime.datetime + Datetime object containing the desired time. + + Returns + ------- + This Products instance with the "before" filter applied. + + """ + iso8601_datetime = dt.isoformat().replace("+00:00", "Z") + clause = f'pds:Time_Coordinates.pds:start_date_time le "{iso8601_datetime}"' + self.__add_clause(clause) + return self + + def after(self, dt: datetime): + """Adds a query clause selecting products with an end date after the given datetime. + + Parameters + ---------- + dt : datetime.datetime + Datetime object containing the desired time. + + Returns + ------- + This Products instance with the "before" filter applied. + + """ + iso8601_datetime = dt.isoformat().replace("+00:00", "Z") + clause = f'pds:Time_Coordinates.pds:stop_date_time ge "{iso8601_datetime}"' + self.__add_clause(clause) + return self + + def of_collection(self, identifier: str): + """Adds a query clause selecting products belonging to the given Parent Collection identifier. + + Parameters + ---------- + identifier : str + Identifier (LIDVID) of the Collection. + + Returns + ------- + This Products instance with the "Parent Collection" filter applied. + + """ + clause = f'ops:Provenance.ops:parent_collection_identifier eq "{identifier}"' + self.__add_clause(clause) + return self + + def observationals(self): + """Adds a query clause selecting only "Product Observational" type products on the current filter. + + Returns + ------- + This Products instance with the "Observational Product" filter applied. + + """ + clause = 'product_class eq "Product_Observational"' + self.__add_clause(clause) + return self + + def collections(self, collection_type: Optional[str] = None): + """Adds a query clause selecting only "Product Collection" type products on the current filter. + + Parameters + ---------- + collection_type : str, optional + Collection type to filter on. If not provided, all collection types + are included. + + Returns + ------- + This Products instance with the "Product Collection" filter applied. + + """ + clause = 'product_class eq "Product_Collection"' + self.__add_clause(clause) + + if collection_type: + clause = f'pds:Collection.pds:collection_type eq "{collection_type}"' + self.__add_clause(clause) + + return self + + def bundles(self): + """Adds a query clause selecting only "Bundle" type products on the current filter. + + Returns + ------- + This Products instance with the "Product Bundle" filter applied. + + """ + clause = 'product_class eq "Product_Bundle"' + self.__add_clause(clause) + return self + + def has_instrument(self, identifier: str): + """Adds a query clause selecting products having an instrument matching the provided identifier. + + Parameters + ---------- + identifier : str + Identifier (LIDVID) of the instrument. + + Returns + ------- + This Products instance with the "has instrument" filter applied. + + """ + clause = f'ref_lid_instrument eq "{identifier}"' + self.__add_clause(clause) + return self + + def has_instrument_host(self, identifier: str): + """Adds a query clause selecting products having an instrument host matching the provided identifier. + + Parameters + ---------- + identifier : str + Identifier (LIDVID) of the instrument host. + + Returns + ------- + This Products instance with the "has instrument host" filter applied. + + """ + clause = f'ref_lid_instrument_host eq "{identifier}"' + self.__add_clause(clause) + return self + + def has_processing_level(self, processing_level: PROCESSING_LEVELS = "raw"): + """Adds a query clause selecting products with a specific processing level. + + Parameters + ---------- + processing_level : str, optional + The processing level to filter on. Must be one of "telemetry", "raw", + "partially-processed", "calibrated", or "derived". Defaults to "raw". + + Returns + ------- + This Products instance with the "has processing level" filter applied. + + """ + clause = f'pds:Primary_Result_Summary.pds:processing_level eq "{processing_level.title()}"' + self.__add_clause(clause) + return self + + def get(self, identifier: str): + """Adds a query clause selecting the product with a LIDVID matching the provided value. + + Parameters + ---------- + identifier : str + LIDVID of the product to filter for. + + Returns + ------- + This Products instance with the "LIDVID identifier" filter applied. + + """ + self.__add_clause(f'lidvid like "{identifier}"') + return self + + def fields(self, fields: list): + """Reduce the list of fields returned, for improved efficiency.""" + self._fields = fields + return self + + def filter(self, clause: str): + """Selects products that match the provided query clause. + + Parameters + ---------- + clause : str + A custom query clause. + + Returns + ------- + This Products instance with the provided filtering clause applied. + """ + self.__add_clause(clause) + return self diff --git a/src/pds/peppi/result_set.py b/src/pds/peppi/result_set.py new file mode 100644 index 0000000..eccea74 --- /dev/null +++ b/src/pds/peppi/result_set.py @@ -0,0 +1,168 @@ +"""Module of the ResultSet.""" +import logging +from typing import Optional + +import pandas as pd +from pds.api_client.api.all_products_api import AllProductsApi + +from .client import PDSRegistryClient +from .query_builder import QueryBuilder + +logger = logging.getLogger(__name__) + + +class ResultSet(QueryBuilder): + """ResultSet of products on which a query has been applied. Iterable.""" + + SORT_PROPERTY = "ops:Harvest_Info.ops:harvest_date_time" + """Default property to sort results of a query by.""" + + PAGE_SIZE = 100 + """Default number of results returned in each page fetch from the PDS API.""" + + def __init__(self, client: PDSRegistryClient): + """Constructor of the ResultSet.""" + super().__init__() + self._products = AllProductsApi(client.api_client) + self._latest_harvest_time = None + self._page_counter = None + self._expected_pages = None + + def _init_new_page(self): + """Queries the PDS API for the next page of results. + + Any query clauses associated to this Products instance are included here. + + If there are results remaining from the previously acquired page, + they are yieled on each subsequent call to this method. + + Yields + ------ + product : pds.api_client.models.pds_product.PDSProduct + The next product within the current page fetched from the PDS Registry + API. + + Raises + ------ + StopIteration + Once all available pages of query results have been exhausted. + + """ + # Check if we've hit the expected number of pages (or exceeded in cases + # where no results were returned from the query) + if self._page_counter and self._page_counter >= self._expected_pages: + raise StopIteration + + kwargs = {"sort": [self.SORT_PROPERTY], "limit": self.PAGE_SIZE} + + if self._latest_harvest_time is not None: + kwargs["search_after"] = [self._latest_harvest_time] + + if len(self._q_string) > 0: + kwargs["q"] = f"({self._q_string})" + + if len(self._fields) > 0: + # The sort property is used for pagination + if self.SORT_PROPERTY not in self._fields: + self._fields.append(self.SORT_PROPERTY) + + kwargs["fields"] = self._fields + + results = self._products.product_list(**kwargs) + + # If this is the first page fetch, calculate total number of expected pages + # based on hit count + if self._expected_pages is None: + hits = results.summary.hits + + self._expected_pages = hits // self.PAGE_SIZE + if hits % self.PAGE_SIZE: + self._expected_pages += 1 + + self._page_counter = 0 + + for product in results.data: + yield product + self._latest_harvest_time = product.properties[self.SORT_PROPERTY][0] + + # If here, current page has been exhausted + self._page_counter += 1 + + def __iter__(self): + """Iterates over all products returned by the current query filter applied to this Products instance. + + This method handles pagination automatically by fetching additional pages + from the PDS Registry API as needed. Once all available pages and results + have been yielded, this method will reset this Products instance to a + default state which can be used to perform a new query. + + Yields + ------ + product : pds.api_client.models.pds_product.PDSProduct + The next product within the current page fetched from the PDS Registry + API. + + """ + while True: + try: + for product in self._init_new_page(): + yield product + except RuntimeError as err: + # Make sure we got the StopIteration that was converted to a RuntimeError, + # otherwise we need to re-raise + if "StopIteration" not in str(err): + raise err + + self.reset() + break + + def reset(self): + """Resets internal pagination state to default. + + This method should be called before making any modifications to the + query clause stored by this Products instance while still paginating + through the results of a previous query. + + """ + self._expected_pages = None + self._page_counter = None + self._latest_harvest_time = None + + def as_dataframe(self, max_rows: Optional[int] = None): + """Returns the found products as a pandas DataFrame. + + Loops on the products found and returns a pandas DataFrame with the product properties as columns + and their identifier as index. + + Parameters + ---------- + max_rows : int + Optional limit in the number of products returned in the dataframe. Convenient for test while developing. + Default is no limit (None) + + Returns + ------- + The products as a pandas dataframe. + """ + result_as_dict_list = [] + lidvid_index = [] + n = 0 + for p in self: + result_as_dict_list.append(p.properties) + lidvid_index.append(p.id) + n += 1 + if max_rows and n >= max_rows: + break + self.reset() + + if n > 0: + df = pd.DataFrame.from_records(result_as_dict_list, index=lidvid_index) + # reduce useless arrays in dataframe columns + for column in df.columns: + only_1_element = df.apply(lambda x: len(x[column]) <= 1, axis=1) # noqa + if only_1_element.all(): + df[column] = df.apply(lambda x: x[column][0], axis=1) # noqa + return df + else: + logger.warning("Query with clause %s did not return any products.", self._q_string) # noqa + return None diff --git a/tests/pds/peppi/demo_candidates_A.py b/tests/pds/peppi/demo_candidates_A.py new file mode 100644 index 0000000..bc040bf --- /dev/null +++ b/tests/pds/peppi/demo_candidates_A.py @@ -0,0 +1,24 @@ +import pds.peppi as pep + +# ** option A ** +# a QueryBuilder has an explicit evaluation method called `crack` +# which triggers the query to be evaluated and returns a ResultSet object +# PRO: +# - explicit evaluation of the result +# - 'crack' could be kind of fun for peppi, but we could be more traditional with 'open' or 'execute' +# CON: +# - crack or open might sound useless in the client code, or unclear what it's useful for +lidvid = "urn:nasa:pds:context:target:asteroid.65803_didymos" +query = pep.QueryBuilder().has_target(lidvid).observationals() + +client = pep.PDSRegistryClient() +products = query.crack(client) + +# do something sequentially +for i, p in enumerate(products): + print(p.id) + if i > 10: + break + +# get all the results at once in a new object, here pandas dataframe +df = query.crack(client).as_dataframe(max_rows=10) diff --git a/tests/pds/peppi/demo_candidates_B.py b/tests/pds/peppi/demo_candidates_B.py new file mode 100644 index 0000000..94f7eb6 --- /dev/null +++ b/tests/pds/peppi/demo_candidates_B.py @@ -0,0 +1,21 @@ +import pds.peppi as pep + +# ** option B ** +# Same as before, but under the hood the Products inherit ResultSet and QueryBuilder +# to separate concerns in classes and simplify the code +# PRO: user code is short +# CON: blurred boundaries between Query and Result +client = pep.PDSRegistryClient() + +lidvid = "urn:nasa:pds:context:target:asteroid.65803_didymos" +products = pep.Products(client) +products = products.has_target(lidvid).observationals() + +# do something sequentially +for i, p in enumerate(products): + print(p.id) + if i > 10: + break + +# get all the results at once in a new object, here pandas dataframe +df = products.as_dataframe(max_rows=10) diff --git a/tests/pds/peppi/demo_candidates_C.py b/tests/pds/peppi/demo_candidates_C.py new file mode 100644 index 0000000..ec66a34 --- /dev/null +++ b/tests/pds/peppi/demo_candidates_C.py @@ -0,0 +1,21 @@ +import pds.peppi as pep + +# ** option C ** +# Products result is instantiated from both the connexion to the API and the query +# PRO: that sounds the most simple and readable implementation for the user +# CON: we loose the elegance of the Query being evaluated smoothly into a ResultSet + +client = pep.PDSRegistryClient() + +lidvid = "urn:nasa:pds:context:target:asteroid.65803_didymos" +query = pep.QueryBuilder().has_target(lidvid).observationals() +products = pep.Products(client, query) + +# do something sequentially +for i, p in enumerate(products): + print(p.id) + if i > 10: + break + +# get all the results at once in a new object, here pandas dataframe +df = products.as_dataframe(max_rows=10) diff --git a/tests/pds/peppi/test_client.py b/tests/pds/peppi/test_client.py index 4d49b39..629f3fb 100644 --- a/tests/pds/peppi/test_client.py +++ b/tests/pds/peppi/test_client.py @@ -24,13 +24,43 @@ def test_all(self): assert True + def test_fields(self): + selected_fields = ["pds:Primary_Result_Summary.pds:processing_level", "pds:File.pds:file_name"] + non_selected_fields_examples = ["pds:Time_Coordinates.pds:stop_date_time", "pds:Identification_Area.pds:title"] + for p in self.products.fields(selected_fields): + break + + for field in selected_fields: + assert field in p.properties + + for field in non_selected_fields_examples: + assert field not in p.properties + + def test_as_dataframe(self): + selected_fields = ["pds:Time_Coordinates.pds:start_date_time", "pds:Time_Coordinates.pds:stop_date_time"] + df = ( + self.products.of_collection("urn:nasa:pds:apollo_pse:data_seed::1.0") + .fields(selected_fields) + .as_dataframe(max_rows=10) + ) + + assert len(df) == 10 + + for field in selected_fields: + assert field in df.columns + + assert isinstance(df["pds:Time_Coordinates.pds:start_date_time"].iloc[0], str) + + def test_empty_dataframe(self): + df = self.products.of_collection("non_existing_collection").as_dataframe() + assert df is None + def test_query_modification_during_pagination(self): - n = 0 - for p in self.products: - n += 1 + for i, p in enumerate(self.products): + i += 1 assert isinstance(p, PdsProduct) - if n > self.MAX_ITERATIONS: + if i > self.MAX_ITERATIONS: # Attempt to modify the query clause while there are still results # to paginate through. This should result in a RuntimeError. with self.assertRaises(RuntimeError): @@ -168,7 +198,7 @@ def test_has_instrument_host(self): break def test_has_processing_level(self): - for processing_level in get_args(pep.client.PROCESSING_LEVELS): + for processing_level in get_args(pep.query_builder.PROCESSING_LEVELS): n = 0 for p in self.products.has_processing_level(processing_level): n += 1