#!/bin/env python
# -*- coding: utf-8 -*-
# system modules
import datetime
import logging
import inspect
# internal modules
from pylawr.transform.transformer import Transformer
from pylawr.transform.memorymixin import MemoryMixin
from pylawr.field import tag_array
from pylawr.utilities import log_decorator
# external modules
import xarray as xr
import numpy as np
from pandas import to_datetime
logger = logging.getLogger(__name__)
TAG_NEG_LIN_REFL_REPL = "neg-lin-refl-repl"
"""
Tag to indicate that negative linear reflectivities were replaced with
something.
"""
TAG_NOISE_FILTER = "noise-filtered"
"""
Tag to indicate that a noise filter was applied
"""
[docs]class NoiseRemover(Transformer, MemoryMixin):
r"""
Noise filter that dynamically determines an appropriate
spatially independent noise threshold to subtract from the reflectivity
field.
**Assumptions:**
- reflectivity data is the **raw measurement**, i.e. no range dependency
correction etc. has happened yet
Attributes
----------
noise_percentile : float
percentile of all sorted values to use for determination of a noise
level. Defaults to ``10`` which means :math:`10\%`.
remembrance_time : float
time in seconds to do a moving average of older values. Defaults
to 300 seconds, 5 minutes.
max_noise_percentile_tendency : float
maximum relative tendency per time of the actual :any:`noise_percentile`
value. Defaults to :math:`\frac{20\%}{30s}`.
If a higher relative tendency occurs, the previous
:any:`noiselevel` will be used for filtering.
max_time_constant_noiselevel : float
maximum time in seconds where the :any:`noiselevel` can be constant
before it is reset to the default. Defaults to three hours.
noiselevel_constant_since : numpy.datetime64
the time where :any:`noiselevel` last changed
noiselevel_multiplier : float
the noiselevel is multiplied with this constant value. Defaults to 1.03.
times : numpy.ndarray(numpy.datetime64)
recorded threshold times
thresholds : numpy.ndarray
recorded thresholds
noiselevel : float
current noise level
"""
def __init__(
self,
noise_percentile=10,
max_noise_percentile_tendency=0.2/30,
max_time_constant_noiselevel=3*60*60,
noiselevel_multiplier=1.03,
remembrance_time=60*5,
noiselevel=8.0e-7
):
super().__init__()
self.noise_percentile = noise_percentile
self.max_noise_percentile_tendency = max_noise_percentile_tendency
self.max_time_constant_noiselevel = max_time_constant_noiselevel
self.noiselevel_multiplier = noiselevel_multiplier
self.remembrance_time = remembrance_time
self.default_noiselevel = noiselevel
self.times = None
self.thresholds = None
self._trainable_vars = ('thresholds', 'times')
# Properties
@property
def noiselevel(self):
return self.get_noiselevel()
@property
def times(self):
return self._times
@times.setter
def times(self, new_times):
"""
... sets recorded threshold times with new times. If `new_times` is
`None` `times` resets on default.
Parameters
----------
new_times: numpy.ndarray(numpy.datetime64)
new recorded threshold times
"""
if new_times is None:
self._times = np.array([], dtype="datetime64[ns]")
else:
self._times = np.asarray(new_times, dtype="datetime64[ns]")
@property
def thresholds(self):
return self._thresholds
@thresholds.setter
def thresholds(self, new_thresholds):
"""
... sets recorded thresholds. If `new_thresholds` is `None` `thresholds`
resets on default.
Parameters
----------
new_thresholds : numpy.ndarray(numpy.float)
new recorded thresholds
"""
if new_thresholds is None:
self._thresholds = np.array([], dtype="float")
else:
self._thresholds = np.asarray(new_thresholds, dtype=float)
@property
def noiselevel_constant_since(self):
"""
Get the index, where the noise levels changed the last time.
Returns
-------
time_obj : numpy.datetime64 or numpy.nan
The time of the last changed value within the thresholds. If the
index is np.nan, then no noise level was saved or the noise level
was saved without setting the appropriate time.
"""
try:
different = self.thresholds != self.thresholds[-1]
reversed_different = different[::-1]
ind = different.size - np.argmax(reversed_different)
ind = min(ind, different.size-1)
time_obj = self.times[ind]
except IndexError:
time_obj = datetime.datetime.now()
logger.debug(
'Determined {0:s} as last changed time'.format(str(time_obj))
)
return time_obj
@property
def xr_thresholds(self):
"""
Get the thresholds as xarray.DataArray.
Returns
-------
thres : xarray.DataArray
The thresholds as xarray.DataArray with times as axis.
"""
thres = xr.DataArray(
data=self.thresholds,
coords=dict(
times=self.times
),
dims=['times', ]
)
return thres
@property
def fitted(self):
return self.times.size > 0
# Methods
[docs] def to_xarray(self):
"""
Convert parameters to a dataset for easy serialisation
Returns
-------
xarray.Dataset
parameters as dataset
"""
# get argument names to __init__ function
var_keys = list(self._trainable_vars) + inspect.getfullargspec(
self.__init__).args
data_vars = {
n: {} for n in var_keys
if not n == "self"
}
# fill vars with data
for var, d in data_vars.items():
data_vars[var]["dims"] = ()
data_vars[var]["data"] = getattr(self, var)
data_vars[var]["attrs"] = {}
# set units by hand
data_vars["remembrance_time"]["attrs"]["unit"] = "s"
data_vars["noise_percentile"]["attrs"]["unit"] = "%"
data_vars["max_noise_percentile_tendency"]["attrs"]["unit"] = "1/s"
data_vars["max_time_constant_noiselevel"]["attrs"]["unit"] = "s"
data_vars["thresholds"]["attrs"]["unit"] = "Z"
data_vars["noiselevel"]["attrs"]["unit"] = "Z"
# set dimensions by hand
data_vars["times"]["dims"] = ("times", )
data_vars["thresholds"]["dims"] = ("times", )
# convert to xarray.Dataset
ds = xr.Dataset.from_dict(data_vars)
ds.attrs["type"] = self.__class__.__name__
return ds
[docs] def _get_remembrance_interval(self, time_obj):
"""
Calculate the remembrance interval based on given time object and set
remembrance time.
Parameters
----------
time_obj : `datetime-like`
The remembrance interval is calculated based on this time object.
Returns
-------
start_date : pandas.datetime
The start date of the remembrance interval. This is calculated
based on :math:`time_obj-remembrance_time`
end_date : pandas.datetime
The end date of the remembrance interval. Basically, this is the
given time object as pandas-datetime.
"""
end_date = to_datetime(np.datetime64(time_obj))
remembrance_delta = datetime.timedelta(
seconds=int(self.remembrance_time)
)
start_date = end_date - remembrance_delta
return start_date, end_date
[docs] def _determine_noiselevel(self, time_obj):
start_date, end_date = self._get_remembrance_interval(time_obj)
noiselevels = self.xr_thresholds.sel(times=slice(start_date, end_date))
noiselevel = self._get_mean_noise(noiselevels)
return noiselevel
[docs] @staticmethod
def _get_mean_noise(noiselevels):
"""
Calculate the mean noise level from given noise levels. At the moment
this is only the nan median of the noise levels. In the future it will
be possible to calculate the mean noise with a weighted mean.
Parameters
----------
noiselevels : :any: array-like
The mean from these noise levels is calculated.
Returns
-------
noise_level : float
The calculated noise level.
"""
return np.nanmedian(noiselevels)
[docs] def get_noiselevel(self, time_obj=None):
"""
Determine an appropriate noiselevel from the available data. If
``time`` is given, interpolate an appropriate noiselevel from
:any:`thresholds`. If that is impossible, use the value of
:any:`noiselevel`.
Parameters
----------
time_obj : datetime.datetime or numpy.datetime64, optional
The time used to determine the noiselevel. If None the
value of the last`noiselevel` is returned.
Returns
-------
float
the noiselevel to use
"""
if not self.fitted:
noiselevel = self.default_noiselevel
elif time_obj is None:
noiselevel = self._determine_noiselevel(self.times[-1])
else:
noiselevel = self._determine_noiselevel(time_obj)
return float(noiselevel)
[docs] def _is_rainy_field(self, array):
"""
Check if RadarField has more rain than a threshold. The threshold is
calculated as :math:`100 - noise_percentile`.
Parameters
----------
array : xarray.DataArray
This array is used to check if the field is rainy or not. The array
is converted into reflectivity and all reflectivity above 0 dBZ
will be set to rain.
Returns
-------
is_rainy : bool
If the rain fraction is larger than the threshold, this will be
True, else False.
"""
rainy_threshold = 100-self.noise_percentile
array_dbz = array.lawr.to_dbz()
rain_array = array_dbz > 0
rain_fraction = float(
np.mean(
rain_array
)
)
logger.debug("{:.2f}% of the dataarray suggests rain".format(
100*rain_fraction))
logger.debug(
"The 'a lot of rain'-threshold is {:.2f}".format(rainy_threshold)
)
is_rainy = rain_fraction * 100 > rainy_threshold
return is_rainy
[docs] def _get_old_noise_level(self, time_obj):
"""
Get the noise level from the old thresholds, which is the nearest to
the given time object.
Parameters
----------
time_obj : numpy.datetime64
The nearest noise level to this time object will be returned.
Returns
-------
old_noiselevel : float
The old_noiselevel is either determined from nearest old noise level
or if not fitted it is the default value.
"""
if self.fitted:
times_diff = self.times - time_obj
older_than_given = times_diff.astype(float) < 0
prev_ind = times_diff[older_than_given].argmax()
old_noiselevel = self.thresholds[prev_ind]
else:
old_noiselevel = self.noiselevel
return old_noiselevel
[docs] def _determine_noiselevel_from_array(self, array):
"""
Calculate the noise level from given array. At the moment, the noise
level is the `noise_percentile`-th percentile of the given array.
Parameters
----------
array : xarray.DataArray
This array is used to calculate the noise level.
Returns
-------
noiselevel : float
The calculated noise level, based on the given array.
"""
logger.debug(
'Using the {0:.2f} percentile as noise level'.format(
self.noise_percentile
)
)
noiselevel = float(
np.nanpercentile(
a=array,
q=self.noise_percentile,
interpolation="linear",
)
)
return noiselevel
[docs] def _save_noiselevel(self, noiselevel, time_obj):
"""
Save the given noiselevel and time object pair to the attributes of this
filter.
Parameters
----------
time_obj : numpy.datetime64
The valid time of the given noise level.
"""
logger.debug(
'Saving the noise level for time {0:s}'.format(str(time_obj))
)
self.times = np.append(self.times, time_obj)
self.thresholds = np.append(self.thresholds, noiselevel)
[docs] def _prune_thresholds(self, time_obj):
"""
Prune the thresholds based on the given time object and set remembrance
time.
Parameters
----------
time_obj : numpy.datetime64
This time object will be used as base time for the pruning process.
"""
remem_date, _ = self._get_remembrance_interval(time_obj)
pruned_thres = self.xr_thresholds.sel(times=slice(remem_date, None))
self.thresholds = pruned_thres.values
self.times = pruned_thres['times'].values
[docs] def _get_last_changed_time(self):
"""
Get the index, where the noise levels changed the last time.
Returns
-------
time_obj : numpy.datetime64 or numpy.nan
The time of the last changed value within the thresholds. If the
index is np.nan, then no noise level was saved or the noise level
was saved without setting the appropriate time.
"""
try:
different = self.thresholds != self.thresholds[-1]
reversed_different = different[::-1]
ind = different.size - np.argmax(reversed_different)
ind = min(ind, different.size-1)
time_obj = self.times[ind]
except IndexError:
time_obj = np.nan
logger.debug(
'Determined {0:s} as last changed time'.format(str(time_obj))
)
return time_obj
[docs] @log_decorator(logger)
def fit(self, array, time_obj=None):
"""
Given a reflectivity array, determine the :any:`noiselevel` for use in
:any:`transform`
Parameters
----------
array: array-like
The **linear** reflectivity :math:`Z`
time_obj : datetime.datetime or numpy.datetime64
The time ``array`` was recorded. Defaults to ``array.time`` if
available, otherwise :any:`datetime.datetime.now`.
"""
time_obj = self._get_time(array, time_obj)
first_guess = array.lawr.to_z()
field_wo_noise = self.transform(first_guess, time_obj=time_obj)
is_rainy = self._is_rainy_field(field_wo_noise)
if is_rainy:
logger.debug("Pretty much the whole dataarray suggests rain")
noiselevel = self._get_old_noise_level(time_obj)
else:
noiselevel = self._determine_noiselevel_from_array(first_guess)
logger.debug(
'Determined noiselevel for time {0:s} is: {1:e}'.format(
str(time_obj), noiselevel
)
)
self._save_noiselevel(noiselevel, time_obj)
self._prune_thresholds(time_obj)
constant_for_td = time_obj - self.noiselevel_constant_since
constant_for_s = int(constant_for_td)/1e9
logger.debug(
'The noise level is constant for: {0:.3f} s'.format(constant_for_s)
)
if constant_for_s > self.max_time_constant_noiselevel:
self.reset()