Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(auth): implement iap auth token #637

Merged
merged 10 commits into from
Sep 22, 2023
17 changes: 17 additions & 0 deletions auth/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
This library implements an ``IamClient`` class, which can be used to interact
with GCP public keys and URL sign blobs.

It also implements an ``IapToken`` class which is used for authorizing against
an `Identity-Aware Proxy`_ (IAP) secured GCP service. IAP uses identity tokens
which are specific to the target service and allows administrators to configure
a list of identities (ex. service accounts, users, or groups) that may access
the service. Therefore each ``IapToken`` instance corresponds to an ID token
which may be used to authorize against a single IAP service.

It additionally implements a ``Token`` class, which is used for authorizing
against Google Cloud. The other ``gcloud-aio-*`` package components accept a
``Token`` instance as an argument; you can define a single token for all of
Expand Down Expand Up @@ -52,13 +59,23 @@ routes, eg. we can list our project's uptime checks with a tool such as
-H "Authorization: Bearer $(python3 -c 'from gcloud.rest.auth import Token; print(Token().get())')" \
"https://monitoring.googleapis.com/v3/projects/PROJECT_ID/uptimeCheckConfigs"

Similarly it can be used to quickly test your IAP-secured endpoints:

.. code-block:: console

# using default application credentials
curl \
-H "Authorization: Bearer $(python3 -c 'from gcloud.rest.auth import IapToken; print(IapToken(APP_URL, service_account=SA))')" \
APP_URL

Contributing
------------

Please see our `contributing guide`_.

.. _contributing guide: https://github.com/talkiq/gcloud-aio/blob/master/.github/CONTRIBUTING.rst
.. _our docs: https://talkiq.github.io/gcloud-aio
.. _Identity-Aware Proxy: https://cloud.google.com/iap
.. _scopes: https://developers.google.com/identity/protocols/googlescopes

.. |pypi| image:: https://img.shields.io/pypi/v/gcloud-aio-auth.svg?style=flat-square
Expand Down
29 changes: 27 additions & 2 deletions auth/gcloud/aio/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,41 @@

```python
from gcloud.aio.auth import IamClient
from gcloud.aio.auth import IapToken
from gcloud.aio.auth import Token


client = IamClient()
pubkeys = await client.list_public_keys()

iap_token = IapToken('https://your.service.url.com')
print(await iap_token.get())

token = Token()
print(await token.get())
```

Additionally, the `Token` constructor accepts the following optional arguments:
The `IapToken` constructor accepts the following optional arguments:

* `service_file`: path to a [service account][service-account], authorized user
file, or any other application credentials. Alternatively, you can pass a
file-like object, like an `io.StringIO` instance, in case your credentials
are not stored in a file but in memory. If omitted, will attempt to find one
on your path or fallback to generating a token from GCE metadata.
* `session`: an `aiohttp.ClientSession` instance to be used for all requests.
If omitted, a default session will be created. If you use the default
session, you may be interested in using `Token()` as a context manager
(`async with Token(..) as token:`) or explicitly calling the `Token.close()`
method to ensure the session is cleaned up appropriately.
* `service_account`: an optional string denoting a GCP service account which
takes the form of an email address. Only valid (and required!) for
authentication with a project's authorized users. [Impersonating a service
account][impersonating-sa] is required when generating an ID token in this
case.

The `Token` constructor accepts the following optional arguments:

* `service_file`: path to a [service account][service-account] authorized user
* `service_file`: path to a [service account][service-account], authorized user
file, or any other application credentials. Alternatively, you can pass a
file-like object, like an `io.StringIO` instance, in case your credentials
are not stored in a file but in memory. If omitted, will attempt to find one
Expand All @@ -39,13 +61,15 @@
Only valid (and required!) for [service account][service-account]
authentication.

[impersonating-sa]: https://cloud.google.com/iap/docs/authentication-howto#obtaining_an_oidc_token_in_all_other_cases # pylint: ignore=line-too-long
[service-account]: https://console.cloud.google.com/iam-admin/serviceaccounts
"""
import importlib.metadata

from .build_constants import BUILD_GCLOUD_REST
from .iam import IamClient
from .session import AioSession
from .token import IapToken
from .token import Token
from .utils import decode
from .utils import encode
Expand All @@ -56,6 +80,7 @@
'AioSession',
'BUILD_GCLOUD_REST',
'IamClient',
'IapToken',
'Token',
'__version__',
'decode',
Expand Down
37 changes: 37 additions & 0 deletions auth/gcloud/aio/auth/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ async def delete(
) -> Response:
pass

@abstractmethod
async def head(
self, url: str, headers: Optional[Mapping[str, str]],
timeout: float, params: Optional[Mapping[str, Union[int, str]]],
allow_redirects: bool,
) -> Response:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could definitely work, although I would like to limit the sprawl of my changes and this is an extension of existing form. Since this is an abstract base class and this is an abstract method, you can't instantiate an object of this class so raising is redundant:

>>> import abc
>>> class Base(abc.ABC):
...     def fn(self):
...         return 42
...     @abc.abstractmethod
...     def abstract(self):
...         pass
... 
>>> b = Base()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: Can't instantiate abstract class Base with abstract method abstract


@abstractmethod
async def request(
self, method: str, url: str, headers: Mapping[str, str],
Expand Down Expand Up @@ -219,6 +227,21 @@ async def delete( # type: ignore[override]
await _raise_for_status(resp)
return resp

async def head( # type: ignore[override]
self, url: str,
headers: Optional[Mapping[str, str]] = None,
timeout: Timeout = 10,
params: Optional[Mapping[str, Union[int, str]]] = None,
allow_redirects: bool = False,
) -> aiohttp.ClientResponse:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is imported as from aiohttp import ClientResponse as Response. Perhaps change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the previous comment about wanting to avoid changing unrelated parts of the code.

@TheKevJames might be able to tell you more about this choice.

resp = await self.session.head(
url, headers=headers,
params=params, timeout=timeout,
allow_redirects=allow_redirects,
)
await _raise_for_status(resp)
return resp

async def request( # type: ignore[override]
self, method: str,
url: str, headers: Mapping[str, str],
Expand Down Expand Up @@ -321,6 +344,20 @@ async def delete(
resp.raise_for_status()
return resp

async def head(
self, url: str, headers: Optional[Mapping[str, str]] = None,
timeout: float = 10,
params: Optional[Mapping[str, Union[int, str]]] = None,
allow_redirects: bool = False,
) -> Response:
with self.google_api_lock:
resp = self.session.head(
url, params=params, headers=headers,
timeout=timeout, allow_redirects=allow_redirects,
)
resp.raise_for_status()
return resp

async def request(
self, method: str, url: str, headers: Mapping[str, str],
auto_raise_for_status: bool = True, **kwargs: Any,
Expand Down
Loading