Source code for epygram.formats.LFI

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
LFA:

Contains the class to handle LFA format.
"""

__all__ = ['LFI']

import datetime
import os
import tempfile
import shutil
import copy
import numpy
import math
import re

from footprints import FPDict, FPList, proxy as fpx

from epygram import config, epygramError, util, epylog, profiles
from epygram.util import Angle
from epygram.base import Resource, FieldSet, FieldValidity
from epygram import arpifs4py
from epygram.geometries import H2DGeometry
from epygram.fields import H2DField, MiscField

#TODO LFI
    #add writing support
    #add compression support when writing
    #when writing field, check consistency between LFI_COMPRESSED and compression flag
    #add extraction of sections and profiles in pressure coordinate
    #add secant geometry support A01RK.1.AST01.001.Z.lfi
    #optimize use of listfields and listLFInames
    #2D linear and cubic interpolations must be replaced by 1D interpolations in case of vertical 2D simulations
    #check if function profiles.pressure2altitude must be renamed as it seams to be valid only for hybridP coordinate

def inquire_field_dict(fieldname):
    """
    Returns the info contained in the LFI _field_dict for the requested field. 
    """

    matching_field = None
    for fd in LFI._field_dict:
        dictitem = fd['name']
        pattern = re.subn('\.', r'\.', dictitem)[0]  # protect '.'
        pattern = pattern.replace('?', '.')  # change unix '?' to python '.' (any char)
        pattern = pattern.replace('*', '.*')  # change unix '*' to python '.*' (several any char)
        pattern += '(?!.)'
        if re.match(pattern, fieldname):
            matching_field = fd
            break

    if matching_field == None:
        epylog.info("field '" + fieldname + "' is not referenced in Field_Dict_LFI. Assume its type being a MiscField.")
        matching_field = {'name':fieldname, 'type':'Misc', 'nature':'float', 'dimension':'1'}

    return copy.deepcopy(matching_field)

[docs]class LFI(Resource): """ Class implementing all specificities for LFI resource format. """ _footprint = dict( attr=dict( format=dict( values=set(['LFI']), default='LFI'), # compressed = dict( # optional = True, # default = False, # info = "Compression flag."), ) ) # the Field Dictionary gathers info about fields nature CSV_field_dictionaries = config.CSV_field_dictionaries_LFI # syntax: _field_dict = [{'name':'fieldname1', 'type':'...', ...}, {'name':'fieldname2', 'type':'...', ...}, ...] _field_dict = [] @classmethod def _LFIsoft_init(cls): """ Initialize the LFI software maximum dimensions. """ cls._LFIsoftware_cst = {'JPXKRK':100} @classmethod def _read_field_dict(cls, fd_abspath): """ Reads the CSV fields dictionary of the format. """ field_dict, file_priority = util.read_CSV_as_dict(fd_abspath) if file_priority == 'main': cls._field_dict = field_dict elif file_priority == 'underwrite': for fd in field_dict: found = False for cfd in cls._field_dict: found = fd['name'] == cfd['name'] if not found: cls._field_dict.append(fd) elif file_priority == 'overwrite': for cfd in cls._field_dict: found = False for fd in field_dict: found = fd['name'] == cfd['name'] if not found: field_dict.append(cfd) cls._field_dict = field_dict def __init__(self, *args, **kwargs): """ Constructor. See its footprint for arguments. """ self.isopen = False self.geometry = None self.validity = None self.compressed = None # At creation of the first LFI, initialize LFI._field_dict if self._field_dict == []: self._read_field_dict(self.CSV_field_dictionaries['default']) if os.path.exists(self.CSV_field_dictionaries['user']): self._read_field_dict(self.CSV_field_dictionaries['user']) super(LFI, self).__init__(*args, **kwargs) # Initialization of LFI software (if necessary): if not hasattr(self, '_LFIsoftware_cst'): self._LFIsoft_init() if not self.fmtdelayedopen: self.open()
[docs] def open(self): """ Opens a LFI with ifsaux' LFIOUV, and initializes some attributes. """ # use alias (shortened filename), in order to prevent filenames too long for the FA software self.container.alias = tempfile.mkstemp(prefix='LFI.')[1] os.remove(self.container.alias) os.symlink(self.container.abspath, self.container.alias) if self.openmode in ('r', 'a'): # LFI already exists # open, getting logical unit self._unit = arpifs4py.wlfiouv(self.container.alias, 'OLD') self.isopen = True self.empty = False elif self.openmode == 'w': # new LFI # open, getting logical unit self._unit = arpifs4py.wfaitou(self.container.alias, 'NEW', self.headername) self.isopen = True self.empty = True
[docs] def close(self): """ Closes a LFI with ifsaux' LFIFER. """ if self.isopen: try: arpifs4py.wlfifer(self._unit, 'KEEP') except Exception: raise IOError("closing " + self.container.abspath) self.isopen = False # Cleanings if self.openmode in ('r', 'a'): os.remove(self.container.alias) elif self.openmode == 'w': if self.empty: os.remove(self.container.alias) else: shutil.move(self.container.alias, self.container.abspath) ################ # ABOUT FIELDS # ################
[docs] def find_fields_in_resource(self, seed=None, fieldtype=None): """ Returns a list of the fields from resource whose identifier match the given seed. Args: \n - *seed*: might be a tuple of regular expressions, a list of regular expressions tuples or *None*. If *None* (default), returns the list of all fields in resource. - *fieldtype*: optional, among ('H2D', 'Misc'). If provided, filters out the fields not of the given type. """ fieldslist = [] if seed == None: tmplist = self.listfields() for f in tmplist: if fieldtype == None or inquire_field_dict(f[0])['type'] == fieldtype: fieldslist.append(f) elif isinstance(seed, tuple): tmplist = util.find_re_in_list(seed, self.listfields()) for f in tmplist: if fieldtype == None or inquire_field_dict(f[0])['type'] == fieldtype: fieldslist.append(f) elif isinstance(seed, list): tmplist = [] for s in seed: tmplist += util.find_re_in_list(s, self.listfields()) for f in tmplist: if fieldtype == None or inquire_field_dict(f[0])['type'] == fieldtype: fieldslist.append(f) if fieldslist == []: raise epygramError("no field matching '" + str(seed) + "' was found in resource " + self.container.abspath) return fieldslist
[docs] def listfields(self): """ Returns a list containing the LFI identifiers of all the fields of the resource. """ return super(LFI, self).listfields()
@Resource._openbeforedelayed def _listfields(self): """ Actual listfields() method for LFI. """ fieldslist = [] for fieldname in self._listLFInames(): field_info = inquire_field_dict(fieldname) if field_info['type'] == '3D': if self.geometry == None: self._read_geometry() kmax = self.geometry.vcoordinate['levels_number'] for level in range(kmax): fieldslist.append((fieldname, level)) else: fieldslist.append((fieldname, None)) return fieldslist @Resource._openbeforedelayed def _listLFInames(self): """ List of LFI article names contained in file. """ records_number = arpifs4py.wlfinaf(self._unit)[0] arpifs4py.wlfipos(self._unit) # rewind nameslist = [] for i in range(records_number): nameslist.append(arpifs4py.wlficas(self._unit, True)[0].strip()) return nameslist
[docs] def sortfields(self): """ Returns a sorted list of fields with regards to their name and nature, as a dict of lists. """ listMisc = [] list3D = [] list2D = [] for field in [f for (f, l) in self.listfields()]: info = inquire_field_dict(field) if info['type'] == 'H2D': list2D.append(field) elif info['type'] == '3D': list3D.append(field) else: listMisc.append(field) # sort list2D.sort() list3D.sort() listMisc.sort() outlists = {'3D fields':list(set(list3D)), '2D fields':list(set(list2D)), 'Misc-fields':list(set(listMisc))} return outlists
@Resource._openbeforedelayed
[docs] def readfield(self, fieldidentifier, getdata=True): """ Reads one field, given its identifier (tuple (LFI name, level)), and returns a Field instance. Interface to Fortran routines from 'ifsaux'. Args: \n - *fieldidentifier*: (LFI fieldname, level). - *getdata*: optional, if *False*, only metadata are read, the field do not contain data. Default is *True*. """ if self.openmode == 'w': raise epygramError("cannot read fields in resource if with openmode == 'w'.") if type(fieldidentifier) != type(tuple()) or len(fieldidentifier) != 2: raise epygramError("fieldidentifier of a LFI field is a tuple (name, level).") # Get field info fieldname, level = fieldidentifier field_info = inquire_field_dict(fieldname) if field_info['type'] in ['H2D', '3D']: if self.geometry == None: self._read_geometry() if self.validity == None: self._read_validity() # Make geometry object kwargs_geom = dict(structure='H2D', name=self.geometry.name, grid=FPDict(self.geometry.grid), dimensions=FPDict(self.geometry.dimensions), geoid=config.LFI_default_geoid, position_on_grid=None ) if hasattr(self.geometry, 'projection'): kwargs_geom['projection'] = FPDict(self.geometry.projection) if self.geometry.vcoordinate != None: # vertical geometry vcoordinate = copy.deepcopy(self.geometry.vcoordinate) vcoordinate.update(field_info) vcoordinate.pop('type') vcoordinate.pop('name') if field_info['type'] == 'H2D': vcoordinate['Z'] = field_info['level'] if level in field_info else 0 else: vcoordinate['Z'] = level kwargs_geom['vcoordinate'] = FPDict(vcoordinate) # Get data if requested or only metadata field_length = arpifs4py.wlfinfo(self._unit, fieldname)[0] #Record length lengthToRead = min(field_length, 2 + LFI._LFIsoftware_cst['JPXKRK'] + 3) if not getdata else field_length #Length needed rawData = arpifs4py.wlfilec(self._unit, fieldname, lengthToRead, getdata) #Reading (h, v) = {0:('__unknown__', '__unknown__'), 1:('center', 'mass'), 2:('center-left', 'mass'), 3:('lower-center', 'mass'), 4:('center', 'flux'), 5:('lower-left', 'mass'), 6:('center-left', 'flux'), 7:('lower-center', 'flux'), 8:('lower-left', 'flux')}[rawData[0]] gridIndicator = FPDict({'vertical':v, 'horizontal':h}) comment_length = rawData[1] if comment_length > LFI._LFIsoftware_cst['JPXKRK']: raise epygramError("comment length is superior to the limit.") comment = "" for i in rawData[2:comment_length + 2]: comment += chr(i) data = rawData[comment_length + 2:] if getdata: if fieldname != 'LFI_COMPRESSED': if self.compressed == None: if 'LFI_COMPRESSED' in self._listLFInames(): self.compressed = self.readfield(('LFI_COMPRESSED', 0)).data else: self.compressed = False if self.compressed: (lengthAfter, compressionType) = arpifs4py.wget_compheader(len(data), data, lengthToRead) if lengthAfter > 0: data = arpifs4py.wdecompress_field(len(data), data, compressionType, lengthAfter) if field_info['type'] == 'Misc': if field_info['dimension'] == '0': if field_info['nature'] == 'int': dataOut = data[0] elif field_info['nature'] == 'str': dataOut = "" for num in data: dataOut += chr(num) elif field_info['nature'] == 'bool': dataOut = bool(data[0]) elif field_info['nature'] == 'float': dataOut = data.view('float64')[0] else: raise NotImplementedError("reading of datatype " + field_info['nature'] + ".") else: if field_info['nature'] == 'int': dataOut = numpy.copy(data) elif field_info['nature'] == 'float': dataOut = numpy.copy(data.view('float64')[:]) elif field_info['nature'] == 'str': #FIXME: table of int(str) = table of tables ??? dataOut = numpy.copy(data) elif field_info['nature'] == 'bool': dataOut = data != 0 else: raise NotImplementedError("reading of datatype " + field_info['nature'] + " array.") data = dataOut elif field_info['type'] == 'H2D': if level != None: raise epygramError("level must be 0 for a one-level field.") offset = 0 nature = field_info['nature'] if 'nature' in field_info else 'float' if nature == 'float': data = numpy.array(data.view('float64')[offset:offset + self.geometry.dimensions['X'] * self.geometry.dimensions['Y']]) elif nature == 'int': data = numpy.array(data[offset:offset + self.geometry.dimensions['X'] * self.geometry.dimensions['Y']]) else: raise NotImplementedError("reading of datatype " + field_info['nature'] + " field.") elif field_info['type'] == '3D': kmax = self.geometry.vcoordinate['levels_number'] if level not in range(kmax): raise epygramError("level must be between 0 and kmax for a 3D field (level=" + str(level) + ").") offset = level * self.geometry.dimensions['X'] * self.geometry.dimensions['Y'] data = numpy.array(data.view('float64')[offset:offset + self.geometry.dimensions['X'] * self.geometry.dimensions['Y']]) # Create field if field_info['type'] in ['H2D', '3D']: # Create H2D field kwargs_geom['position_on_grid'] = gridIndicator geometry = fpx.geometry(**kwargs_geom) field = H2DField(fid=FPDict({self.format:(fieldname, level)}), geometry=geometry, validity=self.validity, processtype='forecast', comment=comment) elif field_info['type'] == 'Misc': # Create Misc field class empty(object): pass geometry = empty() geometry.position_on_grid = gridIndicator field = MiscField(fid=FPDict({self.format:fieldname}), comment=comment) field.geometry = geometry if getdata: if field_info['type'] in ['H2D', '3D']: data = geometry.reshape_data(data) field.setdata(data) return field
[docs] def readfields(self, requestedfields=None, getdata=True): """ Returns a :class:`epygram.base.FieldSet` containing requested fields read in the resource. Args: \n - *requestedfields*: might be \n - a tuple of regular expressions (e.g. ('RVT', '?')) - a list of LFI fields identifiers with regular expressions (e.g. [('COVER???', 0), ('RVT', '*')]) - if not specified, interpretated as all fields that will be found in resource - *getdata*: optional, if *False*, only metadata are read, the fields do not contain data. Default is *True*. """ requestedfields = self.find_fields_in_resource(requestedfields) if requestedfields == []: raise epygramError("unable to find requested fields in resource.") return super(LFI, self).readfields(requestedfields, getdata) # def writefield(self, field): # """ # Write a field in the resource. # # Args: \n # - *field*: a :class:`epygram.base.Field` instance or :class:`epygram.fields.H2DField`. # """ # # if self.openmode == 'r': # raise IOError("cannot write field in a FA with openmode 'r'.") # # if not self.isopen: # if not isinstance(field, H2DField): # raise epygramError("cannot write a this kind of field on a non-open FA.\n \ # FA need a geometry to be open. Maybe this FA has not been given one at opening.\n \ # For opening it, either write a H2DField in, or call its method open(geometry, validity),\n \ # geometry being a H2DGeometry, validity being a FieldValidity.") # if self.validity == None: # self._attributes['validity'] = field.validity # self._attributes['headername'] = _create_header_from_geometry(field.geometry, field.spectral_geometry) # self.open() # # if not self.empty and field.fid['FA'] in self.listfields(): # raise epygramError("there already is a field with the same name in this FA.") # # if isinstance(field, MiscField): # data = field.data # if field.shape in ((1,), ()): # if 'int' in field.datatype.name: # dataReal = data.view('float64') # elif 'str' in field.datatype.name: # data = str(data) # ndarray of str -> simple str # dataReal = numpy.array([ord(d) for d in data]).view('float64') # elif 'bool' in field.datatype.name: # dataReal = numpy.array(1 if data else 0).view('float64') # elif 'float' in field.datatype.name: # dataReal = data # else: # raise NotImplementedError("writing of datatype "+field.datatype.name+".") # else: # try: # dataReal = numpy.copy(data.view('float64')) # except Exception: # raise NotImplementedError("writing of datatype "+field.datatype.__name__+" array.") # arpifs4py.wfaisan(self._unit, field.fid['FA'], dataReal.size, dataReal) # # elif isinstance(field, H2DField): # if [self.geometry.name, self.geometry.grid, self.geometry.dimensions] != \ # [field.geometry.name, field.geometry.grid, field.geometry.dimensions]: # raise epygramError("gridpoint geometry incompatibility: a FA can hold only one geometry.") # if field.spectral_geometry != None and field.spectral_geometry != self.spectral_geometry: # # compatibility check # raise epygramError("spectral geometry incompatibility: a FA can hold only one geometry.") # data = numpy.ma.copy(field.data).flatten() # if isinstance(data, numpy.ma.core.MaskedArray): # data=numpy.copy(data[data.mask==False].data) # if compression != None: # modified_compression = True # elif self.fieldscompression.has_key(field.fid['FA']): # compression = self.fieldscompression[field.fid['FA']] # modified_compression = True # else: # modified_compression = False # if modified_compression: # self._setrunningcompression(**compression) # arpifs4py.wfaienc(self._unit, field.fid['FA'][0:4], 0, field.fid['FA'][4:], len(data), data, field.spectral) # if modified_compression: # self._setrunningcompression(**self.default_compression) # set back to default # if not self.fieldscompression.has_key(field.fid['FA']): # self.fieldscompression[field.fid['FA']] = compression # if self.empty: self.empty = False # def writefields(self, fieldset, compression=None): # """ # Write the fields of the *fieldset* in the resource. # # Args: \n # - *fieldset*: must be a :class:`epygram.base.FieldSet` instance. # - *compression*: must be a list of compression dicts (cf. *writefield()* method), # of length equal to the length of the *fieldset*, and with the same order. # """ # # if not isinstance(fieldset, FieldSet): # raise epygramError("'fieldset' argument must be of kind FieldSet.") # if len(fieldset) != len(compression): # raise epygramError("fieldset and compression must have the same length.") # # # Be sure the first field to be written is an H2DField, # # for being able to set the header if necessary # if not self.isopen and not isinstance(fieldset[0], H2DField): # for f in range(len(fieldset)): # if isinstance(fieldset[f], H2DField): # fieldset.insert(0, fieldset.pop(f)) # if compression != None: # compression.insert(0, compression.pop(f)) # break # # if compression != None: # # loop separated from the above one, because fieldset is there-above modified # for f in range(len(fieldset)): # self.writefield(fieldset[f], compression[f]) # else: # super(LFI, self).writefields(fieldset)
[docs] def extractprofile(self, fid, lon, lat, vertical_coordinate=None, interpolation='nearest', excludeExtraVPoints=True): """ Extracts a vertical profile from the LFI resource, given its fid and the geographic location (*lon*/*lat*) of the profile. Args: \n - *fid* must have syntax: ('PARAMETER', '\*'), \* being a true star character, and PARAMETER being the name of the parameter requested, as named in LFI. - *lon* is the longitude of the desired point. - *lat* is the latitude of the desired point. - *vertical_coordinate* defines the requested vertical coordinate of the V2DField (cf. :class:`epygram.geometries.V1DGeometry` coordinate possible values). - *interpolation* defines the interpolation function used to compute the profile at requested lon/lat from the fields grid: - if 'nearest' (default), extracts profile at the horizontal nearest neighboring gridpoint; - if 'linear', computes profile with horizontal linear spline interpolation; - if 'cubic', computes profile with horizontal cubic spline interpolation. - *excludeExtraVPoints*: if True uppermost and lowermost points (they have no physical meaning) are excluded. """ try: N = len(lon) except Exception: N = 1 lon = numpy.array([lon]) lat = numpy.array([lat]) l = len(fid[2:]) fidlist = self.find_fields_in_resource(seed=fid, fieldtype='3D') if fidlist == []: raise epygramError("cannot find profile for" + fid + " in resource.") fs = self.readfields(fidlist, getdata=False) fs.sort(attribute=['geometry', 'vcoordinate'], key='Z') fidlist = fs.listfields(typefmt='LFI') if excludeExtraVPoints: fidlist = fidlist[1:-1] # Build commons field = fs[0] # location for i in range(N): if not field.geometry.point_is_inside_domain_ll(lon[i], lat[i]): raise ValueError("point (" + str(lon[i]) + ", " + \ str(lat[i]) + ") is out of field domain.") if interpolation == 'nearest': true_loc = field.geometry.ij2ll(*field.geometry.nearest_points(lon, lat, 'nearest')) comments = [] for i in range(N): distance = field.geometry.distance((lon[i], lat[i]), (true_loc[0][i], true_loc[1][i])) az = field.geometry.azimuth((lon[i], lat[i]), (true_loc[0][i], true_loc[1][i])) if -22.5 < az <= 22.5: direction = 'N' elif -77.5 < az <= -22.5: direction = 'NW' elif -112.5 < az <= -77.5: direction = 'W' elif -157.5 < az <= -112.5: direction = 'SW' elif -180.5 <= az <= -157.5 or 157.5 < az <= 180.: direction = 'S' elif 22.5 < az <= 77.5: direction = 'NE' elif 77.5 < az <= 112.5: direction = 'E' elif 112.5 < az <= 157.5: direction = 'SE' gridpointstr = "(" + '{:.{precision}{type}}'.format(true_loc[0][i], type='F', precision=4) + ", " + \ '{:.{precision}{type}}'.format(true_loc[1][i], type='F', precision=4) + ")" comment = "Profile @ " + str(int(distance)) + "m " + direction + \ " from " + str((lon[i], lat[i])) + "\n" + \ "( = nearest gridpoint: " + gridpointstr + ")" comments.append(comment) elif interpolation in ('linear', 'cubic'): if interpolation == 'linear': interpstr = 'linearly' elif interpolation == 'cubic': interpstr = 'cubically' comments = [] for i in range(N): comment = "Profile " + interpstr + " interpolated @ " + str((lon[i], lat[i])) comments.append(comment) else: raise NotImplementedError(interpolation + "interpolation.") # build vertical geometry fieldlevels_number = len(fidlist) gridsize = fieldlevels_number coordinate = 'hybrid_height' gridposition = 'flux' position_on_grid = field.geometry.position_on_grid['vertical'] if excludeExtraVPoints: grid = {'levels':FPDict({'Ai':tuple(self.geometry.vcoordinate['Ai'][1:-1]), 'Bi':tuple(self.geometry.vcoordinate['Bi'][1:-1])})} else: grid = {'levels':FPDict({'Ai':tuple(self.geometry.vcoordinate['Ai']), 'Bi':tuple(self.geometry.vcoordinate['Bi'])})} grid['fieldlevels_number'] = fieldlevels_number grid['gridsize'] = gridsize grid['gridposition'] = gridposition vgeometries = [] for i in range(N): vgeometry = fpx.geometry(structure='V1D', coordinate=coordinate, grid=FPDict(grid), position_on_grid=position_on_grid, hlocation=FPDict(zip(('lon', 'lat'), (lon[i], lat[i]))) ) vgeometries.append(vgeometry) # profile values = [] for f in fidlist: # field field = self.readfield(f) # read profile value values.append(field.getvalue_ll(lon, lat, interpolation=interpolation)) # build profile as a V1DField profilesSet = FieldSet() for i in range(N): profile = fpx.field(fid=FPDict({'LFI':fid}), geometry=vgeometries[i], validity=self.validity, comment=comments[i]) if N == 1: profile.setdata(values) else: profile.setdata([v[i] for v in values]) profilesSet.append(profile) # preparation for vertical coords conversion if vertical_coordinate not in (None, vgeometries[0].coordinate): if vgeometries[0].coordinate == 'hybrid_height' and \ vertical_coordinate in ('altitude', 'height'): zsfield = self.readfield(('ZS', None)) zs_values = zsfield.getvalue_ll(lon, lat, interpolation=interpolation) for i in range(N): # effective vertical coords conversion if vertical_coordinate not in (None, profilesSet[i].geometry.coordinate): if profilesSet[i].geometry.coordinate == 'hybrid_height' and \ vertical_coordinate in ('altitude', 'height'): profilesSet[i].geometry.hybridH2altitude(zs_values[i], gridposition=field.geometry.position_on_grid['vertical'], conv2height=vertical_coordinate == 'height') else: raise NotImplementedError("this vertical coordinate conversion.") if N == 1: profile = profilesSet[0] else: profile = profilesSet return profile
@Resource._openbeforedelayed
[docs] def extractsection(self, fid, end1, end2, transect=None, points_number=None, resolution=None, vertical_coordinate=None, interpolation='linear', cheap_alt=False, excludeExtraVPoints=True): """ Extracts a vertical section from the LFI resource, given its fid and the geographic (lon/lat) coordinates of its ends. The section is returned as a V2DField. Args: \n - *fid* must have syntax: ('PARAMETER', '\*'), \* being a true star character, and PARAMETER being the name of the parameter requested, as named in LFI. - *end1* must be a tuple (lon, lat). - *end2* must be a tuple (lon, lat). - *transect* is the list of (lon,lat) tuples at which points compising the section are extracted. If None, defaults to linearily spaced positions computed from *points_number*. - *points_number* defines the total number of horizontal points of the section (including ends). If None, defaults to a number computed from the *ends* and the *resolution*. - *resolution* defines the horizontal resolution to be given to the field. If None, defaults to the horizontal resolution of the field. - *vertical_coordinate* defines the requested vertical coordinate of the V2DField (cf. :class:`epygram.geometries.V1DGeometry` coordinate possible values). - *interpolation* defines the interpolation function used to compute the profile points locations from the fields grid: \n - if 'nearest', each horizontal point of the section is taken as the horizontal nearest neighboring gridpoint; - if 'linear' (default), each horizontal point of the section is computed with linear spline interpolation; - if 'cubic', each horizontal point of the section is computed with linear spline interpolation. - *cheap_alt*: if True and *vertical_coordinate* is 'pressure', the computation of pressures is done without taking hydrometeors into account (in R computation). - *excludeExtraVPoints*: if True uppermost and lowermost points (they have no physical meaning) are excluded. """ if transect == None: if resolution != None and points_number != None: raise epygramError("only one of resolution and points_number can be given.") if self.geometry == None: self._read_geometry() distance = self.geometry.distance(end1, end2) if resolution == None and points_number == None: resolution = 0.5 * (self.geometry.resolution_ll(*end1) + self.geometry.resolution_ll(*end2)) if resolution > distance: raise epygramError("'ends' are too near: pure interpolation between two gridpoints.") elif points_number != None and points_number < 2: raise epygramError("'points_number' must be at least 2.") if resolution != None: points_number = int(numpy.rint(distance / resolution)) + 1 epylog.info('section computed on ' + str(points_number) + \ ' points == ' + str(resolution) + ' m resolution.') if points_number >= 3: transect = self.geometry.linspace(end1, end2, points_number) elif points_number == 2: transect = [end1, end2] else: raise epygramError("cannot make a section with less than 2 points.") else: if resolution != None or points_number != None: raise epygramError("if transect is given, resolution and points_number must be None.") if len(transect) < 2: raise epygramError("transect must contain at least two points.") profileslist = self.extractprofile(fid, [p[0] for p in transect], [p[1] for p in transect], interpolation=interpolation, vertical_coordinate=vertical_coordinate, excludeExtraVPoints=excludeExtraVPoints) geometry = fpx.geometry(structure='V2D', grid=FPList([p.geometry for p in profileslist])) geometry.distance = self.geometry.distance geometry.azimuth = self.geometry.azimuth section = fpx.field(fid=FPDict({'section':fid}), geometry=geometry, validity=profileslist[0].validity) section.setdata(numpy.array([p.data for p in profileslist]).transpose()) return section ########### # pre-app # ###########
@Resource._openbeforedelayed
[docs] def what(self, out, fieldsextrainfo=False, sortfields=False): """ Writes in file a summary of the contents of the LFI. Args: \n - *out*: the output open file-like object (duck-typing: *out*.write() only is needed). - *sortfields*: **True** if the fields have to be sorted by type. """ firstcolumn_width = 50 secondcolumn_width = 16 sepline = '{:-^{width}}'.format('', width=firstcolumn_width + secondcolumn_width + 1) + '\n' first_H2DField = [(f, l) for (f, l) in self.listfields() if inquire_field_dict(f)['type'] in ['H2D', '3D']][0] firstfield = self.readfield(first_H2DField, getdata=False) v_geometry = self.geometry.vcoordinate listfields = self.listfields() onelevel = {} for (f, l) in listfields: onelevel[f] = l listoffields = [f for (f, l) in listfields] if sortfields: sortedfields = self.sortfields() def write_formatted(dest, label, value): dest.write('{:<{width}}'.format(label, width=firstcolumn_width) \ + ':' \ + '{:>{width}}'.format(str(value), width=secondcolumn_width) \ + '\n') def write_formatted_col(dest, label, value): dest.write('{:>{width}}'.format(label, width=firstcolumn_width) \ + ':' \ + '{:>{width}}'.format(str(value), width=secondcolumn_width) \ + '\n') def write_formatted_fields(dest, label, gridIndicator=None, comment=None): if gridIndicator == None and comment == None: dest.write('{:<{width}}'.format(label, width=20) \ + '\n') else: dest.write('{:<{width}}'.format(label, width=20) \ + ':' \ + '{:^{width}}'.format(str(gridIndicator), width=10) \ + ':' \ + comment \ + '\n') firstfield.what(out, vertical_geometry=False) out.write("#########################\n") out.write("### VERTICAL GEOMETRY ###\n") out.write("#########################\n") write_formatted(out, "Number of vertical levels", v_geometry['levels_number']) out.write("Hybrid-height coord. A coefficients\n") for c in range(len(v_geometry['Ai'])): write_formatted_col(out, str(c + 1), v_geometry['Ai'][c]) out.write(sepline) out.write("Hybrid-height coord. B coefficients\n") for c in range(len(v_geometry['Bi'])): write_formatted_col(out, str(c + 1), v_geometry['Bi'][c]) out.write(sepline) out.write("\n") out.write("######################\n") out.write("### LIST OF FIELDS ###\n") out.write("######################\n") if sortfields: listoffields = [] for k in sorted(sortedfields.keys()): listoffields.append(k) listoffields.append('--------------------') listoffields.extend(sortedfields[k]) listoffields.append('--------------------') numfields = sum([len(v) for v in sortedfields.values()]) else: numfields = len(listoffields) out.write("Number: " + str(numfields) + "\n") if fieldsextrainfo: write_formatted_fields(out, "Field name", "Grid ind.", "Comment") else: write_formatted_fields(out, "Field name") out.write(sepline) done = [] for f in listoffields: if f not in done: if fieldsextrainfo: field = self.readfield((f, onelevel[f])) if hasattr(field, 'geometry'): gridIndicator = {('__unknown__', '__unknown__'):0, ('center', 'mass'):1, ('center-left', 'mass'):2, ('lower-center', 'mass'):3, ('center', 'flux'):4, ('lower-left', 'mass'):5, ('center-left', 'flux'):6, ('lower-center', 'flux'):7, ('lower-left', 'flux'):8}[field.geometry.position_on_grid['horizontal'], field.geometry.position_on_grid['vertical']] else: gridIndicator = '-' write_formatted_fields(out, f, gridIndicator, field.comment) else: write_formatted_fields(out, f) done.append(f) out.write(sepline) ############## # the LFI WAY # ##############
@Resource._openbeforedelayed def _read_geometry(self): """ Reads the geometry in the LFI articles. Interface to Fortran routines from 'ifsaux'. """ listnames = self._listLFInames() if 'CARTESIAN' in listnames: cartesian = self.readfield(('CARTESIAN', 0)).data else: cartesian = False if cartesian: imax = int(self.readfield(('IMAX', 0)).data) jmax = int(self.readfield(('JMAX', 0)).data) xhat = self.readfield(('XHAT', 0)).data yhat = self.readfield(('YHAT', 0)).data if 'KMAX' in listnames: kmax = int(self.readfield(('KMAX', 0)).data) if kmax > 1: zhat = self.readfield(('ZHAT', 0)).data kmax += 2 else: kmax = 0 grid = {'X_resolution':xhat[1] - xhat[0], 'Y_resolution':yhat[1] - yhat[0], 'LAMzone':'CIE' } dimensions = {'X':imax + 2, 'Y':1 if (cartesian and jmax == 1) else jmax + 2, 'X_CIzone':imax, 'Y_CIzone':jmax, 'X_Iwidth':imax, 'Y_Iwidth':jmax, 'X_Czone':imax, 'Y_Czone':jmax, 'X_CIoffset':1, 'Y_CIoffset':1 } geometryname = 'academic' kwargs_geom = dict(structure='H2D', name=geometryname, grid=FPDict(grid), dimensions=FPDict(dimensions), geoid=FPDict(config.LFI_default_geoid), ) if kmax > 1: H = zhat[-1] if 'SLEVE' in listnames: sleve = self.readfield(('SLEVE', 0)).data else: sleve = False vcoordinate = {'grib2_surface_type':'118' if not sleve else '255', 'levels_number':kmax, 'Ai':FPList([c for c in zhat[0:kmax + 2]]), # conversion for footprints purpose... :-/ 'Bi':FPList([1 - c / H for c in zhat[0:kmax + 2]]), 'ABgrid_position':'flux' } kwargs_geom['vcoordinate'] = FPDict(vcoordinate) else: lat0 = self.readfield(('LAT0', 0)).data lon0 = self.readfield(('LON0', 0)).data lat1 = self.readfield(('LATORI' if 'LATORI' in listnames else 'LATOR', 0)).data lon1 = self.readfield(('LONORI' if 'LONORI' in listnames else 'LONOR', 0)).data rpk = self.readfield(('RPK', 0)).data beta = self.readfield(('BETA', 0)).data imax = int(self.readfield(('IMAX', 0)).data) jmax = int(self.readfield(('JMAX', 0)).data) xhat = self.readfield(('XHAT', 0)).data yhat = self.readfield(('YHAT', 0)).data if 'KMAX' in listnames: kmax = int(self.readfield(('KMAX', 0)).data) if kmax > 1: zhat = self.readfield(('ZHAT', 0)).data kmax += 2 else: kmax = 0 projection = {'reference_lon':Angle(lon0, 'degrees'), 'center_lon':Angle(lon1, 'degrees'), #CAUTION: not center but lower left point! 'center_lat':Angle(lat1, 'degrees') #CAUTION: not center but lower left point! } if abs(rpk - math.sin(math.radians(lat0))) <= 1.E-4: projection['reference_lat'] = Angle(lat0, 'degrees') else: print rpk, lat0, math.sin(math.radians(lat0)), rpk - math.sin(math.radians(lat0)), config.epsilon raise NotImplementedError('secant case not implemented') #When this case will be implemented, we will be able to change the 1.E-4 in the test above into config.epsilon #projection['secant_lat']= mercator and polar stereographic #projection['secant_lat1']= lambert #projection['secant_lat2']= lambert grid = {'X_resolution':xhat[1] - xhat[0], 'Y_resolution':yhat[1] - yhat[0], 'LAMzone':'CIE' } dimensions = {'X':imax + 2, 'Y':jmax + 2, 'X_CIzone':imax, 'Y_CIzone':jmax, 'X_Iwidth':imax, 'Y_Iwidth':jmax, 'X_Czone':imax, 'Y_Czone':jmax, 'X_CIoffset':1, 'Y_CIoffset':1 } if abs(lat0) <= config.epsilon: geometryname = 'mercator' if abs(90. - abs(lat0)) <= config.epsilon: geometryname = 'polar_stereographic' else: geometryname = 'lambert' #Wrong geometry object with lower left point instead of center point kwargs_geom = dict(structure='H2D', name=geometryname, grid=FPDict(grid), dimensions=FPDict(dimensions), geoid=FPDict(config.LFI_default_geoid), projection=FPDict(projection) ) if kmax > 1: H = zhat[-1] if 'SLEVE' in listnames: sleve = self.readfield(('SLEVE', 0)).data else: sleve = False vcoordinate = {'grib2_surface_type':'118' if not sleve else '255', 'levels_number':kmax, 'Ai':FPList([c for c in zhat[0:kmax + 2]]), # conversion for footprints purpose... :-/ 'Bi':FPList([1 - c / H for c in zhat[0:kmax + 2]]), 'ABgrid_position':'flux' } kwargs_geom['vcoordinate'] = FPDict(vcoordinate) kwargs_geom['position_on_grid'] = FPDict({'vertical':'mass', 'horizontal':'center'}) geometry = fpx.geometry(**kwargs_geom) #Good geometry object centerPoint = geometry.gimme_corners_ll(subzone='CIE')['ur'] projection['center_lon'] = Angle(centerPoint[0], 'degrees') projection['center_lat'] = Angle(centerPoint[1], 'degrees') kwargs_geom['projection'] = FPDict(projection) self.geometry = fpx.geometry(**kwargs_geom) @Resource._openbeforedelayed def _read_validity(self): """ Reads the validity in the LFI articles. """ listnames = self._listLFInames() kwargs = {} if 'DTEXP%TDATE' in listnames: dtexpDate = self.readfield(('DTEXP%TDATE', 0)).data dtexpTime = float(self.readfield(('DTEXP%TIME', 0)).data) kwargs['basis'] = datetime.datetime(dtexpDate[0], dtexpDate[1], dtexpDate[2]) + datetime.timedelta(seconds=dtexpTime) if 'DTCUR%TDATE' in listnames: dtcurDate = self.readfield(('DTCUR%TDATE', 0)).data dtcurTime = float(self.readfield(('DTCUR%TIME', 0)).data) kwargs['term'] = datetime.datetime(dtcurDate[0], dtcurDate[1], dtcurDate[2]) + datetime.timedelta(seconds=dtcurTime) - kwargs['basis'] self.validity = FieldValidity(**kwargs) # @Resource._openbeforedelayed # def _set_validity(self, termunit='hours'): # """ # Sets date, hour and the processtype in the resource. # """ # # if not self.isopen: # raise epygramError("_set_validity must be called after FA is open.") # # basis = self.validity.getbasis() # KDATEF = numpy.zeros(22, dtype=numpy.int64) # KDATEF[0] = int(basis.year) # KDATEF[1] = int(basis.month) # KDATEF[2] = int(basis.day) # KDATEF[3] = int(basis.hour) # KDATEF[4] = int(basis.minute) # if termunit == 'hours': # KDATEF[5] = 1 # KDATEF[6] = self.validity.term('IntHours') # KDATEF[9] = self.validity.term('IntHours') - self.validity.cumulativeduration('IntHours') # elif termunit == 'days': # KDATEF[5] = 2 # KDATEF[6] = self.validity.term('IntHours') / 24 # KDATEF[9] = (self.validity.term('IntHours') - self.validity.cumulativeduration('IntHours')) / 24 # else: # raise NotImplementedError("term unit other than hours/days ?") # #KDATEF[7] = 0 # if self.processtype == 'analysis': processtype = 0 # elif self.processtype == 'initialization': processtype = 1 # elif self.processtype == 'forecast': processtype = 10 # else: processtype = 255 # unknown processtype # KDATEF[8] = processtype # #KDATEF[10] = 0 # if not config.FA_fandax: # arpifs4py.wfandar(self._unit, KDATEF) # else: # # fandax # KDATEF[11] = 1 # KDATEF[13] = int(basis.second) + int(basis.hour)*3600 + int(basis.minute)*60 # KDATEF[14] = self.validity.term(fmt='IntSeconds') # KDATEF[15] = self.validity.term(fmt='IntSeconds') - self.validity.cumulativeduration(fmt='IntSeconds') # arpifs4py.wfandax(self._unit, KDATEF)