Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make client auto choose prediction endpoint #425

Merged
merged 1 commit into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 39 additions & 24 deletions gordo_components/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from gordo_components import serializer
from gordo_components.client import io as gordo_io
from gordo_components.client.io import HttpUnprocessableEntity
from gordo_components.client.forwarders import PredictionForwarder
from gordo_components.client.utils import EndpointMetadata, PredictionResult
from gordo_components.dataset.datasets import TimeSeriesDataset
Expand All @@ -45,7 +46,6 @@ def __init__(
port: int = 443,
scheme: str = "https",
gordo_version: str = "v0",
prediction_path: str = "/anomaly/prediction",
metadata: typing.Optional[dict] = None,
data_provider: typing.Optional[GordoBaseDataProvider] = None,
prediction_forwarder: typing.Optional[PredictionForwarder] = None,
Expand All @@ -72,8 +72,6 @@ def __init__(
The request scheme to use, ie 'https'.
gordo_version: str
The version of major gordo the services are using, ie. 'v0'.
prediction_path: str
Url path to use for making predictions, default '/anomaly/prediction'
metadata: Optional[dict]
Arbitrary mapping of key-value pairs to save to influx with
prediction runs in 'tags' property
Expand Down Expand Up @@ -105,7 +103,9 @@ def __init__(
self.endpoints = self._endpoints_from_watchman(self.watchman_endpoint)
self.prediction_forwarder = prediction_forwarder
self.data_provider = data_provider
self.prediction_path = prediction_path

# Default, failing back to /prediction on http code 422
self.prediction_path = "/anomaly/prediction"
self.batch_size = batch_size
self.parallelism = parallelism
self.forward_resampled_sensors = forward_resampled_sensors
Expand Down Expand Up @@ -385,23 +385,28 @@ async def _process_post_prediction_task(
PredictionResult
"""

json = {
"X": server_utils.multi_lvl_column_dataframe_to_dict(X.iloc[chunk]),
"y": server_utils.multi_lvl_column_dataframe_to_dict(y.iloc[chunk])
if y is not None
else None,
}

for i in itertools.count(start=1):
try:
resp = await gordo_io.post_json(
f"{endpoint.endpoint}{self.prediction_path}",
session=session,
json={
"X": server_utils.multi_lvl_column_dataframe_to_dict(
X.iloc[chunk]
),
"y": server_utils.multi_lvl_column_dataframe_to_dict(
y.iloc[chunk]
)
if y is not None
else None,
},
)

try:
resp = await gordo_io.post_json(
url=f"{endpoint.endpoint}{self.prediction_path}",
session=session,
json=json,
)
except HttpUnprocessableEntity:
self.prediction_path = "/prediction"
resp = await gordo_io.post_json(
url=f"{endpoint.endpoint}{self.prediction_path}",
session=session,
json=json,
)
# If it was an IO or TimeoutError, we can retry
except (
IOError,
Expand Down Expand Up @@ -517,12 +522,22 @@ async def _process_get_prediction_task(
-------
PredictionResult
"""
json = {"start": start.isoformat(), "end": end.isoformat()}

try:
response = await gordo_io.fetch_json(
f"{endpoint.endpoint}{self.prediction_path}",
session=session,
json={"start": start.isoformat(), "end": end.isoformat()},
)
try:
response = await gordo_io.fetch_json(
f"{endpoint.endpoint}{self.prediction_path}",
session=session,
json=json,
)
except HttpUnprocessableEntity:
self.prediction_path = "/prediction"
response = await gordo_io.fetch_json(
f"{endpoint.endpoint}{self.prediction_path}",
session=session,
json=json,
)
except IOError as exc:
msg = (
f"Failed to get predictions for dates {start} -> {end} "
Expand Down
14 changes: 13 additions & 1 deletion gordo_components/client/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@
from werkzeug.exceptions import BadRequest


class HttpUnprocessableEntity(BaseException):
"""
Represents an error from an HTTP status code of 422: UnprocessableEntity.
Used in our case for calling /anomaly/prediction on a model which does not
support anomaly behavior.
"""

pass


async def fetch_json(
url: str, *args, session: Optional[aiohttp.ClientSession] = None, **kwargs
) -> dict:
Expand Down Expand Up @@ -73,7 +83,9 @@ async def _handle_json(resp: aiohttp.ClientResponse) -> dict:
content = await resp.content.read()
msg = f"Failed to get JSON with status code: {resp.status}: {content}"

if 400 <= resp.status <= 499:
if resp.status == 422:
raise HttpUnprocessableEntity()
elif 400 <= resp.status <= 499:
raise BadRequest(msg)
else:
raise IOError(msg)