Source code for pylawr.transform.temporal.extrapolation
#!/bin/env python
# -*- coding: utf-8 -*-
# system modules
import logging
# internal modules
from pylawr.transform.transformer import Transformer
from pylawr.transform.memorymixin import MemoryMixin
from pylawr.utilities import log_decorator
from pylawr.field import tag_array, get_verified_grid
from pylawr.grid.cartesian import CartesianGrid
# external modules
import xarray as xr
import numpy as np
from skimage.feature import match_template
logger = logging.getLogger(__name__)
TAG_EXTRAPOLATION = "extrapolated"
"""
Tag to indicate that an extrapolation was applied
"""
[docs]class Extrapolator(Transformer, MemoryMixin):
"""
This class can be used to extrapolate a field based on two previous fields
using template matching. The template matching finds similar areas between
two fields. Based on the distance between similar pixels a vector of pixel
movement between to time steps is calculated. With the vector the current
field can be shifted to a field of a following time step.
Attributes
----------
vector : tuple, float
fitted movement vector in meters per second
time : ``numpy.datetime64``
timestep of fitted vector
cut_percentage : float
Offset defines cut of range for old data.
correlation_threshold : float
Minimal threshold of maximal correlation of match template between
data arrays. Is the correlation lower than threshold, the vector of
movement is set to zero.
max_timediff : int
Maximal difference between time steps until warning.
"""
def __init__(self, cut_percentage=0.15, correlation_threshold=0.5,
max_timediff=60.*10):
super().__init__()
self._time = None
self._vector = None
self.cut_percentage = cut_percentage
self.correlation_threshold = correlation_threshold
self.max_timediff = max_timediff
self.direction = ['y', 'x']
self.time = None
self.vector = None
self._trainable_vars = ('vector', 'time')
@property
def fitted(self):
return self.time is not None
@property
def time(self):
return self._time
@time.setter
def time(self, new_time):
"""
... sets timestep of fitted vector with new time. If `new_time` is
`None` `time` resets on default.
Parameters
----------
new_time: ``numpy.datetime64`` or None
new timestep of fitted vector
"""
if new_time is None:
self._time = new_time
else:
self._time = np.asarray(new_time, dtype="datetime64[ns]")
@property
def vector(self):
return self._vector
@vector.setter
def vector(self, new_vector):
"""
... sets vector with new vector. If `new_vector` is
`None` `vector` resets on default.
Parameters
----------
new_vector: tuple, int or None
new fitted movement vector
"""
self._vector = new_vector
[docs] def _determine_timedelta(self, time, time_pre):
"""
Get the timedelta between two data arrays.
Parameters
----------
time : ``numpy.datetime64``
actual timestep.
time_pre : ``numpy.datetime64``
previous timestep.
Returns
-------
timedelta : ``numpy.timedelat64`` (seconds)
Timedelta in seconds.
"""
timedelta = (time
- time_pre
) / np.timedelta64(1, 's')
logger.debug("Timedelta is {0:f} s".format(timedelta))
if timedelta > self.max_timediff:
logger.warning("Timedelta exceeds {0:f} minutes".format(
self.max_timediff / 60.))
return timedelta
[docs] def calc_match_matrix(self, array, array_pre):
"""
Calculates the correlation matrix based on match template between two
arrays
Parameters
----------
array : :py:class:`xarray.DataArray`
Data of analogous actual timestep.
array_pre : :py:class:`xarray.DataArray`
Data of analogous previous timestep.
Returns
-------
:py:class:`numpy.ndarray` (float)
Correlation matrix
"""
# cut_values
cv = [int(round(self.cut_percentage * array_pre.values[0].shape[0])),
int(round(self.cut_percentage * array_pre.values[0].shape[1]))]
# match template based on correlation, nan must be replaced
match_mat = match_template(np.nan_to_num(array.values[0]),
np.nan_to_num(array_pre.values[0][
cv[0]:-cv[0],
cv[1]:-cv[1]]
)
)
return match_mat
[docs] @staticmethod
def _check_grids(grid_now, grid_pre):
"""
Checks if the grids of the arrays are equal and
child of :py:class:`pylawr.grid.cartesian.CartesianGrid`
Parameters
----------
grid_now : child of :py:class:`~pylawr.grid.base.BaseGrid`
grid of data of actual timestep
grid_pre : child of :py:class:`~pylawr.grid.base.BaseGrid`
grid of data of previous timestep
"""
if not isinstance(grid_now, CartesianGrid):
raise TypeError(
'Grid of actual timestep is not a cartesian grid.'
)
if not isinstance(grid_pre, CartesianGrid):
raise TypeError(
'Grid of previous timestep is not a cartesian grid.'
)
if not grid_now == grid_pre:
raise ValueError(
'The grids have different attributes.'
)
[docs] @log_decorator(logger)
def fit(self, array, array_pre, grid=None, *args, **kwargs):
"""
Fits a vector of movement between to arrays, of e.g. reflectivity,
based on match template.
Parameters
----------
array : :py:class:`xarray.DataArray`
Data of analogous actual timestep.
array_pre : :py:class:`xarray.DataArray`
Data of analogous previous timestep.
grid : :py:class:`pylawr.grid.cartesian.CartesianGrid` or None, optional
To fit the arrays they need to have the same grid specifications. If
no grid is given, the grids from given arrays are used.
"""
if grid is None:
grid = get_verified_grid(array)
self._check_grids(grid, get_verified_grid(array_pre))
elif not isinstance(grid, CartesianGrid):
raise TypeError(
'Given grid is not a CartesianGrid'
)
else:
array.lawr.check_grid(grid)
array_pre.lawr.check_grid(grid)
match_mat = self.calc_match_matrix(array, array_pre)
# It was assumed
# (... if the max_correlation is < 0.3 there is no rain)
# ... if the max_correlation is < 0.5 the speed is set to zero
if np.max(match_mat) >= self.correlation_threshold:
# calculate the movement based on the match-result
ij = np.unravel_index(np.argmax(match_mat),
match_mat.shape)
x, y = ij[::-1]
dist_x = np.around((x - np.array(match_mat.shape[1]) * .5 + .5)
).astype(int)
dist_y = np.around((y - np.array(match_mat.shape[0]) * .5 + .5)
).astype(int)
dist = np.array([dist_y, dist_x])
timedelta = self._determine_timedelta(self._get_time(array),
self._get_time(array_pre))
if timedelta == 0.:
logger.debug(
"The timedelta is zero. You fit the extrapolator to "
"the same timesteps".format(self.correlation_threshold)
)
self.vector = np.array([0, 0])
else:
self.vector = grid.resolution * dist / timedelta
else:
logger.debug(
"The correlation is under {0:.2f}. The speed is set to "
"zero!".format(self.correlation_threshold)
)
self.vector = np.array([0, 0])
logger.info('Fitted extrapolator with movement vector {0:f} m in x '
'and {1:f} m in y'.format(self.vector[1],
self.vector[0]))
self.time = array.time.values
[docs] @log_decorator(logger)
def transform(self, array, grid=None, time=None,
*args, **kwargs):
"""
Extrapolate the given given ``array`` with a cartesian ``grid``
according to the fitted movement vector.
Parameters
----------
array: :py:class:`xarray.DataArray`
The array to operate on
grid : child of :py:class:`pylawr.grid.base.BaseGrid` or None
To extrapolate the array needs to have a
:py:class:`pylawr.grid.cartesian.CartesianGrid`.
time : ``numpy.datetime64``
next time step to extrapolate to
args: sequence
Further positional arguments
kwargs: dict
Further keyword arguments
Returns
-------
:py:class:`xarray.DataArray`
The transformed array
"""
if grid is None:
grid = get_verified_grid(array)
timedelta_new = self._determine_timedelta(time,
self._get_time(array))
pixel_move = np.around(self.vector
/ grid.resolution
* timedelta_new).astype(int)
# expand field with nans and roll
field_new = np.pad(array.values,
((0, 0),
(abs(pixel_move[0]), abs(pixel_move[0])),
(abs(pixel_move[1]), abs(pixel_move[1]))),
'constant', constant_values=np.nan)
field_new = np.roll(field_new, pixel_move[0], axis=1)
field_new = np.roll(field_new, pixel_move[1], axis=2)
y_slice = slice(
abs(pixel_move[0]), (field_new.shape[-2]-abs(pixel_move[0]))
)
x_slice = slice(
abs(pixel_move[1]), (field_new.shape[-1]-abs(pixel_move[1]))
)
field_new = field_new[..., y_slice, x_slice]
# transform new array
transformed = array.copy()
transformed.values = field_new
transformed['time'] = [time]
transformed = transformed.lawr.set_grid_coordinates(grid)
tag_array(transformed, TAG_EXTRAPOLATION)
logger.info('Extrapolated reflectivity - '
'moved pixels '
'{0:d} in x and {1:d} in y'.format(pixel_move[1],
pixel_move[0]))
return transformed
[docs] def to_xarray(self):
"""
Serialize this filter's parameters to an :py:class:`xarray.Dataset`
Returns
-------
:py:class:`xarray.Dataset`
the filter's parameters as dataset
"""
data_vars = {
'cut_percentage': {
'dims': (),
'data': self.cut_percentage,
'attrs': {}
},
'correlation_threshold': {
'dims': (),
'data': self.correlation_threshold,
'attrs': {}
},
'max_timediff': {
'dims': (),
'data': self.max_timediff,
'attrs': {'unit': 's'}
},
'time': {
'dims': ('time', ),
'data': self.time,
'attrs': {}
},
'direction': {
'dims': ('direction', ),
'data': self.direction,
'attrs': {}
},
'vector': {
'dims': ('direction', ),
'data': self.vector,
'attrs': {
'unit': 'm/s',
'long_name': 'Movement vector'
}
}
}
ds = xr.Dataset.from_dict(data_vars)
ds.attrs["type"] = self.__class__.__name__
return ds