#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
DDHLFA:
Contains the class for DDH-in-LFA format.
"""
__all__ = ['DDHLFA']
import math
import datetime
import footprints
from footprints import FPDict, FPList
from epygram import epylog, config, epygramError
from epygram.util import Angle
from epygram.base import FieldValidity, FieldSet
from .LFA import LFA
from epygram import arpifs4py
from epygram.geometries import V1DGeometry, PointGeometry
from epygram.fields import V1DField, PointField, MiscField
from epygram import profiles
[docs]class DDHLFA(LFA):
"""
Class implementing all specificities for DDHLFA resource format.
"""
_footprint = dict(
attr=dict(
format=dict(
values=set(['DDHLFA']),
default='DDHLFA'),
validity=dict(
type=FieldValidity,
optional=True,
info="Describes the temporal validity of the resource."),
domains=dict(
type=FPDict,
optional=True,
info="Describes the domains covered by the resource."),
levels=dict(
type=FPDict,
optional=True,
info="Number of levels for variables/tendencies ('VT') and\
fluxes ('F')."),
xpid=dict(
optional=True,
info="Experiment identifier.")
)
)
def __init__(self, *args, **kwargs):
"""
Constructor. See its footprint for arguments.
"""
self.isopen = False
super(LFA, self).__init__(*args, **kwargs)
if self.openmode != 'r':
raise NotImplementedError("openmode != 'r' for DDHLFA.")
elif self.openmode == 'r':
if self.domains != None or self.validity != None:
epylog.warning(self.container.abspath + ": DDHLFA.__init__():\
domains/validity argument will be ignored with\
this openmode ('r').")
if not self.fmtdelayedopen:
self.open()
[docs] def open(self, openmode=None):
"""
Opens the DDHLFA in Fortran sense, and initializes domains, validity
and vertical geometry.
- *openmode*: optional, to open with a specific openmode, eventually
different from the one specified at initialization.
"""
super(DDHLFA, self).open(openmode=openmode)
if self.openmode == 'r':
# read info
self._attributes['xpid'] = super(DDHLFA, self).readfield('INDICE EXPERIENCE').data[0]
self._read_geometry()
self._read_validity()
else:
raise NotImplementedError("openmode != 'r' for DDHLFA.")
################
# ABOUT FIELDS #
################
[docs] def find_fields_in_resource(self, seed=None, fieldtype=None):
"""
Returns a list of the fields from resource whose name match the given
*seed*.
Args: \n
- *seed*: might be a regular expression, a list of regular expressions
or *None*. If *None* (default), returns the list of all fields in
resource.
- *fieldtype*: optional, among ('V1D', 'Point', 'Misc'). If provided,
filters out the fields not of the given type.
"""
fieldslist = super(DDHLFA, self).find_fields_in_resource(seed=seed)
if fieldtype != None:
fs = FieldSet()
for f in fieldslist:
fld = self.readfield(f, getdata=False)
if isinstance(fld, FieldSet):
fs.append(fld[0])
else:
fs.append(fld)
if fieldtype == 'V1D':
fieldslist = [f.fid['DDHLFA'] for f in fs
if isinstance(f, V1DField)]
elif fieldtype == 'Point':
fieldslist = [f.fid['DDHLFA'] for f in fs
if isinstance(f, PointField)]
elif fieldtype == 'Misc':
fieldslist = [f.fid['DDHLFA'] for f in fs
if isinstance(f, MiscField)]
else:
raise epygramError("unknown fieldtype for DDHLFA: " + fieldtype)
return fieldslist
@LFA._openbeforedelayed
[docs] def readfield(self, fieldname, getdata=True,
footprints_builder=config.use_footprints_as_builder):
"""
Reads a field in resource.
- Documentation fields ('INDICE EXPERIENCE', 'DATE', 'DOCFICHIER',
'ECHEANCE', 'DOCDnnn') are returned as :class:`epygram.formats.LFA`
returns.
- Profile/surface fields are returned as a
:class:`epygram.base.FieldSet` of 1D/Point fields, one for each
domain.
- *footprints_builder*: if *True*, uses footprints.proxy to build
fields. Defaults to False for performance reasons.
"""
if footprints_builder:
field_builder = footprints.proxy.field
geom_builder = footprints.proxy.geometry
else:
if fieldname[0] in ('S', 'G'):
field_builder = PointField
geom_builder = PointGeometry
else:
field_builder = V1DField
geom_builder = V1DGeometry
field_from_LFA = super(DDHLFA, self).readfield(fieldname,
getdata=getdata)
if fieldname in ('INDICE EXPERIENCE', 'DATE', 'DOCFICHIER', 'ECHEANCE')\
or fieldname[0:4] == 'DOCD':
# documentation
field_from_LFA._attributes['fid'] = FPDict({'DDHLFA':field_from_LFA.fid['LFA']})
toreturn = field_from_LFA
else:
# true field
validity = FieldValidity(basis=self.validity.getbasis())
# distinction between initial, final and integrated fields
if fieldname[0:2] == 'SV':
validity.set(date_time=self.validity.get())
elif fieldname[0:2] == 'SF':
validity.set(date_time=self.validity.get(),
cumulativeduration=self.validity.cumulativeduration())
elif fieldname[0] in ('V', 'S'):
if fieldname[-1] == '0':
validity.set(date_time=self.validity.getbasis())
elif fieldname[-1] == '1':
validity.set(date_time=self.validity.get())
elif fieldname[0] in ('T', 'F', 'G') or fieldname == 'PPP':
validity.set(date_time=self.validity.get(),
cumulativeduration=self.validity.cumulativeduration())
toreturn = FieldSet()
if fieldname[0] in ('S', 'G'):
# surface fields
for d in range(self.domains['number']):
if getdata:
value = field_from_LFA.data[d]
location = self.domains['geometry'][d]
pgeometry = geom_builder(structure='Point',
hlocation=FPDict(location))
field = field_builder(fid=FPDict({self.format:fieldname}),
geometry=pgeometry,
validity=validity,
processtype=self.processtype)
if getdata:
field.setdata(value)
toreturn.append(field)
else:
# profile fields
if not getdata:
fieldlevels = arpifs4py.wlfacas(self._unit, fieldname)[1]\
/ self.domains['number']
else:
fieldlevels = len(field_from_LFA.data)\
/ self.domains['number']
if fieldlevels == self.levels['VT']:
position_on_grid = 'mass'
gridposition = 'mass'
gridsize = self.levels['VT']
elif fieldlevels == self.levels['F']:
position_on_grid = 'flux'
gridposition = 'flux'
gridsize = self.levels['F']
for d in range(self.domains['number']):
domain = self.domains['geometry'][d]
if fieldlevels == self.levels['VT']:
if validity.term('IntHours') == 0:
pressure_vertical_grid = self.domains['vertical_grid'][d]['masslevels_pressure_init']
else:
pressure_vertical_grid = self.domains['vertical_grid'][d]['masslevels_pressure_term']
elif fieldlevels == self.levels['F']:
if validity.term('IntHours') == 0:
pressure_vertical_grid = self.domains['vertical_grid'][d]['fluxlevels_pressure_init']
else:
pressure_vertical_grid = self.domains['vertical_grid'][d]['fluxlevels_pressure_term']
vgeometry = geom_builder(structure='V1D',
coordinate='pressure',
grid=FPDict({'gridposition':gridposition,
'gridsize':gridsize,
'fieldlevels':fieldlevels,
'levels':tuple(pressure_vertical_grid)}),
position_on_grid=position_on_grid,
hlocation=FPDict(domain))
if getdata:
profile = field_from_LFA.data[d * fieldlevels:(d + 1) * fieldlevels]
field = field_builder(fid=FPDict({self.format:fieldname}),
geometry=vgeometry,
validity=validity,
processtype=self.processtype)
if getdata:
field.setdata(profile)
toreturn.append(field)
return toreturn
[docs] def readfields(self, requestedfields, getdata=True):
"""
Inactivation of readfields because readfield already returns a FieldSet.
"""
raise epygramError("readfields() method disabled for DDHLFA.")
#################
# DDH interface #
#################
@LFA._openbeforedelayed
[docs] def what(self, out, sortfields=False):
"""
Writes in file a summary of the contents of the DDHLFA.
Args: \n
- *out*: the output open file-like object (duck-typing: *out*.write()
only is needed).
- *sortfields*: **True** if the fields have to be sorted by type.
"""
firstcolumn_width = 50
secondcolumn_width = 16
sepline = '{:-^{width}}'.format('', width=firstcolumn_width + \
secondcolumn_width + 1) + '\n'
domains = self.domains
mass_vertical_levels = self.readfield('VPP0')[0].geometry.grid['gridsize']
listoffields = self.listfields()
validity = self.validity
def write_formatted(dest, label, value):
dest.write('{:<{width}}'.format(label, width=firstcolumn_width)\
+ ':' \
+ '{:>{width}}'.format(str(value),
width=secondcolumn_width)\
+ '\n')
def write_formatted_col(dest, label, value):
dest.write('{:>{width}}'.format(label, width=firstcolumn_width)\
+ ':' \
+ '{:>{width}}'.format(str(value),
width=secondcolumn_width)\
+ '\n')
def write_formatted_fields(dest, label, value, compression=None):
if compression == None:
dest.write('{:<{width}}'.format(label, width=20)\
+ ':' \
+ '{:^{width}}'.format(str(value), width=10)\
+ '\n')
else:
line = '{:<{width}}'.format(label, width=20) \
+ ':' \
+ '{:^{width}}'.format(str(value), width=10) \
+ ':' \
+ compression \
+ '\n'
dest.write(line)
# Write out
out.write("### IDENTIFIER (INDICE EXPERIENCE): " + self.xpid + "\n")
out.write("\n")
out.write("################\n")
out.write("### VALIDITY ###\n")
out.write("################\n")
write_formatted(out, "Validity", validity.get())
write_formatted(out, "Basis", validity.getbasis())
write_formatted(out, "Term", validity.term())
write_formatted(out, "Duration for cumulative quantities",
validity.cumulativeduration())
write_formatted(out, "Kind of producting process", self.processtype)
out.write(sepline)
out.write("\n")
out.write("###############\n")
out.write("### DOMAINS ###\n")
out.write("###############\n")
out.write("Domain(s): " + str(domains['number']) + " " + \
domains['geometry'][0]['type'] + "\n")
for d in range(domains['number']):
geom = domains['geometry'][d]
if geom['type'] != 'globe':
out.write("# " + str(d + 1) + ":\n")
if geom['type'] == 'ij_point':
write_formatted(out, "Longitude", geom['lon'].get('degrees'))
write_formatted(out, "Latitude", geom['lat'].get('degrees'))
write_formatted(out, "Point index (X)", geom['jlon'])
write_formatted(out, "Point index (Y)", geom['jgl'])
elif geom['type'] == 'point':
write_formatted(out, "Longitude", geom['lon'].get('degrees'))
write_formatted(out, "Latitude", geom['lat'].get('degrees'))
write_formatted(out, "Point index (X)", geom['jlon'])
write_formatted(out, "Point index (Y)", geom['jgl'])
write_formatted(out, "User Longitude",
geom['user_lon'].get('degrees'))
write_formatted(out, "User Latitude",
geom['user_lat'].get('degrees'))
elif geom['type'] in ('quadrilateral', 'rectangle'):
write_formatted(out, "Longitude 1", geom['lon1'].get('degrees'))
write_formatted(out, "Latitude 1", geom['lat1'].get('degrees'))
write_formatted(out, "Longitude 2", geom['lon2'].get('degrees'))
write_formatted(out, "Latitude 2", geom['lat2'].get('degrees'))
write_formatted(out, "Longitude 3", geom['lon3'].get('degrees'))
write_formatted(out, "Latitude 3", geom['lat3'].get('degrees'))
write_formatted(out, "Longitude 4", geom['lon4'].get('degrees'))
write_formatted(out, "Latitude 4", geom['lat4'].get('degrees'))
elif geom['type'] == 'zonal_bands':
write_formatted(out, "Center Latitude",
geom['lat'].get('degrees'))
out.write(sepline)
out.write("\n")
out.write("#########################\n")
out.write("### VERTICAL GEOMETRY ###\n")
out.write("#########################\n")
write_formatted(out, "Number of vertical levels for variables",
mass_vertical_levels)
write_formatted(out, "Number of vertical levels for fluxes",
mass_vertical_levels + 1)
out.write(sepline)
out.write("\n")
out.write("######################\n")
out.write("### LIST OF FIELDS ###\n")
out.write("######################\n")
numfields = len(listoffields)
out.write("Number: " + str(numfields) + "\n")
out.write(sepline)
if sortfields:
out.write("Documentation:\n")
out.write("-------------\n")
docfields = [f for f in listoffields if \
f in ('INDICE EXPERIENCE', 'DATE', 'DOCFICHIER',
'ECHEANCE') or f[0:4] == 'DOCD']
for f in docfields:
out.write(f + "\n")
out.write("-----------------\n")
out.write("Vertical profiles:\n")
out.write("-----------------\n")
profilefields = [f for f in listoffields
if f[0] not in ('S', 'G') and f not in docfields]
for f in profilefields:
out.write(f + "\n")
out.write("--------------\n")
out.write("Surface fields:\n")
out.write("--------------\n")
surfacefields = [f for f in listoffields
if f not in docfields and f not in profilefields]
for f in surfacefields:
out.write(f + "\n")
else:
for f in listoffields:
out.write(f + "\n")
out.write(sepline)
@LFA._openbeforedelayed
def _read_geometry(self):
"""
Reads DOCFICHIER and DOCDnnn fields, to fill-in domains and levels
attributes. Cf. J.M. Piriou's documentation about DDH.
"""
domains = {}
DOCFICHIER = self.readfield('DOCFICHIER')
vpp_init = super(DDHLFA, self).readfield('VPP0')
vpp_term = super(DDHLFA, self).readfield('VPP1')
NFLEV = DOCFICHIER.data[5]
self._attributes['levels'] = FPDict({'VT':NFLEV, 'F':NFLEV + 1})
if DOCFICHIER.data[0] == 1:
domains['type'] = 'limited_area_domains'
elif DOCFICHIER.data[0] == 5:
domains['type'] = 'global_domain'
elif DOCFICHIER.data[0] == 6:
domains['type'] = 'zonal_bands'
else:
raise NotImplementedError("DOCFICHIER[0] not among (1,5,6).")
domains['number'] = DOCFICHIER.data[14]
domains['geometry'] = []
domains['vertical_grid'] = []
for d in range(1, domains['number'] + 1):
DOCD = self.readfield('DOCD' + '{:0>{width}}'.format(str(d),
width=3))
geom = {}
if DOCD.data[10] == 1.:
geom['type'] = 'ij_point'
geom['lon'] = Angle(DOCD.data[2], 'radians')
geom['lat'] = Angle(math.asin(DOCD.data[3]), 'radians')
geom['jlon'] = int(DOCD.data[4])
geom['jlgl'] = int(DOCD.data[5])
elif DOCD.data[10] == 4.:
geom['type'] = 'point'
geom['lon'] = Angle(DOCD.data[2], 'radians')
geom['lat'] = Angle(math.asin(DOCD.data[3]), 'radians')
geom['jlon'] = int(DOCD.data[4])
geom['jgl'] = int(DOCD.data[5])
geom['user_lon'] = Angle(DOCD.data[6], 'radians')
geom['user_lat'] = Angle(math.asin(DOCD.data[7]), 'radians')
elif DOCD.data[10] == 2.:
geom['type'] = 'quadrilateral'
geom['lon1'] = Angle(DOCD.data[2], 'radians')
geom['lat1'] = Angle(math.asin(DOCD.data[3]), 'radians')
geom['lon2'] = Angle(DOCD.data[4], 'radians')
geom['lat2'] = Angle(math.asin(DOCD.data[5]), 'radians')
geom['lon3'] = Angle(DOCD.data[6], 'radians')
geom['lat3'] = Angle(math.asin(DOCD.data[7]), 'radians')
geom['lon4'] = Angle(DOCD.data[8], 'radians')
geom['lat4'] = Angle(math.asin(DOCD.data[9]), 'radians')
elif DOCD.data[10] == 3.:
geom['type'] = 'rectangle'
geom['lon1'] = Angle(DOCD.data[2], 'radians')
geom['lat1'] = Angle(math.asin(DOCD.data[3]), 'radians')
geom['lon2'] = Angle(DOCD.data[4], 'radians')
geom['lat2'] = Angle(math.asin(DOCD.data[5]), 'radians')
geom['lon3'] = Angle(DOCD.data[6], 'radians')
geom['lat3'] = Angle(math.asin(DOCD.data[7]), 'radians')
geom['lon4'] = Angle(DOCD.data[8], 'radians')
geom['lat4'] = Angle(math.asin(DOCD.data[9]), 'radians')
elif DOCD.data[10] == 5.:
geom['type'] = 'globe'
elif DOCD.data[10] == 6.:
geom['type'] = 'zonal_bands'
geom['lat'] = Angle(math.asin(DOCD.data[3]), 'radians')
geom['band'] = DOCD.data[1]
geom['bands_number'] = DOCD.data[2]
else:
raise ValueError('unknown value: ' + str(DOCD.data[10]) + \
' for DOCD' + '{:0>{width}}'.format(str(d),
width=3))
# vertical grid
dm1 = d - 1
vgrid = {}
fluxlevels_pressure_init = vpp_init.data[dm1 * self.levels['VT']:(dm1 + 1) * self.levels['VT']
] * config.g0
fluxlevels_pressure_init = [sum(fluxlevels_pressure_init[0:i])
for i in range(1, len(fluxlevels_pressure_init) + 1)]
vgrid['fluxlevels_pressure_init'] = FPList([0.] + fluxlevels_pressure_init)
vgrid['masslevels_pressure_init'] = FPList(profiles.flux2masspressures(fluxlevels_pressure_init))
fluxlevels_pressure_term = vpp_term.data[dm1 * self.levels['VT']:(dm1 + 1) * self.levels['VT']
] * config.g0
fluxlevels_pressure_term = [sum(fluxlevels_pressure_term[0:i])
for i in range(1, len(fluxlevels_pressure_term) + 1)]
vgrid['fluxlevels_pressure_term'] = FPList([0.] + fluxlevels_pressure_term)
vgrid['masslevels_pressure_term'] = FPList(profiles.flux2masspressures(fluxlevels_pressure_term))
domains['geometry'].append(FPDict(geom))
domains['vertical_grid'].append(FPDict(vgrid))
self._attributes['domains'] = FPDict(domains)
@LFA._openbeforedelayed
def _read_validity(self):
"""
Reads DATE and ECHEANCE fields, to fill-in a FieldValidity object.
Cf. J.M. Piriou's documentation about DDH.
"""
DATE = self.readfield('DATE')
year = DATE.data[0]
month = DATE.data[1]
day = DATE.data[2]
hour = DATE.data[3]
minute = DATE.data[4]
processtype = DATE.data[8]
if processtype == 0:
self.processtype = 'analysis'
elif processtype == 1:
self.processtype = 'initialization'
elif processtype == 10:
self.processtype = 'forecast'
# cumulated_timesteps = DATE.data[9]
basis = datetime.datetime(year, month, day, hour, minute)
ECHEANCE = self.readfield('ECHEANCE')
term = datetime.timedelta(seconds=int(ECHEANCE.data[0]))
cumulativeduration = term
self._attributes['validity'] = FieldValidity(basis=basis,
term=term,
cumulativeduration=cumulativeduration)