diff --git a/awsfindingsmanagerlib/awsfindingsmanagerlib.py b/awsfindingsmanagerlib/awsfindingsmanagerlib.py index bc9f6a2..2db3fb8 100755 --- a/awsfindingsmanagerlib/awsfindingsmanagerlib.py +++ b/awsfindingsmanagerlib/awsfindingsmanagerlib.py @@ -608,6 +608,11 @@ def _get_security_hub_client(region: str): raise NoRegion(f'Security Hub client requires a valid region set to connect, message was:{msg}') from None return client + def _get_security_hub_paginator_iterator(self, region: str, operation_name: str, query_filter: dict): + security_hub = self._get_security_hub_client(region=region) + paginator = security_hub.get_paginator(operation_name) + return paginator.paginate(Filters=query_filter) + @staticmethod def _get_ec2_client(region: str): kwargs = {} @@ -712,18 +717,18 @@ def _get_findings(self, query_filter: Dict): regions_to_retrieve = [aggregating_region] if aggregating_region else self.regions for region in regions_to_retrieve: self._logger.debug(f'Trying to get findings for region {region}') - security_hub = self._get_security_hub_client(region=region) - paginator = security_hub.get_paginator('get_findings') - iterator = paginator.paginate(Filters=query_filter) + iterator = self._get_security_hub_paginator_iterator(region, 'get_findings', query_filter) try: for page in iterator: for finding_data in page['Findings']: finding = Finding(finding_data) self._logger.debug(f'Adding finding with id {finding.id}') findings.add(finding) - except (security_hub.exceptions.InvalidAccessException, security_hub.exceptions.AccessDeniedException): - self._logger.debug(f'No access for Security Hub for region {region}.') - continue + except botocore.exceptions.ClientError as error: + if error.response['Error']['Code'] in ['AccessDeniedException', 'InvalidAccessException']: + self._logger.debug(f'No access for Security Hub for region {region}.') + continue + raise error return list(findings) @staticmethod diff --git a/tests/test_awsfindingsmanagerlib.py b/tests/test_awsfindingsmanagerlib.py deleted file mode 100644 index ad9ea11..0000000 --- a/tests/test_awsfindingsmanagerlib.py +++ /dev/null @@ -1,111 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# File: test_awsfindingsmanagerlib.py -# -# Copyright 2023 Marwin Baumann -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -test_awsfindingsmanagerlib ----------------------------------- -Tests for `awsfindingsmanagerlib` module. - -.. _Google Python Style Guide: - http://google.github.io/styleguide/pyguide.html - -""" - -from unittest import TestCase -from unittest.mock import patch, MagicMock -from betamax.fixtures import unittest -from awsfindingsmanagerlib import FindingsManager, Local -import json - -__author__ = '''Marwin Baumann ''' -__docformat__ = '''google''' -__date__ = '''21-11-2023''' -__copyright__ = '''Copyright 2023, Marwin Baumann''' -__credits__ = ["Marwin Baumann"] -__license__ = '''Apache Software License 2.0''' -__maintainer__ = '''Marwin Baumann''' -__email__ = '''''' -__status__ = '''Development''' # "Prototype", "Development", "Production". - - -class TestAwsfindingsmanagerlib(unittest.BetamaxTestCase): - - def setUp(self): - """ - Test set up - - This is where you can setup things that you use throughout the tests. This method is called before every test. - """ - pass - - def tearDown(self): - """ - Test tear down - - This is where you should tear down what you've setup in setUp before. This method is called after every test. - """ - pass - -class TestBasicRun(TestCase): - - @patch('awsfindingsmanagerlib.FindingsManager._batch_update_findings') - @patch('awsfindingsmanagerlib.FindingsManager._get_sts_client') - @patch('awsfindingsmanagerlib.FindingsManager._get_security_hub_client') - @patch('awsfindingsmanagerlib.FindingsManager._get_ec2_client') - def test_basic_run(self, mock_ec2: MagicMock, mock_sec_hub_get: MagicMock, mock_sts: MagicMock, mock_update: MagicMock): - # basic init - local_backend = Local(path='./tests/fixtures/suppressions.yaml') - rules = local_backend.get_rules() - - findings_manager = FindingsManager() - findings_manager.register_rules(rules) - - # configuring mock sec hub to return fixture findings - mock_sec_hub_client = MagicMock() - mock_sec_hub_get.return_value = mock_sec_hub_client - - mock_sec_hub_paginator = MagicMock() - mock_sec_hub_client.get_paginator.return_value = mock_sec_hub_paginator - - with open('./tests/fixtures/findings.json', encoding='utf-8') as findings_file: - mock_sec_hub_paginator.paginate.return_value = [json.load(findings_file)] - - # basic suppression in action - self.assertTrue(findings_manager.suppress_matching_findings()) - - # load expected api call payload - with open('tests/fixtures/batch_update_findings.json', encoding='utf-8') as updates_file: - batch_update_findings = json.load(updates_file) - - # compare expected to actual api call payload - # sadly this does not work: mock_update.assert_called_once_with(mock_sec_hub_client, batch_update_findings) - # because FindingIdentifiers is a randomly ordered collection - mock_update.assert_called_once() - - update_args = mock_update.call_args.args[1] - self.assertEqual(update_args.keys(), batch_update_findings.keys()) - - self.assertEquals(update_args['Note'], batch_update_findings['Note']) - self.assertEquals(update_args['Workflow'], batch_update_findings['Workflow']) - - self.assertEqual(len(update_args['FindingIdentifiers']), len(batch_update_findings['FindingIdentifiers'])) - for item in update_args['FindingIdentifiers']: - self.assertIn(item, batch_update_findings['FindingIdentifiers']) - for item in batch_update_findings['FindingIdentifiers']: - self.assertIn(item, update_args['FindingIdentifiers']) diff --git a/tests/test_requests.py b/tests/test_requests.py new file mode 100644 index 0000000..926e413 --- /dev/null +++ b/tests/test_requests.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# File: test_awsfindingsmanagerlib.py +# +# Copyright 2023 Marwin Baumann +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +test_awsfindingsmanagerlib +---------------------------------- +Tests for `awsfindingsmanagerlib` module. + +.. _Google Python Style Guide: + http://google.github.io/styleguide/pyguide.html + +""" + +from betamax.fixtures import unittest + +__author__ = '''Marwin Baumann ''' +__docformat__ = '''google''' +__date__ = '''21-11-2023''' +__copyright__ = '''Copyright 2023, Marwin Baumann''' +__credits__ = ["Marwin Baumann"] +__license__ = '''Apache Software License 2.0''' +__maintainer__ = '''Marwin Baumann''' +__email__ = '''''' +__status__ = '''Development''' # "Prototype", "Development", "Production". + + +class TestAwsfindingsmanagerlib(unittest.BetamaxTestCase): + + def setUp(self): + """ + Test set up + + This is where you can setup things that you use throughout the tests. This method is called before every test. + """ + pass + + def tearDown(self): + """ + Test tear down + + This is where you should tear down what you've setup in setUp before. This method is called after every test. + """ + pass diff --git a/tests/test_suppressions.py b/tests/test_suppressions.py new file mode 100644 index 0000000..a065ba0 --- /dev/null +++ b/tests/test_suppressions.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# File: test_suppressions.py +# +# Copyright 2024 Carlo van Overbeek +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +test_suppressions +---------------------------------- +Tests for `awsfindingsmanagerlib` module. + +.. _Google Python Style Guide: + http://google.github.io/styleguide/pyguide.html + +""" + +from unittest.mock import patch, MagicMock +from .utils import FindingsManager, TestCaseWithBatchUpdateFindings +from awsfindingsmanagerlib import Local +import json + +__author__ = '''Carlo van Overbeek ''' +__docformat__ = '''google''' +__date__ = '''26-06-2024''' +__copyright__ = '''Copyright 2024, Carlo van Overbeek''' +__credits__ = ["Carlo van Overbeek"] +__license__ = '''Apache Software License 2.0''' +__maintainer__ = '''Carlo van Overbeek''' +__email__ = '''''' +__status__ = '''Development''' # "Prototype", "Development", "Production". + + +with open('tests/fixtures/findings.json', encoding='utf-8') as findings_file: + findings_fixture = [json.load(findings_file)] + +with open('tests/fixtures/batch_update_findings.json', encoding='utf-8') as updates_file: + batch_update_findings_fixture = json.load(updates_file) + +class TestBasicRun(TestCaseWithBatchUpdateFindings): + + @patch('awsfindingsmanagerlib.FindingsManager._get_security_hub_paginator_iterator', lambda *_: findings_fixture) + @patch('awsfindingsmanagerlib.FindingsManager._batch_update_findings') + def test_basic_run(self, _batch_update_findings_mocked: MagicMock): + # basic init + local_backend = Local(path='./tests/fixtures/suppressions.yaml') + rules = local_backend.get_rules() + + findings_manager = FindingsManager() + findings_manager.register_rules(rules) + + # basic suppression in action + self.assertTrue(findings_manager.suppress_matching_findings()) + + # created payload validation + self.assert_batch_update_findings_called_once_with(batch_update_findings_fixture, _batch_update_findings_mocked) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..eece61c --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# File: utils.py +# +# Copyright 2024 Carlo van Overbeek +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +utils +---------------------------------- +Test utils for `awsfindingsmanagerlib` module. + +.. _Google Python Style Guide: + http://google.github.io/styleguide/pyguide.html + +""" + +from awsfindingsmanagerlib import FindingsManager as FindingsManagerToMock +from unittest.mock import MagicMock +from unittest import TestCase + +__author__ = '''Carlo van Overbeek ''' +__docformat__ = '''google''' +__date__ = '''26-06-2024''' +__copyright__ = '''Copyright 2024, Carlo van Overbeek''' +__credits__ = ["Carlo van Overbeek"] +__license__ = '''Apache Software License 2.0''' +__maintainer__ = '''Carlo van Overbeek''' +__email__ = '''''' +__status__ = '''Development''' # "Prototype", "Development", "Production". + + +class FindingsManager(FindingsManagerToMock): + + @staticmethod + def _get_ec2_client(region: str): + return MagicMock() + + @staticmethod + def _get_security_hub_client(region: str): + return MagicMock() + + @staticmethod + def _get_sts_client(): + return MagicMock() + + +class TestCaseWithBatchUpdateFindings(TestCase): + + def assert_batch_update_findings_called_once_with(self, batch_update_findings_expected: dict, _batch_update_findings_mocked: MagicMock): + """ + Compare expected to actual (=mocked) api call payload. + + Sadly, something like this does not work: _batch_update_findings_mocked.assert_called_once_with(ANY, batch_update_findings), + because FindingIdentifiers is a randomly ordered collection. + """ + _batch_update_findings_mocked.assert_called_once() + + received_args = _batch_update_findings_mocked.call_args.args[1] + + self.assertEqual(batch_update_findings_expected.keys(), received_args.keys()) + + self.assertEqual(batch_update_findings_expected['Note'], received_args['Note']) + self.assertEqual(batch_update_findings_expected['Workflow'], received_args['Workflow']) + + self.assertEqual(len(batch_update_findings_expected['FindingIdentifiers']), len(received_args['FindingIdentifiers'])) + + for item in batch_update_findings_expected['FindingIdentifiers']: + self.assertIn(item, received_args['FindingIdentifiers'])