#!/usr/bin/python
import datetime, os, copy
import numpy as np
import pprint as pp
import re
import errno
import ast
import traceback
from functools import reduce
import logging
logger = logging.getLogger(__name__)
def ident(x):
return x
[docs]
def get_converter_array(string, **kwargs):
"""colletion of converters that works on arrays
combines time, range and varconverters (i see no conceptual separation here)
the maskconverter becomes relevant, if the order is no
time, range, whatever (as in mira spec)
chaining example:
```var_conversion = 'z2lin,extrfromaxis2(0)'```
Returns:
(varconverter, maskconverter) which both are functions
"""
if ',' in string:
converters = [get_converter_array(s, **kwargs) for s in string.split(',')]
varfuncs = reversed([f[0] for f in converters])
maskfuncs = reversed([f[1] for f in converters])
varf = lambda x: reduce(lambda r, f: f(r), varfuncs, x)
maskf = lambda x: reduce(lambda r, f: f(r), maskfuncs, x)
return varf, maskf
elif string == 'since20010101':
return lambda x: x + dt_to_ts(datetime.datetime(2001, 1, 1)), ident
elif string == 'hours_since20150101':
return lambda x: x*60*60 + dt_to_ts(datetime.datetime(2015, 1, 1)), ident
elif string == 'unix':
return lambda x: x, ident
elif string == 'since19691231':
return lambda x: x + dt_to_ts(datetime.datetime(1969, 12, 31, 23)), ident
elif string == 'since19700101':
return lambda x: x + dt_to_ts(datetime.datetime(1970, 1, 1)), ident
elif string == 'days_since19700101':
return lambda x: x*24*60*60, ident
elif string == 'since19040101':
return lambda x: x + dt_to_ts(datetime.datetime(1904, 1, 1)), ident
elif string == 'beginofday':
if 'ncD' in kwargs.keys():
return (lambda h: (h.astype(np.float64) * 3600. + \
float(dt_to_ts(datetime.datetime(int(kwargs['ncD'].year),
int(kwargs['ncD'].month),
int(kwargs['ncD'].day))))),
ident)
elif string == 'from_global':
if 'ncD' in kwargs.keys():
return (lambda h: (np.array([dt_to_ts(datetime.datetime.strptime(
getattr(kwargs['ncD'], 'date [dd.mm.yyyy]') + '_' + getattr(kwargs['ncD'], 'interval start [HH:MM]'),
"%d.%m.%Y_%H:%M") )])), ident)
elif string == 'beginofdayminute_car': # Special formats for special people
if 'ncD' in kwargs.keys():
print('hello here dateconverter =======================================================================')
print(getattr(kwargs['ncD']['time'], 'units')[14:])
return (lambda h: (h.astype(np.float64) * 60 + \
(float(dt_to_ts(datetime.datetime.strptime(getattr(kwargs['ncD']['time'], 'units')[14:], "%Y-%m-%d %H:%M:%S"))))), ident)
elif string == "hours_since_year0":
return (lambda x: x*24*60*60 - 62167305599.99999,
ident)
elif string == "pollytime":
return (lambda x: np.array([x[i,1] + dt_to_ts(datetime.datetime.strptime(str(int(x[i,0])), "%Y%m%d"))\
for i in range(x.shape[0])]),
ident)
elif string == 'since20200101':
return lambda x: x + dt_to_ts(datetime.datetime(2020, 1, 1,)), ident
elif string == "km2m":
return lambda x: x * 1000., ident
elif string == "sealevel2range":
return lambda x: x - kwargs['altitude'], ident
elif string == 'z2lin':
return z2lin, ident
elif string == 'lin2z':
return lin2z, ident
elif string == 'switchsign':
return lambda x: -x, ident
elif string == "mira_azi_offset":
return lambda x: (x + kwargs['mira_azi_zero']) % 360, ident
elif string == 'transposedim':
#return np.transpose, np.transpose
return transpose_only, transpose_only
elif string == 'transposedim+invert3rd':
return transpose_and_invert, transpose_and_invert
elif string == 'divideby2':
return divide_by(2.), ident
elif string == 'keepNyquist':
return ident, ident
elif string == 'raw2Z':
return raw2Z(**kwargs), ident
elif string == "extract_level0":
return lambda x: x[:, 0], ident
elif string == "extract_level1":
return lambda x: x[:, 1], ident
elif string == "extract_level2":
return lambda x: x[:, 2], ident
elif string == 'extract_1st':
return lambda x: np.array(x[0])[np.newaxis,], ident
elif string == "none":
return ident, ident
elif 'extrfromaxis0' in string:
return get_extrfromaxis0(string), get_extrfromaxis0(string)
elif 'extrfromaxis1' in string:
return get_extrfromaxis1(string), get_extrfromaxis1(string)
elif 'extrfromaxis2' in string:
return get_extrfromaxis2(string), get_extrfromaxis2(string)
else:
raise ValueError("converter {} not defined".format(string))
def transpose_only(var):
return np.transpose(var)
def transpose_and_invert(var):
return np.transpose(var)[:, :, ::-1]
def divide_by(val):
return lambda var: var / val
[docs]
def flatten(xs):
"""flatten inhomogeneous deep lists
e.g. ``[[1,2,3],4,5,[6,[7,8],9],10]``
"""
result = []
if isinstance(xs, (list, tuple)):
for x in xs:
result.extend(flatten(x))
else:
result.append(xs)
return result
[docs]
def since2001_to_dt(s):
"""seconds since 2001-01-01 to datetime"""
# return (dt - datetime.datetime(1970, 1, 1)).total_seconds()
return datetime.datetime(2001, 1, 1) + datetime.timedelta(seconds=s)
[docs]
def dt_to_ts(dt):
"""datetime to unix timestamp"""
# return dt.replace(tzinfo=datetime.timezone.utc).timestamp()
return (dt - datetime.datetime(1970, 1, 1)).total_seconds()
[docs]
def ts_to_dt(ts):
"""unix timestamp to dt"""
return datetime.datetime.utcfromtimestamp(ts)
[docs]
def argnearest(array, value):
"""find the index of the nearest value in a sorted array
for example time or range axis
Args:
array (np.array): sorted array with values, list will be converted to 1D array
value: value to find
Returns:
index
"""
if type(array) == list:
array = np.array(array)
i = np.searchsorted(array, value) - 1
if not i == array.shape[0] - 1:
if np.abs(array[i] - value) > np.abs(array[i + 1] - value):
i = i + 1
return i
[docs]
def nearest(array, pivot):
"""find the nearest value to a given one
Args:
array (np.array): sorted array with values
pivot: value to find
Returns:
value with smallest distance
"""
return min(array, key=lambda x: abs(x - pivot))
[docs]
def lin2z(array):
"""linear values to dB (for np.array or single number)"""
return 10 * np.ma.log10(array)
[docs]
def z2lin(array):
"""dB to linear values (for np.array or single number)"""
return 10 ** (array / 10.)
[docs]
def raw2Z(array, **kwargs):
"""raw signal units (MRR-Pro) to reflectivity Z"""
return array * kwargs['wl']**4 / (np.pi**5) / 0.93 * 10**6
[docs]
def get_extrfromaxis0(string):
"""get function that extracts given index from axis2"""
m = re.search(r"\((\d+)\)", string)
ind = int(m.groups(0)[0])
return lambda x: x[ind,:,:]
[docs]
def get_extrfromaxis1(string):
"""get function that extracts given index from axis2"""
m = re.search(r"\((\d+)\)", string)
ind = int(m.groups(0)[0])
return lambda x: x[:,ind,:]
[docs]
def get_extrfromaxis2(string):
"""get function that extracts given index from axis2"""
m = re.search(r"\((\d+)\)", string)
ind = int(m.groups(0)[0])
return lambda x: x[:,:,ind]
[docs]
def fill_with(array, mask, fill):
"""fill an array where mask is true with fill value"""
filled = array.copy()
filled[mask] = fill
return filled
[docs]
def guess_str_to_dict(string):
"""try to convert a text string into a dict
intended to be used in the var_def
Returns:
dict with flag as key and description string
"""
if "{" in string:
#probalby already the stringified python format
return ast.literal_eval(string)
elif "\nValue" in string:
# the cloudnetpy format \nValue 0: desc\n ....
d = {}
for e in string.split('\nValue '):
if len(e) > 0:
k, v = e.split(':')
d[int(k)] = v.strip()
return d
elif "\n" in string:
# the cloudnet format 0: desc\n ....
d = {}
for e in string.split('\n'):
k, v = e.split(':')
d[int(k)] = v.strip()
return d
elif "\\n" in string:
# pollynet format
d = {}
for e in string.split('\\n'):
m = re.match(r'(\d{1,2}): (.*)', e.replace(r'\"', ''))
d[int(m.group(1))] = m.group(2).strip()
return d
else:
# unknown format
return string
def _method_info_from_argv(argv=None):
"""Command-line -> method call arg processing.
- positional args:
a b -> method('a', 'b')
- intifying args:
a 123 -> method('a', 123)
- json loading args:
a '["pi", 3.14, null]' -> method('a', ['pi', 3.14, None])
- keyword args:
a foo=bar -> method('a', foo='bar')
- using more of the above
1234 'extras=["r2"]' -> method(1234, extras=["r2"])
@param argv {list} Command line arg list. Defaults to `sys.argv`.
@returns (<method-name>, <args>, <kwargs>)
Reference: http://code.activestate.com/recipes/577122-transform-command-line-arguments-to-args-and-kwarg/
"""
import json
import sys
if argv is None:
argv = sys.argv
method_name, arg_strs = argv[0], argv[1:]
args = []
kwargs = {}
for s in arg_strs:
if s.count('=') == 1:
key, value = s.split('=', 1)
else:
key, value = None, s
try:
value = json.loads(value)
except ValueError:
pass
if key:
kwargs[key] = value
else:
args.append(value)
return method_name, args, kwargs
[docs]
def reshape_spectra(data):
"""This function reshapes time, range and var variables of a data container and returns numpy arrays.
Args:
data (dict): data container
Returns:
list with
- ts (numpy.array): time stamp numpy array, dim = (n_time,)
- rg (numpy.array): range stamp numpy array, dim = (n_range,)
- var (numpy.array): values of the spectra numpy array, dim = (n_time, n_range, n_vel)
"""
n_ts, n_rg, n_vel = data['ts'].size, data['rg'].size, data['vel'].size
if data['dimlabel'] == ['time', 'range', 'vel']:
ts = data['ts'].copy()
rg = data['rg'].copy()
var = data['var'].copy()
mask = data['mask'].copy()
elif data['dimlabel'] == ['time', 'vel']:
ts = data['ts'].copy()
rg = np.reshape(data['rg'], (n_rg,))
var = np.reshape(data['var'], (n_ts, 1, n_vel))
mask = np.reshape(data['mask'], (n_ts, 1, n_vel))
elif data['dimlabel'] == ['range', 'vel']:
ts = np.reshape(data['ts'].copy(), (n_ts,))
rg = data['rg'].copy()
var = np.reshape(data['var'], (1, n_rg, n_vel))
mask = np.reshape(data['mask'], (1, n_rg, n_vel))
elif data['dimlabel'] == ['vel']:
ts = np.reshape(data['ts'].copy(), (n_ts,))
rg = np.reshape(data['rg'], (n_rg,))
var = np.reshape(data['var'], (1, 1, n_vel))
mask = np.reshape(data['mask'], (1, 1, n_vel))
else:
raise TypeError('Wrong data format in plot_spectra')
return ts, rg, var, mask
[docs]
def isKthBitSet(n, k):
"""
Function to check if a certain bit of a number is set (required to analyse quality flags)
"""
if n & (1 << (k - 1)):
return 1
else:
return 0
[docs]
def pprint(data, verbose=False):
"""print a pretty representation of the data container"""
print(pformat(data, verbose=verbose))
[docs]
def interp_only_3rd_dim(arr, old, new):
"""function to interpolate only the velocity (3rd) axis"""
from scipy import interpolate
f = interpolate.interp1d(old, arr, axis=2,
bounds_error=False, fill_value=-999.)
new_arr = f(new)
return new_arr
[docs]
def put_in_container(data, data_container, **kwargs):
"""
This routine will generate a new larda container, replacing var in the data_container given with data
Kwargs:
all keys used in larda containers, that is
paraminfo (dict): information from params_[campaign].toml for the specific variable
rg_unit: range unit
mask: ndarray of same size as var, indicating masked values
ts: time stamp in unix time
var_lims: variable limits
colormap: colormap to be used for plotting
Return:
container (dict): larda data container
"""
container = copy.deepcopy(data_container)
container['var'] = data
container.update(kwargs)
return container
[docs]
def change_dir(folder_path, **kwargs):
"""
This routine changes to a folder or creates it (including subfolders) if it does not exist already.
Args:
folder_path (string): path of folder to switch into
"""
folder_path = folder_path.replace('//', '/', 1)
if not os.path.exists(os.path.dirname(folder_path)):
try:
os.makedirs(os.path.dirname(folder_path))
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise
os.chdir(folder_path)
logger.debug('\ncd to: {}'.format(folder_path))
[docs]
def make_dir(folder_path):
"""
This routine changes to a folder or creates it (including subfolders) if it does not exist already.
Args:
folder_path (string): path of folder to switch into
"""
if not os.path.exists(os.path.dirname(folder_path)):
try:
os.makedirs(os.path.dirname(folder_path))
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise
[docs]
def print_traceback(txt):
"""
Print the traceback to an error to screen.
Args:
- txt (string): error msg
"""
print(txt)
track = traceback.format_exc()
print(track)
[docs]
def smooth(y, box_pts, padding='constant'):
"""Smooth a one dimensional array using a rectangular window of box_pts points
Args:
y (np.array): array to be smoothed
box_pts: number of points of the rectangular smoothing window
Returns:
y_smooth (np.arrax): smoothed array
"""
box = np.ones(box_pts) / box_pts
if padding.lower() == 'constant':
return np.convolve(y, box, mode='full')[box_pts // 2:-box_pts // 2 + 1]
else:
return np.convolve(y, box, mode='same')