Skip to content

Commit

Permalink
Compute running vitals; Export results to json
Browse files Browse the repository at this point in the history
  • Loading branch information
prouast committed Jul 21, 2024
1 parent 960e17e commit a2a4196
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 49 deletions.
63 changes: 47 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,31 @@ pip install ./vitallens-python
To start using `vitallens-python`, first create an instance of `vitallens.VitalLens`.
It can be configured using the following parameters:

| Parameter | Description | Default |
|----------------|------------------------------------------------------------------------------------|--------------------|
| method | Inference method. {`Method.VITALLENS`, `Method.POS`, `Method.CHROM` or `Method.G`} | `Method.VITALLENS` |
| api_key | Usage key for the VitalLens API (required for `Method.VITALLENS`) | `None` |
| detect_faces | `True` if faces need to be detected, otherwise `False`. | `True` |
| fdet_max_faces | The maximum number of faces to detect (if necessary). | `2` |
| fdet_fs | Frequency [Hz] at which faces should be scanned - otherwise linearly interpolated. | `1.0` |
| Parameter | Description | Default |
|-------------------------|------------------------------------------------------------------------------------|--------------------|
| method | Inference method. {`Method.VITALLENS`, `Method.POS`, `Method.CHROM` or `Method.G`} | `Method.VITALLENS` |
| api_key | Usage key for the VitalLens API (required for `Method.VITALLENS`) | `None` |
| detect_faces | `True` if faces need to be detected, otherwise `False`. | `True` |
| estimate_running_vitals | Set `True` to compute running vitals (e.g., `running_heart_rate`). | `True` |
| fdet_max_faces | The maximum number of faces to detect (if necessary). | `1` |
| fdet_fs | Frequency [Hz] at which faces should be scanned - otherwise linearly interpolated. | `1.0` |
| export_to_json | If `True`, write results to a json file. | `True` |
| export_dir | The directory to which json files are written. | `.` |

Once instantiated, `vitallens.VitalLens` can be called to estimate vitals.
This can also be configured using the following parameters:

| Parameter | Description | Default |
|---------------------|---------------------------------------------------------------------------------------|---------|
| video | The video to analyze. Either a path to a video file or `np.ndarray`. [More info here.](https://github.com/Rouast-Labs/vitallens-python/blob/ddcf48f29a2765fd98a7029c0f10075a33e44247/vitallens/client.py#L98) | |
| faces | Face detections. Ignored unless `detect_faces=False`. [More info here.](https://github.com/Rouast-Labs/vitallens-python/blob/ddcf48f29a2765fd98a7029c0f10075a33e44247/vitallens/client.py#L101) | `None` |
| video | The video to analyze. Either a path to a video file or `np.ndarray`. [More info here.](https://github.com/Rouast-Labs/vitallens-python/raw/main/vitallens/client.py#L114) | |
| faces | Face detections. Ignored unless `detect_faces=False`. [More info here.](https://github.com/Rouast-Labs/vitallens-python/raw/main/vitallens/client.py#L117) | `None` |
| fps | Sampling frequency of the input video. Required if video is `np.ndarray`. | `None` |
| override_fps_target | Target frequency for inference (optional - use methods's default otherwise). | `None` |
| export_filename | Filename for json export if applicable. | `None` |

The estimation results are returned as a `list`. It contains a `dict` for each distinct face, with the following structure:

```
```json
[
{
'face': {
Expand All @@ -84,13 +88,13 @@ The estimation results are returned as a `list`. It contains a `dict` for each d
},
'vital_signs': {
'heart_rate': {
'value': <Estimated value as float scalar>,
'unit': <Value unit>,
'confidence': <Estimation confidence as float scalar>,
'note': <Explanatory note>
},
'value': <Estimated global value as float scalar>,
'unit': <Value unit>,
'confidence': <Estimation confidence as float scalar>,
'note': <Explanatory note>
},
'respiratory_rate': {
'value': <Estimated value as float scalar>,
'value': <Estimated global value as float scalar>,
'unit': <Value unit>,
'confidence': <Estimation confidence as float scalar>,
'note': <Explanatory note>
Expand All @@ -117,6 +121,33 @@ The estimation results are returned as a `list`. It contains a `dict` for each d
]
```

If the video is long enough and `estimate_running_vitals=True`, the results additionally contain running vitals:

```json
[
{
...
'vital_signs': {
...
'running_heart_rate': {
'data': <Estimated value for each frame as np.ndarray of shape (n_frames,)>,
'unit': <Value unit>,
'confidence': <Estimation confidence for each frame as np.ndarray of shape (n_frames,)>,
'note': <Explanatory note>
},
'running_respiratory_rate': {
'data': <Estimated value for each frame as np.ndarray of shape (n_frames,)>,
'unit': <Value unit>,
'confidence': <Estimation confidence for each frame as np.ndarray of shape (n_frames,)>,
'note': <Explanatory note>
}
}
...
},
...
]
```

### Example: Use VitalLens API to estimate vitals from a video file

```python
Expand Down
28 changes: 22 additions & 6 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import json
import numpy as np
import os
import pytest

import sys
Expand All @@ -29,29 +31,42 @@
@pytest.mark.parametrize("method", [Method.G, Method.CHROM, Method.POS])
@pytest.mark.parametrize("detect_faces", [True, False])
@pytest.mark.parametrize("file", [True, False])
def test_VitalLens(request, method, detect_faces, file):
vl = VitalLens(method=method, detect_faces=detect_faces)
@pytest.mark.parametrize("export", [True, False])
def test_VitalLens(request, method, detect_faces, file, export):
vl = VitalLens(method=method, detect_faces=detect_faces, export_to_json=export)
if file:
test_video_path = request.getfixturevalue('test_video_path')
result = vl(test_video_path, faces = None if detect_faces else [247, 57, 440, 334])
result = vl(test_video_path, faces = None if detect_faces else [247, 57, 440, 334], export_filename="test")
else:
test_video_ndarray = request.getfixturevalue('test_video_ndarray')
test_video_fps = request.getfixturevalue('test_video_fps')
result = vl(test_video_ndarray, fps=test_video_fps, faces = None if detect_faces else [247, 57, 440, 334])
result = vl(test_video_ndarray, fps=test_video_fps, faces = None if detect_faces else [247, 57, 440, 334], export_filename="test")
assert len(result) == 1
assert result[0]['face']['coordinates'].shape == (360, 4)
assert result[0]['face']['confidence'].shape == (360,)
assert result[0]['vital_signs']['ppg_waveform']['data'].shape == (360,)
assert result[0]['vital_signs']['ppg_waveform']['confidence'].shape == (360,)
np.testing.assert_allclose(result[0]['vital_signs']['heart_rate']['value'], 60, atol=10)
assert result[0]['vital_signs']['heart_rate']['confidence'] == 1.0
if export:
test_json_path = os.path.join("test.json")
assert os.path.exists(test_json_path)
with open(test_json_path, 'r') as f:
data = json.load(f)
assert np.asarray(data[0]['face']['coordinates']).shape == (360, 4)
assert np.asarray(data[0]['face']['confidence']).shape == (360,)
assert np.asarray(data[0]['vital_signs']['ppg_waveform']['data']).shape == (360,)
assert np.asarray(data[0]['vital_signs']['ppg_waveform']['confidence']).shape == (360,)
np.testing.assert_allclose(data[0]['vital_signs']['heart_rate']['value'], 60, atol=10)
assert data[0]['vital_signs']['heart_rate']['confidence'] == 1.0
os.remove(test_json_path)

def test_VitalLens_API(request):
api_key = request.getfixturevalue('test_dev_api_key')
vl = VitalLens(method=Method.VITALLENS, api_key=api_key, detect_faces=True)
vl = VitalLens(method=Method.VITALLENS, api_key=api_key, detect_faces=True, export_to_json=False)
test_video_ndarray = request.getfixturevalue('test_video_ndarray')
test_video_fps = request.getfixturevalue('test_video_fps')
result = vl(test_video_ndarray, fps=test_video_fps, faces=None)
result = vl(test_video_ndarray, fps=test_video_fps, faces=None, export_filename="test")
assert len(result) == 1
assert result[0]['face']['coordinates'].shape == (360, 4)
assert result[0]['vital_signs']['ppg_waveform']['data'].shape == (360,)
Expand All @@ -62,3 +77,4 @@ def test_VitalLens_API(request):
np.testing.assert_allclose(result[0]['vital_signs']['heart_rate']['confidence'], 1.0, atol=0.1)
np.testing.assert_allclose(result[0]['vital_signs']['respiratory_rate']['value'], 13.5, atol=0.5)
np.testing.assert_allclose(result[0]['vital_signs']['respiratory_rate']['confidence'], 1.0, atol=0.1)
assert not os.path.exists("test.json")
50 changes: 50 additions & 0 deletions tests/test_signal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (c) 2024 Philipp Rouast
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import numpy as np
import pytest

import sys
sys.path.append('../vitallens-python')

from vitallens.signal import windowed_mean, windowed_freq

def test_windowed_mean():
x = np.asarray([0., 1., 2., 3., 4., 5., 6.])
y = np.asarray([1., 1., 2., 3., 4., 5., 5.])
out_y = windowed_mean(x=x, window_size=3, overlap=1)
np.testing.assert_equal(
out_y,
y)

@pytest.mark.parametrize("num", [100, 1000])
@pytest.mark.parametrize("freq", [2.35, 4.89, 13.55])
@pytest.mark.parametrize("window_size", [10, 20])
def test_estimate_freq_periodogram(num, freq, window_size):
# Test data
x = np.linspace(0, freq * 2 * np.pi, num=num)
np.random.seed(0)
y = 100 * np.sin(x) + np.random.normal(scale=8, size=num)
# Check a default use case with axis=-1
np.testing.assert_allclose(
windowed_freq(x=y, window_size=window_size, overlap=window_size//2, f_s=len(x), f_range=(max(freq-2,1),freq+2), f_res=0.05),
np.full((num,), fill_value=freq),
rtol=1)

73 changes: 64 additions & 9 deletions vitallens/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from datetime import datetime
from enum import IntEnum
import json
import logging
import numpy as np
import os
from typing import Union

from vitallens.constants import DISCLAIMER
from vitallens.constants import DISCLAIMER, SECONDS_PER_MINUTE
from vitallens.constants import CALC_HR_MIN, CALC_HR_MAX, CALC_HR_WINDOW_SIZE
from vitallens.constants import CALC_RR_MIN, CALC_RR_MAX, CALC_RR_WINDOW_SIZE
from vitallens.methods.g import GRPPGMethod
from vitallens.methods.chrom import CHROMRPPGMethod
from vitallens.methods.pos import POSRPPGMethod
from vitallens.methods.vitallens import VitalLensRPPGMethod
from vitallens.signal import windowed_freq, windowed_mean
from vitallens.ssd import FaceDetector
from vitallens.utils import load_config, probe_video_inputs, check_faces
from vitallens.utils import load_config, probe_video_inputs, check_faces, convert_ndarray_to_list

class Method(IntEnum):
VITALLENS = 1
Expand All @@ -45,22 +51,28 @@ def __init__(
method: Method = Method.VITALLENS,
api_key: str = None,
detect_faces: bool = True,
estimate_running_vitals: bool = True,
fdet_max_faces: int = 1,
fdet_fs: float = 1.0,
fdet_score_threshold: float = 0.9,
fdet_iou_threshold: float = 0.3
fdet_iou_threshold: float = 0.3,
export_to_json: bool = True,
export_dir: str = "."
):
"""Initialisation. Loads face detection model if necessary.
Args:
method: The rPPG method to be used for inference.
api_key: Usage key for the VitalLens API (required for Method.VITALLENS)
detect_faces: `True` if faces need to be detected, otherwise `False`.
estimate_running_vitals: Set `True` to compute running vitals (e.g., `running_heart_rate`).
fdet_max_faces: The maximum number of faces to detect (if necessary).
fdet_fs: Frequency [Hz] at which faces should be scanned. Detections are
linearly interpolated for remaining frames.
fdet_score_threshold: Face detection score threshold.
fdet_iou_threshold: Face detection iou threshold.
export_to_json: If `True`, write results to a json file.
export_dir: The directory to which json files are written.
"""
self.api_key = api_key
# Load the config and model
Expand All @@ -80,6 +92,9 @@ def __init__(
else:
raise ValueError("Method {} not implemented!".format(self.config['model']))
self.detect_faces = detect_faces
self.estimate_running_vitals = estimate_running_vitals
self.export_to_json = export_to_json
self.export_dir = export_dir
if detect_faces:
# Initialise face detector
self.face_detector = FaceDetector(
Expand All @@ -90,7 +105,8 @@ def __call__(
video: Union[np.ndarray, str],
faces: Union[np.ndarray, list] = None,
fps: float = None,
override_fps_target: float = None
override_fps_target: float = None,
export_filename: str = None
) -> list:
"""Run rPPG inference.
Expand All @@ -107,6 +123,7 @@ def __call__(
fps: Sampling frequency of the input video. Required if type(video) == np.ndarray.
override_fps_target: Target fps at which rPPG inference should be run (optional).
If not provided, will use default of the selected method.
export_filename: Filename for json export if applicable.
Returns:
result: Analysis results as a list of faces in the following format:
[
Expand All @@ -118,11 +135,11 @@ def __call__(
},
'vital_signs': {
'heart_rate': {
'value': <Estimated value as float scalar>,
'unit': <Value unit>,
'confidence': <Estimation confidence as float scalar>,
'note': <Explanatory note>
},
'value': <Estimated value as float scalar>,
'unit': <Value unit>,
'confidence': <Estimation confidence as float scalar>,
'note': <Explanatory note>
},
'respiratory_rate': {
'value': <Estimated value as float scalar>,
'unit': <Value unit>,
Expand Down Expand Up @@ -192,7 +209,45 @@ def __call__(
'confidence': conf[name],
'note': note[name]
}
if self.estimate_running_vitals:
try:
if 'ppg_waveform' in self.config['signals']:
window_size = int(CALC_HR_WINDOW_SIZE*SECONDS_PER_MINUTE)
running_hr = windowed_freq(
x=data['ppg_waveform'], f_s=fps, f_res=0.005,
f_range=(CALC_HR_MIN/SECONDS_PER_MINUTE, CALC_HR_MAX/SECONDS_PER_MINUTE),
window_size=window_size, overlap=window_size//2) * SECONDS_PER_MINUTE
running_conf = windowed_mean(
x=conf['ppg_waveform'], window_size=window_size, overlap=window_size//2)
vital_signs_results['running_heart_rate'] = {
'data': running_hr,
'unit': 'bpm',
'confidence': running_conf,
'note': 'Estimate of the running heart rate using VitalLens, along with frame-wise confidences between 0 and 1.',
}
if 'respiratory_waveform' in self.config['signals']:
window_size = int(CALC_RR_WINDOW_SIZE*SECONDS_PER_MINUTE)
running_rr = windowed_freq(
x=data['respiratory_waveform'], f_s=fps, f_res=0.005,
f_range=(CALC_RR_MIN/SECONDS_PER_MINUTE, CALC_RR_MAX/SECONDS_PER_MINUTE),
window_size=window_size, overlap=window_size//2) * SECONDS_PER_MINUTE
running_conf = windowed_mean(
x=conf['respiratory_waveform'], window_size=window_size, overlap=window_size//2)
vital_signs_results['running_respiratory_rate'] = {
'data': running_rr,
'unit': 'bpm',
'confidence': running_conf,
'note': 'Estimate of the running respiratory rate using VitalLens, along with frame-wise confidences between 0 and 1.',
}
except ValueError as e:
logging.warn("Issue while computing running vitals: {}".format(e))
face_result['vital_signs'] = vital_signs_results
face_result['message'] = DISCLAIMER
results.append(face_result)
# Export to json
if self.export_to_json:
os.makedirs(self.export_dir, exist_ok=True)
export_filename = "{}.json".format(export_filename) if export_filename is not None else 'vitallens_{}.json'.format(datetime.now().strftime("%Y%m%d_%H%M%S"))
with open(os.path.join(self.export_dir, export_filename), 'w') as f:
json.dump(convert_ndarray_to_list(results), f, indent=4)
return results
2 changes: 2 additions & 0 deletions vitallens/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
# Minima and maxima of derived vitals
CALC_HR_MIN = 40
CALC_HR_MAX = 240
CALC_HR_WINDOW_SIZE = 10
CALC_RR_MIN = 1
CALC_RR_MAX = 60
CALC_RR_WINDOW_SIZE = 20

# API settings
API_MIN_FRAMES = 16
Expand Down
2 changes: 1 addition & 1 deletion vitallens/methods/simple_rppg_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __call__(
data[name] = hr
unit[name] = 'bpm'
conf[name] = 1.0
note[name] = 'Estimate of the heart rate using {} method. This method is not capable of providing a confidence estimate, hence returning 1.'.format(self.model)
note[name] = 'Estimate of the global heart rate using {} method. This method is not capable of providing a confidence estimate, hence returning 1.'.format(self.model)
elif name == 'ppg_waveform':
data[name] = sig
unit[name] = 'unitless'
Expand Down
Loading

0 comments on commit a2a4196

Please sign in to comment.