#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
FA:
Contains the class for FA format.
Plus a function to guess a field type given its name.
"""
__all__ = ['FA', 'inquire_field_dict']
import datetime
import os
import copy
import numpy
import math
import re
from footprints import FPDict, FPList, proxy as fpx
from epygram import config, epygramError, util, epylog, profiles
from epygram.util import Angle
from epygram.base import Resource, FieldSet, FieldValidity
from epygram import arpifs4py
from epygram.geometries import H2DGeometry, SpectralGeometry
from epygram.fields import MiscField, H2DField
[docs]def inquire_field_dict(fieldname):
"""
Returns the info contained in the FA _field_dict for the requested field.
"""
matching_field = None
for fd in FA._field_dict:
dictitem = fd['name']
pattern = re.subn('\.', r'\.', dictitem)[0] # protect '.'
pattern = pattern.replace('?', '.') # change unix '?' to python '.'
pattern = pattern.replace('*', '.*') # change unix '*' to python '.*'
pattern += '(?!.)'
if re.match(pattern, fieldname):
matching_field = fd
break
if matching_field == None:
epylog.info("field '" + fieldname + "' is not referenced in\
Field_Dict_FA. Assume its type being a MiscField.")
matching_field = {'name':fieldname, 'type':'Misc',
'nature':'float', 'dimension':'1'}
return copy.deepcopy(matching_field)
def _gen_headername():
"""Generates a random headername for the FA software."""
import uuid
return str(uuid.uuid4()).replace('-', '')[0:16]
def _create_header_from_geometry(geometry, spectral_geometry=None):
"""
Create a header and returns its name, from a H2DGeometry and (preferably)
a SpectralGeometry.
Args:
- geometry: a H2DGeometry instance, from which the header is set.
- spectral_geometry: optional, a SpectralGeometry instance, from which
truncation is set in header. If not provided, in LAM case the X/Y
truncations are computed from field dimension, as linear grid.
In global case, an error is raised.
Contains a call to arpifs4py.wfacade (wrapper for FACADE routine).
"""
if not isinstance(geometry, H2DGeometry):
raise epygramError("geometry must be a H2DGeometry instance.")
if spectral_geometry != None and\
not isinstance(spectral_geometry, SpectralGeometry):
raise epygramError("spectral_geometry must be a SpectralGeometry\
instance.")
headername = _gen_headername()
CDNOMC = headername
JPXPAH = FA._FAsoftware_cst['JPXPAH']
JPXIND = FA._FAsoftware_cst['JPXIND']
if geometry.rectangular_grid:
if spectral_geometry != None:
truncation_in_X = spectral_geometry.truncation['in_X']
truncation_in_Y = spectral_geometry.truncation['in_Y']
else:
# default: linear truncation...
truncation_in_X = numpy.floor((geometry.dimensions['X'] - 1) / 2)
truncation_in_Y = numpy.floor((geometry.dimensions['Y'] - 1) / 2)
# scalars
if geometry.name == 'regular_lonlat':
KTYPTR = -11
else:
KTYPTR = -1 * truncation_in_X
if geometry.name in ('lambert', 'polar_stereographic'):
PSLAPO = geometry.projection['center_lon'].get('radians') - \
geometry.projection['reference_lon'].get('radians')
else:
PSLAPO = 0.0
PCLOPO = 0.0
PSLOPO = 0.0
if geometry.name == 'academic':
PCODIL = -1.0
else:
PCODIL = 0.0
if geometry.name == 'regular_lonlat':
KTRONC = 11
else:
KTRONC = truncation_in_Y
KNLATI = geometry.dimensions['Y']
KNXLON = geometry.dimensions['X']
# KNLOPA
KNLOPA = numpy.zeros(JPXPAH, dtype=numpy.int64)
KNLOPA[0] = max(0, min(11, truncation_in_X, truncation_in_Y) - 1)
if geometry.name == 'regular_lonlat':
corners = geometry.gimme_corners_ij()
KNLOPA[1] = 0
KNLOPA[2] = 1 + corners['ll'][0]
KNLOPA[3] = 1 + corners['ur'][0]
KNLOPA[4] = 1 + corners['ll'][1]
KNLOPA[5] = 1 + corners['ur'][1]
KNLOPA[6] = 8
KNLOPA[7] = 8
else:
corners = geometry.gimme_corners_ij(subzone='CI')
if geometry.grid['LAMzone'] == 'CIE':
KNLOPA[1] = 1
KNLOPA[2] = 1 + corners['ll'][0]
KNLOPA[3] = 1 + corners['ur'][0]
KNLOPA[4] = 1 + corners['ll'][1]
KNLOPA[5] = 1 + corners['ur'][1]
KNLOPA[6] = geometry.dimensions['X_Iwidth']
KNLOPA[7] = geometry.dimensions['Y_Iwidth']
KNOZPA = numpy.zeros(JPXIND, dtype=numpy.int64)
# PSINLA
PSINLA = numpy.zeros(max((1 + geometry.dimensions['Y']) / 2, 18))
PSINLA[0] = -1
if geometry.name == 'regular_lonlat':
PSINLA[1] = -9
PSINLA[2] = geometry.grid['center_lon'].get('radians')
PSINLA[3] = geometry.grid['center_lat'].get('radians')
PSINLA[4] = geometry.grid['center_lon'].get('radians')
PSINLA[5] = geometry.grid['center_lat'].get('radians')
PSINLA[6] = geometry.grid['X_resolution'].get('radians')
PSINLA[7] = geometry.grid['Y_resolution'].get('radians')
PSINLA[8] = geometry.grid['X_resolution'].get('radians') * geometry.dimensions['X']
PSINLA[9] = geometry.grid['Y_resolution'].get('radians') * geometry.dimensions['Y']
PSINLA[10] = 0.0
PSINLA[11] = 0.0
elif geometry.projected_geometry:
if geometry.secant_projection:
raise epygramError("cannot write secant projected geometries\
in FA.")
PSINLA[1] = geometry.projection['reference_lat'].get('cos_sin')[1]
PSINLA[6] = geometry.grid['X_resolution']
PSINLA[7] = geometry.grid['Y_resolution']
PSINLA[8] = geometry.grid['X_resolution'] * geometry.dimensions['X']
PSINLA[9] = geometry.grid['Y_resolution'] * geometry.dimensions['Y']
PSINLA[10] = 2 * math.pi / PSINLA[8]
PSINLA[11] = 2 * math.pi / PSINLA[9]
PSINLA[2] = geometry.projection['reference_lon'].get('radians')
PSINLA[3] = geometry.projection['reference_lat'].get('radians')
PSINLA[4] = geometry.projection['center_lon'].get('radians')
PSINLA[5] = geometry.projection['center_lat'].get('radians')
if geometry.name != 'academic':
PSINLA[12] = Angle(geometry.ij2ll(*corners['ll'])[0], 'degrees').get('radians')
PSINLA[13] = Angle(geometry.ij2ll(*corners['ll'])[1], 'degrees').get('radians')
PSINLA[14] = Angle(geometry.ij2ll(*corners['ur'])[0], 'degrees').get('radians')
PSINLA[15] = Angle(geometry.ij2ll(*corners['ur'])[1], 'degrees').get('radians')
else:
PSINLA[6] = geometry.grid['X_resolution']
PSINLA[7] = geometry.grid['Y_resolution']
PSINLA[8] = geometry.grid['X_resolution'] * geometry.dimensions['X']
PSINLA[9] = geometry.grid['Y_resolution'] * geometry.dimensions['Y']
PSINLA[10] = 2 * math.pi / PSINLA[8]
PSINLA[11] = 2 * math.pi / PSINLA[9]
else: # global
if geometry.name == 'reduced_gauss':
KTYPTR = 1
PSLAPO = 0. # as observed in files; why not 1. ?
PCLOPO = 0. # as observed in files; why not 1. ?
PSLOPO = 0.
elif geometry.name == 'rotated_reduced_gauss':
KTYPTR = 2
PSLAPO = geometry.grid['pole_lat'].get('cos_sin')[1]
PCLOPO = geometry.grid['pole_lon'].get('cos_sin')[0]
PSLOPO = geometry.grid['pole_lon'].get('cos_sin')[1]
PCODIL = geometry.grid['dilatation_coef']
if spectral_geometry != None:
KTRONC = spectral_geometry.truncation['max']
else:
# default: linear truncation...
KTRONC = (geometry.dimensions['max_lon_number'] - 1) / 2
KNLATI = geometry.dimensions['lat_number']
KNXLON = geometry.dimensions['max_lon_number']
KNLOPA = numpy.zeros(JPXPAH, dtype=numpy.int64)
KNLOPA[0:KNLATI / 2] = geometry.dimensions['lon_number_by_lat'][0:KNLATI / 2]
KNOZPA = numpy.zeros(JPXIND, dtype=numpy.int64)
if spectral_geometry != None:
KNOZPA[0:KNLATI / 2] = spectral_geometry.truncation['max_zonal_wavenumber_by_lat'][0:KNLATI / 2]
else:
KNOZPA[0:KNLATI / 2] = arpifs4py.w_trans_inq(geometry.dimensions['lat_number'],
KTRONC,
len(geometry.dimensions['lon_number_by_lat']),
numpy.array(geometry.dimensions['lon_number_by_lat']),
config.KNUMMAXRESOL)[2][:]
PSINLA = numpy.zeros((1 + geometry.dimensions['lat_number']) / 2,
dtype=numpy.float64)
PSINLA[0:KNLATI / 2] = numpy.array([s.get('cos_sin')[1] for s in
geometry.grid['latitudes'][0:KNLATI / 2]])
# vertical geometry
try:
KNIVER = geometry.vcoordinate['levels_number']
except KeyError:
KNIVER = 1
PREFER = FA.reference_pressure
try:
PAHYBR = numpy.array([0.] + geometry.vcoordinate['Ai']) / FA.reference_pressure
except KeyError:
PAHYBR = numpy.array([0., 0.])
try:PBHYBR = numpy.array([0.] + geometry.vcoordinate['Bi'])
except KeyError:
PBHYBR = numpy.array([0., 1.])
LDGARD = True
arpifs4py.wfacade(CDNOMC,
KTYPTR, PSLAPO, PCLOPO, PSLOPO,
PCODIL, KTRONC,
KNLATI, KNXLON,
len(KNLOPA), KNLOPA,
len(KNOZPA), KNOZPA,
len(PSINLA), PSINLA,
KNIVER, PREFER, PAHYBR, PBHYBR,
LDGARD)
return headername
[docs]class FA(Resource):
"""Class implementing all specificities for FA resource format."""
_footprint = dict(
attr=dict(
format=dict(
values=set(['FA']),
default='FA'),
headername=dict(
optional=True,
info="With openmode == 'w', name of an existing header, for the\
new FA to use its geometry."),
validity=dict(
type=FieldValidity,
optional=True,
info="With openmode == 'w', describes the temporal validity of\
the resource."),
default_compression=dict(
type=FPDict,
optional=True,
info="Default compression for writing fields in resource."),
cdiden=dict(
optional=True,
default='unknown',
access='rwx',
info="With openmode == 'w', identifies the FA by a keyword,\
usually the model abbreviation."),
processtype=dict(
optional=True,
default='analysis',
access='rwx',
info="With openmode == 'w', identifies the processus that\
produced the resource.")
)
)
# the Field Dictionary gathers info about fields nature
CSV_field_dictionaries = config.CSV_field_dictionaries_FA
# syntax: _field_dict = [{'name':'fieldname1', 'type':'...', ...},
# {'name':'fieldname2', 'type':'...', ...}, ...]
_field_dict = []
# reference pressure coefficient for converting hybrid A coefficients in FA
reference_pressure = config.FA_default_reference_pressure
@classmethod
def _FAsoft_init(cls):
"""Initialize the FA software maximum dimensions."""
cls._FAsoftware_cst = dict(zip(('JPXPAH', 'JPXIND', 'JPXGEO', 'JPXNIV'),
arpifs4py.get_facst()))
@classmethod
def _read_field_dict(cls, fd_abspath):
"""Reads the CSV fields dictionary of the format."""
field_dict, file_priority = util.read_CSV_as_dict(fd_abspath)
if file_priority == 'main':
cls._field_dict = field_dict
elif file_priority == 'underwrite':
for fd in field_dict:
found = False
for cfd in cls._field_dict:
found = fd['name'] == cfd['name']
if not found:
cls._field_dict.append(fd)
elif file_priority == 'overwrite':
for cfd in cls._field_dict:
found = False
for fd in field_dict:
found = fd['name'] == cfd['name']
if not found:
field_dict.append(cfd)
cls._field_dict = field_dict
def __init__(self, *args, **kwargs):
"""Constructor. See its footprint for arguments."""
self.isopen = False
# At creation of the first FA, initialize FA._field_dict
if self._field_dict == []:
self._read_field_dict(self.CSV_field_dictionaries['default'])
if os.path.exists(self.CSV_field_dictionaries['user']):
self._read_field_dict(self.CSV_field_dictionaries['user'])
super(FA, self).__init__(*args, **kwargs)
# Initialization of FA software (if necessary):
if not hasattr(self, '_FAsoftware_cst'):
self._FAsoft_init()
self.fieldscompression = {}
if not self.fmtdelayedopen:
self.open()
[docs] def open(self, geometry=None, spectral_geometry=None, validity=None,
openmode=None):
"""
Opens a FA with ifsaux' FAITOU, and initializes some attributes.
Actually, as FAITOU needs an existing header with 'w' *openmode*,
opening file in 'w' *openmode* will require else an existing header to
which the resource is linked, or to create a header from a geometry via
*_create_header_from_geometry()* function. This explains the eventual
need for *geometry/spectral_geometry/validity* in *open()* method. If
neither headername nor geometry are available, resource is not opened:
the *open()* will be called again at first writing of a field in
resource.
Args: \n
- *geometry*: optional, must be a
:class:`epygram.geometries.H2DGeometry` instance.
- *spectral_geometry*: optional, must be a
:class:`epygram.geometries.SpectralGeometry` instance.
- *validity*: optional, must be a :class:`epygram.base.FieldValidity`
instance.
- *openmode*: optional, to open with a specific openmode, eventually
different from the one specified at initialization.
"""
super(FA, self).open(openmode=openmode)
if self.openmode in ('r', 'a'):
# FA already exists, including geometry and validity
if self.openmode in ('r', 'a') and self.headername == None:
self._attributes['headername'] = _gen_headername()
if geometry != None or validity != None:
epylog.warning(self.container.abspath + ": FA.open():\
geometry/validity argument will be ignored with\
this openmode ('r','a').")
# open, getting logical unit
self._unit = arpifs4py.wfaitou(self.container.abspath, 'OLD',
self.headername)
self.isopen = True
self.empty = False
# read info
self._attributes['cdiden'] = arpifs4py.wfalsif(self._unit)
self._read_geometry()
self._read_validity()
if self.openmode == 'a':
if self.default_compression == None:
self._attributes['default_compression'] = FPDict(self._getrunningcompression())
self._setrunningcompression(**self.default_compression)
elif self.openmode == 'w':
if geometry != None:
if not isinstance(geometry, H2DGeometry):
raise epygramError("validity must be a H2DGeometry instance.")
self._attributes['headername'] = _create_header_from_geometry(geometry, spectral_geometry)
if validity != None:
if not isinstance(validity, FieldValidity):
raise epygramError("validity must be a FieldValidity instance.")
self._attributes['validity'] = validity
if self.headername != None and self.validity != None:
# new FA, with an already existing header and validity
# set geometry from existing header
self._read_geometry()
# open
(self._unit) = arpifs4py.wfaitou(self.container.abspath, 'NEW',
self.headername)
self.isopen = True
self.empty = True
# set FA date
if self.validity.get() != self.validity.getbasis():
self.processtype = 'forecast'
self._set_validity()
# set CDIDEN
arpifs4py.wfautif(self._unit, self.cdiden)
if self.default_compression == None:
self._attributes['default_compression'] = FPDict(config.FA_default_compression)
self._setrunningcompression(**self.default_compression)
elif self.headername == None or self.validity == None:
# a header need to be created prior to opening:
# header definition (then opening) will be done from the
# geometry taken in the first field to be written in resource
pass
[docs] def close(self):
"""Closes a FA with ifsaux' FAIRME."""
if self.isopen:
try:
arpifs4py.wfairme(self._unit, 'KEEP')
except Exception:
raise IOError("closing " + self.container.abspath)
self.isopen = False
# Cleanings
if self.openmode == 'w' and self.empty:
os.remove(self.container.abspath)
################
# ABOUT FIELDS #
################
[docs] def find_fields_in_resource(self, seed=None, fieldtype=None):
"""
Returns a list of the fields from resource whose name match the given
seed.
Args: \n
- *seed*: might be a regular expression, a list of regular expressions
or *None*. If *None* (default), returns the list of all fields in
resource.
- *fieldtype*: optional, among ('H2D', 'Misc'). If provided, filters out
the fields not of the given type.
"""
fieldslist = []
if seed == None:
tmplist = self.listfields()
for f in tmplist:
if fieldtype == None or\
inquire_field_dict(f)['type'] == fieldtype:
fieldslist.append(f)
elif isinstance(seed, str):
tmplist = util.find_re_in_list(seed, self.listfields())
for f in tmplist:
if fieldtype == None or\
inquire_field_dict(f)['type'] == fieldtype:
fieldslist.append(f)
elif isinstance(seed, list):
tmplist = []
for s in seed:
tmplist += util.find_re_in_list(s, self.listfields())
for f in tmplist:
if fieldtype == None or\
inquire_field_dict(f)['type'] == fieldtype:
fieldslist.append(f)
if fieldslist == []:
raise epygramError("no field matching '" + seed + \
"' was found in resource " + \
self.container.abspath)
return fieldslist
[docs] def listfields(self):
"""
Returns a list containing the FA identifiers of all the fields of the
resource.
"""
return super(FA, self).listfields()
@Resource._openbeforedelayed
def _listfields(self):
"""Actual listfields() method for FA."""
records_number = arpifs4py.wlfinaf(self._unit)[0]
arpifs4py.wlfipos(self._unit) # rewind
fieldslist = []
for i in range(records_number):
fieldname = arpifs4py.wlficas(self._unit, True)[0]
if i >= 7 and fieldname != 'DATX-DES-DONNEES':
fieldslist.append(fieldname.strip())
# i >= 7: 7 first fields in LFI are the header ("cadre")
# 8th field added by P.Marguinaud, DATX-DES-DONNEES, to store dates
# with 1-second precision, from cy40t1 onwards.
return fieldslist
[docs] def sortfields(self):
"""
Returns a sorted list of fields with regards to their name and nature,
as a dict of lists.
"""
list3Dparams = []
list3D = []
list2D = []
# final lists
list3Dsp = []
list3Dgp = []
list2Dsp = []
list2Dgp = []
listMisc = []
for f in self.listfields():
info = inquire_field_dict(f)
# separate H2D from Misc
if info['type'] == 'H2D':
# separate 3D from 2D
if info['grib2_surface_type'] in ('119', '100', '103', '109',
'20'):
list3D.append(f)
list3Dparams.append(filter(lambda x: not x.isdigit(),
f[1:]))
else:
list2D.append(f)
else:
listMisc.append(f)
list3Dparams = list(set(list3Dparams))
# separate gp/sp
for f in list2D:
if self.fieldencoding(f)['spectral']:
list2Dsp.append(f)
else:
list2Dgp.append(f)
# sort 2D
list2Dsp.sort()
list2Dgp.sort()
# sort parameters
for p in sorted(list3Dparams):
interlist = []
for f in list3D:
if p in f:
interlist.append(f)
# sort by increasing level
interlist.sort()
# separate gp/sp
if self.fieldencoding(interlist[0])['spectral']:
list3Dsp.extend(interlist)
else:
list3Dgp.extend(interlist)
outlists = {'3D spectral fields':list3Dsp,
'3D gridpoint fields':list3Dgp,
'2D spectral fields':list2Dsp,
'2D gridpoint fields':list2Dgp,
'Misc-fields':listMisc}
return outlists
@Resource._openbeforedelayed
[docs] def fieldencoding(self, fieldname):
"""
Returns a dict containing info about how the field is encoded:
spectral? and compression. Interface to ifsaux' FANION.
"""
try:
(LDCOSP, KNGRIB, KNBITS, KSTRON, KPUILA) = arpifs4py.wfanion(self._unit,
fieldname[0:4],
0,
fieldname[4:])[1:6]
except RuntimeError:
raise RuntimeError("error while calling FANION: " + fieldname + \
" type is probably 'Misc' instead of 'H2D':\
please update Field_Dict_FA.csv accordingly\
(cf. config.py for location of field dict).")
encoding = {'spectral':LDCOSP, 'KNGRIB':KNGRIB, 'KNBITS':KNBITS,
'KSTRON':KSTRON, 'KPUILA':KPUILA}
return encoding
@Resource._openbeforedelayed
[docs] def readfield(self, fieldname, getdata=True,
footprints_builder=config.use_footprints_as_builder):
"""
Reads one field, given its FA name, and returns a Field instance.
Interface to Fortran routines from 'ifsaux'.
Args: \n
- *fieldname*: FA fieldname
- *getdata*: if *False*, only metadata are read, the field do not
contain data.
- *footprints_builder*: if *True*, uses footprints.proxy to build
fields. Defaults to False for performance reasons.
"""
if self.openmode == 'w':
raise epygramError("cannot read fields in resource if with\
openmode == 'w'.")
# Get field info
field_info = inquire_field_dict(fieldname)
if footprints_builder:
builder = fpx.field
else:
if field_info['type'] == 'H2D':
builder = H2DField
elif field_info['type'] == 'Misc':
builder = MiscField
if field_info['type'] == 'H2D':
encoding = self.fieldencoding(fieldname)
# Save compression in FA
compression = {'KNGRIB':encoding['KNGRIB'],
'KNBPDG':encoding['KNBITS'],
'KNBCSP':encoding['KNBITS'],
'KSTRON':encoding['KSTRON'],
'KPUILA':encoding['KPUILA']}
self.fieldscompression[fieldname] = compression
# vertical geometry
vcoordinate = copy.deepcopy(self.geometry.vcoordinate)
vcoordinate.update(field_info)
vcoordinate.pop('type')
vcoordinate.pop('name')
# get level for Sigma/Pressure levels in field name (deeply linked
# to FA naming conventions)
if vcoordinate['grib2_surface_type'] == '119': # hybrid-pressure
vcoordinate['Z'] = int(fieldname[1:4])
elif vcoordinate['grib2_surface_type'] == '100': # isobaric (P)
vcoordinate['Z'] = int(fieldname[1:6])
# problem linked to number of digits:
if vcoordinate['Z'] == 0:
vcoordinate['Z'] = 100000
elif vcoordinate['grib2_surface_type'] == '103': # height
try:
vcoordinate['Z'] = int(fieldname[1:6])
except ValueError:
if 'level' in field_info:
vcoordinate['Z'] = field_info['level']
else:
vcoordinate['Z'] = 255
elif vcoordinate['grib2_surface_type'] == '109': # PV
vcoordinate['Z'] = float(fieldname[1:4]) / 10. # to handle PVU
elif vcoordinate['grib2_surface_type'] == 1: # surface
vcoordinate.pop('Ai')
vcoordinate.pop('Bi')
vcoordinate.pop('ABgrid_position')
vcoordinate.pop('levels')
# Prepare field dimensions
spectral = encoding['spectral']
if spectral:
truncation = self.spectral_geometry.truncation
if 'fourier' in self.spectral_geometry.space:
# LAM
SPdatasize = arpifs4py.w_etrans_inq(self.geometry.dimensions['X'],
self.geometry.dimensions['Y'],
self.geometry.dimensions['X_CIzone'],
self.geometry.dimensions['Y_CIzone'],
truncation['in_X'], truncation['in_Y'],
config.KNUMMAXRESOL)[1]
elif self.spectral_geometry.space == 'legendre':
# Global
total_system_memory = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
memory_needed_for_transforms = truncation['max'] ** 3 / 2 * 8
if memory_needed_for_transforms >= config.prevent_swapping_legendre * total_system_memory:
raise epygramError('Legendre spectral transforms need ' + \
str(int(float(memory_needed_for_transforms) / 1024 ** 2.)) + \
' MB memory, while only ' + \
str(int(float(total_system_memory) / 1024 ** 2.)) + \
' MB is available: SWAPPING prevented !')
SPdatasize = arpifs4py.w_trans_inq(self.geometry.dimensions['lat_number'],
truncation['max'],
len(self.geometry.dimensions['lon_number_by_lat']),
numpy.array(self.geometry.dimensions['lon_number_by_lat']),
config.KNUMMAXRESOL)[1]
SPdatasize *= 2 # complex coefficients
datasize = SPdatasize
spectral_geometry = self.spectral_geometry
else:
if self.geometry.rectangular_grid:
GPdatasize = self.geometry.dimensions['X'] * self.geometry.dimensions['Y']
else:
GPdatasize = sum(self.geometry.dimensions['lon_number_by_lat'])
datasize = GPdatasize
spectral_geometry = None
# Make geometry object
kwargs_geom = dict(structure='H2D',
name=self.geometry.name,
grid=FPDict(self.geometry.grid),
dimensions=FPDict(self.geometry.dimensions),
vcoordinate=FPDict(vcoordinate),
position_on_grid=FPDict(self.geometry.position_on_grid),
geoid=config.FA_default_geoid)
if self.geometry.projected_geometry:
kwargs_geom['projection'] = FPDict(self.geometry.projection)
geometry = fpx.geometry(**kwargs_geom)
# Get data if requested
if getdata:
if field_info['type'] == 'Misc':
field_length = arpifs4py.wlfinfo(self._unit, fieldname)[0]
data = arpifs4py.wfalais(self._unit, fieldname, field_length)
if field_info['dimension'] == '0':
if field_info['nature'] == 'int':
dataOut = data.view('int64')[0]
elif field_info['nature'] == 'str':
dataInt = data.view('int64')
dataOut = ""
for num in dataInt:
dataOut += chr(num)
elif field_info['nature'] == 'bool':
dataOut = bool(data.view('int64')[0])
elif field_info['nature'] == 'float':
dataOut = data[0]
else:
raise NotImplementedError("reading of datatype " + \
field_info['nature'] + ".")
else:
# copy is necessary for garbage collector
if field_info['nature'] == 'int':
dataOut = numpy.copy(data.view('int64')[:])
elif field_info['nature'] == 'float':
dataOut = numpy.copy(data)
elif field_info['nature'] == 'str':
#FIXME: table of int(str) = table of tables ???
dataOut = numpy.copy(data)
elif field_info['nature'] == 'bool':
dataOut = numpy.copy(data.view('bool')[:])
else:
raise NotImplementedError("reading of datatype " + \
field_info['nature'] + " array.")
data = dataOut
elif field_info['type'] == 'H2D':
data = numpy.array(arpifs4py.wfacile(datasize, self._unit,
fieldname[0:4], 0,
fieldname[4:], spectral))
# Create field
if field_info['type'] == 'H2D':
# Create H2D field
field = builder(fid=FPDict({self.format:fieldname}),
geometry=geometry,
validity=self.validity,
spectral_geometry=spectral_geometry,
processtype=self.processtype)
if 'gauss' in self.geometry.name and config.FA_buffered_gauss_grid:
field.geometry._bound_gauss_grid = self.geometry.get_lonlat_grid
elif field_info['type'] == 'Misc':
# Create Misc field
field = builder(fid=FPDict({self.format:fieldname}))
if getdata:
if field_info['type'] == 'H2D' and not field.spectral:
data = geometry.reshape_data(data)
field.setdata(data)
return field
[docs] def readfields(self, requestedfields=None, getdata=True):
"""
Returns a :class:`epygram.base.FieldSet` containing requested fields
read in the resource.
Args: \n
- *requestedfields*: might be \n
- a regular expression (e.g. 'S\*WIND.[U,V].PHYS')
- a list of FA fields identifiers with regular expressions (e.g.
['SURFTEMPERATURE', 'S0[10-20]WIND.?.PHYS'])
- if not specified, interpretated as all fields that will be found in
resource
- *getdata*: optional, if *False*, only metadata are read, the fields
do not contain data. Default is *True*.
"""
requestedfields = self.find_fields_in_resource(requestedfields)
if requestedfields == []:
raise epygramError("unable to find requested fields in resource.")
return super(FA, self).readfields(requestedfields, getdata)
[docs] def writefield(self, field, compression=None):
"""
Write a field in the resource.
Args: \n
- *field*: a :class:`epygram.base.Field` instance or
:class:`epygram.fields.H2DField`.
- *compression*: optional, a (possibly partial) dict containing
parameters for field compression (in case of a
:class:`epygram.fields.H2DField`). Ex: {'KNGRIB': 2, 'KDMOPL': 5,
'KPUILA': 1, 'KSTRON': 10, 'KNBPDG': 24, 'KNBCSP': 24}
"""
if self.openmode == 'r':
raise IOError("cannot write field in a FA with openmode 'r'.")
if not self.isopen:
if not isinstance(field, H2DField):
# FA need a geometry to be open. Maybe this FA has not been
# given one at opening. For opening it, either write a H2DField
# in, or call its method open(geometry, validity), geometry
# being a H2DGeometry, validity being a FieldValidity.
raise epygramError("cannot write a this kind of field on a\
non-open FA.")
if self.validity == None:
self._attributes['validity'] = field.validity
self._attributes['headername'] = _create_header_from_geometry(field.geometry,
field.spectral_geometry)
self.open()
if not self.empty and field.fid['FA'] in self.listfields():
epylog.info("there already is a field with the same name in this\
FA: overwrite.")
if isinstance(field, MiscField):
data = field.data
if field.shape in ((1,), ()):
if 'int' in field.datatype.name:
dataReal = data.view('float64')
elif 'str' in field.datatype.name:
data = str(data) # ndarray of str -> simple str
dataReal = numpy.array([ord(d) for d in data]).view('float64')
elif 'bool' in field.datatype.name:
dataReal = numpy.array(1 if data else 0).view('float64')
elif 'float' in field.datatype.name:
dataReal = data
else:
raise NotImplementedError("writing of datatype " + \
field.datatype.name + ".")
else:
try:
dataReal = numpy.copy(data.view('float64'))
except Exception:
raise NotImplementedError("writing of datatype " + \
field.datatype.__name__ + \
" array.")
arpifs4py.wfaisan(self._unit, field.fid['FA'],
dataReal.size, dataReal)
elif isinstance(field, H2DField):
if [self.geometry.name, self.geometry.grid,
self.geometry.dimensions] != \
[field.geometry.name, field.geometry.grid,
field.geometry.dimensions]:
raise epygramError("gridpoint geometry incompatibility:\
a FA can hold only one geometry.")
if field.spectral_geometry != None and\
field.spectral_geometry != self.spectral_geometry:
# compatibility check
raise epygramError("spectral geometry incompatibility:\
a FA can hold only one geometry.")
data = numpy.ma.copy(field.data).flatten()
if isinstance(data, numpy.ma.core.MaskedArray):
data = numpy.copy(data[data.mask == False].data)
if compression != None:
modified_compression = True
elif self.fieldscompression.has_key(field.fid['FA']):
compression = self.fieldscompression[field.fid['FA']]
modified_compression = True
else:
modified_compression = False
if modified_compression:
self._setrunningcompression(**compression)
arpifs4py.wfaienc(self._unit,
field.fid['FA'][0:4], 0, field.fid['FA'][4:],
len(data), data, field.spectral)
if modified_compression:
# set back to default
self._setrunningcompression(**self.default_compression)
if not self.fieldscompression.has_key(field.fid['FA']):
self.fieldscompression[field.fid['FA']] = compression
if self.empty: self.empty = False
[docs] def writefields(self, fieldset, compression=None):
"""
Write the fields of the *fieldset* in the resource.
Args: \n
- *fieldset*: must be a :class:`epygram.base.FieldSet` instance.
- *compression*: must be a list of compression dicts
(cf. *writefield()* method), of length equal to the length of the
*fieldset*, and with the same order.
"""
if not isinstance(fieldset, FieldSet):
raise epygramError("'fieldset' argument must be of kind FieldSet.")
if len(fieldset) != len(compression):
raise epygramError("fieldset and compression must have the same\
length.")
# Be sure the first field to be written is an H2DField,
# for being able to set the header if necessary
if not self.isopen and not isinstance(fieldset[0], H2DField):
for f in range(len(fieldset)):
if isinstance(fieldset[f], H2DField):
fieldset.insert(0, fieldset.pop(f))
if compression != None:
compression.insert(0, compression.pop(f))
break
if compression != None:
# loop separated from the above one,
# because fieldset is there-above modified
for f in range(len(fieldset)):
self.writefield(fieldset[f], compression[f])
else:
super(FA, self).writefields(fieldset)
[docs] def rename_field(self, fieldname, new_name):
"""
Renames a field "in place".
"""
arpifs4py.wlfiren(self._unit, fieldname, new_name)
[docs] def delfield(self, fieldname):
"""
Deletes a field from file "in place".
"""
arpifs4py.wlfisup(self._unit, fieldname)
@Resource._openbeforedelayed
@Resource._openbeforedelayed
[docs] def what(self, out, fieldscompression=False, sortfields=False):
"""
Writes in file a summary of the contents of the FA.
Args: \n
- *out*: the output open file-like object (duck-typing: *out*.write()
only is needed).
- *fieldscompression*: **True** if information about fields compression
is requested.
- *sortfields*: **True** if the fields have to be sorted by type.
"""
firstcolumn_width = 50
secondcolumn_width = 16
sepline = '{:-^{width}}'.format('', width=firstcolumn_width + \
secondcolumn_width + 1) + '\n'
first_H2DField = [f for f in self.listfields() if
inquire_field_dict(f)['type'] == 'H2D'][0]
firstfield = self.readfield(first_H2DField, getdata=False)
if not firstfield.spectral and self.spectral_geometry != None:
firstfield._attributes['spectral_geometry'] = self.spectral_geometry
v_geometry = self.geometry.vcoordinate
listoffields = self.listfields()
if sortfields:
sortedfields = self.sortfields()
def write_formatted(dest, label, value):
dest.write('{:<{width}}'.format(label, width=firstcolumn_width) \
+ ':' \
+ '{:>{width}}'.format(str(value), width=secondcolumn_width) \
+ '\n')
def write_formatted_col(dest, label, value):
dest.write('{:>{width}}'.format(label, width=firstcolumn_width) \
+ ':' \
+ '{:>{width}}'.format(str(value), width=secondcolumn_width) \
+ '\n')
def write_formatted_fields(dest, label, value, compression=None):
if compression == None:
dest.write('{:<{width}}'.format(label, width=20) \
+ ':' \
+ '{:^{width}}'.format(str(value), width=10) \
+ '\n')
else:
line = '{:<{width}}'.format(label, width=20) \
+ ':' \
+ '{:^{width}}'.format(str(value), width=10) \
+ ':' \
+ compression \
+ '\n'
dest.write(line)
out.write("### IDENTIFIER (CDIDEN): " + self.cdiden + "\n")
out.write("\n")
firstfield.what(out, vertical_geometry=False)
out.write("#########################\n")
out.write("### VERTICAL GEOMETRY ###\n")
out.write("#########################\n")
write_formatted(out, "Number of vertical levels",
v_geometry['levels_number'])
out.write("Hybrid-pressure coord. A coefficients\n")
for c in range(len(v_geometry['Ai'])):
write_formatted_col(out, str(c + 1), v_geometry['Ai'][c])
out.write(sepline)
out.write("Hybrid-pressure coord. B coefficients\n")
for c in range(len(v_geometry['Bi'])):
write_formatted_col(out, str(c + 1), v_geometry['Bi'][c])
out.write(sepline)
out.write("\n")
out.write("######################\n")
out.write("### LIST OF FIELDS ###\n")
out.write("######################\n")
if sortfields:
listoffields = []
for k in sorted(sortedfields.keys()):
listoffields.append(k)
listoffields.append('--------------------')
listoffields.extend(sortedfields[k])
listoffields.append('--------------------')
numfields = sum([len(v) for v in sortedfields.values()])
else:
numfields = len(listoffields)
out.write("Number: " + str(numfields) + "\n")
if not fieldscompression:
write_formatted_fields(out, "Field name", "Spectral")
else:
params = ['KNGRIB', 'KNBITS', 'KSTRON', 'KPUILA']
width_cp = 8
compressionline = ""
for p in params:
compressionline += '{:^{width}}'.format(p, width=width_cp)
write_formatted_fields(out, "Field name", "Spectral",
compressionline)
out.write(sepline)
for f in listoffields:
if inquire_field_dict(f)['type'] == 'H2D':
encoding = self.fieldencoding(f)
if not fieldscompression:
write_formatted_fields(out, f, encoding['spectral'])
else:
compressionline = ""
for p in params:
try:
compressionline += '{:^{width}}'.format(str(encoding[p]), width=width_cp)
except KeyError:
compressionline += '{:^{width}}'.format('-', width=width_cp)
write_formatted_fields(out, f, encoding['spectral'],
compression=compressionline)
else:
write_formatted_fields(out, f, '-')
out.write(sepline)
##############
# the FA WAY #
##############
def _read_geometry(self):
"""
Reads the geometry in the FA header.
Interface to Fortran routines from 'ifsaux'.
"""
(KTYPTR, PSLAPO, PCLOPO, PSLOPO,
PCODIL, KTRONC,
KNLATI, KNXLON, KNLOPA, KNOZPA, PSINLA,
KNIVER, PREFER, PAHYBR, PBHYBR) = arpifs4py.wfacies(self._FAsoftware_cst['JPXPAH'],
self._FAsoftware_cst['JPXIND'],
self._FAsoftware_cst['JPXGEO'],
self._FAsoftware_cst['JPXNIV'],
self.headername)[:-1]
vcoordinate_read_in_header = {'grib2_surface_type':'119',
'levels_number':KNIVER,
'Ai':FPList([c * PREFER for c in PAHYBR[1:KNIVER + 1]]),
'Bi':FPList([c for c in PBHYBR[1:KNIVER + 1]]),
'ABgrid_position':'flux'
}
FA.reference_pressure = PREFER
rectangular_grid = KTYPTR <= 0
if rectangular_grid:
LMAP = int(PCODIL) != -1
# LAM or regular lat/lon
projected_geometry = int(PSINLA[1]) != -9
if projected_geometry:
# LAM (projection)
if LMAP and int(PSINLA[0]) == 0:
raise NotImplementedError("'old' FA header.")
if LMAP:
projection = {'reference_lon':Angle(PSINLA[2], 'radians'),
'reference_lat':Angle(PSINLA[3], 'radians'),
'center_lon':Angle(PSINLA[4], 'radians'),
'center_lat':Angle(PSINLA[5], 'radians')
}
if int(PSINLA[0]) != 0:
grid = {'X_resolution':PSINLA[6],
'Y_resolution':PSINLA[7]
}
else:
grid = {'X_resolution':PSINLA[14],
'Y_resolution':PSINLA[15]
}
dimensions = {'X':KNXLON,
'Y':KNLATI,
'X_CIzone':KNLOPA[3] - KNLOPA[2] + 1,
'Y_CIzone':KNLOPA[5] - KNLOPA[4] + 1,
'X_Iwidth':KNLOPA[6],
'Y_Iwidth':KNLOPA[7],
'X_Czone':KNLOPA[3] - KNLOPA[2] + 1 - 2 * KNLOPA[6],
'Y_Czone':KNLOPA[5] - KNLOPA[4] + 1 - 2 * KNLOPA[7],
}
if KNLOPA[1] == 0: #C+I
grid['LAMzone'] = 'CI'
dimensions['X'] = dimensions['X_CIzone']
dimensions['Y'] = dimensions['Y_CIzone']
elif abs(KNLOPA[1]) == 1: #C+I+E
grid['LAMzone'] = 'CIE'
dimensions['X_CIoffset'] = KNLOPA[2] - 1
dimensions['Y_CIoffset'] = KNLOPA[4] - 1
if abs(PSINLA[1]) <= config.epsilon:
geometryname = 'mercator'
elif 1.0 - abs(PSINLA[1]) <= config.epsilon:
geometryname = 'polar_stereographic'
elif config.epsilon < abs(PSINLA[1]) < 1.0 - config.epsilon:
geometryname = 'lambert'
spectral_space = 'bi-fourier'
spectral_trunc = {'in_X':KTYPTR * -1,
'in_Y':KTRONC,
'shape':'elliptic'}
if not LMAP:
if dimensions['X'] == 1:
spectral_space = 'fourier'
spectral_trunc = {'in_Y':KTRONC,
'in_X':KTYPTR * -1}
projection = None
geometryname = 'academic'
elif not projected_geometry:
# regular lat/lon
projection = None
geometryname = 'regular_lonlat'
grid = {'center_lon':Angle(PSINLA[4], 'radians'),
'center_lat':Angle(PSINLA[5], 'radians'),
'X_resolution':Angle(PSINLA[6], 'radians'),
'Y_resolution':Angle(PSINLA[7], 'radians')
}
dimensions = {'X':KNXLON,
'Y':KNLATI
}
spectral_space = None
else:
# ARPEGE global
projection = None
# reconstruction of tables on both hemispheres
KNLOPA = KNLOPA[:KNLATI / 2]
KNOZPA = KNOZPA[:KNLATI / 2]
PSINLA = PSINLA[:KNLATI / 2]
lon_number_by_lat = [n for n in KNLOPA] + [KNLOPA[-(n + 1)] for n in
range(0, len(KNLOPA))]
max_zonal_wavenumber_by_lat = [n for n in KNOZPA] + \
[KNOZPA[-(n + 1)] for n in
range(0, len(KNOZPA))]
latitudes = [Angle((math.cos(math.asin(sinlat)), sinlat), 'cos_sin') for sinlat in PSINLA] \
+ [Angle((math.cos(math.asin(PSINLA[-(n + 1)])), -PSINLA[-(n + 1)]), 'cos_sin') for n in range(0, len(PSINLA))]
grid = {'dilatation_coef':PCODIL,
'latitudes':FPList([l for l in latitudes])
}
if KTYPTR == 1:
geometryname = 'reduced_gauss'
elif KTYPTR == 2:
geometryname = 'rotated_reduced_gauss'
grid['pole_lat'] = Angle((math.cos(math.asin(PSLAPO)), PSLAPO),
'cos_sin')
grid['pole_lon'] = Angle((PCLOPO, PSLOPO), 'cos_sin')
dimensions = {'max_lon_number':KNXLON,
'lat_number':KNLATI,
'lon_number_by_lat':FPList([n for n in
lon_number_by_lat])
}
spectral_space = 'legendre'
spectral_trunc = {'max':KTRONC,
'shape':'triangular',
'max_zonal_wavenumber_by_lat':FPList([k for k in
max_zonal_wavenumber_by_lat])
}
kwargs_geom = dict(structure='H2D',
name=geometryname,
grid=FPDict(grid),
dimensions=FPDict(dimensions),
vcoordinate=FPDict(vcoordinate_read_in_header),
position_on_grid=FPDict({'vertical':'mass',
'horizontal':'center'}),
geoid=FPDict(config.FA_default_geoid))
if projection != None:
kwargs_geom['projection'] = FPDict(projection)
self.geometry = fpx.geometry(**kwargs_geom)
if spectral_space != None:
self.spectral_geometry = SpectralGeometry(space=spectral_space, truncation=FPDict(spectral_trunc))
else:
self.spectral_geometry = None
@Resource._openbeforedelayed
def _read_validity(self):
"""
Reads the validity in the FA header.
Interface to Fortran routines from 'ifsaux'.
"""
KDATEF = arpifs4py.wfadiex(self._unit)
year = int(KDATEF[0])
month = int(KDATEF[1])
day = int(KDATEF[2])
hour = int(KDATEF[3])
minute = int(KDATEF[4])
second = int(KDATEF[13]) - hour * 3600 - minute * 60
processtype = int(KDATEF[8])
if processtype == 0:
self.processtype = 'analysis'
elif processtype == 1:
self.processtype = 'initialization'
elif processtype == 10:
self.processtype = 'forecast'
term_in_seconds = int(KDATEF[14])
cumulationstart_in_seconds = int(KDATEF[15])
cumulativeduration_in_seconds = term_in_seconds - cumulationstart_in_seconds
basis = datetime.datetime(year, month, day, hour, minute, second)
term = datetime.timedelta(seconds=term_in_seconds)
cumulativeduration = datetime.timedelta(seconds=cumulativeduration_in_seconds)
self._attributes['validity'] = FieldValidity(basis=basis, term=term, cumulativeduration=cumulativeduration)
@Resource._openbeforedelayed
def _set_validity(self, termunit='hours'):
"""
Sets date, hour and the processtype in the resource.
"""
if not self.isopen:
raise epygramError("_set_validity must be called after FA is open.")
basis = self.validity.getbasis()
KDATEF = numpy.zeros(22, dtype=numpy.int64)
KDATEF[0] = int(basis.year)
KDATEF[1] = int(basis.month)
KDATEF[2] = int(basis.day)
KDATEF[3] = int(basis.hour)
KDATEF[4] = int(basis.minute)
if termunit == 'hours':
KDATEF[5] = 1
KDATEF[6] = self.validity.term('IntHours')
KDATEF[9] = self.validity.term('IntHours') - \
self.validity.cumulativeduration('IntHours')
elif termunit == 'days':
KDATEF[5] = 2
KDATEF[6] = self.validity.term('IntHours') / 24
KDATEF[9] = (self.validity.term('IntHours') - \
self.validity.cumulativeduration('IntHours')) / 24
else:
raise NotImplementedError("term unit other than hours/days ?")
#KDATEF[7] = 0
if self.processtype == 'analysis':
processtype = 0
elif self.processtype == 'initialization':
processtype = 1
elif self.processtype == 'forecast':
processtype = 10
else:
processtype = 255 # unknown processtype
KDATEF[8] = processtype
#KDATEF[10] = 0
if not config.FA_fandax:
arpifs4py.wfandar(self._unit, KDATEF)
else:
# fandax
KDATEF[11] = 1
KDATEF[13] = int(basis.second) + \
int(basis.minute) * 60 + \
int(basis.hour) * 3600
KDATEF[14] = self.validity.term(fmt='IntSeconds')
KDATEF[15] = self.validity.term(fmt='IntSeconds') - \
self.validity.cumulativeduration(fmt='IntSeconds')
arpifs4py.wfandax(self._unit, KDATEF)
@Resource._openbeforedelayed
def _getrunningcompression(self):
"""
Returns the current compression parameters of the FA (at time of writing).
Interface to ifsaux' FAVEUR.
"""
comp = dict()
(comp['KNGRIB'], comp['KNBPDG'], comp['KNBCSP'], comp['KSTRON'],
comp['KPUILA'], comp['KDMOPL']) = arpifs4py.wfaveur(self._unit)
return comp
@Resource._openbeforedelayed
def _setrunningcompression(self, **kwargs):
"""
Sets the compression parameters of the FA.
Interface to FAGOTE (cf. FAGOTE documentation for significance of
arguments).
"""
if self.openmode == 'r':
raise IOError("method _setrunningcompression() can only be called\
if 'openmode' in('w', 'a').")
comp = copy.deepcopy(self.default_compression)
for k in kwargs.keys():
if k in self.default_compression.keys():
comp[k] = kwargs[k]
else:
raise epygramError("unknown parameter: " + k + \
" passed as argument.")
arpifs4py.wfagote(self._unit, comp['KNGRIB'], comp['KNBPDG'],
comp['KNBCSP'], comp['KSTRON'], comp['KPUILA'],
comp['KDMOPL'])