Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge query params ourselves instead of relying on httpx to do it #68

Merged
merged 1 commit into from
Dec 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions pyporscheconnectapi/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@

_LOGGER = logging.getLogger(__name__)


class Credentials(NamedTuple):
"""Store credentials for the Porsche Connect API."""

email: str
password: str


class Captcha(NamedTuple):
"""Store captcha data for the Porsche Connect API."""

Expand Down Expand Up @@ -117,9 +119,7 @@ async def ensure_valid_token(self, token: OAuth2Token):
token.update(token_data)
token.expires_at = token_data["expires_in"]
_LOGGER.debug("Refreshed Access Token: %s", token.access_token)
if (
token.access_token is None or token_is_expired is None
): # no token, get a new one
if token.access_token is None or token_is_expired is None: # no token, get a new one
auth_code = await self.fetch_authorization_code()
token_data = await self.fetch_access_token(auth_code)
token.update(token_data)
Expand Down Expand Up @@ -207,7 +207,10 @@ async def get_and_extract_location_params(self, url, params=None):
if params is None:
params = {}
resp = await self.client.get(
url, params=params, timeout=TIMEOUT, headers=self.headers,
url,
params=self._merge_query_params(url, params),
timeout=TIMEOUT,
headers=self.headers,
)
if resp.status_code != 302:
msg = "Could not fetch authorization code"
Expand All @@ -224,6 +227,14 @@ def _extract_params_from_url(self, url):
"""
return parse_qs(urlparse(url).query)

def _merge_query_params(self, url: str, params: dict[str, str]) -> dict[str, str]:
"""Merge query parameters into a new dictionary with the existing query parameters of a URL."""
parsed_url = urlparse(url)
query = parse_qs(parsed_url.query)
new_query = {k: v[0] for k, v in query.items()}
new_query.update(params)
return new_query

async def login_with_identifier(self, state: str):
"""Log into the Identifier First flow.

Expand Down Expand Up @@ -328,7 +339,10 @@ async def fetch_access_token(self, authorization_code):
_LOGGER.debug("Exchanging the authorization code for an access token.")

resp = await self.client.post(
TOKEN_URL, data=data, timeout=TIMEOUT, headers=self.headers,
TOKEN_URL,
data=data,
timeout=TIMEOUT,
headers=self.headers,
)
resp.raise_for_status()
return resp.json()
Expand All @@ -350,7 +364,10 @@ async def refresh_token(self, refresh_token):
_LOGGER.debug("Using the refresh token to get a new access token.")

resp = await self.client.post(
TOKEN_URL, data=data, timeout=TIMEOUT, headers=self.headers,
TOKEN_URL,
data=data,
timeout=TIMEOUT,
headers=self.headers,
)
resp.raise_for_status()
return resp.json()
Expand Down
Loading