From 42a5a1dc58cd5f5304ceb87d704c4a863011bd5d Mon Sep 17 00:00:00 2001 From: Jamie Pringle Date: Fri, 10 Feb 2023 13:53:53 -0500 Subject: [PATCH] Add sig_v5 support to duo_client_python calls and v2 integrations handler (#188) * Add sig_v5 support to duo_client_python calls and update integrations to v2 handler * Fix billing tests * Update docstring for integrations to call out that sso feature is not released --------- Co-authored-by: Jamie Pringle --- duo_client/admin.py | 85 ++++++++++++--- duo_client/client.py | 118 +++++++++++++++++---- examples/create_integration_sso_generic.py | 67 ++++++++++++ tests/accountAdmin/test_billing.py | 23 ++-- tests/admin/test_admins.py | 15 +-- tests/admin/test_groups.py | 5 +- tests/admin/test_integration.py | 78 ++++++++++++++ tests/admin/test_integrations.py | 10 +- tests/admin/test_logo.py | 4 +- tests/admin/test_settings.py | 66 ++++++------ tests/admin/test_users.py | 63 +++++------ tests/test_client.py | 16 +++ 12 files changed, 425 insertions(+), 125 deletions(-) create mode 100644 examples/create_integration_sso_generic.py create mode 100644 tests/admin/test_integration.py diff --git a/duo_client/admin.py b/duo_client/admin.py index 9d527b4..d9e56a3 100644 --- a/duo_client/admin.py +++ b/duo_client/admin.py @@ -216,11 +216,43 @@ class Admin(client.Client): account_id = None + admin_sig_version = 5 - def api_call(self, method, path, params): + def api_call(self, method, path, params, sig_version=admin_sig_version): if self.account_id is not None: params['account_id'] = self.account_id - return super(Admin, self).api_call(method, path, params) + return super(Admin, self).api_call( + method, + path, + params, + sig_version=sig_version + ) + + def json_api_call(self, method, path, params, sig_version=admin_sig_version): + return super(Admin, self).json_api_call( + method, + path, + params, + sig_version=sig_version + ) + + def json_paging_api_call(self, method, path, params, sig_version=admin_sig_version): + return super(Admin, self).json_paging_api_call( + method, + path, + params, + sig_version=sig_version + ) + + def json_cursor_api_call(self, method, path, params, get_records_func, sig_version=admin_sig_version): + return super(Admin, self).json_cursor_api_call( + method, + path, + params, + get_records_func, + sig_version=sig_version + ) + @classmethod def _canonicalize_ip_whitelist(klass, ip_whitelist): @@ -2378,8 +2410,8 @@ def get_integrations_generator(self): """ return self.json_paging_api_call( 'GET', - '/admin/v1/integrations', - {} + '/admin/v2/integrations', + {}, ) def get_integrations(self, limit=None, offset=0): @@ -2398,8 +2430,8 @@ def get_integrations(self, limit=None, offset=0): if limit: return self.json_api_call( 'GET', - '/admin/v1/integrations', - {'limit': limit, 'offset': offset} + '/admin/v2/integrations', + {'limit': limit, 'offset': offset}, ) return list(self.get_integrations_generator()) @@ -2417,8 +2449,8 @@ def get_integration(self, integration_key): params = {} response = self.json_api_call( 'GET', - '/admin/v1/integrations/' + integration_key, - params + '/admin/v2/integrations/' + integration_key, + params, ) return response @@ -2441,7 +2473,8 @@ def create_integration(self, ip_whitelist=None, ip_whitelist_enroll_policy=None, groups_allowed=None, - self_service_allowed=None): + self_service_allowed=None, + sso=None): """Creates a new integration. name - The name of the integration (required) @@ -2467,6 +2500,9 @@ def create_integration(self, adminapi_write_resource - |None groups_allowed - self_service_allowed - |None + sso - (optional) + New argument for unreleased feature. Will return an error if used. + Client will be updated again in the future when feature is released. Returns the created integration. @@ -2514,9 +2550,12 @@ def create_integration(self, params['groups_allowed'] = groups_allowed if self_service_allowed is not None: params['self_service_allowed'] = '1' if self_service_allowed else '0' + if sso is not None: + params['sso'] = sso response = self.json_api_call('POST', - '/admin/v1/integrations', - params) + '/admin/v2/integrations', + params, + ) return response def delete_integration(self, integration_key): @@ -2528,8 +2567,12 @@ def delete_integration(self, integration_key): """ integration_key = six.moves.urllib.parse.quote_plus(str(integration_key)) - path = '/admin/v1/integrations/%s' % integration_key - return self.json_api_call('DELETE', path, {}) + path = '/admin/v2/integrations/%s' % integration_key + return self.json_api_call( + 'DELETE', + path, + {}, + ) def update_integration(self, integration_key, @@ -2551,7 +2594,8 @@ def update_integration(self, ip_whitelist=None, ip_whitelist_enroll_policy=None, groups_allowed=None, - self_service_allowed=None): + self_service_allowed=None, + sso=None): """Updates an integration. integration_key - The key of the integration to update. (required) @@ -2576,6 +2620,9 @@ def update_integration(self, reset_secret_key - |None groups_allowed - self_service_allowed - True|False|None + sso - (optional) + New argument for unreleased feature. Will return an error if used. + Client will be updated again in the future when feature is released. If any value other than None is provided for 'reset_secret_key' (for example, 1), then a new secret key will be generated for the @@ -2587,7 +2634,7 @@ def update_integration(self, """ integration_key = six.moves.urllib.parse.quote_plus(str(integration_key)) - path = '/admin/v1/integrations/%s' % integration_key + path = '/admin/v2/integrations/%s' % integration_key params = {} if name is not None: params['name'] = name @@ -2629,11 +2676,17 @@ def update_integration(self, params['groups_allowed'] = groups_allowed if self_service_allowed is not None: params['self_service_allowed'] = '1' if self_service_allowed else '0' + if sso is not None: + params['sso'] = sso if not params: raise TypeError("No new values were provided") - response = self.json_api_call('POST', path, params) + response = self.json_api_call( + 'POST', + path, + params, + ) return response def get_admins(self, limit=None, offset=0): diff --git a/duo_client/client.py b/duo_client/client.py index 26491a1..b635e57 100644 --- a/duo_client/client.py +++ b/duo_client/client.py @@ -54,7 +54,58 @@ def canon_params(params): return '&'.join(args) -def canonicalize(method, host, uri, params, date, sig_version, body=None): +def canon_x_duo_headers(additional_headers): + """ + Args: + additional_headers: Dict + Returns: + stringified version of all headers that start with 'X-Duo*'. Which is then hashed. + Note: the keys are also lower-cased for signing. + """ + if additional_headers is None: + additional_headers = {} + + canon_list = [] + added_headers = [] # store headers we've added, use for duplicate checking (case insensitive) + for header_name in sorted(additional_headers.keys()): + # Extract header value and set key to lower case from now on. + value = additional_headers[header_name] + header_name = header_name.lower() if header_name is not None else None + + # Validation gate. We will raise if a problem is found here. + _validate_additional_header(header_name, value, added_headers) + + # Add to the list of values to canonicalize: + canon_list.extend([header_name, value]) + added_headers.append(header_name) + + canon = '\x00'.join(canon_list) + return hashlib.sha512(canon.encode('utf-8')).hexdigest() + + +def _validate_additional_header(header_name, value, added_headers): + """ + Args: + header_name: str + value: str + added_headers: list[str] - headers we've already added - check for duplicates (case insensitive) + Returns: None + + Validates additional headers added to request - headers must comply with the following rules (for V5 sig_version) + """ + if header_name is None or value is None: + raise ValueError("Not allowed 'None' as a header name or value") + if '\x00' in header_name: + raise ValueError("Not allowed 'Null' character in header name") + if '\x00' in value: + raise ValueError("Not allowed 'Null' character in header value") + if not header_name.lower().startswith('x-duo-'): + raise ValueError("Additional headers must start with \'X-Duo-\'") + if header_name.lower() in added_headers: + raise ValueError("Duplicate header passed, header={}".format(header_name)) + + +def canonicalize(method, host, uri, params, date, sig_version, body=None, additional_headers=None): """ Return a canonical string version of the given request attributes. @@ -91,17 +142,27 @@ def canonicalize(method, host, uri, params, date, sig_version, body=None): canon_params(params), hashlib.sha512(body.encode('utf-8')).hexdigest(), ] + elif sig_version == 5: + canon = [ + date, + method.upper(), + host.lower(), + uri, + canon_params(params), + hashlib.sha512(body.encode('utf-8')).hexdigest(), + canon_x_duo_headers(additional_headers), # hashed in canon_x_duo_headers + ] else: raise ValueError("Unknown signature version: {}".format(sig_version)) return '\n'.join(canon) def sign(ikey, skey, method, host, uri, date, sig_version, params, body=None, - digestmod=hashlib.sha512): + digestmod=hashlib.sha512, additional_headers=None): """ Return basic authorization header line with a Duo Web API signature. """ - canonical = canonicalize(method, host, uri, params, date, sig_version, body=body) + canonical = canonicalize(method, host, uri, params, date, sig_version, body=body, additional_headers=additional_headers) if isinstance(skey, six.text_type): skey = skey.encode('utf-8') if isinstance(canonical, six.text_type): @@ -153,7 +214,7 @@ def __init__(self, ikey, skey, host, user_agent=('Duo API Python/' + __version__), timeout=socket._GLOBAL_DEFAULT_TIMEOUT, paging_limit=100, - digestmod=hashlib.sha512, + digestmod=hashlib.sha512, sig_version=2, port=None ): @@ -189,9 +250,6 @@ def __init__(self, ikey, skey, host, if sig_version == 3: raise ValueError('sig_version 3 not supported') - if sig_version == 4 and digestmod != hashlib.sha512: - raise ValueError('sha512 required for sig_version 4') - def set_proxy(self, host, port=None, headers=None, proxy_type='CONNECT'): """ @@ -207,7 +265,14 @@ def set_proxy(self, host, port=None, headers=None, self.proxy_port = port self.proxy_type = proxy_type - def api_call(self, method, path, params): + def api_call( + self, + method, + path, + params, + additional_headers=None, + sig_version=None, + ): """ Call a Duo API method. Return a (response, data) tuple. @@ -215,20 +280,30 @@ def api_call(self, method, path, params): * path: Full path of the API endpoint. E.g. "/auth/v2/ping". * params: dict mapping from parameter name to stringified value, or a dict to be converted to json. + * sig_version: signature version integer """ params_go_in_body = method in ('POST', 'PUT', 'PATCH') - if self.sig_version in (1, 2): + digestmod = self.digestmod + if additional_headers is None: + additional_headers = {} + if sig_version is None: + sig_version = self.sig_version + + if sig_version in (1, 2): params = normalize_params(params) # v1 and v2 canonicalization don't distinguish between # params and body. There's no separate body input. body = None - elif self.sig_version == 4: + elif sig_version in (4, 5): + digestmod = hashlib.sha512 if params_go_in_body: body = self.canon_json(params) params = {} else: body = '' params = normalize_params(params) + else: + raise ValueError(f"unsupported sig_version {sig_version}") if self.sig_timezone == 'UTC': now = email.utils.formatdate() @@ -244,20 +319,25 @@ def api_call(self, method, path, params): self.host, path, now, - self.sig_version, + sig_version, params, body=body, - digestmod=self.digestmod) + digestmod=digestmod, + additional_headers=additional_headers) headers = { 'Authorization': auth, 'Date': now, } + if sig_version == 5: + for k, v in additional_headers.items(): + headers[k] = v + if self.user_agent: headers['User-Agent'] = self.user_agent if params_go_in_body: - if self.sig_version == 4: + if sig_version in (4, 5): headers['Content-type'] = 'application/json' else: headers['Content-type'] = 'application/x-www-form-urlencoded' @@ -382,16 +462,16 @@ def normalize_paging_args(self, limit=None, offset=0): return (limit, offset) - def json_api_call(self, method, path, params): + def json_api_call(self, method, path, params, sig_version=None): """ Call a Duo API method which is expected to return a JSON body with a 200 status. Return the response data structure or raise RuntimeError. """ - (response, data) = self.api_call(method, path, params) + (response, data) = self.api_call(method, path, params, sig_version=sig_version) return self.parse_json_response(response, data) - def json_paging_api_call(self, method, path, params): + def json_paging_api_call(self, method, path, params, sig_version=None): """ Call a Duo API method which is expected to return a JSON body with a 200 status. Return a generator that can be used to get @@ -405,13 +485,13 @@ def json_paging_api_call(self, method, path, params): while next_offset is not None: params['offset'] = str(next_offset) - (response, data) = self.api_call(method, path, params) + (response, data) = self.api_call(method, path, params, sig_version=sig_version) (objects, metadata) = self.parse_json_response_and_metadata(response, data) next_offset = metadata.get('next_offset', None) for obj in objects: yield obj - def json_cursor_api_call(self, method, path, params, get_records_func): + def json_cursor_api_call(self, method, path, params, get_records_func, sig_version=None): """ Call a Duo API endpoint which utilizes a cursor in some responses to page through a set of data. This cursor is supplied through the optional @@ -440,7 +520,7 @@ def json_cursor_api_call(self, method, path, params, get_records_func): while True: if next_offset is not None: params['offset'] = str(next_offset) - (http_resp, http_resp_data) = self.api_call(method, path, params) + (http_resp, http_resp_data) = self.api_call(method, path, params, sig_version=sig_version) (response, metadata) = self.parse_json_response_and_metadata( http_resp, http_resp_data, diff --git a/examples/create_integration_sso_generic.py b/examples/create_integration_sso_generic.py new file mode 100644 index 0000000..da3450b --- /dev/null +++ b/examples/create_integration_sso_generic.py @@ -0,0 +1,67 @@ +#!/usr/bin/python +from __future__ import absolute_import +from __future__ import print_function +import pprint +import sys + +import duo_client +from six.moves import input + +argv_iter = iter(sys.argv[1:]) +def get_next_arg(prompt): + try: + return next(argv_iter) + except StopIteration: + return input(prompt) + +ikey = get_next_arg('Admin API integration key ("DI..."): ') +skey = get_next_arg('integration secret key: ') +host = get_next_arg('API hostname ("api-....duosecurity.com"): ') + +# Configuration and information about objects to create. +admin_api = duo_client.Admin( + ikey, + skey, + host, +) + +integration = admin_api.create_integration( + name='api-created integration', + integration_type='sso-generic', + sso={ + "saml_config": { + "entity_id": "entity_id", + "acs_urls": [ + { + "url": "https://example.com/acs", + "binding": None, + "isDefault": None, + "index": None, + } + ], + "nameid_format": "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified", + "nameid_attribute": "mail", + "sign_assertion": False, + "sign_response": True, + "signing_algorithm": "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256", + "mapped_attrs": {}, + "relaystate": "https://example.com/relaystate", + "slo_url": "https://example.com/slo", + "spinitiated_url": "https://example.com/spurl", + "static_attrs": {}, + "role_attrs": { + "bob": { + "ted": ["DGS08MMO53GNRLSFW0D0", "DGETXINZ6CSJO4LRSVKV"], + "frank": ["DGETXINZ6CSJO4LRSVKV"], + } + }, + "attribute_transformations": { + "attribute_1": 'use ""\nprepend text="dev-"', + "attribute_2": 'use ""\nappend additional_attr=""', + } + } + }, +) + +print('Created integration:') +pprint.pprint(integration) diff --git a/tests/accountAdmin/test_billing.py b/tests/accountAdmin/test_billing.py index 74318b0..21c1d8e 100644 --- a/tests/accountAdmin/test_billing.py +++ b/tests/accountAdmin/test_billing.py @@ -1,3 +1,5 @@ +import json + from .. import util import duo_client.admin from .base import TestAccountAdmin @@ -22,14 +24,13 @@ def test_set_business_billing_edition(self): """ response = self.client.set_edition('PLATFORM') uri = response['uri'] - args = response['body'] self.assertEqual(response['method'], 'POST') self.assertEqual(uri, '/admin/v1/billing/edition') - self.assertEqual(util.params_to_dict(args), + self.assertEqual(json.loads(response['body']), { - 'edition': ['PLATFORM'], - 'account_id': [self.client.account_id], + 'edition': 'PLATFORM', + 'account_id': self.client.account_id, }) def test_set_enterprise_billing_edition(self): @@ -37,14 +38,13 @@ def test_set_enterprise_billing_edition(self): """ response = self.client.set_edition('ENTERPRISE') uri = response['uri'] - args = response['body'] self.assertEqual(response['method'], 'POST') self.assertEqual(uri, '/admin/v1/billing/edition') - self.assertEqual(util.params_to_dict(args), + self.assertEqual(json.loads(response['body']), { - 'edition': ['ENTERPRISE'], - 'account_id': [self.client.account_id], + 'edition': 'ENTERPRISE', + 'account_id': self.client.account_id, }) def test_get_telephony_credits(self): @@ -65,12 +65,11 @@ def test_set_telephony_credits(self): """ response = self.client.set_telephony_credits(10) uri = response['uri'] - args = response['body'] self.assertEqual(response['method'], 'POST') self.assertEqual(uri, '/admin/v1/billing/telephony_credits') - self.assertEqual(util.params_to_dict(args), + self.assertEqual(json.loads(response['body']), { - 'credits': ['10'], - 'account_id': [self.client.account_id], + 'credits': '10', + 'account_id': self.client.account_id, }) diff --git a/tests/admin/test_admins.py b/tests/admin/test_admins.py index 807891b..60f50a4 100644 --- a/tests/admin/test_admins.py +++ b/tests/admin/test_admins.py @@ -1,3 +1,4 @@ +import json from .. import util import duo_client.admin from .base import TestAdmin @@ -155,10 +156,10 @@ def test_update_admin_password_mgmt_status(self): self.assertEqual(response['method'], 'POST') self.assertEqual(response['uri'], '/admin/v1/admins/DFAKEADMINID/password_mgmt') self.assertEqual( - util.params_to_dict(response['body']), + json.loads(response['body']), { - 'account_id': [self.client.account_id], - 'has_external_password_mgmt': ['False'] + 'account_id': self.client.account_id, + 'has_external_password_mgmt': 'False' }) def test_update_admin_password_mgmt_status_set_password(self): @@ -168,9 +169,9 @@ def test_update_admin_password_mgmt_status_set_password(self): self.assertEqual(response['method'], 'POST') self.assertEqual(response['uri'], '/admin/v1/admins/DFAKEADMINID/password_mgmt') self.assertEqual( - util.params_to_dict(response['body']), + json.loads(response['body']), { - 'account_id': [self.client.account_id], - 'has_external_password_mgmt': ['True'], - 'password': ['dolphins'] + 'account_id': self.client.account_id, + 'has_external_password_mgmt': 'True', + 'password': 'dolphins' }) diff --git a/tests/admin/test_groups.py b/tests/admin/test_groups.py index 7e516e1..53de0a3 100644 --- a/tests/admin/test_groups.py +++ b/tests/admin/test_groups.py @@ -1,3 +1,4 @@ +import json import warnings from .. import util import duo_client.admin @@ -212,5 +213,5 @@ def test_modify_group(self): response = self.client.modify_group('ABC123') self.assertEqual(response['method'], 'POST') self.assertEqual(response['uri'], '/admin/v1/groups/ABC123') - self.assertEqual(util.params_to_dict(response['body']), - {'account_id': [self.client.account_id]}) + self.assertEqual(json.loads(response['body']), + {'account_id': self.client.account_id}) diff --git a/tests/admin/test_integration.py b/tests/admin/test_integration.py new file mode 100644 index 0000000..77dcc6b --- /dev/null +++ b/tests/admin/test_integration.py @@ -0,0 +1,78 @@ +from .. import util +import json +import duo_client.admin +from .base import TestAdmin + + +class TestIntegration(TestAdmin): + def setUp(self): + super(TestIntegration, self).setUp() + self.integration_key = "DISRYL7L8LZ5YXNWKGNK" + + def test_get_integration(self): + response = self.client.get_integration(self.integration_key) + (uri, args) = response['uri'].split('?') + + self.assertEqual(response['method'], 'GET') + self.assertEqual(uri, '/admin/v2/integrations/{}'.format(self.integration_key)) + self.assertEqual(util.params_to_dict(args), {'account_id': [self.client.account_id]}) + + def test_delete_integration(self): + response = self.client.delete_integration(self.integration_key) + (uri, args) = response['uri'].split('?') + + self.assertEqual(response['method'], 'DELETE') + self.assertEqual(uri, '/admin/v2/integrations/{}'.format(self.integration_key)) + self.assertEqual(util.params_to_dict(args), {'account_id': [self.client.account_id]}) + + def test_create_integration(self): + response = self.client.create_integration( + name="New integration name", + integration_type="sso-generic", + sso={ + "idp_metadata": None, + "saml_config": {} + }, + ) + + self.assertEqual(response['method'], 'POST') + self.assertEqual(response['uri'], '/admin/v2/integrations') + self.assertEqual(json.loads(response['body']), + { + "account_id": self.client.account_id, + "name": "New integration name", + "type": "sso-generic", + "sso": { + "idp_metadata": None, + "saml_config": {} + }, + } + ) + + def test_update_integration_success(self): + response = self.client.update_integration( + self.integration_key, + name="Integration name", + sso={ + "saml_config": { + "nameid_attribute": "mail", + } + }, + ) + + self.assertEqual(response['method'], 'POST') + self.assertEqual(response['uri'], '/admin/v2/integrations/{}'.format(self.integration_key)) + self.assertEqual(json.loads(response['body']), + { + "account_id": self.client.account_id, + "name": "Integration name", + "sso": { + "saml_config": { + "nameid_attribute": "mail", + } + }, + } + ) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/admin/test_integrations.py b/tests/admin/test_integrations.py index 6844ba3..59052c2 100644 --- a/tests/admin/test_integrations.py +++ b/tests/admin/test_integrations.py @@ -11,7 +11,7 @@ def test_get_integrations_generator(self): response = next(generator) self.assertEqual(response['method'], 'GET') (uri, args) = response['uri'].split('?') - self.assertEqual(uri, '/admin/v1/integrations') + self.assertEqual(uri, '/admin/v2/integrations') self.assertEqual( util.params_to_dict(args), { @@ -27,7 +27,7 @@ def test_get_integrations(self): response = response[0] self.assertEqual(response['method'], 'GET') (uri, args) = response['uri'].split('?') - self.assertEqual(uri, '/admin/v1/integrations') + self.assertEqual(uri, '/admin/v2/integrations') self.assertEqual( util.params_to_dict(args), { @@ -43,7 +43,7 @@ def test_get_integrations_with_limit(self): response = response[0] self.assertEqual(response['method'], 'GET') (uri, args) = response['uri'].split('?') - self.assertEqual(uri, '/admin/v1/integrations') + self.assertEqual(uri, '/admin/v2/integrations') self.assertEqual( util.params_to_dict(args), { @@ -59,7 +59,7 @@ def test_get_integrations_with_limit_offset(self): response = response[0] self.assertEqual(response['method'], 'GET') (uri, args) = response['uri'].split('?') - self.assertEqual(uri, '/admin/v1/integrations') + self.assertEqual(uri, '/admin/v2/integrations') self.assertEqual( util.params_to_dict(args), { @@ -75,7 +75,7 @@ def test_get_integrations_with_offset(self): response = response[0] self.assertEqual(response['method'], 'GET') (uri, args) = response['uri'].split('?') - self.assertEqual(uri, '/admin/v1/integrations') + self.assertEqual(uri, '/admin/v2/integrations') self.assertEqual( util.params_to_dict(args), { diff --git a/tests/admin/test_logo.py b/tests/admin/test_logo.py index b666347..5834ff5 100644 --- a/tests/admin/test_logo.py +++ b/tests/admin/test_logo.py @@ -1,3 +1,4 @@ +import json from .base import TestAdmin import os import base64 @@ -21,5 +22,6 @@ def test_update_logo(self): # Validate response: self.assertTrue( - 'logo={}'.format(base64_logo) in response['body'] + json.loads(response['body']).get('logo'), + base64_logo ) diff --git a/tests/admin/test_settings.py b/tests/admin/test_settings.py index 2d4e600..c03344a 100644 --- a/tests/admin/test_settings.py +++ b/tests/admin/test_settings.py @@ -1,3 +1,4 @@ +import json from .. import util import duo_client.admin from .base import TestAdmin @@ -8,6 +9,7 @@ class TestSettings(TestAdmin): def test_update_settings(self): """ Test updating settings """ + self.maxDiff = None response = self.client_list.update_settings( lockout_threshold=10, lockout_expire_duration=60, @@ -44,37 +46,37 @@ def test_update_settings(self): self.assertEqual(response['method'], 'POST') self.assertEqual(response['uri'], '/admin/v1/settings') self.assertEqual( - util.params_to_dict(response['body']), + json.loads(response['body']), { - 'account_id': [self.client.account_id], - 'lockout_threshold': ['10'], - 'lockout_expire_duration': ['60'], - 'inactive_user_expiration': ['30'], - 'pending_deletion_days': ['5'], - 'log_retention_days': ['180'], - 'sms_batch': ['5'], - 'sms_expiration': ['60'], - 'sms_refresh': ['1'], - 'sms_message': ['test_message'], - 'fraud_email': ['test@example.com'], - 'fraud_email_enabled': ['1'], - 'keypress_confirm': ['0'], - 'keypress_fraud': ['9'], - 'timezone': ['UTC'], - 'telephony_warning_min': ['50'], - 'caller_id': ['+15035551000'], - 'user_telephony_cost_max': ['10'], - 'minimum_password_length': ['12'], - 'password_requires_upper_alpha': ['1'], - 'password_requires_lower_alpha': ['1'], - 'password_requires_numeric': ['1'], - 'password_requires_special': ['1'], - 'helpdesk_bypass': ['allow'], - 'helpdesk_bypass_expiration': ['60'], - 'helpdesk_message': ['test_message'], - 'helpdesk_can_send_enroll_email': ['1'], - 'reactivation_url': ['https://www.example.com'], - 'reactivation_integration_key': ['DINTEGRATIONKEYTEST0'], - 'security_checkup_enabled': ['1'], - 'user_managers_can_put_users_in_bypass': ['0'], + 'account_id': self.client.account_id, + 'lockout_threshold': '10', + 'lockout_expire_duration': '60', + 'inactive_user_expiration': '30', + 'pending_deletion_days': '5', + 'log_retention_days': '180', + 'sms_batch': '5', + 'sms_expiration': '60', + 'sms_refresh': '1', + 'sms_message': 'test_message', + 'fraud_email': 'test@example.com', + 'fraud_email_enabled': '1', + 'keypress_confirm': '0', + 'keypress_fraud': '9', + 'timezone': 'UTC', + 'telephony_warning_min': '50', + 'caller_id': '+15035551000', + 'user_telephony_cost_max': '10', + 'minimum_password_length': '12', + 'password_requires_upper_alpha': '1', + 'password_requires_lower_alpha': '1', + 'password_requires_numeric': '1', + 'password_requires_special': '1', + 'helpdesk_bypass': 'allow', + 'helpdesk_bypass_expiration': '60', + 'helpdesk_message': 'test_message', + 'helpdesk_can_send_enroll_email': '1', + 'reactivation_url': 'https://www.example.com', + 'reactivation_integration_key': 'DINTEGRATIONKEYTEST0', + 'security_checkup_enabled': '1', + 'user_managers_can_put_users_in_bypass': '0', }) diff --git a/tests/admin/test_users.py b/tests/admin/test_users.py index 6ad8323..8cd99c6 100644 --- a/tests/admin/test_users.py +++ b/tests/admin/test_users.py @@ -1,3 +1,4 @@ +import json from .. import util import duo_client.admin from .base import TestAdmin @@ -111,28 +112,28 @@ def test_add_user(self): self.assertEqual(response['method'], 'POST') self.assertEqual(response['uri'], '/admin/v1/users') self.assertEqual( - util.params_to_dict(response['body']), + json.loads(response['body']), { - 'realname': ['bar'], - 'notes': ['notes'], - 'username': ['foo'], - 'status': ['active'], - 'email': ['foobar@baz.com'], - 'firstname': ['fName'], - 'lastname': ['lName'], - 'account_id': [self.client.account_id], - 'alias1': ['alias1'], - 'alias2': ['alias2'], - 'alias3': ['alias3'], - 'alias4': ['alias4'], + 'realname': 'bar', + 'notes': 'notes', + 'username': 'foo', + 'status': 'active', + 'email': 'foobar@baz.com', + 'firstname': 'fName', + 'lastname': 'lName', + 'account_id': self.client.account_id, + 'alias1': 'alias1', + 'alias2': 'alias2', + 'alias3': 'alias3', + 'alias4': 'alias4', }) # defaults response = self.client.add_user('bar') self.assertEqual(response['method'], 'POST') self.assertEqual(response['uri'], '/admin/v1/users') self.assertEqual( - util.params_to_dict(response['body']), - {'username':['bar'], 'account_id':[self.client.account_id]}) + json.loads(response['body']), + {'username':'bar', 'account_id':self.client.account_id}) def test_update_user(self): response = self.client.update_user( @@ -144,21 +145,21 @@ def test_update_user(self): self.assertEqual( response['uri'], '/admin/v1/users/DU012345678901234567') self.assertEqual( - util.params_to_dict(response['body']), + json.loads(response['body']), { - 'account_id':[self.client.account_id], - 'realname': ['bar'], - 'notes': ['notes'], - 'username': ['foo'], - 'status': ['active'], - 'email': ['foobar@baz.com'], - 'firstname': ['fName'], - 'lastname': ['lName'], - 'account_id': [self.client.account_id], - 'alias1': ['alias1'], - 'alias2': ['alias2'], - 'alias3': ['alias3'], - 'alias4': ['alias4'], + 'account_id':self.client.account_id, + 'realname': 'bar', + 'notes': 'notes', + 'username': 'foo', + 'status': 'active', + 'email': 'foobar@baz.com', + 'firstname': 'fName', + 'lastname': 'lName', + 'account_id': self.client.account_id, + 'alias1': 'alias1', + 'alias2': 'alias2', + 'alias3': 'alias3', + 'alias4': 'alias4', }) def test_sync_user(self): @@ -169,5 +170,5 @@ def test_sync_user(self): self.assertEqual(response['uri'], '/admin/v1/users/directorysync/test_dir_key/syncuser') self.assertEqual( - util.params_to_dict(response['body']), - {'username': ['foo'], 'account_id': [self.client.account_id]}) + json.loads(response['body']), + {'username': 'foo', 'account_id': self.client.account_id}) diff --git a/tests/test_client.py b/tests/test_client.py index 8ce7dfa..035ad37 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -160,6 +160,22 @@ def test_v4_with_json(self): self.assertEqual(actual, expected) + def test_v5_with_json(self): + hashed_body = hashlib.sha512(JSON_STRING.encode('utf-8')).hexdigest() + headers = {"X-Duo-Header-1": "header_value_1"} + expected = ( + 'Tue, 17 Nov 2020 14:12:00\n' + 'POST\n' + 'foo.bar52.com\n' + '/Foo/BaR2/qux\n\n' + hashed_body + +'\n630b4bfe7e9abd03da2eee8f0a5d4e60a254ec880a839bcc2223bb5b9443e8ef24d58f0' + '254f1f5934bf8c017ebd0fd5b1acf86766bdbe74185e712a4092df3ed') + params = {} + body = duo_client.client.Client.canon_json(JSON_BODY) + actual = duo_client.client.canonicalize( + 'POST', 'foO.BaR52.cOm', '/Foo/BaR2/qux', params, 'Tue, 17 Nov 2020 14:12:00', + sig_version=5, body=body, additional_headers=headers) + def test_invalid_signature_version_raises(self): params = duo_client.client.Client.canon_json(JSON_BODY) with self.assertRaises(ValueError) as e: