From 1bf1dcfcc30e47ee18514dd5e34b4f6d4175e195 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Fri, 17 Jan 2025 16:29:34 +0100 Subject: [PATCH 1/6] deprecate sample_rate in EdfReader and EdfReader tests --- pyedflib/_extensions/_pyedflib.pyx | 11 +++++++---- pyedflib/edfreader.py | 1 - pyedflib/edfwriter.py | 24 ++++++------------------ pyedflib/tests/test_edfreader.py | 27 +++++++++++++-------------- 4 files changed, 26 insertions(+), 37 deletions(-) diff --git a/pyedflib/_extensions/_pyedflib.pyx b/pyedflib/_extensions/_pyedflib.pyx index 60874531..0e47ce01 100644 --- a/pyedflib/_extensions/_pyedflib.pyx +++ b/pyedflib/_extensions/_pyedflib.pyx @@ -123,7 +123,9 @@ cdef class CyEdfReader: Note that edflib.c is encapsulated so there is no direct access to the file from here unless I add a raw interface or something - EDF/BDF+ files are arranged into N signals sampled at rate Fs. The data is actually stored in chunks called "datarecords" which have a file specific size. + EDF/BDF+ files are arranged into N signals sampled at rate Fs. + The data is actually stored in chunks called "datarecords" which have a + file specific size. A typical way to use this to read an EEG file would be to choose a certain number of seconds per page to display. Then figureout how many data records @@ -554,9 +556,10 @@ def set_samples_per_record(handle, edfsignal, smp_per_record ): """ int set_samples_per_record(int handle, int edfsignal, int smp_per_record ) - sets how many samples are in the record for this signal. - this is not the sampling frequency (Hz), (which is calculated by - by smp_per_record/record_duration). + sets how many samples are in each record for this signal. + this is not the sampling frequency (Hz), unless record_duration=1 + The sample frequency is calculated by smp_per_record/record_duration. + The function call to the C library is therefore slightly mislabeled """ return c_edf.edf_set_samplefrequency(handle, edfsignal, smp_per_record) diff --git a/pyedflib/edfreader.py b/pyedflib/edfreader.py index 116766c4..889ae2be 100644 --- a/pyedflib/edfreader.py +++ b/pyedflib/edfreader.py @@ -206,7 +206,6 @@ def getSignalHeader( return { "label": self.getLabel(chn), "dimension": self.getPhysicalDimension(chn), - "sample_rate": self.getSampleFrequency(chn), # backwards compatibility "sample_frequency": self.getSampleFrequency(chn), "physical_max": self.getPhysicalMaximum(chn), "physical_min": self.getPhysicalMinimum(chn), diff --git a/pyedflib/edfwriter.py b/pyedflib/edfwriter.py index ea9d3e9e..5cf06a1e 100644 --- a/pyedflib/edfwriter.py +++ b/pyedflib/edfwriter.py @@ -226,7 +226,6 @@ def __init__(self, file_name: str, n_channels: int, file_type: int = FILETYPE_ED 'label' : channel label (string, <= 16 characters, must be unique) 'dimension' : physical dimension (e.g., mV) (string, <= 8 characters) - 'sample_rate' : sample frequency in hertz (int). Deprecated: use 'sample_frequency' instead. 'sample_frequency' : number of samples per record (int) 'physical_max' : maximum physical value (float) 'physical_min' : minimum physical value (float) @@ -253,13 +252,13 @@ def __init__(self, file_name: str, n_channels: int, file_type: int = FILETYPE_ED self.sample_buffer: List[List] = [] for i in np.arange(self.n_channels): if self.file_type == FILETYPE_BDFPLUS or self.file_type == FILETYPE_BDF: - self.channels.append({'label': f'ch{i}', 'dimension': 'mV', 'sample_rate': 100, - 'sample_frequency': None, 'physical_max': 1.0, 'physical_min': -1.0, + self.channels.append({'label': f'ch{i}', 'dimension': 'mV', 'sample_frequency': None, + 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 8388607,'digital_min': -8388608, 'prefilter': '', 'transducer': ''}) elif self.file_type == FILETYPE_EDFPLUS or self.file_type == FILETYPE_EDF: - self.channels.append({'label': f'ch{i}', 'dimension': 'mV', 'sample_rate': 100, - 'sample_frequency': None, 'physical_max': 1.0, 'physical_min': -1.0, + self.channels.append({'label': f'ch{i}', 'dimension': 'mV', 'sample_frequency': None, + 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 32767, 'digital_min': -32768, 'prefilter': '', 'transducer': ''}) @@ -949,7 +948,7 @@ def get_smp_per_record(self, ch_idx: int) -> int: gets the calculated number of samples that need to be fit into one record (i.e. window/block of data) with the given record duration. """ - fs = self._get_sample_frequency(ch_idx) + fs = self.channels[ch_idx]['sample_frequency'] if fs is None: return None record_duration = self.record_duration @@ -972,7 +971,7 @@ def _calculate_optimal_record_duration(self) -> None: """ if self._enforce_record_duration: return allint = lambda int_list: all([n==int(n) for n in int_list]) - all_fs = [self._get_sample_frequency(i) for i,_ in enumerate(self.channels)] + all_fs = [self.channels[i]['sample_frequency'] for i,_ in enumerate(self.channels)] # calculate the optimal record duration to represent all frequencies. # this is achieved when fs*duration=int, i.e. the number of samples @@ -989,14 +988,3 @@ def _calculate_optimal_record_duration(self) -> None: assert record_duration>0, f'cannot accurately represent sampling frequencies with data record durations between 1-60s: {all_fs}' assert record_duration<=60, 'record duration must be below 60 seconds' self.record_duration = record_duration - - def _get_sample_frequency(self, channelIndex: int) -> Union[int, float]: - # Temporary conditional assignment while we deprecate 'sample_rate' as a channel attribute - # in favor of 'sample_frequency', supporting the use of either to give - # users time to switch to the new interface. - if 'sample_rate' in self.channels[channelIndex]: - warnings.warn("`sample_rate` is deprecated and will be removed in a future release. \ - Please use `sample_frequency` instead", DeprecationWarning) - return (self.channels[channelIndex]['sample_rate'] - if self.channels[channelIndex].get('sample_frequency') is None - else self.channels[channelIndex]['sample_frequency']) diff --git a/pyedflib/tests/test_edfreader.py b/pyedflib/tests/test_edfreader.py index 648e5c27..5f5bec4d 100644 --- a/pyedflib/tests/test_edfreader.py +++ b/pyedflib/tests/test_edfreader.py @@ -260,9 +260,10 @@ def test_EdfReader_ReadAnnotations(self): del f + def test_EdfReader_Check_Size(self): sample_frequency = 100 - channel_info = {'label': 'test_label', 'dimension': 'mV', 'sample_rate': 256, + channel_info = {'label': 'test_label', 'dimension': 'mV', 'sample_frequency': sample_frequency, 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} @@ -276,19 +277,18 @@ def test_EdfReader_Check_Size(self): f = pyedflib.EdfReader(self.bdf_broken_file, pyedflib.READ_ALL_ANNOTATIONS, pyedflib.DO_NOT_CHECK_FILE_SIZE) np.testing.assert_equal(f.getTechnician(), 'tec1') - np.testing.assert_equal(f.getLabel(0), 'test_label') np.testing.assert_equal(f.getPhysicalDimension(0), 'mV') np.testing.assert_equal(f.getPrefilter(0), 'pre1') np.testing.assert_equal(f.getTransducer(0), 'trans1') np.testing.assert_equal(f.getSampleFrequency(0), sample_frequency) - # When both 'sample_rate' and 'sample_frequency' are present, we write - # the file using the latter, which means that when we read it back, - # only the 'sample_frequency' value is present. - expected_signal_header = {**channel_info, 'sample_rate': sample_frequency} - np.testing.assert_equal(f.getSignalHeader(0), expected_signal_header) - np.testing.assert_equal(f.getSignalHeaders(), [expected_signal_header]) + # sample_rate was deprecated, check that it does not appear anymore + sheads = f.getSignalHeaders() + for shead in sheads: + assert not 'sample_rate' in shead + assert 'sample_frequency' in shead + del f def test_EdfReader_Close_file(self): @@ -327,12 +327,11 @@ def test_BdfReader_Read_accented_file(self): np.testing.assert_equal(f.getPrefilter(0), 'pre1') np.testing.assert_equal(f.getTransducer(0), 'trans1') - # When both 'sample_rate' and 'sample_frequency' are present, we write - # the file using the latter, which means that when we read it back, - # only the 'sample_frequency' value is present. - expected_signal_header = {**channel_info, 'sample_rate': sample_frequency} - np.testing.assert_equal(f.getSignalHeader(0), expected_signal_header) - np.testing.assert_equal(f.getSignalHeaders(), [expected_signal_header]) + # sample_rate was deprecated, check that it does not appear anymore + sheads = f.getSignalHeaders() + for shead in sheads: + assert not 'sample_rate' in shead + assert 'sample_frequency' in shead del f From c8ab27150e850bdcb234c1d3c8d00d93a5cbf808 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Sat, 18 Jan 2025 10:02:28 +0100 Subject: [PATCH 2/6] deprecate sample_rate in edfwriter and tests --- pyedflib/edfwriter.py | 149 ++++++++++++++++++++++--------- pyedflib/highlevel.py | 27 ++---- pyedflib/tests/test_edfreader.py | 4 +- pyedflib/tests/test_edfwriter.py | 113 +++++++++++++++-------- pyedflib/tests/test_highlevel.py | 26 +++--- 5 files changed, 206 insertions(+), 113 deletions(-) diff --git a/pyedflib/edfwriter.py b/pyedflib/edfwriter.py index 5cf06a1e..6860aafa 100644 --- a/pyedflib/edfwriter.py +++ b/pyedflib/edfwriter.py @@ -10,6 +10,9 @@ from datetime import date, datetime from types import TracebackType from typing import Any, Union, Optional, List, Dict, Type +import math +from functools import reduce +from fractions import Fraction import numpy as np @@ -182,6 +185,89 @@ def gender2int(gender: Union[int, str, None]) -> Optional[int]: return sex2int(gender) +def _calculate_record_duration(freqs, max_str_len=8, max_val=60): + """ + Finds a 'record duration' X in (0, max_val] such that for each freq in freqs: + * freq * X is an integer (or freq is zero / already integer) + * len(str(X).rstrip('0').rstrip('.')) <= max_str_len + + Returns: + float: A suitable multiplier X. + Raises: + ValueError: if no suitable multiplier is found. + """ + # Convert input to float just in case + freqs = [float(f) for f in freqs] + + # Ignore frequencies that are 0.0 or already integral — these don't impose constraints + nonint_freqs = [] + for f in freqs: + if f == 0.0: + continue + # If nearly an integer, treat it as integer + # or do an exact check if you prefer: if f.is_integer(): + if abs(f - round(f)) < 1e-12: + continue + nonint_freqs.append(f) + + # If there's no non-integer frequency, X=1 is trivially fine + if not nonint_freqs: + return 1.0 + + # Convert each frequency to a fraction with a bounded denominator + frac_list = [Fraction(f).limit_denominator(10_000_000) for f in nonint_freqs] + + # Compute the LCM of all denominators + def lcm(a, b): + return abs(a * b) // math.gcd(a, b) + + denominators = [fr.denominator for fr in frac_list] + L = reduce(lcm, denominators, 1) + + # For each freq_i = nᵢ / dᵢ, define Mᵢ = L / dᵢ + # We want X = L / d for some d that divides EVERY (nᵢ × Mᵢ). + numerators = [fr.numerator for fr in frac_list] + Ms = [L // fr.denominator for fr in frac_list] + + # G = gcd of all (nᵢ × Mᵢ) + gcd_val = 0 + for n, M in zip(numerators, Ms): + gcd_val = math.gcd(gcd_val, n * M) + + # Edge case: if gcd_val == 0, just return 1.0 to avoid dividing by zero + if gcd_val == 0: + return 1.0 + + # Enumerate all divisors of gcd_val in ascending order + def all_divisors(num): + divs = [] + i = 1 + while i * i <= num: + if num % i == 0: + divs.append(i) + if i != num // i: + divs.append(num // i) + i += 1 + return sorted(divs) + + divisors = all_divisors(gcd_val) + + # For each divisor d, form X = L / d and see if it's <= max_val and short enough + for d in divisors: + candidate = L / d + if candidate <= max_val: + # Build a short string + c_str = str(candidate).rstrip('0').rstrip('.') + if len(c_str) <= max_str_len: + return float(candidate) + + raise ValueError( + f"No suitable record_duration (≤ {max_val}) found with ≤ {max_str_len} ASCII chars " + f"for frequencies: {freqs}" + ) + + + class ChannelDoesNotExist(Exception): def __init__(self, value: Any) -> None: self.parameter = value @@ -252,12 +338,12 @@ def __init__(self, file_name: str, n_channels: int, file_type: int = FILETYPE_ED self.sample_buffer: List[List] = [] for i in np.arange(self.n_channels): if self.file_type == FILETYPE_BDFPLUS or self.file_type == FILETYPE_BDF: - self.channels.append({'label': f'ch{i}', 'dimension': 'mV', 'sample_frequency': None, + self.channels.append({'label': f'ch{i}', 'dimension': 'mV', 'sample_frequency': 100, 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 8388607,'digital_min': -8388608, 'prefilter': '', 'transducer': ''}) elif self.file_type == FILETYPE_EDFPLUS or self.file_type == FILETYPE_EDF: - self.channels.append({'label': f'ch{i}', 'dimension': 'mV', 'sample_frequency': None, + self.channels.append({'label': f'ch{i}', 'dimension': 'mV', 'sample_frequency': 100, 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 32767, 'digital_min': -32768, 'prefilter': '', 'transducer': ''}) @@ -291,7 +377,17 @@ def update_header(self) -> None: # the channels, we need to find a common denominator so that all sample # frequencies can be represented accurately. # this can be overwritten by explicitly calling setDatarecordDuration - self._calculate_optimal_record_duration() + + for ch in self.channels: + # raise exception, can be removed in later release + if 'sample_rate' in ch: + raise ValueError('Use of `sample_rate` is deprecated, use `sample_frequency` instead') + + sample_freqs = [ch['sample_frequency'] for ch in self.channels] + if not self._enforce_record_duration and not any([f is None for f in sample_freqs]): + assert all([isinstance(f, (float, int)) for f in sample_freqs]), \ + f'{sample_freqs=} contains non int/float' + self.record_duration = _calculate_record_duration(sample_freqs) set_technician(self.handle, du(self.technician)) set_recording_additional(self.handle, du(self.recording_additional)) @@ -329,6 +425,8 @@ def update_header(self) -> None: set_transducer(self.handle, i, du(self.channels[i]['transducer'])) set_prefilter(self.handle, i, du(self.channels[i]['prefilter'])) + + def setHeader(self, fileHeader: Dict[str, Union[str, float, int, None]]) -> None: """ Sets the file header @@ -354,7 +452,6 @@ def setSignalHeader(self, edfsignal: int, channel_info: Dict[str, Union[str, int 'label' : channel label (string, <= 16 characters, must be unique) 'dimension' : physical dimension (e.g., mV) (string, <= 8 characters) - 'sample_rate' : sample frequency in hertz (int). Deprecated: use 'sample_frequency' instead. 'sample_frequency' : number of samples per record (int) 'physical_max' : maximum physical value (float) 'physical_min' : minimum physical value (float) @@ -378,8 +475,6 @@ def setSignalHeaders(self, signalHeaders: List[Dict[str, Union[str, int, float, channel label (string, <= 16 characters, must be unique) 'dimension' : str physical dimension (e.g., mV) (string, <= 8 characters) - 'sample_rate' : - sample frequency in hertz (int). Deprecated: use 'sample_frequency' instead. 'sample_frequency' : int number of samples per record 'physical_max' : float @@ -525,10 +620,11 @@ def setDatarecordDuration(self, record_duration: Union[float, int]) -> None: This function is NOT REQUIRED but can be called after opening a file in writemode and before the first sample write action. This function can be used when you want to use a samplefrequency which is not an - integer. For example, if you want to use a samplerate of 0.5 Hz, set + integer. For example, if you want to use a samplefreq of 0.5 Hz, set the samplefrequency to 5 Hz and the datarecord duration to 10 seconds. Do not use this function, except when absolutely necessary! """ + warnings.warn('Forcing a specific record_duration might alter calculated sample_frequencies when reading the file') self._enforce_record_duration = True self.record_duration = record_duration self.update_header() @@ -836,12 +932,6 @@ def writeSamples(self, data_list: Union[List[np.ndarray], np.ndarray], digital: All parameters must be already written into the bdf/edf-file. """ - there_are_blank_sample_frequencies = any([channel.get('sample_frequency') is None - for channel in self.channels]) - if there_are_blank_sample_frequencies: - warnings.warn("The 'sample_rate' parameter is deprecated. Please use " - "'sample_frequency' instead.", DeprecationWarning) - if (len(data_list)) == 0: raise WrongInputSize('Data list is empty') if (len(data_list) != len(self.channels)): @@ -949,7 +1039,9 @@ def get_smp_per_record(self, ch_idx: int) -> int: record (i.e. window/block of data) with the given record duration. """ fs = self.channels[ch_idx]['sample_frequency'] - if fs is None: return None + # if fs is None: + # print('NONE') + # return None record_duration = self.record_duration smp_per_record = fs*record_duration @@ -959,32 +1051,3 @@ def get_smp_per_record(self, ch_idx: int) -> int: f'smp_per_record={smp_per_record}, record_duration={record_duration} seconds,' + f'calculated sample_frequency will be {np.round(smp_per_record)/record_duration}') return int(np.round(smp_per_record)) - - - def _calculate_optimal_record_duration(self) -> None: - """ - calculate optimal denominator (record duration in seconds) - for all sample frequencies such that smp_per_record is an integer - for all channels. - - If all sampling frequencies are integers, this will simply be 1. - """ - if self._enforce_record_duration: return - allint = lambda int_list: all([n==int(n) for n in int_list]) - all_fs = [self.channels[i]['sample_frequency'] for i,_ in enumerate(self.channels)] - - # calculate the optimal record duration to represent all frequencies. - # this is achieved when fs*duration=int, i.e. the number of samples - # in one data record can be represented by an int (smp_per_record) - # if all sampling frequencies are ints, this will be simply 1 - # for now this brute force solution should cover 99% of cases. - # TODO: optimize this process - - record_duration = 0 - for i in range(1, 60): - if allint([x*i for x in all_fs]): - record_duration = i - break - assert record_duration>0, f'cannot accurately represent sampling frequencies with data record durations between 1-60s: {all_fs}' - assert record_duration<=60, 'record duration must be below 60 seconds' - self.record_duration = record_duration diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py index 1895805f..c1841d0b 100644 --- a/pyedflib/highlevel.py +++ b/pyedflib/highlevel.py @@ -34,15 +34,6 @@ # from . import EdfReader -def _get_sample_frequency(signal_header: dict) -> int: - # Temporary conditional assignment while we deprecate 'sample_rate' as a channel attribute - # in favor of 'sample_frequency', supporting the use of either to give - # users time to switch to the new interface. - return (signal_header['sample_rate'] - if signal_header.get('sample_frequency') is None - else signal_header['sample_frequency']) - - def tqdm(iterable: Iterable, *args: Any, **kwargs: Any) -> Iterable: """ These is an optional dependency that shows a progress bar for some @@ -236,8 +227,7 @@ def make_header( def make_signal_header( label: str, dimension: str = 'uV', - sample_rate: Union[int, float] = 256, - sample_frequency: Optional[Union[int, float]] = None, + sample_frequency: Optional[Union[int, float]] = 256, physical_min: float = -200, physical_max: float = 200, digital_min: Union[float, int] = -32768, @@ -257,8 +247,6 @@ def make_signal_header( the name of the channel. dimension : str, optional dimension, eg mV. The default is 'uV'. - sample_rate : int, optional - sampling frequency. The default is 256. Deprecated: use 'sample_frequency' instead. sample_frequency : int, optional sampling frequency. The default is 256. physical_min : float, optional @@ -283,7 +271,6 @@ def make_signal_header( signal_header = {'label': label, 'dimension': dimension, - 'sample_rate': sample_rate, 'sample_frequency': sample_frequency, 'physical_min': physical_min, 'physical_max': physical_max, @@ -297,8 +284,7 @@ def make_signal_header( def make_signal_headers( list_of_labels: List[str], dimension: str = 'uV', - sample_rate: int = 256, - sample_frequency: Optional[Union[int, float]] = None, + sample_frequency: Optional[Union[int, float]] = 256, physical_min: float = -200.0, physical_max: float = 200.0, digital_min: Union[float,int] = -32768, @@ -316,8 +302,6 @@ def make_signal_headers( A list with labels for each channel. dimension : str, optional dimension, eg mV. The default is 'uV'. - sample_rate : int, optional - sampling frequency. The default is 256. Deprecated: use 'sample_frequency' instead. sample_frequency : int, optional sampling frequency. The default is 256. physical_min : float, optional @@ -341,7 +325,7 @@ def make_signal_headers( """ signal_headers = [] for label in list_of_labels: - header = make_signal_header(label, dimension=dimension, sample_rate=sample_rate, + header = make_signal_header(label, dimension=dimension, sample_frequency=sample_frequency, physical_min=physical_min, physical_max=physical_max, digital_min=digital_min, digital_max=digital_max, @@ -435,7 +419,7 @@ def read_edf( signals.append(signal) # we can only return a np.array if all signals have the same samplefreq - sfreqs = [_get_sample_frequency(shead) for shead in signal_headers] + sfreqs = [shead['sample_frequency'] for shead in signal_headers] all_sfreq_same = sfreqs[1:]==sfreqs[:-1] if all_sfreq_same: dtype = np.int32 if digital else float @@ -518,6 +502,9 @@ def write_edf( # check dmin, dmax and pmin, pmax dont exceed signal min/max for sig, shead in zip(signals, signal_headers): + if 'sample_rate' in shead: + raise ValueError('Use of `sample_rate` is deprecated, use `sample_frequency` instead') + dmin, dmax = shead['digital_min'], shead['digital_max'] pmin, pmax = shead['physical_min'], shead['physical_max'] label = shead['label'] diff --git a/pyedflib/tests/test_edfreader.py b/pyedflib/tests/test_edfreader.py index 5f5bec4d..17bddc25 100644 --- a/pyedflib/tests/test_edfreader.py +++ b/pyedflib/tests/test_edfreader.py @@ -308,7 +308,7 @@ def test_EdfReader_Close_file(self): def test_BdfReader_Read_accented_file(self): sample_frequency = 100 - channel_info = {'label': 'test_label', 'dimension': 'mV', 'sample_rate': 256, + channel_info = {'label': 'test_label', 'dimension': 'mV', 'sample_frequency': sample_frequency, 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} @@ -351,7 +351,7 @@ def test_read_incorrect_file(self): # now we create a file that is not EDF/BDF compliant # this should raise an OSError and not a FileNotFoundError with self.assertRaises(OSError) as cm: - channel_info = {'label': 'test_label', 'dimension': 'mV', 'sample_rate': 256, + channel_info = {'label': 'test_label', 'dimension': 'mV', 'sample_frequency': 100, 'physical_max': 1.0, 'physical_min': -1.0, 'digital_max': 8388607, 'digital_min': -8388608, 'prefilter': 'pre1', 'transducer': 'trans1'} diff --git a/pyedflib/tests/test_edfwriter.py b/pyedflib/tests/test_edfwriter.py index af14b52b..b8cd31d9 100644 --- a/pyedflib/tests/test_edfwriter.py +++ b/pyedflib/tests/test_edfwriter.py @@ -15,6 +15,7 @@ import pyedflib from pyedflib.edfreader import EdfReader from pyedflib.edfwriter import ChannelDoesNotExist, EdfWriter, WrongInputSize +from pyedflib.edfwriter import _calculate_record_duration class TestEdfWriter(unittest.TestCase): @@ -867,22 +868,28 @@ def test_non_one_second_record_duration(self): f.close() del f - def test_sample_rate_backwards_compatibility(self): + + def test_force_record_duration(self): + """forcing a specific record duration should alter the sample_freq""" channel_count = 4 - record_duration = 1 - # Choosing a weird sample rate to make sure it doesn't equal any defaults - sample_rate = 42 - record_count = 10 - sample_count_per_channel = sample_rate * record_count + record_duration = 0.33333 # should not be able to represent 256 Hz + samples_per_record = 256 + sample_frequency = 256 + sample_freq_exp = int(256*record_duration)/record_duration + record_count = 4 + f = pyedflib.EdfWriter(self.edf_data_file, channel_count, file_type=pyedflib.FILETYPE_EDF) + with self.assertWarns(UserWarning): + f.setDatarecordDuration(record_duration) physMax = 1 physMin = -physMax digMax = 32767 digMin = -digMax - base_signal_header = lambda idx: { + f.setSignalHeaders([{ 'label': f'test_label{idx}', + 'sample_frequency': sample_frequency, 'dimension': 'mV', 'physical_min': physMin, 'physical_max': physMax, @@ -890,44 +897,53 @@ def test_sample_rate_backwards_compatibility(self): 'digital_max': digMax, 'transducer': f'trans{idx}', 'prefilter': f'pre{idx}' - } + } for idx in range(channel_count)]) - with self.subTest("when 'sample_frequency' param is missing"): - f = pyedflib.EdfWriter(self.edf_data_file, channel_count, file_type=pyedflib.FILETYPE_EDF) - f.setDatarecordDuration(record_duration) + f.writeSamples(np.random.rand(channel_count, samples_per_record*4)) + f.close() + del f - f.setSignalHeaders([{ - 'sample_rate': sample_rate, - **base_signal_header(idx) - } for idx in range(channel_count)]) + f = pyedflib.EdfReader(self.edf_data_file) - with self.assertWarnsRegex(DeprecationWarning, "'sample_rate' parameter is deprecated"): - f.writeSamples(np.random.rand(channel_count, sample_count_per_channel)) - del f + for signal_header in f.getSignalHeaders(): + self.assertEqual(signal_header['sample_frequency'], sample_freq_exp) - f = pyedflib.EdfReader(self.edf_data_file) + self.assertEqual(f.datarecord_duration, record_duration) + f.close() + del f - for signal_header in f.getSignalHeaders(): - self.assertEqual(signal_header['sample_rate'], sample_rate) - del f + def test_sample_rate_deprecation(self): + channel_count = 4 + record_duration = 1 + # Choosing a weird sample rate to make sure it doesn't equal any defaults + sample_rate = 42 - with self.subTest("when 'sample_frequency' param is present"): - f = pyedflib.EdfWriter(self.edf_data_file, channel_count, file_type=pyedflib.FILETYPE_EDF) - f.setDatarecordDuration(record_duration) + physMax = 1 + physMin = -physMax + digMax = 32767 + digMin = -digMax + + base_signal_header = lambda idx: { + 'label': f'test_label{idx}', + 'dimension': 'mV', + 'physical_min': physMin, + 'physical_max': physMax, + 'digital_min': digMin, + 'digital_max': digMax, + 'transducer': f'trans{idx}', + 'prefilter': f'pre{idx}' + } + + f = pyedflib.EdfWriter(self.edf_data_file, channel_count, file_type=pyedflib.FILETYPE_EDF) + f.setDatarecordDuration(record_duration) + with self.assertRaises(ValueError): f.setSignalHeaders([{ - 'sample_frequency': sample_rate, + 'sample_rate': sample_rate, **base_signal_header(idx) } for idx in range(channel_count)]) - f.writeSamples(np.random.rand(channel_count, sample_count_per_channel)) - - del f - - f = pyedflib.EdfReader(self.edf_data_file) - for signal_header in f.getSignalHeaders(): - self.assertEqual(signal_header['sample_frequency'], sample_rate) + del f - del f def test_EdfWriter_more_than_80_chars(self): @@ -1126,8 +1142,7 @@ def test_write_annotations_long_long(self): with EdfWriter(self.edf_data_file, len(ch_names)) as f: f.setSignalHeaders([{'label': 'EEG1', 'dimension': 'uV', - 'sample_rate': 256, - 'sample_frequency': None, + 'sample_frequency': 256, 'physical_min': -200.0, 'physical_max': 200.0, 'digital_min': -32768, @@ -1136,8 +1151,7 @@ def test_write_annotations_long_long(self): 'prefilter': ''}, {'label': 'EEG2', 'dimension': 'uV', - 'sample_rate': 256, - 'sample_frequency': None, + 'sample_frequency': 256, 'physical_min': -200.0, 'physical_max': 200.0, 'digital_min': -32768, @@ -1154,6 +1168,29 @@ def test_write_annotations_long_long(self): self.assertEqual( len(annotations[0]), 48) np.testing.assert_array_equal(annotations[0], np.arange(0, 342000, 3600*2)) + def test_find_record_duration(self): + """check that _calculate_record_duration finds optimal values""" + freqs = np.arange(10) + duration = _calculate_record_duration(freqs) + assert duration==1 + + freqs = np.arange(10)/3 + duration = _calculate_record_duration(freqs) + assert duration==3 + + freqs = np.array([0.32, 1.4, 2.3, 6.4, 3.1]) + duration = _calculate_record_duration(freqs) + assert duration == 50 + + # float rounding errors should be handled correctly + freqs = np.array([199.99999999, 255.99999999]) + duration = _calculate_record_duration(freqs) + assert duration == 1 + + for _ in range(10): + freqs = np.random.randint(1, 10000, 25,).astype(int) + duration = _calculate_record_duration(freqs) + assert duration == 1 if __name__ == '__main__': # run_module_suite(argv=sys.argv) diff --git a/pyedflib/tests/test_highlevel.py b/pyedflib/tests/test_highlevel.py index bada0866..fb24e541 100644 --- a/pyedflib/tests/test_highlevel.py +++ b/pyedflib/tests/test_highlevel.py @@ -119,11 +119,7 @@ def test_read_write_edf(self): self.assertEqual(len(signals2), 5) self.assertEqual(len(signals2), len(signal_headers2)) for shead1, shead2 in zip(signal_headers1, signal_headers2): - # When only 'sample_rate' is present, we use its value to write - # the file, ignoring 'sample_frequency', which means that when - # we read it back only the 'sample_rate' value is present. - self.assertDictEqual({**shead1, 'sample_frequency': shead1['sample_rate']}, - shead2) + self.assertDictEqual(shead1, shead2) np.testing.assert_allclose(signals, signals2, atol=0.01) if file_type in [-1, 1, 3]: self.assertDictEqual(header, header2) @@ -141,11 +137,8 @@ def test_read_write_edf(self): self.assertEqual(len(signals2), len(signal_headers2)) np.testing.assert_array_equal(signals, signals2) for shead1, shead2 in zip(signal_headers1, signal_headers2): - # When only 'sample_rate' is present, we use its value to write - # the file, ignoring 'sample_frequency', which means that when - # we read it back only the 'sample_rate' value is present. - self.assertDictEqual({**shead1, 'sample_frequency': shead1['sample_rate']}, - shead2) + self.assertDictEqual(shead1, shead2) + # EDF/BDF header writing does not correctly work yet if file_type in [-1, 1, 3]: self.assertDictEqual(header, header2) @@ -415,6 +408,19 @@ def test_annotation_bytestring(self): _,_,header3 = highlevel.read_edf(self.edfplus_data_file) self.assertEqual(header2['annotations'], header3['annotations']) + def test_deprecated_sample_rate(self): + header = highlevel.make_header(technician='tech', recording_additional='radd', + patientname='name', patient_additional='padd', + patientcode='42', equipment='eeg', admincode='420', + sex='Male', birthdate='05.09.1980') + annotations = [[0.01, b'-1', 'begin'],[0.5, b'-1', 'middle'],[10, -1, 'end']] + header['annotations'] = annotations + signal_headers = highlevel.make_signal_headers(['ch'+str(i) for i in range(3)]) + signal_headers[0]['sample_rate'] = 256 + signals = np.random.rand(3, 256*300)*200 #5 minutes of eeg + with self.assertRaises(ValueError): + highlevel.write_edf(self.edfplus_data_file, signals, signal_headers, header) + if __name__ == '__main__': # run_module_suite(argv=sys.argv) From e0c84ba286485d107a49bc5a5e4e16b7178a69c8 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Sat, 18 Jan 2025 10:08:30 +0100 Subject: [PATCH 3/6] cleanup --- pyedflib/edfwriter.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pyedflib/edfwriter.py b/pyedflib/edfwriter.py index 6860aafa..0b5dd158 100644 --- a/pyedflib/edfwriter.py +++ b/pyedflib/edfwriter.py @@ -1039,9 +1039,6 @@ def get_smp_per_record(self, ch_idx: int) -> int: record (i.e. window/block of data) with the given record duration. """ fs = self.channels[ch_idx]['sample_frequency'] - # if fs is None: - # print('NONE') - # return None record_duration = self.record_duration smp_per_record = fs*record_duration From 135b814cb00cc5d4666385b1051be68866201fc5 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Sat, 18 Jan 2025 10:11:46 +0100 Subject: [PATCH 4/6] remove 3.7 from test suite --- .github/workflows/tests.yml | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ef4f1639..65d91235 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,13 +9,8 @@ jobs: strategy: matrix: os: [ubuntu-latest, windows-2019, windows-latest, macos-latest] - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] - exclude: # Python < v3.8 does not support Apple Silicon ARM64. - - python-version: "3.7" - os: macos-latest - include: # So run those legacy versions on Intel CPUs. - - python-version: "3.7" - os: macos-13 + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + steps: - uses: actions/checkout@v3 - name: Setup Python From 553751bbed34d4b2b0c52f53daee134a269b6daf Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Mon, 20 Jan 2025 20:26:32 +0100 Subject: [PATCH 5/6] added suggestions by @cbrnr --- pyedflib/_extensions/_pyedflib.pyx | 9 ++++----- pyedflib/edfwriter.py | 4 ++-- pyedflib/highlevel.py | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pyedflib/_extensions/_pyedflib.pyx b/pyedflib/_extensions/_pyedflib.pyx index 0e47ce01..9631f7be 100644 --- a/pyedflib/_extensions/_pyedflib.pyx +++ b/pyedflib/_extensions/_pyedflib.pyx @@ -555,12 +555,11 @@ def set_equipment(handle, equipment): def set_samples_per_record(handle, edfsignal, smp_per_record ): """ int set_samples_per_record(int handle, int edfsignal, int smp_per_record ) - - sets how many samples are in each record for this signal. - this is not the sampling frequency (Hz), unless record_duration=1 - The sample frequency is calculated by smp_per_record/record_duration. - The function call to the C library is therefore slightly mislabeled """ + # sets how many samples are in each record for this signal. + # this is not the sampling frequency (Hz), unless record_duration=1 + # The sample frequency is calculated by smp_per_record/record_duration. + # The function call to the C library is therefore slightly mislabeled return c_edf.edf_set_samplefrequency(handle, edfsignal, smp_per_record) def set_admincode(handle, admincode): diff --git a/pyedflib/edfwriter.py b/pyedflib/edfwriter.py index 0b5dd158..9e9ec6ff 100644 --- a/pyedflib/edfwriter.py +++ b/pyedflib/edfwriter.py @@ -381,7 +381,7 @@ def update_header(self) -> None: for ch in self.channels: # raise exception, can be removed in later release if 'sample_rate' in ch: - raise ValueError('Use of `sample_rate` is deprecated, use `sample_frequency` instead') + raise FutureWarning('Use of `sample_rate` is deprecated, use `sample_frequency` instead') sample_freqs = [ch['sample_frequency'] for ch in self.channels] if not self._enforce_record_duration and not any([f is None for f in sample_freqs]): @@ -620,7 +620,7 @@ def setDatarecordDuration(self, record_duration: Union[float, int]) -> None: This function is NOT REQUIRED but can be called after opening a file in writemode and before the first sample write action. This function can be used when you want to use a samplefrequency which is not an - integer. For example, if you want to use a samplefreq of 0.5 Hz, set + integer. For example, if you want to use a sample frequency of 0.5 Hz, set the samplefrequency to 5 Hz and the datarecord duration to 10 seconds. Do not use this function, except when absolutely necessary! """ diff --git a/pyedflib/highlevel.py b/pyedflib/highlevel.py index c1841d0b..bd9f083b 100644 --- a/pyedflib/highlevel.py +++ b/pyedflib/highlevel.py @@ -503,7 +503,7 @@ def write_edf( # check dmin, dmax and pmin, pmax dont exceed signal min/max for sig, shead in zip(signals, signal_headers): if 'sample_rate' in shead: - raise ValueError('Use of `sample_rate` is deprecated, use `sample_frequency` instead') + raise FutureWarning('Use of `sample_rate` is deprecated, use `sample_frequency` instead') dmin, dmax = shead['digital_min'], shead['digital_max'] pmin, pmax = shead['physical_min'], shead['physical_max'] From 75540e2f33308a7751aebb937deb3f8c1451aac6 Mon Sep 17 00:00:00 2001 From: skjerns <14980558+skjerns@users.noreply.github.com> Date: Mon, 20 Jan 2025 20:29:00 +0100 Subject: [PATCH 6/6] fix tests --- .github/workflows/tests.yml | 2 +- pyedflib/tests/test_edfwriter.py | 2 +- pyedflib/tests/test_highlevel.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 65d91235..80d7d79e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,7 +9,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, windows-2019, windows-latest, macos-latest] - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] steps: - uses: actions/checkout@v3 diff --git a/pyedflib/tests/test_edfwriter.py b/pyedflib/tests/test_edfwriter.py index b8cd31d9..d047d5e1 100644 --- a/pyedflib/tests/test_edfwriter.py +++ b/pyedflib/tests/test_edfwriter.py @@ -937,7 +937,7 @@ def test_sample_rate_deprecation(self): f = pyedflib.EdfWriter(self.edf_data_file, channel_count, file_type=pyedflib.FILETYPE_EDF) f.setDatarecordDuration(record_duration) - with self.assertRaises(ValueError): + with self.assertRaises(FutureWarning): f.setSignalHeaders([{ 'sample_rate': sample_rate, **base_signal_header(idx) diff --git a/pyedflib/tests/test_highlevel.py b/pyedflib/tests/test_highlevel.py index fb24e541..7cb11f5f 100644 --- a/pyedflib/tests/test_highlevel.py +++ b/pyedflib/tests/test_highlevel.py @@ -418,7 +418,7 @@ def test_deprecated_sample_rate(self): signal_headers = highlevel.make_signal_headers(['ch'+str(i) for i in range(3)]) signal_headers[0]['sample_rate'] = 256 signals = np.random.rand(3, 256*300)*200 #5 minutes of eeg - with self.assertRaises(ValueError): + with self.assertRaises(FutureWarning): highlevel.write_edf(self.edfplus_data_file, signals, signal_headers, header)