Source code for epygram.formats.DDHLFA

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
DDHLFA:

Contains the class for DDH-in-LFA format.
"""

__all__ = ['DDHLFA']

import math
import datetime

import footprints
from footprints import FPDict, FPList

from epygram import epylog, config, epygramError
from epygram.util import Angle
from epygram.base import FieldValidity, FieldSet
from .LFA import LFA
from epygram import arpifs4py
from epygram.geometries import V1DGeometry, PointGeometry
from epygram.fields import V1DField, PointField, MiscField
from epygram import profiles



[docs]class DDHLFA(LFA): """ Class implementing all specificities for DDHLFA resource format. """ _footprint = dict( attr=dict( format=dict( values=set(['DDHLFA']), default='DDHLFA'), validity=dict( type=FieldValidity, optional=True, info="Describes the temporal validity of the resource."), domains=dict( type=FPDict, optional=True, info="Describes the domains covered by the resource."), levels=dict( type=FPDict, optional=True, info="Number of levels for variables/tendencies ('VT') and\ fluxes ('F')."), xpid=dict( optional=True, info="Experiment identifier.") ) ) def __init__(self, *args, **kwargs): """ Constructor. See its footprint for arguments. """ self.isopen = False super(LFA, self).__init__(*args, **kwargs) if self.openmode != 'r': raise NotImplementedError("openmode != 'r' for DDHLFA.") elif self.openmode == 'r': if self.domains != None or self.validity != None: epylog.warning(self.container.abspath + ": DDHLFA.__init__():\ domains/validity argument will be ignored with\ this openmode ('r').") if not self.fmtdelayedopen: self.open()
[docs] def open(self, openmode=None): """ Opens the DDHLFA in Fortran sense, and initializes domains, validity and vertical geometry. - *openmode*: optional, to open with a specific openmode, eventually different from the one specified at initialization. """ super(DDHLFA, self).open(openmode=openmode) if self.openmode == 'r': # read info self._attributes['xpid'] = super(DDHLFA, self).readfield('INDICE EXPERIENCE').data[0] self._read_geometry() self._read_validity() else: raise NotImplementedError("openmode != 'r' for DDHLFA.") ################ # ABOUT FIELDS # ################
[docs] def find_fields_in_resource(self, seed=None, fieldtype=None): """ Returns a list of the fields from resource whose name match the given *seed*. Args: \n - *seed*: might be a regular expression, a list of regular expressions or *None*. If *None* (default), returns the list of all fields in resource. - *fieldtype*: optional, among ('V1D', 'Point', 'Misc'). If provided, filters out the fields not of the given type. """ fieldslist = super(DDHLFA, self).find_fields_in_resource(seed=seed) if fieldtype != None: fs = FieldSet() for f in fieldslist: fld = self.readfield(f, getdata=False) if isinstance(fld, FieldSet): fs.append(fld[0]) else: fs.append(fld) if fieldtype == 'V1D': fieldslist = [f.fid['DDHLFA'] for f in fs if isinstance(f, V1DField)] elif fieldtype == 'Point': fieldslist = [f.fid['DDHLFA'] for f in fs if isinstance(f, PointField)] elif fieldtype == 'Misc': fieldslist = [f.fid['DDHLFA'] for f in fs if isinstance(f, MiscField)] else: raise epygramError("unknown fieldtype for DDHLFA: " + fieldtype) return fieldslist
@LFA._openbeforedelayed
[docs] def readfield(self, fieldname, getdata=True, footprints_builder=config.use_footprints_as_builder): """ Reads a field in resource. - Documentation fields ('INDICE EXPERIENCE', 'DATE', 'DOCFICHIER', 'ECHEANCE', 'DOCDnnn') are returned as :class:`epygram.formats.LFA` returns. - Profile/surface fields are returned as a :class:`epygram.base.FieldSet` of 1D/Point fields, one for each domain. - *footprints_builder*: if *True*, uses footprints.proxy to build fields. Defaults to False for performance reasons. """ if footprints_builder: field_builder = footprints.proxy.field geom_builder = footprints.proxy.geometry else: if fieldname[0] in ('S', 'G'): field_builder = PointField geom_builder = PointGeometry else: field_builder = V1DField geom_builder = V1DGeometry field_from_LFA = super(DDHLFA, self).readfield(fieldname, getdata=getdata) if fieldname in ('INDICE EXPERIENCE', 'DATE', 'DOCFICHIER', 'ECHEANCE')\ or fieldname[0:4] == 'DOCD': # documentation field_from_LFA._attributes['fid'] = FPDict({'DDHLFA':field_from_LFA.fid['LFA']}) toreturn = field_from_LFA else: # true field validity = FieldValidity(basis=self.validity.getbasis()) # distinction between initial, final and integrated fields if fieldname[0:2] == 'SV': validity.set(date_time=self.validity.get()) elif fieldname[0:2] == 'SF': validity.set(date_time=self.validity.get(), cumulativeduration=self.validity.cumulativeduration()) elif fieldname[0] in ('V', 'S'): if fieldname[-1] == '0': validity.set(date_time=self.validity.getbasis()) elif fieldname[-1] == '1': validity.set(date_time=self.validity.get()) elif fieldname[0] in ('T', 'F', 'G') or fieldname == 'PPP': validity.set(date_time=self.validity.get(), cumulativeduration=self.validity.cumulativeduration()) toreturn = FieldSet() if fieldname[0] in ('S', 'G'): # surface fields for d in range(self.domains['number']): if getdata: value = field_from_LFA.data[d] location = self.domains['geometry'][d] pgeometry = geom_builder(structure='Point', hlocation=FPDict(location)) field = field_builder(fid=FPDict({self.format:fieldname}), geometry=pgeometry, validity=validity, processtype=self.processtype) if getdata: field.setdata(value) toreturn.append(field) else: # profile fields if not getdata: fieldlevels = arpifs4py.wlfacas(self._unit, fieldname)[1]\ / self.domains['number'] else: fieldlevels = len(field_from_LFA.data)\ / self.domains['number'] if fieldlevels == self.levels['VT']: position_on_grid = 'mass' gridposition = 'mass' gridsize = self.levels['VT'] elif fieldlevels == self.levels['F']: position_on_grid = 'flux' gridposition = 'flux' gridsize = self.levels['F'] for d in range(self.domains['number']): domain = self.domains['geometry'][d] if fieldlevels == self.levels['VT']: if validity.term('IntHours') == 0: pressure_vertical_grid = self.domains['vertical_grid'][d]['masslevels_pressure_init'] else: pressure_vertical_grid = self.domains['vertical_grid'][d]['masslevels_pressure_term'] elif fieldlevels == self.levels['F']: if validity.term('IntHours') == 0: pressure_vertical_grid = self.domains['vertical_grid'][d]['fluxlevels_pressure_init'] else: pressure_vertical_grid = self.domains['vertical_grid'][d]['fluxlevels_pressure_term'] vgeometry = geom_builder(structure='V1D', coordinate='pressure', grid=FPDict({'gridposition':gridposition, 'gridsize':gridsize, 'fieldlevels':fieldlevels, 'levels':tuple(pressure_vertical_grid)}), position_on_grid=position_on_grid, hlocation=FPDict(domain)) if getdata: profile = field_from_LFA.data[d * fieldlevels:(d + 1) * fieldlevels] field = field_builder(fid=FPDict({self.format:fieldname}), geometry=vgeometry, validity=validity, processtype=self.processtype) if getdata: field.setdata(profile) toreturn.append(field) return toreturn
[docs] def readfields(self, requestedfields, getdata=True): """ Inactivation of readfields because readfield already returns a FieldSet. """ raise epygramError("readfields() method disabled for DDHLFA.") ################# # DDH interface # #################
@LFA._openbeforedelayed
[docs] def what(self, out, sortfields=False): """ Writes in file a summary of the contents of the DDHLFA. Args: \n - *out*: the output open file-like object (duck-typing: *out*.write() only is needed). - *sortfields*: **True** if the fields have to be sorted by type. """ firstcolumn_width = 50 secondcolumn_width = 16 sepline = '{:-^{width}}'.format('', width=firstcolumn_width + \ secondcolumn_width + 1) + '\n' domains = self.domains mass_vertical_levels = self.readfield('VPP0')[0].geometry.grid['gridsize'] listoffields = self.listfields() validity = self.validity def write_formatted(dest, label, value): dest.write('{:<{width}}'.format(label, width=firstcolumn_width)\ + ':' \ + '{:>{width}}'.format(str(value), width=secondcolumn_width)\ + '\n') def write_formatted_col(dest, label, value): dest.write('{:>{width}}'.format(label, width=firstcolumn_width)\ + ':' \ + '{:>{width}}'.format(str(value), width=secondcolumn_width)\ + '\n') def write_formatted_fields(dest, label, value, compression=None): if compression == None: dest.write('{:<{width}}'.format(label, width=20)\ + ':' \ + '{:^{width}}'.format(str(value), width=10)\ + '\n') else: line = '{:<{width}}'.format(label, width=20) \ + ':' \ + '{:^{width}}'.format(str(value), width=10) \ + ':' \ + compression \ + '\n' dest.write(line) # Write out out.write("### IDENTIFIER (INDICE EXPERIENCE): " + self.xpid + "\n") out.write("\n") out.write("################\n") out.write("### VALIDITY ###\n") out.write("################\n") write_formatted(out, "Validity", validity.get()) write_formatted(out, "Basis", validity.getbasis()) write_formatted(out, "Term", validity.term()) write_formatted(out, "Duration for cumulative quantities", validity.cumulativeduration()) write_formatted(out, "Kind of producting process", self.processtype) out.write(sepline) out.write("\n") out.write("###############\n") out.write("### DOMAINS ###\n") out.write("###############\n") out.write("Domain(s): " + str(domains['number']) + " " + \ domains['geometry'][0]['type'] + "\n") for d in range(domains['number']): geom = domains['geometry'][d] if geom['type'] != 'globe': out.write("# " + str(d + 1) + ":\n") if geom['type'] == 'ij_point': write_formatted(out, "Longitude", geom['lon'].get('degrees')) write_formatted(out, "Latitude", geom['lat'].get('degrees')) write_formatted(out, "Point index (X)", geom['jlon']) write_formatted(out, "Point index (Y)", geom['jgl']) elif geom['type'] == 'point': write_formatted(out, "Longitude", geom['lon'].get('degrees')) write_formatted(out, "Latitude", geom['lat'].get('degrees')) write_formatted(out, "Point index (X)", geom['jlon']) write_formatted(out, "Point index (Y)", geom['jgl']) write_formatted(out, "User Longitude", geom['user_lon'].get('degrees')) write_formatted(out, "User Latitude", geom['user_lat'].get('degrees')) elif geom['type'] in ('quadrilateral', 'rectangle'): write_formatted(out, "Longitude 1", geom['lon1'].get('degrees')) write_formatted(out, "Latitude 1", geom['lat1'].get('degrees')) write_formatted(out, "Longitude 2", geom['lon2'].get('degrees')) write_formatted(out, "Latitude 2", geom['lat2'].get('degrees')) write_formatted(out, "Longitude 3", geom['lon3'].get('degrees')) write_formatted(out, "Latitude 3", geom['lat3'].get('degrees')) write_formatted(out, "Longitude 4", geom['lon4'].get('degrees')) write_formatted(out, "Latitude 4", geom['lat4'].get('degrees')) elif geom['type'] == 'zonal_bands': write_formatted(out, "Center Latitude", geom['lat'].get('degrees')) out.write(sepline) out.write("\n") out.write("#########################\n") out.write("### VERTICAL GEOMETRY ###\n") out.write("#########################\n") write_formatted(out, "Number of vertical levels for variables", mass_vertical_levels) write_formatted(out, "Number of vertical levels for fluxes", mass_vertical_levels + 1) out.write(sepline) out.write("\n") out.write("######################\n") out.write("### LIST OF FIELDS ###\n") out.write("######################\n") numfields = len(listoffields) out.write("Number: " + str(numfields) + "\n") out.write(sepline) if sortfields: out.write("Documentation:\n") out.write("-------------\n") docfields = [f for f in listoffields if \ f in ('INDICE EXPERIENCE', 'DATE', 'DOCFICHIER', 'ECHEANCE') or f[0:4] == 'DOCD'] for f in docfields: out.write(f + "\n") out.write("-----------------\n") out.write("Vertical profiles:\n") out.write("-----------------\n") profilefields = [f for f in listoffields if f[0] not in ('S', 'G') and f not in docfields] for f in profilefields: out.write(f + "\n") out.write("--------------\n") out.write("Surface fields:\n") out.write("--------------\n") surfacefields = [f for f in listoffields if f not in docfields and f not in profilefields] for f in surfacefields: out.write(f + "\n") else: for f in listoffields: out.write(f + "\n") out.write(sepline)
@LFA._openbeforedelayed def _read_geometry(self): """ Reads DOCFICHIER and DOCDnnn fields, to fill-in domains and levels attributes. Cf. J.M. Piriou's documentation about DDH. """ domains = {} DOCFICHIER = self.readfield('DOCFICHIER') vpp_init = super(DDHLFA, self).readfield('VPP0') vpp_term = super(DDHLFA, self).readfield('VPP1') NFLEV = DOCFICHIER.data[5] self._attributes['levels'] = FPDict({'VT':NFLEV, 'F':NFLEV + 1}) if DOCFICHIER.data[0] == 1: domains['type'] = 'limited_area_domains' elif DOCFICHIER.data[0] == 5: domains['type'] = 'global_domain' elif DOCFICHIER.data[0] == 6: domains['type'] = 'zonal_bands' else: raise NotImplementedError("DOCFICHIER[0] not among (1,5,6).") domains['number'] = DOCFICHIER.data[14] domains['geometry'] = [] domains['vertical_grid'] = [] for d in range(1, domains['number'] + 1): DOCD = self.readfield('DOCD' + '{:0>{width}}'.format(str(d), width=3)) geom = {} if DOCD.data[10] == 1.: geom['type'] = 'ij_point' geom['lon'] = Angle(DOCD.data[2], 'radians') geom['lat'] = Angle(math.asin(DOCD.data[3]), 'radians') geom['jlon'] = int(DOCD.data[4]) geom['jlgl'] = int(DOCD.data[5]) elif DOCD.data[10] == 4.: geom['type'] = 'point' geom['lon'] = Angle(DOCD.data[2], 'radians') geom['lat'] = Angle(math.asin(DOCD.data[3]), 'radians') geom['jlon'] = int(DOCD.data[4]) geom['jgl'] = int(DOCD.data[5]) geom['user_lon'] = Angle(DOCD.data[6], 'radians') geom['user_lat'] = Angle(math.asin(DOCD.data[7]), 'radians') elif DOCD.data[10] == 2.: geom['type'] = 'quadrilateral' geom['lon1'] = Angle(DOCD.data[2], 'radians') geom['lat1'] = Angle(math.asin(DOCD.data[3]), 'radians') geom['lon2'] = Angle(DOCD.data[4], 'radians') geom['lat2'] = Angle(math.asin(DOCD.data[5]), 'radians') geom['lon3'] = Angle(DOCD.data[6], 'radians') geom['lat3'] = Angle(math.asin(DOCD.data[7]), 'radians') geom['lon4'] = Angle(DOCD.data[8], 'radians') geom['lat4'] = Angle(math.asin(DOCD.data[9]), 'radians') elif DOCD.data[10] == 3.: geom['type'] = 'rectangle' geom['lon1'] = Angle(DOCD.data[2], 'radians') geom['lat1'] = Angle(math.asin(DOCD.data[3]), 'radians') geom['lon2'] = Angle(DOCD.data[4], 'radians') geom['lat2'] = Angle(math.asin(DOCD.data[5]), 'radians') geom['lon3'] = Angle(DOCD.data[6], 'radians') geom['lat3'] = Angle(math.asin(DOCD.data[7]), 'radians') geom['lon4'] = Angle(DOCD.data[8], 'radians') geom['lat4'] = Angle(math.asin(DOCD.data[9]), 'radians') elif DOCD.data[10] == 5.: geom['type'] = 'globe' elif DOCD.data[10] == 6.: geom['type'] = 'zonal_bands' geom['lat'] = Angle(math.asin(DOCD.data[3]), 'radians') geom['band'] = DOCD.data[1] geom['bands_number'] = DOCD.data[2] else: raise ValueError('unknown value: ' + str(DOCD.data[10]) + \ ' for DOCD' + '{:0>{width}}'.format(str(d), width=3)) # vertical grid dm1 = d - 1 vgrid = {} fluxlevels_pressure_init = vpp_init.data[dm1 * self.levels['VT']:(dm1 + 1) * self.levels['VT'] ] * config.g0 fluxlevels_pressure_init = [sum(fluxlevels_pressure_init[0:i]) for i in range(1, len(fluxlevels_pressure_init) + 1)] vgrid['fluxlevels_pressure_init'] = FPList([0.] + fluxlevels_pressure_init) vgrid['masslevels_pressure_init'] = FPList(profiles.flux2masspressures(fluxlevels_pressure_init)) fluxlevels_pressure_term = vpp_term.data[dm1 * self.levels['VT']:(dm1 + 1) * self.levels['VT'] ] * config.g0 fluxlevels_pressure_term = [sum(fluxlevels_pressure_term[0:i]) for i in range(1, len(fluxlevels_pressure_term) + 1)] vgrid['fluxlevels_pressure_term'] = FPList([0.] + fluxlevels_pressure_term) vgrid['masslevels_pressure_term'] = FPList(profiles.flux2masspressures(fluxlevels_pressure_term)) domains['geometry'].append(FPDict(geom)) domains['vertical_grid'].append(FPDict(vgrid)) self._attributes['domains'] = FPDict(domains) @LFA._openbeforedelayed def _read_validity(self): """ Reads DATE and ECHEANCE fields, to fill-in a FieldValidity object. Cf. J.M. Piriou's documentation about DDH. """ DATE = self.readfield('DATE') year = DATE.data[0] month = DATE.data[1] day = DATE.data[2] hour = DATE.data[3] minute = DATE.data[4] processtype = DATE.data[8] if processtype == 0: self.processtype = 'analysis' elif processtype == 1: self.processtype = 'initialization' elif processtype == 10: self.processtype = 'forecast' # cumulated_timesteps = DATE.data[9] basis = datetime.datetime(year, month, day, hour, minute) ECHEANCE = self.readfield('ECHEANCE') term = datetime.timedelta(seconds=int(ECHEANCE.data[0])) cumulativeduration = term self._attributes['validity'] = FieldValidity(basis=basis, term=term, cumulativeduration=cumulativeduration)