Skip to content

Commit

Permalink
extend csms options to enable client certificate verification
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Klemm <[email protected]>
  • Loading branch information
klemmpnx committed Nov 7, 2023
1 parent 88a5d77 commit 14c7ccb
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 17 deletions.
2 changes: 1 addition & 1 deletion everest-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ An important function that you will frequently use when writing test cases is th
- **ocpp_version**: Can be "ocpp1.6" or "ocpp2.0.1" and is used to setup EVerest and the central system for the specific OCPP version
- **ocpp_config**: Specification of the .json OCPP config file. Used in `ocpp_config` fixture and used as template configuration (if not specified, the OCPP config as specified in the EVerest configuration is used)
- **inject_csms_mock**: (currently only OCPP 2.0.1) If set, the `central_system_v201` will wrap any csms handler method into an unittest mock. In particular, this allows changing the CSMS behavior even after the chargepoint is started by setting side effects of the mock. See `everest.testing.ocpp_utils.charge_point_v201.inject_csms_v201_mock` docstring for an example.
- **csms_tls**: Enable/disable TLS for the CSMS websocket server. If given without arguments, enables TLS. First argument can be `False` to explicitly disable TLS. Further optional keyword arguments `certificate`, `private_key`,`passphrase` allow to overwrite SSL context options.
- **csms_tls**: Enable/disable TLS for the CSMS websocket server. If given without arguments, enables TLS. First argument can be `False` to explicitly disable TLS. Further optional keyword arguments `certificate`, `private_key`,`passphrase`, `root_ca` , and `verify_client_certificate` allow to overwrite SSL context options.

Check warning on line 69 in everest-testing/README.md

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

everest-testing/README.md#L69

[list-item-indent] Incorrect list-item indent: add 2 spaces

## Add a conftest.py

Expand Down
44 changes: 44 additions & 0 deletions everest-testing/src/everest/testing/ocpp_utils/central_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest

import asyncio
import ssl
import time
import logging
from contextlib import asynccontextmanager
Expand All @@ -10,6 +11,9 @@
from unittest.mock import Mock

import websockets
from pytest import FixtureRequest

from everest.testing.ocpp_utils.charge_point_utils import OcppTestConfiguration
from ocpp.routing import create_route_map, on
from ocpp.charge_point import ChargePoint

Expand Down Expand Up @@ -206,3 +210,43 @@ def _method(*args, **kwargs):
for action_name, action_method in charge_point_action_handlers.items():
cs.function_overrides.append((action_name, catch_mock(mock, action_name, action_method)))
return mock


def determine_ssl_context(request: FixtureRequest, test_config: OcppTestConfiguration) -> ssl.SSLContext | None:
""" Determine CSMS SSL Context: Default take from test_config, can be overwritten by csms_tls marker """

csms_tls_enabled = test_config.csms_tls_enabled
csms_tls_cert = test_config.certificate_info.csms_cert
csms_tls_key = test_config.certificate_info.csms_key
csms_tls_passphrase = test_config.certificate_info.csms_passphrase
csms_tls_root_ca = test_config.certificate_info.csms_root_ca
csms_tls_verify_client_certificate = test_config.csms_tls_verify_client_certificate

if csms_tls_marker := request.node.get_closest_marker("csms_tls"):
if csms_tls_marker.args:
csms_tls_enabled = csms_tls_marker.args[0]
else:
csms_tls_enabled = True # provided marker always enabled tls if not explicitly set to False
marker_kwargs = csms_tls_marker.kwargs
if "certificate" in marker_kwargs:
csms_tls_cert = marker_kwargs["certificate"]
if "private_key" in marker_kwargs:
csms_tls_key = marker_kwargs["private_key"]
if "passphrase" in marker_kwargs:
csms_tls_passphrase = marker_kwargs["passphrase"]
if "root_ca" in marker_kwargs:
csms_tls_root_ca = marker_kwargs["root_ca"]
if "verify_client_certificate" in marker_kwargs:
csms_tls_verify_client_certificate = marker_kwargs["verify_client_certificate"]

if csms_tls_enabled:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(csms_tls_cert,
csms_tls_key,
csms_tls_passphrase)
if csms_tls_verify_client_certificate:
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_verify_locations(csms_tls_root_ca)
return ssl_context
else:
return None
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class FirmwareInfo:

@dataclass
class OcppTestConfiguration:
csms_tls_enabled = False
csms_tls_enabled: bool = False
csms_tls_verify_client_certificate: bool = False
csms_port: str = 9000
csms_host: str = "127.0.0.1"
charge_point_info: ChargePointInfo = ChargePointInfo()
Expand Down
18 changes: 3 additions & 15 deletions everest-testing/src/everest/testing/ocpp_utils/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from everest.testing.core_utils.common import OCPPVersion
from everest.testing.core_utils.configuration.everest_environment_setup import EverestEnvironmentOCPPConfiguration
from everest.testing.core_utils.controller.everest_test_controller import EverestTestController
from everest.testing.ocpp_utils.central_system import CentralSystem, inject_csms_v201_mock, inject_csms_v16_mock
from everest.testing.ocpp_utils.central_system import CentralSystem, inject_csms_v201_mock, inject_csms_v16_mock, \
determine_ssl_context
from everest.testing.ocpp_utils.charge_point_utils import TestUtility, OcppTestConfiguration

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".")))
Expand Down Expand Up @@ -54,20 +55,7 @@ async def central_system(request, ocpp_version: OCPPVersion, test_config):
plain websocket depending on the request parameter.
"""

# Determine CSMS SSL Context: Default take from test_config, can be overwritten by csms_tls marker
csms_tls_marker = request.node.get_closest_marker("csms_tls")
csms_tls_marker_tls_enabled = csms_tls_marker and not csms_tls_marker.args[0:1] == [False] # explicit enable tls by marker; this is the case if marker is set and first argument is not "False"
csms_tls_marker_tls_disabled = csms_tls_marker and csms_tls_marker.args[0:1] == [False] # explicit disable tls by marker, overwrites test_config
if csms_tls_marker_tls_enabled or (test_config.csms_tls_enabled and not csms_tls_marker_tls_disabled):

# if tls enabled, marker options and test_config are merged; marker options overwrite test_config options
csms_tls_marker_kwargs = csms_tls_marker.kwargs if csms_tls_marker else {}
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(csms_tls_marker_kwargs.get("certificate", test_config.certificate_info.csms_cert),
csms_tls_marker_kwargs.get("private_key", test_config.certificate_info.csms_key),
csms_tls_marker_kwargs.get("passphrase", test_config.certificate_info.csms_passphrase))
else:
ssl_context = None
ssl_context = determine_ssl_context(request, test_config)

cs = CentralSystem(test_config.charge_point_info.charge_point_id,
ocpp_version=ocpp_version)
Expand Down

0 comments on commit 14c7ccb

Please sign in to comment.