Source code for pyLARDA.Connector

#!/usr/bin/python3

import os, sys, time
import glob
import copy
import re
import datetime
import calendar
import pprint
import functools
import pprint as pprint2
from pathlib import Path

from typing import Callable

import pyLARDA.NcReader as NcReader
import pyLARDA.RPGReader as RPGReader
import pyLARDA.ParameterInfo as ParameterInfo
#import pyLARDA.DataBuffer as DataBuffer
#import pyLARDA.MeteoReader as MeteoReader
#import pyLARDA.Spec as Spec
import pyLARDA.peakTree as peakTree
import pyLARDA.trace_reader as trace_reader
import pyLARDA.helpers as h
import pyLARDA.Transformations as Transf

import numpy as np
from operator import itemgetter
import collections
import json
import requests, msgpack
from tqdm import tqdm
#import cbor2

import logging
logger = logging.getLogger(__name__)

DATEstrfmt = "%Y%m%d-%H%M%S"

[docs]def convert_regex_date_to_dt(re_date): """convert a re_date dict to datetime .. warning:: When using 2 digit years (i.e. RPG) a 20 will be added in front Args: re_date (dict): result of the regex search with keys Returns: datetime """ l = [] if len(re_date['year']) == 2: re_date['year'] = '20' + re_date['year'] for k in ['year', 'month', 'day', 'hour', 'minute', 'second']: if k in re_date.keys() and re_date[k] is not None: l.append(int(re_date[k])) return datetime.datetime(*l)
[docs]def convert_to_datestring(datepattern, f): """convert the date in a (file-)string to dt Args: datepatttern: a python regex definition with named groups f: the string Returns: datetime """ try: dt = convert_regex_date_to_dt( re.search(datepattern, f).groupdict()) except AttributeError: logger.warning(f'No matching data pattern "{datepattern}" in file: "{f}"') return -1 return dt.strftime(DATEstrfmt)
[docs]def setupreader(paraminfo) -> Callable: """obtain the reader from the paraminfo """ if paraminfo["ncreader"] == 'timeheight_limrad94': reader = NcReader.timeheightreader_rpgfmcw(paraminfo) elif paraminfo["ncreader"] in ['spec_rpg94binary', 'timeheight_rpg94binary', 'time_rpg94binary']: reader = RPGReader.rpgfmcw_binary(paraminfo) elif paraminfo["ncreader"] in ['time_hatprobinary', 'timeheight_hatprobinary']: reader = RPGReader.hatpro_binary(paraminfo) elif paraminfo["ncreader"] == 'spec_limrad94': reader = NcReader.specreader_rpgfmcw(paraminfo) elif paraminfo["ncreader"] == 'spec_rpgpy': reader = NcReader.specreader_rpgpy(paraminfo) elif paraminfo["ncreader"] == 'spec_kazr': reader = NcReader.specreader_kazr(paraminfo) elif paraminfo["ncreader"] in ['aux', 'aux_all_ts', 'aux_ts_slice']: reader = NcReader.auxreader(paraminfo) elif paraminfo["ncreader"] in ['scan_timeheight', 'scan_time']: reader = NcReader.scanreader_mira(paraminfo) elif paraminfo['ncreader'] == 'peakTree': reader = peakTree.peakTree_reader(paraminfo) elif paraminfo['ncreader'] == 'trace': reader = trace_reader.trace_reader(paraminfo) elif paraminfo['ncreader'] == 'trace2': reader = trace_reader.trace_reader2(paraminfo) elif paraminfo["ncreader"] == 'pollyraw': reader = NcReader.reader_pollyraw(paraminfo) elif paraminfo["ncreader"] == 'mrrpro_spec': paraminfo.update({"ncreader": "spec", "compute_velbins":"mrrpro"}) reader = NcReader.reader(paraminfo) elif paraminfo["ncreader"] == "wyoming_sounding_txt": reader = NcReader.reader_wyoming_sounding(paraminfo) elif paraminfo["ncreader"] == 'psd': reader = NcReader.psd_reader(paraminfo) else: reader = NcReader.reader(paraminfo) return reader
[docs]def setup_valid_date_filter(valid_dates) -> Callable: """validator function for chunks of valid dates Args: valid_dates: list of [begin, end] in 'YYYYMMDD' Returns: a single argument ('YYYYMMDD-HHMMSS') validator function """ def date_filter(e): datepair, f = e f_b, f_e = datepair #print(valid_dates, datepair, f_b, f_e) #print([(f_b >= valid[0] and f_e <= valid[1]) for valid in valid_dates]) return any([(f_b[:-7] >= valid[0] and f_e[:-7] <= valid[1]) for valid in valid_dates]) return date_filter
[docs]def path_walk(top, prefilter='.*', topdown = False, followlinks = False): """pendant for os.walk """ current_level = len(str(top).split('/')) filter_at_level = '/'.join(prefilter.split('/')[:current_level]) regex = re.compile(filter_at_level) #print(current_level, filter_at_level) #print(str(top).split('/')) #print(str(prefilter).split('/')) #stime = time.time() #names = list(top.iterdir()) #print(" {:5.3f}s".format(time.time() - stime)) #stime = time.time() names = [f for f in top.iterdir() if regex.search(str(f))] #print(" {:5.3f}s".format(time.time() - stime)) #print(len(names), names[:30]) dirs = [node for node in names if node.is_dir() is True] nondirs = [node for node in names if node.is_dir() is False] if topdown: yield top, dirs, nondirs for name in dirs: if followlinks or name.is_symlink() is False: for x in path_walk(name, prefilter, topdown, followlinks): yield x if topdown is not True: yield top, dirs, nondirs
def end_1sec_earlier(date): dt = datetime.datetime.strptime(date, DATEstrfmt) return (dt-datetime.timedelta(seconds=1)).strftime(DATEstrfmt)
[docs]def guess_end(dates): """estimate the end of a file Returns: list of pairs [begin, end] """ if len(dates) > 1: guessed_duration = (datetime.datetime.strptime(dates[-1], DATEstrfmt) - datetime.datetime.strptime(dates[-2], DATEstrfmt)) else: guessed_duration = datetime.timedelta(seconds=(24*60*60)-1) # quick fix guessed duration not longer than 24 h if guessed_duration >= datetime.timedelta(days=1): guessed_duration = datetime.timedelta(seconds=(24*60*60)-1) last_d = ( datetime.datetime.strptime(dates[-1], DATEstrfmt) + guessed_duration ).strftime(DATEstrfmt) ends = [end_1sec_earlier(d) for d in dates[1:]] + [last_d] return list(zip(dates, ends))
[docs]class Connector_remote: """connect the data (from the a remote source) to larda Args: camp_name (str): campaign name system (str): system identifier plain_dict (dict): connector meta info uri (str): address of the remote source """ def __init__(self, camp_name, system, plain_dict, uri): self.camp_name = camp_name self.system = system self.params_list = list(plain_dict['params'].keys()) print(self.system, self.params_list) self.plain_dict = plain_dict self.uri = uri
[docs] def collect(self, param, time_interval, *further_intervals, **kwargs) -> dict: """collect the data from a parameter for the given intervals Args: param (str) identifying the parameter time_interval: list of begin and end datetime *further_intervals: range, velocity, ... **interp_rg_join: interpolate range during join Returns: data_container """ resp_format = 'msgpack' interval = ["-".join([str(h.dt_to_ts(dt)) for dt in time_interval])] interval += ["-".join([str(i) for i in pair]) for pair in further_intervals] stream = True if resp_format == "msgpack" else False params = {"interval": ','.join(interval), 'rformat': resp_format} params.update(kwargs) resp = requests.get(self.uri + '/api/{}/{}/{}'.format(self.camp_name, self.system, param), params=params, stream=stream) logger.debug("fetching data from: {}".format(resp.url)) if resp_format == "msgpack": block_size = 1024 pbar = tqdm(unit="B", total=(int(resp.headers.get('content-length', 0))//block_size)*block_size, unit_divisor=1024, unit_scale=True) content = bytearray() for data in resp.iter_content(block_size): content.extend(data) pbar.update(len(data)) if resp.status_code != 200: if resp_format == "msgpack": print("Error at Backend") print(content.decode("unicode_escape")) else: print(resp.json()) raise ConnectionError("bad status code of response {}".format(resp.status_code)) starttime = time.time() # if resp_format == 'bin': # data_container = cbor2.loads(resp.content) if resp_format == 'msgpack': logger.info("msgpack version {}".format(msgpack.version)) if msgpack.version[0] < 1: data_container = msgpack.loads(content, encoding='utf-8') else: data_container = msgpack.loads(content, strict_map_key=False) elif resp_format == 'json': data_container = resp.json() #print("{:5.3f}s decode data".format(time.time() - starttime)) starttime = time.time() for k in ['ts', 'rg', 'vel', 'var', 'mask', 'vel_ch2', 'vel_ch3', 'aux']: if k in data_container and type(data_container[k]) == list: data_container[k] = np.array(data_container[k]) logger.info("loaded data container from remote: {}".format(data_container.keys())) #print("{:5.3f}s converted to np arrays".format(time.time() - starttime)) return data_container
[docs] def description(self, param): """get the description str""" resp = requests.get(self.uri + '/description/{}/{}/{}'.format(self.camp_name, self.system, param)) if resp.status_code != 200: raise ConnectionError("bad status code of response {}".format(resp.status_code)) logger.warning(resp.text) return resp.text
[docs] def get_as_plain_dict(self) -> dict: """put the most important information of the connector into a plain dict (for http tranfer)""" return self.plain_dict
[docs]def walk_str(pathinfo): """match the names and subdirs with regex using string only works for unix systems, but is reasonably fast Args: pathinfo: dict Returns: all_files """ assert os.name == 'posix', 'walk_str only works with string based filepath' all_files = [] current_regex = pathinfo['matching_subdirs'] if 'matching_subdirs' in pathinfo else '' current_re = re.compile(current_regex) prefilter = pathinfo['base_dir'] + pathinfo['prefilter_subdirs'] if 'prefilter_subdirs' in pathinfo else '.*' for root, d, files in os.walk(pathinfo['base_dir'], topdown=True): #print('walk ', root, len(list(files)), files[:10]) root = root[:-1] if (root[-1] == '/') else root current_level = len(root.split('/')) filter_at_level = '/'.join(prefilter.split('/')[:current_level+1]) regex = re.compile(filter_at_level) #print('root ', root) #print('filter at level', filter_at_level) #print(str(root).split('/')) #print(str(prefilter).split('/')) #print('d before', len(d), d) #print([f"{root}/{f}" for f in d]) d[:] = [f for f in d if regex.search(f"{root}/{f}")] #print('d after', len(d), d) #abs_filepaths = [f for f in files if re.search(current_regex, str(f))] #abs_filepaths = [f for f in files if current_re.search(f)] abs_filepaths = [f"{root}/{f}" for f in files if current_re.search(f"{root}/{f}")] #logger.debug("valid_files {} {}".format(root, [f for f in files if re.search(current_regex, str(f))])) #print("skipped_files {} {}".format(root, [f for f in files if not re.search(current_regex, str(f))])) all_files += abs_filepaths #files = [f for f in os.listdir('.') if re.match(r'[0-9]+.*\.jpg', f)] return all_files
[docs]def walk_pathlib(pathinfo): """match the names and subdirs with regex using pathlib should give cross-platform compatability, but comes with a performance penalty (>3x) Args: pathinfo: dict Returns: all_files """ all_files = [] current_regex = pathinfo['matching_subdirs'] if 'matching_subdirs' in pathinfo else '' current_re = re.compile(current_regex) prefilter = pathinfo['base_dir'] + pathinfo['prefilter_subdirs'] if 'prefilter_subdirs' in pathinfo else '.*' for root, _, files in path_walk(Path(pathinfo['base_dir']), prefilter): print('walk ', root, len(list(files)), files[:10]) #abs_filepaths = [f for f in files if re.search(current_regex, str(f))] abs_filepaths = [f for f in files if current_re.search(str(f))] #logger.debug("valid_files {} {}".format(root, [f for f in files if re.search(current_regex, str(f))])) #print("skipped_files {} {}".format(root, [f for f in files if not re.search(current_regex, str(f))])) all_files += abs_filepaths #files = [f for f in os.listdir('.') if re.match(r'[0-9]+.*\.jpg', f)] return all_files
[docs]class Connector: """connect the data (from the ncfiles/local sources) to larda Args: system (str): system identifier system_info (dict): dict info loaded from toml valid_dates (list of lists): list of begin and end datetime description_dir (optional): dir with the description rst """ def __init__(self, system, system_info, valid_dates, description_dir=None): self.system = system self.system_info = system_info self.valid_dates = valid_dates self.params_list = list(system_info["params"].keys()) self.description_dir = description_dir logger.info("params in this connector {} {}".format(self.system, self.params_list)) logger.debug('connector.system_info {}'.format(system_info)) def __str__(self): s = "connector for system {} \ncontains parameters: ".format(self.system) s += " ".join(self.params_list) return s
[docs] def build_filehandler(self): """scrape the directories and build the filehandler """ pathdict = self.system_info['path'] filehandler = {} for key, pathinfo in pathdict.items(): # 1. match the names and subdirs with regex #all_files = walk_pathlib(pathinfo) all_files = walk_str(pathinfo) # remove basedir (not sure if that is a good idea) all_files = [str(p).replace(pathinfo['base_dir'], "./") for p in all_files] #logger.debug('filelist {} {}'.format(len(all_files), all_files[:10])) # 2. extract the dates with another regex dates = [convert_to_datestring(pathinfo["date_in_filename"], str(f))\ for f in all_files] all_files = [f for _, f in sorted(zip(dates, all_files), key=lambda pair: pair[0])] dates = sorted(dates) # 3. estimate the duration a file covers date_pairs = guess_end(dates) if dates else [] # 4. validate with the durations valid_date_filter = setup_valid_date_filter(self.valid_dates) singlehandler = list(filter( valid_date_filter, list(zip(date_pairs, all_files)))) filehandler[key] = singlehandler #pprint.pprint(filehandler) self.filehandler = filehandler
[docs] def save_filehandler(self, path, camp_name): """save the filehandler to json file""" savename = 'connector_{}.json'.format(self.system) pretty = {'indent': 2, 'sort_keys':True} #pretty = {} if not os.path.isdir(path+'/'+camp_name): os.makedirs(path+'/'+camp_name) with open(path+'/'+camp_name+'/'+savename, 'w') as outfile: json.dump(self.filehandler, outfile, **pretty) logger.info('saved connector to {}/{}/{}'.format(path,camp_name,savename))
[docs] def load_filehandler(self, path, camp_name): """load the filehandler from the json file""" filename = "connector_{}.json".format(self.system) starttime = time.time() with open(path+'/'+camp_name+'/'+filename) as json_data: self.filehandler = json.load(json_data) logger.info("read in json filehandler {}: {}".format(self.system, time.time() - starttime))
[docs] def collect(self, param, time_interval, *further_intervals, **kwargs) -> dict: """collect the data from a parameter for the given intervals Args: param (str) identifying the parameter time_interval: list of begin and end datetime *further_intervals: range, velocity, ... **interp_rg_join: interpolate range during join Returns: data_container """ paraminfo = self.system_info["params"][param] if 'interp_rg_join' not in paraminfo: # default value paraminfo['interp_rg_join'] = False if 'interp_rg_join' in kwargs: paraminfo['interp_rg_join'] = kwargs['interp_rg_join'] base_dir = self.system_info['path'][paraminfo['which_path']]["base_dir"] logger.debug("paraminfo at collect {}".format(paraminfo)) if len(time_interval) == 2: begin, end = [dt.strftime(DATEstrfmt) for dt in time_interval] # cover all three cases: 1. file only covers first part # 2. file covers middle part 3. file covers end #print(begin, end) flist = [e for e in self.filehandler[paraminfo['which_path']] \ if (e[0][0] <= begin < e[0][1]) or (e[0][0] > begin and e[0][1] < end) or (e[0][0] <= end <= e[0][1])] assert len(flist) > 0, "no files available" elif len(time_interval) == 1: begin = time_interval[0].strftime(DATEstrfmt) flist = [e for e in self.filehandler[paraminfo['which_path']] if e[0][0] <= begin < e[0][1]] assert len(flist) == 1, "flist too long or too short: {}".format(len(flist)) #[print(e, (e[0][0] <= begin and e[0][1] > begin), (e[0][0] > begin and e[0][1] < end), (e[0][0] <= end and e[0][1] >= end)) for e in flist] load_data = setupreader(paraminfo) datalist = [load_data(Path(base_dir + e[1]), time_interval, *further_intervals) for e in flist] # [print(e.keys) if e != None else print("NONE!") for e in datalist] # reader returns none, if it detects no data prior to begin # now these none values are filtered from the list assert len(datalist) > 0, 'No data found for parameter: {}'.format(param) datalist = list(filter(lambda x: x != None, datalist)) #Transf.join(datalist[0], datalist[1]) data = functools.reduce(Transf.join, datalist) return data
[docs] def collect_path(self, param, time_interval, *further_intervals, **kwargs) -> dict: """" Returns: data_container """ assert 'paths' in kwargs, "Without filepaths reading is tricky" flist = kwargs['paths'] flist = [Path(f) if type(f) == str else f for f in flist] paraminfo = self.system_info["params"][param] load_data = setupreader(paraminfo) datalist = [load_data(e, time_interval, *further_intervals) for e in flist] assert len(datalist) > 0, 'No data found for parameter: {}'.format(param) datalist = list(filter(lambda x: x != None, datalist)) #Transf.join(datalist[0], datalist[1]) data = functools.reduce(Transf.join, datalist) return data
def description(self, param) -> str: paraminfo = self.system_info["params"][param] #print('connector local paraminfo: ' + paraminfo['variable_name']) # Prints the nicely formatted dictionary # this is the python pprint function, not the larda.helpers function pp = pprint2.PrettyPrinter(indent=4) logger.info(pp.pformat(paraminfo)) if 'description_file' not in paraminfo: return 'no description file defined in config' if self.description_dir == None: return 'description dir not set' description_file = self.description_dir / paraminfo['description_file'] logger.info('load description file {}'.format(description_file)) with open(description_file, 'r', encoding="utf-8") as f: descr = f.read() descr = "\n"+descr+"\n" logger.warning(descr) return descr
[docs] def get_as_plain_dict(self) -> dict: """put the most important information of the connector into a plain dict (for http tranfer) Returns: connector information .. code:: {params: {param_name: fileidentifier, ...}, avail: {fileidentifier: {"YYYYMMDD": no_files, ...}, ...} """ return { 'params': {e: self.system_info['params'][e]['which_path'] for e in self.params_list}, 'avail': {k: self.files_per_day(k) for k in self.filehandler.keys()} }
[docs] def files_per_day(self, which_path) -> dict: """replaces ``days_available`` and ``day_available`` Returns: dict with days and no of files .. code:: {'YYYYMMDD': no of files, ...} """ fh = self.filehandler[which_path] groupedby_day = collections.defaultdict(list) for d, f in fh: groupedby_day[d[0][:8]] += [f] no_files_per_day = {k: len(v) for k, v in groupedby_day.items()} return no_files_per_day