Source code for pylawr.datahandler.lawr
#!/bin/env python
# -*- coding: utf-8 -*-
# System modules
import logging
import io
import datetime
import pytz
# External modules
import numpy as np
import xarray as xr
# Internal modules
from pylawr.datahandler.base import DataHandler
from pylawr.utilities.conventions import naming_convention
logger = logging.getLogger(__name__)
[docs]class LawrHandler(DataHandler):
"""
The LawrHandler is constructed to read in LawrText files.
Parameters
----------
fh : filelike object
The data is read from this filelike object.
"""
def __init__(self, fh):
super().__init__(fh=fh)
self._header = None
self._file = None
@property
def file(self):
if self._file is None:
data = self._fh.read()
if isinstance(data, bytes):
data = data.decode('utf-8')
self._file = io.StringIO(data)
try:
self._fh.seek(0)
except io.UnsupportedOperation:
# Seek is not supported with given file handler
pass
return self._file
@property
def header(self):
"""
Get the decoded header of the file as dict.
"""
if self._header is None:
self.file.seek(0)
raw_header = self.file.readline()
self._header = self._decode_header(raw_header)
self.file.seek(0)
return self._header
[docs] def _get_available_dates(self):
"""
Get a list of the available dates as datetime.datetime.
"""
return [self.header['datetime'], ]
[docs] def _decode_header(self, header):
"""
Decode the given header line into a lawr header.
Parameters
----------
header : str
This header line is decoded.
Returns
-------
decoded_header : dict(str, value)
The decoded header dict.
"""
prepared_header = self._prepare_header(header)
decoded_header = dict(
radartype=prepared_header[0],
datetime=self._decode_datetime(*prepared_header[1:3]),
datestr=prepared_header[1],
timezone=prepared_header[2])
decoded_header.update(self._decode_keyval(prepared_header[3:]))
return decoded_header
[docs] @staticmethod
def _prepare_header(header):
"""
Clean the header string and split the header string.
Parameters
----------
header : str
This header is cleaned and splitted.
Returns
-------
prepared_header : list
The cleaned and splitted substrings of the header.
"""
prepared_header = header.split()
return prepared_header
[docs] @staticmethod
def _decode_datetime(dt_str, tz_str=None):
"""
Decode a given datetime string to a datetime object in UTC.
Parameters
----------
dt_str : str
This datetime string is decoded and transformed to a datetime
object. The format should be %y%m%d%H%M%S.
tz_str : str or None, optional
The timezone information string. The timezone is converted to pytz
timezone. If the timezone is None, the timezone will not be set.
Default is None.
Returns
-------
decoded_dt : datetime.datetime
The decoded datetime object. If a timzone string was given, the
object will be localized and returned in UTC without
timezone information.
"""
decoded_dt = datetime.datetime.strptime(dt_str, '%y%m%d%H%M%S')
if tz_str:
tzinfo = pytz.timezone(tz_str)
decoded_dt = tzinfo.localize(decoded_dt)
decoded_dt = decoded_dt.astimezone(pytz.utc)
decoded_dt = decoded_dt.replace(tzinfo=None)
return decoded_dt
[docs] @staticmethod
def _decode_keyval(header_list):
"""
A key value dict will be created by given list. Equal signs are
searched, the list value before the sign is used as key and the value
after the sign is used as value.
Parameters
----------
header_list : list(str)
The equal sign are searched within this list.
Returns
-------
decoded_keyval : dict(str, float)
The decoded key-value dictionary. If the dict is empty there was
no key value pair found.
"""
decoded_keyval = {
header_list[k-1]: float(header_list[k+1])
for k, sub in enumerate(header_list)
if sub == '=' and k > 0 and k+1 < len(header_list)
}
return decoded_keyval
[docs] def _decode_data(self, data_lines):
"""
Decode the given data lines into a numpy array.
Parameters
----------
data_lines : list(str)
The data lines which should be decoded.
Returns
-------
decoded_array : numpy.ndarray
The decoded data array. The first dimension are the azimuth angles,
the second dimension is the range.
"""
decoded_lines = [self._prepare_dataline(l)[1:]
for l in data_lines if l.startswith('ppw')]
decoded_array = np.array(decoded_lines, dtype=np.float32)
return decoded_array
[docs] @staticmethod
def _prepare_dataline(data_line):
"""
Clean and split the given data line.
Parameters
----------
data_line : str
This data line is cleaned and split.
Returns
-------
prepared_line : list
The cleaned and split data line.
"""
prepared_line = data_line.split()
return prepared_line
[docs] def get_reflectivity(self):
"""
Get the data from the file handler.
Returns
-------
radar_field : xr.DataArray
The decoded radar field.
"""
self.file.seek(0)
raw_header = self.file.readline()
raw_data = self.file.readlines()
try:
self._header = decoded_header = self._decode_header(raw_header)
decoded_data = self._decode_data(raw_data)
azi_len, range_len = decoded_data.shape
except ValueError:
raise ValueError('The given file is corrupted and couldn\'t '
'be decoded!')
if azi_len != decoded_header['n_p']:
missing_angles = int(decoded_header['n_p'] - azi_len)
raise ValueError(
'There are {0:d} missing azimuth angles within the '
'file!'.format(missing_angles)
)
# Decoded header is already UTC
utc_date = decoded_header['datetime'].replace(tzinfo=None)
coordinates = {
'time': (('time',),
np.array((utc_date, ), dtype='datetime64[ns]'),
naming_convention['time']),
'azimuth': (('azimuth',),
np.arange(azi_len),
naming_convention['azimuth']),
'range': (('range',),
np.arange(range_len),
naming_convention['range']),
}
attrs = dict(unit='dBZ')
attrs.update({k: decoded_header[k] for k in decoded_header.keys()
if k != 'datetime'})
radar_field = xr.DataArray(
data=decoded_data[np.newaxis, ...],
coords=coordinates,
dims=['time', 'azimuth', 'range'],
attrs=attrs,
)
radar_field = radar_field.lawr.set_variable('dbz')
self.file.seek(0)
return radar_field