Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate sample_rate #268

Merged
merged 6 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,8 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, windows-2019, windows-latest, macos-latest]
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"]
exclude: # Python < v3.8 does not support Apple Silicon ARM64.
- python-version: "3.7"
os: macos-latest
include: # So run those legacy versions on Intel CPUs.
- python-version: "3.7"
os: macos-13
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v3
- name: Setup Python
Expand Down
12 changes: 7 additions & 5 deletions pyedflib/_extensions/_pyedflib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ cdef class CyEdfReader:
Note that edflib.c is encapsulated so there is no direct access to the file
from here unless I add a raw interface or something

EDF/BDF+ files are arranged into N signals sampled at rate Fs. The data is actually stored in chunks called "datarecords" which have a file specific size.
EDF/BDF+ files are arranged into N signals sampled at rate Fs.
The data is actually stored in chunks called "datarecords" which have a
file specific size.

A typical way to use this to read an EEG file would be to choose a certain
number of seconds per page to display. Then figureout how many data records
Expand Down Expand Up @@ -553,11 +555,11 @@ def set_equipment(handle, equipment):
def set_samples_per_record(handle, edfsignal, smp_per_record ):
"""
int set_samples_per_record(int handle, int edfsignal, int smp_per_record )

sets how many samples are in the record for this signal.
this is not the sampling frequency (Hz), (which is calculated by
by smp_per_record/record_duration).
"""
# sets how many samples are in each record for this signal.
# this is not the sampling frequency (Hz), unless record_duration=1
# The sample frequency is calculated by smp_per_record/record_duration.
# The function call to the C library is therefore slightly mislabeled
return c_edf.edf_set_samplefrequency(handle, edfsignal, smp_per_record)

