diff --git a/README.md b/README.md index f539c7f..ad9cf32 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ The script provides the following capabilities: * Exporting the AWS SSO credentials * Use the credentials via .aws/config * Assume a role via AWS SSO +* Supports automatic authentication refresh via AWS IAM Identity Center (https://docs.aws.amazon.com/cli/latest/userguide/sso-configure-profile-token.html) Please note that the script is called `aws2-wrap` to show that it works with AWS CLI v2, even though the CLI tool is no longer called `aws2`. @@ -125,4 +126,4 @@ Please also note that `make pylint` will only report errors. You *may* want to e ## Credits -Thanks to @nitrocode, @chenrui333, @l1n, @sodul, @damian-bisignano, @flyinprogrammer, @abeluck, @topu, @bigwheel, @krabbit, @jscook2345, @hieki, @blazdivjak, @fukushun1994, @johann8384, @ppezoldt, @atwoodjw, @lummish, @life36-vinny, @lukemassa and @axelri for their contributions. +Thanks to @matan129, @nitrocode, @chenrui333, @l1n, @sodul, @damian-bisignano, @flyinprogrammer, @abeluck, @topu, @bigwheel, @krabbit, @jscook2345, @hieki, @blazdivjak, @fukushun1994, @johann8384, @ppezoldt, @atwoodjw, @lummish, @life36-vinny, @lukemassa and @axelri for their contributions. diff --git a/aws2wrap/__init__.py b/aws2wrap/__init__.py index e7c158b..2a4cf57 100755 --- a/aws2wrap/__init__.py +++ b/aws2wrap/__init__.py @@ -26,7 +26,7 @@ import sys from datetime import datetime, timezone # pylint: disable=wrong-import-order from typing import (Any, Dict, List, # pylint: disable=wrong-import-order - Optional, Tuple, Union, cast) + Optional, Tuple, Union, Callable, cast) import psutil @@ -212,7 +212,10 @@ def retrieve_token_from_file( return blob["accessToken"] -def retrieve_token(sso_start_url: str, sso_region: str, profile_name: str, refresh_profile: Optional[str]) -> str: +def retrieve_token(sso_start_url: str, + sso_region: str, + profile_name: ProfileDef, + refresh_profile: Optional[ProfileDef]) -> str: """Get the access token back from the SSO cache. Args: @@ -235,9 +238,21 @@ def retrieve_token(sso_start_url: str, sso_region: str, profile_name: str, refre return retrieve_token_from_cache(sso_start_url, sso_region, profile_name) -def retrieve_token_from_cache(sso_start_url: str, sso_region: str, profile_name: str) -> str: - # Check each of the files in ~/.aws/sso/cache looking for one that references - # the specific SSO URL and region. If found then check the expiration. +def retrieve_token_from_cache(sso_start_url: str, sso_region: str, profile_name: ProfileDef) -> str: + """Check each of the files in ~/.aws/sso/cache looking for one that references + the specific SSO URL and region. If found then check the expiration. + + Args: + sso_start_url (str): The SSO URL to match for a valid token. + sso_region (str): The AWS region to match for a valid token. + profile_name (str): The desired profile to fetch the token for. + + Raises: + Aws2WrapError: No valid token found for the specified profile + + Returns: + str: The access token if matched and not expired. + """ cachedir_path = os.path.abspath(os.path.expanduser("~/.aws/sso/cache")) cachedir = pathlib.Path(cachedir_path) for cachefile in cachedir.iterdir(): @@ -247,19 +262,27 @@ def retrieve_token_from_cache(sso_start_url: str, sso_region: str, profile_name: raise Aws2WrapError(f"Please login with 'aws sso login --profile={profile_name}'") -def try_refreshing_tokens(profile_name): - # There's no direct way to refresh the tokens, but a quick STS api call does the trick. - # Note that `aws sso login ...` will *not* refresh the tokens but will invalidate the whole SSO session (if any) - # which is not what we want here. +def try_refreshing_tokens(profile_name: ProfileDef): + """Try to refresh any token that AWS CLI currently has for the desired profile. + + There's no direct way to refresh the tokens, but a quick STS api call does the trick. + Note that `aws sso login ...` will *not* refresh the tokens but will invalidate + the whole SSO session (if any) which is not what we want here. + + Args: + profile_name (str): Profile to try to refresh + """ call_aws_cli(["sts", "get-caller-identity"], profile_name) -def get_role_credentials(profile: ProfileDef, parent_profile_name: Optional[str] = None) -> Dict[str, Any]: +def get_role_credentials(profile: ProfileDef, + parent_profile_name: Optional[ProfileDef] = None) -> Dict[str, Any]: """Get the role credentials. Args: profile: An AWS profile object. - parent_profile_name: The name of the parent profile (which included this profile via source_profile), if any. + parent_profile_name: The name of the parent profile (which included this profile + via source_profile), if any. Returns: A dict of AWS credential values. Raises: @@ -290,7 +313,17 @@ def get_role_credentials(profile: ProfileDef, parent_profile_name: Optional[str] return output -def choose_refreshable_profile(parent_profile_name: Optional[str], profile: ProfileDef) -> Optional[str]: +def choose_refreshable_profile(parent_profile_name: Optional[ProfileDef], + profile: ProfileDef) -> Optional[ProfileDef]: + """Determine the name of the refreshable profile. + + Args: + parent_profile_name (Optional[str]): _description_ + profile (ProfileDef): _description_ + + Returns: + Optional[str]: name of the refreshable profile + """ if "sso_session" not in profile: # Not refreshable return None @@ -303,7 +336,28 @@ def choose_refreshable_profile(parent_profile_name: Optional[str], profile: Prof return retrieve_attribute(profile, "profile_name") -def call_aws_cli(args, profile_name, error_supplier=None, append_profile_option=True, env=None): +def call_aws_cli(args: list[str], + profile_name: ProfileDef, + error_supplier: Optional[Callable]=None, + append_profile_option: bool=True, + env: Optional[dict]=None) -> bytes: + """Generalised function to call AWS CLI + + Args: + args (list[str]): arguments to pass to AWS CLI + profile_name (ProfileDef): profile to use + error_supplier (Callable, optional): Function to call in the event of an error. + Defaults to None. + append_profile_option (bool, optional): Appends profile name to arguments. Defaults to True. + env (dict, optional): Mapping to define environment variables for the process. + Defaults to None. + + Raises: + error_supplier: Defined error function + + Returns: + bytes: standard output from running the command + """ # We call the aws2 CLI tool rather than trying to use boto3 because the latter is # currently a special version and this script is trying to avoid needing any extra # packages. @@ -331,7 +385,7 @@ def call_aws_cli(args, profile_name, error_supplier=None, append_profile_option= def get_assumed_role_credentials( profile: ProfileDef, - parent_profile_name: Optional[str] = None + parent_profile_name: Optional[ProfileDef] = None ) -> Dict[str, Dict[str, str]]: """Get the assumed role credentials specified by role_arn and source_profile. @@ -350,7 +404,7 @@ def get_assumed_role_credentials( # Get credentials of source_profile recursively. source_credentials = get_assumed_role_credentials( retrieve_attribute(profile, "source_profile"), - parent_profile_name=cast(str, profile["profile_name"]) + parent_profile_name=cast(ProfileDef, profile["profile_name"]) ) # Set credentials of source_profile. diff --git a/aws2wrap/version.py b/aws2wrap/version.py index 2975a74..71cffa2 100644 --- a/aws2wrap/version.py +++ b/aws2wrap/version.py @@ -1,2 +1,2 @@ # pylint: disable=missing-module-docstring -__version__ = '1.3.1' +__version__ = '1.4.0' diff --git a/setup.py b/setup.py index ded0986..ab09d9e 100644 --- a/setup.py +++ b/setup.py @@ -56,5 +56,5 @@ def get_version(): 'aws2-wrap = aws2wrap:main', ] }, - python_requires=">=3.6", + python_requires=">=3.8", )