Skip to content

Commit

Permalink
Add explicit CSMS TLS marker (#95)
Browse files Browse the repository at this point in the history
* add explicit tls marker

Signed-off-by: Fabian Klemm <[email protected]>

* extend csms options to enable client certificate verification

Signed-off-by: Fabian Klemm <[email protected]>

* bugfix: if is none

Signed-off-by: Fabian Klemm <[email protected]>

---------

Signed-off-by: Fabian Klemm <[email protected]>
  • Loading branch information
klemmpnx authored Nov 8, 2023
1 parent eb96d43 commit a162afe
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 9 deletions.
2 changes: 1 addition & 1 deletion everest-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ An important function that you will frequently use when writing test cases is th
- **ocpp_version**: Can be "ocpp1.6" or "ocpp2.0.1" and is used to setup EVerest and the central system for the specific OCPP version
- **ocpp_config**: Specification of the .json OCPP config file. Used in `ocpp_config` fixture and used as template configuration (if not specified, the OCPP config as specified in the EVerest configuration is used)
- **inject_csms_mock**: (currently only OCPP 2.0.1) If set, the `central_system_v201` will wrap any csms handler method into an unittest mock. In particular, this allows changing the CSMS behavior even after the chargepoint is started by setting side effects of the mock. See `everest.testing.ocpp_utils.charge_point_v201.inject_csms_v201_mock` docstring for an example.

- **csms_tls**: Enable/disable TLS for the CSMS websocket server. If given without arguments, enables TLS. First argument can be `False` to explicitly disable TLS. Further optional keyword arguments `certificate`, `private_key`,`passphrase`, `root_ca` , and `verify_client_certificate` allow to overwrite SSL context options.

## Add a conftest.py

Expand Down
50 changes: 50 additions & 0 deletions everest-testing/src/everest/testing/ocpp_utils/central_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest

import asyncio
import ssl
import time
import logging
from contextlib import asynccontextmanager
Expand All @@ -10,6 +11,9 @@
from unittest.mock import Mock

import websockets
from pytest import FixtureRequest

from everest.testing.ocpp_utils.charge_point_utils import OcppTestConfiguration
from ocpp.routing import create_route_map, on
from ocpp.charge_point import ChargePoint

Expand Down Expand Up @@ -206,3 +210,49 @@ def _method(*args, **kwargs):
for action_name, action_method in charge_point_action_handlers.items():
cs.function_overrides.append((action_name, catch_mock(mock, action_name, action_method)))
return mock


def determine_ssl_context(request: FixtureRequest, test_config: OcppTestConfiguration) -> ssl.SSLContext | None:
""" Determine CSMS SSL Context: Default take from test_config, can be overwritten by csms_tls marker """

csms_tls_enabled = test_config.csms_tls_enabled
if test_config.certificate_info:
csms_tls_cert = test_config.certificate_info.csms_cert
csms_tls_key = test_config.certificate_info.csms_key
csms_tls_passphrase = test_config.certificate_info.csms_passphrase
csms_tls_root_ca = test_config.certificate_info.csms_root_ca
else:
csms_tls_cert = None
csms_tls_key = None
csms_tls_passphrase = None
csms_tls_root_ca = None
csms_tls_verify_client_certificate = test_config.csms_tls_verify_client_certificate

if csms_tls_marker := request.node.get_closest_marker("csms_tls"):
if csms_tls_marker.args:
csms_tls_enabled = csms_tls_marker.args[0]
else:
csms_tls_enabled = True # provided marker always enabled tls if not explicitly set to False
marker_kwargs = csms_tls_marker.kwargs
if "certificate" in marker_kwargs:
csms_tls_cert = marker_kwargs["certificate"]
if "private_key" in marker_kwargs:
csms_tls_key = marker_kwargs["private_key"]
if "passphrase" in marker_kwargs:
csms_tls_passphrase = marker_kwargs["passphrase"]
if "root_ca" in marker_kwargs:
csms_tls_root_ca = marker_kwargs["root_ca"]
if "verify_client_certificate" in marker_kwargs:
csms_tls_verify_client_certificate = marker_kwargs["verify_client_certificate"]

if csms_tls_enabled:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(csms_tls_cert,
csms_tls_key,
csms_tls_passphrase)
if csms_tls_verify_client_certificate:
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_verify_locations(csms_tls_root_ca)
return ssl_context
else:
return None
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class FirmwareInfo:

@dataclass
class OcppTestConfiguration:
csms_tls_enabled: bool = False
csms_tls_verify_client_certificate: bool = False
csms_port: str = 9000
csms_host: str = "127.0.0.1"
charge_point_info: ChargePointInfo = ChargePointInfo()
Expand Down
14 changes: 6 additions & 8 deletions everest-testing/src/everest/testing/ocpp_utils/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import ssl
import sys
import tempfile
from dataclasses import dataclass
from pathlib import Path
from threading import Thread

Expand All @@ -19,7 +20,8 @@
from everest.testing.core_utils.common import OCPPVersion
from everest.testing.core_utils.configuration.everest_environment_setup import EverestEnvironmentOCPPConfiguration
from everest.testing.core_utils.controller.everest_test_controller import EverestTestController
from everest.testing.ocpp_utils.central_system import CentralSystem, inject_csms_v201_mock, inject_csms_v16_mock
from everest.testing.ocpp_utils.central_system import CentralSystem, inject_csms_v201_mock, inject_csms_v16_mock, \
determine_ssl_context
from everest.testing.ocpp_utils.charge_point_utils import TestUtility, OcppTestConfiguration

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".")))
Expand Down Expand Up @@ -52,13 +54,9 @@ async def central_system(request, ocpp_version: OCPPVersion, test_config):
"""Fixture for CentralSystem. Can be started as TLS or
plain websocket depending on the request parameter.
"""
if (hasattr(request, 'param')):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(test_config.certificate_info.csms_cert,
test_config.certificate_info.csms_key,
test_config.certificate_info.csms_passphrase)
else:
ssl_context = None

ssl_context = determine_ssl_context(request, test_config)

cs = CentralSystem(test_config.charge_point_info.charge_point_id,
ocpp_version=ocpp_version)

Expand Down

0 comments on commit a162afe

Please sign in to comment.