def set_admincode(handle, admincode):
Expand Down
1 change: 0 additions & 1 deletion pyedflib/edfreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ def getSignalHeader(
return {
"label": self.getLabel(chn),
"dimension": self.getPhysicalDimension(chn),
"sample_rate": self.getSampleFrequency(chn), # backwards compatibility
"sample_frequency": self.getSampleFrequency(chn),
"physical_max": self.getPhysicalMaximum(chn),
"physical_min": self.getPhysicalMinimum(chn),
Expand Down
164 changes: 106 additions & 58 deletions pyedflib/edfwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from datetime import date, datetime
from types import TracebackType
from typing import Any, Union, Optional, List, Dict, Type
import math
from functools import reduce
from fractions import Fraction

import numpy as np

Expand Down Expand Up @@ -182,6 +185,89 @@ def gender2int(gender: Union[int, str, None]) -> Optional[int]:
return sex2int(gender)


def _calculate_record_duration(freqs, max_str_len=8, max_val=60):
"""
Finds a 'record duration' X in (0, max_val] such that for each freq in freqs:
* freq * X is an integer (or freq is zero / already integer)
* len(str(X).rstrip('0').rstrip('.')) <= max_str_len

Returns:
float: A suitable multiplier X.
Raises:
ValueError: if no suitable multiplier is found.
"""
# Convert input to float just in case
freqs = [float(f) for f in freqs]

# Ignore frequencies that are 0.0 or already integral — these don't impose constraints
nonint_freqs = []
for f in freqs:
if f == 0.0:
continue
# If nearly an integer, treat it as integer
# or do an exact check if you prefer: if f.is_integer():
if abs(f - round(f)) < 1e-12:
continue
nonint_freqs.append(f)

# If there's no non-integer frequency, X=1 is trivially fine
if not nonint_freqs:
return 1.0

# Convert each frequency to a fraction with a bounded denominator
frac_list = [Fraction(f).limit_denominator(10_000_000) for f in nonint_freqs]

# Compute the LCM of all denominators
def lcm(a, b):
return abs(a * b) // math.gcd(a, b)

denominators = [fr.denominator for fr in frac_list]
L = reduce(lcm, denominators, 1)

# For each freq_i = nᵢ / dᵢ, define Mᵢ = L / dᵢ
# We want X = L / d for some d that divides EVERY (nᵢ × Mᵢ).
numerators = [fr.numerator for fr in frac_list]
Ms = [L // fr.denominator for fr in frac_list]

# G = gcd of all (nᵢ × Mᵢ)
gcd_val = 0
for n, M in zip(numerators, Ms):
gcd_val = math.gcd(gcd_val, n * M)

# Edge case: if gcd_val == 0, just return 1.0 to avoid dividing by zero
if gcd_val == 0:
return 1.0

# Enumerate all divisors of gcd_val in ascending order
def all_divisors(num):
divs = []
i = 1
while i * i <= num:
if num % i == 0:
divs.append(i)
if i != num // i:
divs.append(num // i)
i += 1
return sorted(divs)

divisors = all_divisors(gcd_val)

# For each divisor d, form X = L / d and see if it's <= max_val and short enough
for d in divisors:
candidate = L / d
if candidate <= max_val:
# Build a short string
c_str = str(candidate).rstrip('0').rstrip('.')
if len(c_str) <= max_str_len:
return float(candidate)

raise ValueError(
f"No suitable record_duration (≤ {max_val}) found with ≤ {max_str_len} ASCII chars "
f"for frequencies: {freqs}"
)



class ChannelDoesNotExist(Exception):
def __init__(self, value: Any) -> None:
self.parameter = value
Expand Down Expand Up @@ -226,7 +312,6 @@ def __init__(self, file_name: str, n_channels: int, file_type: int = FILETYPE_ED

'label' : channel label (string, <= 16 characters, must be unique)
'dimension' : physical dimension (e.g., mV) (string, <= 8 characters)
'sample_rate' : sample frequency in hertz (int). Deprecated: use 'sample_frequency' instead.
'sample_frequency' : number of samples per record (int)
'physical_max' : maximum physical value (float)
'physical_min' : minimum physical value (float)
Expand All @@ -253,13 +338,13 @@ def __init__(self, file_name: str, n_channels: int, file_type: int = FILETYPE_ED
self.sample_buffer: List[List] = []
for i in np.arange(self.n_channels):
if self.file_type == FILETYPE_BDFPLUS or self.file_type == FILETYPE_BDF:
self.channels.append({'label': f'ch{i}', 'dimension': 'mV', 'sample_rate': 100,
'sample_frequency': None, 'physical_max': 1.0, 'physical_min': -1.0,
self.channels.append({'label': f'ch{i}', 'dimension': 'mV', 'sample_frequency': 100,
'physical_max': 1.0, 'physical_min': -1.0,
'digital_max': 8388607,'digital_min': -8388608,
'prefilter': '', 'transducer': ''})
elif self.file_type == FILETYPE_EDFPLUS or self.file_type == FILETYPE_EDF:
self.channels.append({'label': f'ch{i}', 'dimension': 'mV', 'sample_rate': 100,
'sample_frequency': None, 'physical_max': 1.0, 'physical_min': -1.0,
self.channels.append({'label': f'ch{i}', 'dimension': 'mV', 'sample_frequency': 100,
'physical_max': 1.0, 'physical_min': -1.0,
'digital_max': 32767, 'digital_min': -32768,
'prefilter': '', 'transducer': ''})

Expand Down Expand Up @@ -292,7 +377,17 @@ def update_header(self) -> None:
# the channels, we need to find a common denominator so that all sample
# frequencies can be represented accurately.
# this can be overwritten by explicitly calling setDatarecordDuration
self._calculate_optimal_record_duration()

for ch in self.channels:
# raise exception, can be removed in later release
if 'sample_rate' in ch:
raise FutureWarning('Use of `sample_rate` is deprecated, use `sample_frequency` instead')

sample_freqs = [ch['sample_frequency'] for ch in self.channels]
if not self._enforce_record_duration and not any([f is None for f in sample_freqs]):
assert all([isinstance(f, (float, int)) for f in sample_freqs]), \
f'{sample_freqs=} contains non int/float'
self.record_duration = _calculate_record_duration(sample_freqs)

set_technician(self.handle, du(self.technician))
set_recording_additional(self.handle, du(self.recording_additional))
Expand Down Expand Up @@ -330,6 +425,8 @@ def update_header(self) -> None:
set_transducer(self.handle, i, du(self.channels[i]['transducer']))
set_prefilter(self.handle, i, du(self.channels[i]['prefilter']))



def setHeader(self, fileHeader: Dict[str, Union[str, float, int, None]]) -> None:
"""
Sets the file header
Expand All @@ -355,7 +452,6 @@ def setSignalHeader(self, edfsignal: int, channel_info: Dict[str, Union[str, int

'label' : channel label (string, <= 16 characters, must be unique)
'dimension' : physical dimension (e.g., mV) (string, <= 8 characters)
'sample_rate' : sample frequency in hertz (int). Deprecated: use 'sample_frequency' instead.
'sample_frequency' : number of samples per record (int)
'physical_max' : maximum physical value (float)
'physical_min' : minimum physical value (float)
Expand All @@ -379,8 +475,6 @@ def setSignalHeaders(self, signalHeaders: List[Dict[str, Union[str, int, float,
channel label (string, <= 16 characters, must be unique)
'dimension' : str
physical dimension (e.g., mV) (string, <= 8 characters)
'sample_rate' :
sample frequency in hertz (int). Deprecated: use 'sample_frequency' instead.
'sample_frequency' : int
number of samples per record
'physical_max' : float
Expand Down Expand Up @@ -526,10 +620,11 @@ def setDatarecordDuration(self, record_duration: Union[float, int]) -> None:
This function is NOT REQUIRED but can be called after opening a file
in writemode and before the first sample write action. This function
can be used when you want to use a samplefrequency which is not an
integer. For example, if you want to use a samplerate of 0.5 Hz, set
integer. For example, if you want to use a sample frequency of 0.5 Hz, set
the samplefrequency to 5 Hz and the datarecord duration to 10 seconds.
Do not use this function, except when absolutely necessary!
"""
warnings.warn('Forcing a specific record_duration might alter calculated sample_frequencies when reading the file')
self._enforce_record_duration = True
self.record_duration = record_duration
self.update_header()
Expand Down Expand Up @@ -837,12 +932,6 @@ def writeSamples(self, data_list: Union[List[np.ndarray], np.ndarray], digital:

All parameters must be already written into the bdf/edf-file.
"""
there_are_blank_sample_frequencies = any([channel.get('sample_frequency') is None
for channel in self.channels])
if there_are_blank_sample_frequencies:
warnings.warn("The 'sample_rate' parameter is deprecated. Please use "
"'sample_frequency' instead.", DeprecationWarning)

if (len(data_list)) == 0:
raise WrongInputSize('Data list is empty')
if (len(data_list) != len(self.channels)):
Expand Down Expand Up @@ -949,8 +1038,7 @@ def get_smp_per_record(self, ch_idx: int) -> int:
gets the calculated number of samples that need to be fit into one
record (i.e. window/block of data) with the given record duration.
"""
fs = self._get_sample_frequency(ch_idx)
if fs is None: return None
fs = self.channels[ch_idx]['sample_frequency']

record_duration = self.record_duration
smp_per_record = fs*record_duration
Expand All @@ -960,43 +1048,3 @@ def get_smp_per_record(self, ch_idx: int) -> int:
f'smp_per_record={smp_per_record}, record_duration={record_duration} seconds,' +
f'calculated sample_frequency will be {np.round(smp_per_record)/record_duration}')
return int(np.round(smp_per_record))


def _calculate_optimal_record_duration(self) -> None:
"""
calculate optimal denominator (record duration in seconds)
for all sample frequencies such that smp_per_record is an integer
for all channels.

If all sampling frequencies are integers, this will simply be 1.
"""
if self._enforce_record_duration: return
allint = lambda int_list: all([n==int(n) for n in int_list])
all_fs = [self._get_sample_frequency(i) for i,_ in enumerate(self.channels)]

# calculate the optimal record duration to represent all frequencies.
# this is achieved when fs*duration=int, i.e. the number of samples
# in one data record can be represented by an int (smp_per_record)
# if all sampling frequencies are ints, this will be simply 1
# for now this brute force solution should cover 99% of cases.
# TODO: optimize this process

record_duration = 0
for i in range(1, 60):
if allint([x*i for x in all_fs]):
record_duration = i
break
assert record_duration>0, f'cannot accurately represent sampling frequencies with data record durations between 1-60s: {all_fs}'
assert record_duration<=60, 'record duration must be below 60 seconds'
self.record_duration = record_duration

def _get_sample_frequency(self, channelIndex: int) -> Union[int, float]:
# Temporary conditional assignment while we deprecate 'sample_rate' as a channel attribute
# in favor of 'sample_frequency', supporting the use of either to give
# users time to switch to the new interface.
if 'sample_rate' in self.channels[channelIndex]:
warnings.warn("`sample_rate` is deprecated and will be removed in a future release. \
Please use `sample_frequency` instead", DeprecationWarning)
return (self.channels[channelIndex]['sample_rate']
if self.channels[channelIndex].get('sample_frequency') is None
else self.channels[channelIndex]['sample_frequency'])
27 changes: 7 additions & 20 deletions pyedflib/highlevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,6 @@
# from . import EdfReader


def _get_sample_frequency(signal_header: dict) -> int:
# Temporary conditional assignment while we deprecate 'sample_rate' as a channel attribute
# in favor of 'sample_frequency', supporting the use of either to give
# users time to switch to the new interface.
return (signal_header['sample_rate']
if signal_header.get('sample_frequency') is None
else signal_header['sample_frequency'])


def tqdm(iterable: Iterable, *args: Any, **kwargs: Any) -> Iterable:
"""
These is an optional dependency that shows a progress bar for some
Expand Down Expand Up @@ -236,8 +227,7 @@ def make_header(
def make_signal_header(
label: str,
dimension: str = 'uV',
sample_rate: Union[int, float] = 256,
sample_frequency: Optional[Union[int, float]] = None,
sample_frequency: Optional[Union[int, float]] = 256,
physical_min: float = -200,
physical_max: float = 200,
digital_min: Union[float, int] = -32768,
Expand All @@ -257,8 +247,6 @@ def make_signal_header(
the name of the channel.
dimension : str, optional
dimension, eg mV. The default is 'uV'.
sample_rate : int, optional
sampling frequency. The default is 256. Deprecated: use 'sample_frequency' instead.
sample_frequency : int, optional
sampling frequency. The default is 256.
physical_min : float, optional
Expand All @@ -283,7 +271,6 @@ def make_signal_header(

signal_header = {'label': label,
'dimension': dimension,
'sample_rate': sample_rate,
'sample_frequency': sample_frequency,
'physical_min': physical_min,
'physical_max': physical_max,
Expand All @@ -297,8 +284,7 @@ def make_signal_header(
def make_signal_headers(
list_of_labels: List[str],
dimension: str = 'uV',
sample_rate: int = 256,
sample_frequency: Optional[Union[int, float]] = None,
sample_frequency: Optional[Union[int, float]] = 256,
physical_min: float = -200.0,
physical_max: float = 200.0,
digital_min: Union[float,int] = -32768,
Expand All @@ -316,8 +302,6 @@ def make_signal_headers(
A list with labels for each channel.
dimension : str, optional
dimension, eg mV. The default is 'uV'.
sample_rate : int, optional
sampling frequency. The default is 256. Deprecated: use 'sample_frequency' instead.
sample_frequency : int, optional
sampling frequency. The default is 256.
physical_min : float, optional
Expand All @@ -341,7 +325,7 @@ def make_signal_headers(
"""
signal_headers = []
for label in list_of_labels:
header = make_signal_header(label, dimension=dimension, sample_rate=sample_rate,
header = make_signal_header(label, dimension=dimension,
sample_frequency=sample_frequency,
physical_min=physical_min, physical_max=physical_max,
digital_min=digital_min, digital_max=digital_max,
Expand Down Expand Up @@ -435,7 +419,7 @@ def read_edf(
signals.append(signal)

# we can only return a np.array if all signals have the same samplefreq
sfreqs = [_get_sample_frequency(shead) for shead in signal_headers]
sfreqs = [shead['sample_frequency'] for shead in signal_headers]
all_sfreq_same = sfreqs[1:]==sfreqs[:-1]
if all_sfreq_same:
dtype = np.int32 if digital else float
Expand Down Expand Up @@ -518,6 +502,9 @@ def write_edf(

# check dmin, dmax and pmin, pmax dont exceed signal min/max
for sig, shead in zip(signals, signal_headers):
if 'sample_rate' in shead:
raise FutureWarning('Use of `sample_rate` is deprecated, use `sample_frequency` instead')

dmin, dmax = shead['digital_min'], shead['digital_max']
pmin, pmax = shead['physical_min'], shead['physical_max']
label = shead['label']
Expand Down
Loading
Loading