saftig.filtering¶
Implementations of filtering techniques with a common interface.
Submodules¶
Attributes¶
Classes¶
common interface definition for Filter implementations |
|
Satic Wiener filter implementation |
|
Updating Wiener filter implementation |
|
LMS filter implementation |
|
Experimental non-linear LMS-like filter implementation |
|
LMS filter implementation in C (faster but harder to adjust) |
Package Contents¶
- class saftig.filtering.FilterBase(n_filter, idx_target, n_channel=1)¶
common interface definition for Filter implementations
- Parameters:
n_filter (int) – Length of the FIR filter (how many samples are in the input window per output sample)
idx_target (int) – Position of the prediction
n_channel (int) – Number of witness sensor channels
- requires_apply_target: bool¶
- n_filter: int¶
- n_channel: int¶
- idx_target: int¶
- filter_name: str¶
- static supports_saving_loading()¶
Indicates whether saving and loading is supported Due to the way dataclasses work with inheritance, class values with default values don’t work in the parent dataclass. Thus, this is a function
- abstract condition(witness, target)¶
Use an input dataset to condition the filter
- Parameters:
witness (collections.abc.Sequence | collections.abc.Sequence[collections.abc.Sequence]) – Witness sensor data
target (collections.abc.Sequence) – Target sensor data
- Return type:
None
- abstract apply(witness, target, pad=True, update_state=False)¶
Apply the filter to input data
- Parameters:
witness (collections.abc.Sequence | numpy.typing.NDArray) – Witness sensor data (1D or 2D array)
target (collections.abc.Sequence | numpy.typing.NDArray) – Target sensor data (1D array)
pad (bool) – if True, apply padding zeros so that the length matches the target signal
update_state (bool) – if True, the filter state will be changed. If false, the filter state will remain
- Returns:
prediction
- Return type:
numpy.typing.NDArray
- check_data_dimensions(witness, target=None)¶
Check the dimensions of the provided input data and apply make_2d_array()
- Parameters:
witness (collections.abc.Sequence | numpy.typing.NDArray) – Witness sensor data
target (Optional[collections.abc.Sequence | numpy.typing.NDArray]) – Target sensor data
- Returns:
data as (target, witness)
- Raises:
AssertionError
- Return type:
tuple[numpy.typing.NDArray, numpy.typing.NDArray]
- as_dict()¶
Returns a dictionary that represents the state of this filter.
- Return type:
dict
- classmethod from_dict(input_dict)¶
Create a filter instance from a dictionary that was created from as_dict()
- Return type:
FilterTypeT
- classmethod _make_filename(filename)¶
- save(filename, warn_incompatible=False)¶
Save the filter state as a numpy file
The given filename will be autocompleted with a “.<filter_name>.npz” filename extension, unless a matching extension is detected.
- warn_incompatible: set to True to warn for object types might not
compatible with np.save(allow_pickle=False) during developement
- classmethod load(filename)¶
Load a filter state from the supplied filename.
The given filename will be autocompleted with a “.<filter_name>.npz” filename extension, unless a matching extension is detected.
- Return type:
FilterTypeT
- class saftig.filtering.WienerFilter(n_filter, idx_target, n_channel=1)¶
Bases:
saftig.filtering.common.FilterBaseSatic Wiener filter implementation
- Parameters:
n_filter (int) – Length of the FIR filter (how many samples are in the input window per output sample)
idx_target (int) – Position of the prediction
n_channel (int) – Number of witness sensor channels
>>> import saftig as sg >>> n_filter = 128 >>> witness, target = sg.evaluation.TestDataGenerator(0.1).generate(int(1e5)) >>> filt = sg.filtering.WienerFilter(n_filter, 0, 1) >>> _coefficients, full_rank = filt.condition(witness, target) >>> full_rank True >>> prediction = filt.apply(witness, target) # check on the data used for conditioning >>> residual_rms = sg.evaluation.rms(target-prediction) >>> residual_rms > 0.05 and residual_rms < 0.15 # the expected RMS in this test scenario is 0.1 True
- filter_state: numpy.typing.NDArray | None = None¶
- filter_name: str = 'WF'¶
- requires_apply_target = False¶
- condition(witness, target)¶
Use an input dataset to condition the filter
- Parameters:
witness (collections.abc.Sequence) – Witness sensor data
target (collections.abc.Sequence) – Target sensor data
- apply(witness, target=None, pad=True, update_state=False)¶
Apply the filter to input data
- Parameters:
witness (collections.abc.Sequence | numpy.typing.NDArray) – Witness sensor data
target (Optional[collections.abc.Sequence | numpy.typing.NDArray]) – Target sensor data (is ignored)
pad (bool) – if True, apply padding zeros so that the length matches the target signal
update_state (bool) – ignored
- Returns:
prediction
- Return type:
numpy.typing.NDArray
- class saftig.filtering.UpdatingWienerFilter(n_filter, idx_target, n_channel=1, context_pre=0, context_post=0)¶
Bases:
saftig.filtering.common.FilterBaseUpdating Wiener filter implementation
- Parameters:
n_filter (int) – Length of the FIR filter (how many samples are in the input window per output sample)
idx_target (int) – Position of the prediction
n_channel (int) – Number of witness sensor channels
context_pre (int) – how many additional samples before the current block are used to update the filters
context_post (int) – how many additional samples after the current block are used to update the filters
>>> import saftig as sg >>> n_filter = 128 >>> witness, target = sg.evaluation.TestDataGenerator(0.1).generate(int(1e5)) >>> filt = sg.filtering.UpdatingWienerFilter(n_filter, 0, 1, context_pre=20*n_filter, context_post=20*n_filter) >>> prediction = filt.apply(witness, target) # check on the data used for conditioning >>> residual_rms = sg.evaluation.rms(target-prediction) >>> residual_rms > 0.05 and residual_rms < 0.15 # the expected RMS in this test scenario is 0.1 True
- context_pre: int¶
- context_post: int¶
- filter_name: str = 'UWF'¶
- filter_state: numpy.typing.NDArray | None = None¶
- static supports_saving_loading()¶
Indicates whether saving and loading is supported.
- condition(witness, target, hide_warning=False)¶
Placeholder for compatibility to other filters; does nothing!
- Parameters:
witness (collections.abc.Sequence | numpy.typing.NDArray) –
target (collections.abc.Sequence | numpy.typing.NDArray) –
hide_warning (bool) –
- Return type:
None
- apply(witness, target, pad=True, update_state=False)¶
Apply the filter to input data
- Parameters:
witness (collections.abc.Sequence | numpy.typing.NDArray) – Witness sensor data
target (collections.abc.Sequence | numpy.typing.NDArray) – Target sensor data (is ignored)
pad (bool) – if True, apply padding zeros so that the length matches the target signal
update_state (bool) – ignored
- Returns:
prediction, bool indicating if all WF updates had full rank
- Return type:
numpy.typing.NDArray
- class saftig.filtering.LMSFilter(n_filter, idx_target, n_channel=1, normalized=True, step_scale=0.1, coefficient_clipping=np.nan)¶
Bases:
saftig.filtering.common.FilterBaseLMS filter implementation
- Parameters:
n_filter (int) – Length of the FIR filter (how many samples are in the input window per output sample)
idx_target (int) – Position of the prediction
n_channel (int) – Number of witness sensor channels
normalized (bool) – if True: NLMS, else LMS
coefficient_clipping (float) – If set to a positive float, FIR filter coefficients will be limited to this value. This can increase filter stability.
step_scale (float) – the learning rate of the LMS filter
>>> import saftig as sg >>> n_filter = 128 >>> witness, target = sg.evaluation.TestDataGenerator(0.1).generate(int(1e5)) >>> filt = sg.filtering.LMSFilter(n_filter, 0, 1) >>> filt.condition(witness, target) >>> prediction = filt.apply(witness, target) # check on the data used for conditioning >>> residual_rms = sg.evaluation.rms(target-prediction) >>> residual_rms > 0.05 and residual_rms < 0.15 # the expected RMS in this test scenario is 0.1 True
- filter_state: numpy.typing.NDArray¶
- normalized: bool¶
- step_scale: float¶
- coefficient_clipping: float¶
- filter_name: str = 'LMS'¶
- reset()¶
reset the filter coefficients to zero
- condition(witness, target)¶
Use an input dataset to condition the filter
- Parameters:
witness (collections.abc.Sequence | numpy.typing.NDArray) – Witness sensor data
target (collections.abc.Sequence | numpy.typing.NDArray) – Target sensor data
- apply(witness, target, pad=True, update_state=False)¶
Apply the filter to input data
- Parameters:
witness (collections.abc.Sequence | numpy.typing.NDArray) – Witness sensor data
target (collections.abc.Sequence | numpy.typing.NDArray) – Target sensor data (is ignored)
pad (bool) – if True, apply padding zeros so that the length matches the target signal
update_state (bool) – if True, the filter state will be changed. If false, the filter state will remain
- Returns:
prediction
- Return type:
numpy.typing.NDArray
- class saftig.filtering.PolynomialLMSFilter(n_filter, idx_target, n_channel=1, normalized=True, step_scale=0.5, coefficient_clipping=np.nan, order=1)¶
Bases:
saftig.filtering.common.FilterBaseExperimental non-linear LMS-like filter implementation Implements: \(x[n] = \sum_p\sum_i\sum_t {w_i[n-t]}^pH_{it}\) where p is the polynomial order, i the channel and t the index within the filter
- Parameters:
n_filter (int) – Length of the FIR filter (how many samples are in the input window per output sample)
idx_target (int) – Position of the prediction
n_channel (int) – Number of witness sensor channels
normalized (bool) – If True: NLMS, else LMS
step_scale (float) – The learning rate of the LMS filter
coefficient_clipping (float) – If set to a positive float, FIR filter coefficients will be limited to this value. This can increase filter stability.
order (int) – Polynomial order of the filter
>>> import saftig as sg >>> n_filter = 128 >>> witness, target = sg.evaluation.TestDataGenerator(0.1).generate(int(1e5)) >>> filt = sg.filtering.PolynomialLMSFilter(n_filter, 0, 1, step_scale=0.1, order=2, coefficient_clipping=4) >>> filt.condition(witness, target) >>> prediction = filt.apply(witness, target) # check on the data used for conditioning >>> residual_rms = sg.evaluation.rms((target-prediction)[1000:]) >>> residual_rms > 0.05 and residual_rms < 0.15 # the expected RMS in this test scenario is 0.1 True
- filter_state: numpy.typing.NDArray¶
- normalized: bool¶
- step_scale: float¶
- coefficient_clipping: float¶
- order: int¶
- filter_name: str = 'PolyLMS'¶
- reset()¶
reset the filter coefficients to zero
- condition(witness, target)¶
Use an input dataset to condition the filter
- Parameters:
witness (collections.abc.Sequence | numpy.typing.NDArray) – Witness sensor data
target (collections.abc.Sequence | numpy.typing.NDArray) – Target sensor data
- apply(witness, target, pad=True, update_state=False)¶
Apply the filter to input data
- Parameters:
witness (collections.abc.Sequence | numpy.typing.NDArray) – Witness sensor data
target (collections.abc.Sequence | numpy.typing.NDArray) – Target sensor data (is ignored)
pad (bool) – if True, apply padding zeros so that the length matches the target signal
update_state (bool) – if True, the filter state will be changed. If false, the filter state will remain
- Returns:
prediction
- Return type:
numpy.typing.NDArray
- class saftig.filtering.LMSFilterC(n_filter, idx_target, n_channel, step_scale=0.1, normalized=True, coefficient_clipping=None)¶
Bases:
saftig.filtering.common.FilterBaseLMS filter implementation in C (faster but harder to adjust)
- Parameters:
n_filter (int) – Length of the FIR filter (how many samples are in the input window per output sample)
idx_target (int) – Position of the prediction
n_channel (int) – Number of witness sensor channels
normalized (bool) – if True: NLMS, else LMS
step_scale (float) – the learning rate of the LMS filter
coefficient_clipping (float | None) –
>>> import saftig as sg >>> n_filter = 128 >>> witness, target = sg.evaluation.TestDataGenerator(0.1).generate(int(1e5)) >>> filt = sg.filtering.LMSFilterC(n_filter, 0, 1) >>> filt.condition(witness, target) >>> prediction = filt.apply(witness, target) # check on the data used for conditioning >>> residual_rms = sg.evaluation.rms(target-prediction) >>> residual_rms > 0.05 and residual_rms < 0.15 # the expected RMS in this test scenario is 0.1 True
- filter_name: str = 'LMS_C'¶
- filter¶
- static supports_saving_loading()¶
Indicates whether saving and loading is supported.
- reset()¶
reset the filter coefficients to zero
- Return type:
None
- condition(witness, target)¶
Use an input dataset to condition the filter
- Parameters:
witness (collections.abc.Sequence[float] | collections.abc.Sequence[collections.abc.Sequence[float]]) – Witness sensor data
target (collections.abc.Sequence[float]) – Target sensor data
- apply(witness, target, pad=True, update_state=False)¶
Apply the filter to input data
- Parameters:
witness (collections.abc.Sequence | numpy.typing.NDArray) – Witness sensor data
target (collections.abc.Sequence | numpy.typing.NDArray) – Target sensor data (is ignored)
pad (bool) – if True, apply padding zeros so that the length matches the target signal
update_state (bool) –
- Returns:
prediction
- Return type:
numpy.typing.NDArray
- saftig.filtering.all_filters¶