Source code for epygram.formats.FA

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
FA:

Contains the class for FA format.

Plus a function to guess a field type given its name.
"""

__all__ = ['FA', 'inquire_field_dict']

import datetime
import os
import copy
import numpy
import math
import re

from footprints import FPDict, FPList, proxy as fpx

from epygram import config, epygramError, util, epylog, profiles
from epygram.util import Angle
from epygram.base import Resource, FieldSet, FieldValidity
from epygram import arpifs4py
from epygram.geometries import H2DGeometry, SpectralGeometry
from epygram.fields import MiscField, H2DField



[docs]def inquire_field_dict(fieldname): """ Returns the info contained in the FA _field_dict for the requested field. """ matching_field = None for fd in FA._field_dict: dictitem = fd['name'] pattern = re.subn('\.', r'\.', dictitem)[0] # protect '.' pattern = pattern.replace('?', '.') # change unix '?' to python '.' pattern = pattern.replace('*', '.*') # change unix '*' to python '.*' pattern += '(?!.)' if re.match(pattern, fieldname): matching_field = fd break if matching_field == None: epylog.info("field '" + fieldname + "' is not referenced in\ Field_Dict_FA. Assume its type being a MiscField.") matching_field = {'name':fieldname, 'type':'Misc', 'nature':'float', 'dimension':'1'} return copy.deepcopy(matching_field)
def _gen_headername(): """Generates a random headername for the FA software.""" import uuid return str(uuid.uuid4()).replace('-', '')[0:16] def _create_header_from_geometry(geometry, spectral_geometry=None): """ Create a header and returns its name, from a H2DGeometry and (preferably) a SpectralGeometry. Args: - geometry: a H2DGeometry instance, from which the header is set. - spectral_geometry: optional, a SpectralGeometry instance, from which truncation is set in header. If not provided, in LAM case the X/Y truncations are computed from field dimension, as linear grid. In global case, an error is raised. Contains a call to arpifs4py.wfacade (wrapper for FACADE routine). """ if not isinstance(geometry, H2DGeometry): raise epygramError("geometry must be a H2DGeometry instance.") if spectral_geometry != None and\ not isinstance(spectral_geometry, SpectralGeometry): raise epygramError("spectral_geometry must be a SpectralGeometry\ instance.") headername = _gen_headername() CDNOMC = headername JPXPAH = FA._FAsoftware_cst['JPXPAH'] JPXIND = FA._FAsoftware_cst['JPXIND'] if geometry.rectangular_grid: if spectral_geometry != None: truncation_in_X = spectral_geometry.truncation['in_X'] truncation_in_Y = spectral_geometry.truncation['in_Y'] else: # default: linear truncation... truncation_in_X = numpy.floor((geometry.dimensions['X'] - 1) / 2) truncation_in_Y = numpy.floor((geometry.dimensions['Y'] - 1) / 2) # scalars if geometry.name == 'regular_lonlat': KTYPTR = -11 else: KTYPTR = -1 * truncation_in_X if geometry.name in ('lambert', 'polar_stereographic'): PSLAPO = geometry.projection['center_lon'].get('radians') - \ geometry.projection['reference_lon'].get('radians') else: PSLAPO = 0.0 PCLOPO = 0.0 PSLOPO = 0.0 if geometry.name == 'academic': PCODIL = -1.0 else: PCODIL = 0.0 if geometry.name == 'regular_lonlat': KTRONC = 11 else: KTRONC = truncation_in_Y KNLATI = geometry.dimensions['Y'] KNXLON = geometry.dimensions['X'] # KNLOPA KNLOPA = numpy.zeros(JPXPAH, dtype=numpy.int64) KNLOPA[0] = max(0, min(11, truncation_in_X, truncation_in_Y) - 1) if geometry.name == 'regular_lonlat': corners = geometry.gimme_corners_ij() KNLOPA[1] = 0 KNLOPA[2] = 1 + corners['ll'][0] KNLOPA[3] = 1 + corners['ur'][0] KNLOPA[4] = 1 + corners['ll'][1] KNLOPA[5] = 1 + corners['ur'][1] KNLOPA[6] = 8 KNLOPA[7] = 8 else: corners = geometry.gimme_corners_ij(subzone='CI') if geometry.grid['LAMzone'] == 'CIE': KNLOPA[1] = 1 KNLOPA[2] = 1 + corners['ll'][0] KNLOPA[3] = 1 + corners['ur'][0] KNLOPA[4] = 1 + corners['ll'][1] KNLOPA[5] = 1 + corners['ur'][1] KNLOPA[6] = geometry.dimensions['X_Iwidth'] KNLOPA[7] = geometry.dimensions['Y_Iwidth'] KNOZPA = numpy.zeros(JPXIND, dtype=numpy.int64) # PSINLA PSINLA = numpy.zeros(max((1 + geometry.dimensions['Y']) / 2, 18)) PSINLA[0] = -1 if geometry.name == 'regular_lonlat': PSINLA[1] = -9 PSINLA[2] = geometry.grid['center_lon'].get('radians') PSINLA[3] = geometry.grid['center_lat'].get('radians') PSINLA[4] = geometry.grid['center_lon'].get('radians') PSINLA[5] = geometry.grid['center_lat'].get('radians') PSINLA[6] = geometry.grid['X_resolution'].get('radians') PSINLA[7] = geometry.grid['Y_resolution'].get('radians') PSINLA[8] = geometry.grid['X_resolution'].get('radians') * geometry.dimensions['X'] PSINLA[9] = geometry.grid['Y_resolution'].get('radians') * geometry.dimensions['Y'] PSINLA[10] = 0.0 PSINLA[11] = 0.0 elif geometry.projected_geometry: if geometry.secant_projection: raise epygramError("cannot write secant projected geometries\ in FA.") PSINLA[1] = geometry.projection['reference_lat'].get('cos_sin')[1] PSINLA[6] = geometry.grid['X_resolution'] PSINLA[7] = geometry.grid['Y_resolution'] PSINLA[8] = geometry.grid['X_resolution'] * geometry.dimensions['X'] PSINLA[9] = geometry.grid['Y_resolution'] * geometry.dimensions['Y'] PSINLA[10] = 2 * math.pi / PSINLA[8] PSINLA[11] = 2 * math.pi / PSINLA[9] PSINLA[2] = geometry.projection['reference_lon'].get('radians') PSINLA[3] = geometry.projection['reference_lat'].get('radians') PSINLA[4] = geometry.projection['center_lon'].get('radians') PSINLA[5] = geometry.projection['center_lat'].get('radians') if geometry.name != 'academic': PSINLA[12] = Angle(geometry.ij2ll(*corners['ll'])[0], 'degrees').get('radians') PSINLA[13] = Angle(geometry.ij2ll(*corners['ll'])[1], 'degrees').get('radians') PSINLA[14] = Angle(geometry.ij2ll(*corners['ur'])[0], 'degrees').get('radians') PSINLA[15] = Angle(geometry.ij2ll(*corners['ur'])[1], 'degrees').get('radians') else: PSINLA[6] = geometry.grid['X_resolution'] PSINLA[7] = geometry.grid['Y_resolution'] PSINLA[8] = geometry.grid['X_resolution'] * geometry.dimensions['X'] PSINLA[9] = geometry.grid['Y_resolution'] * geometry.dimensions['Y'] PSINLA[10] = 2 * math.pi / PSINLA[8] PSINLA[11] = 2 * math.pi / PSINLA[9] else: # global if geometry.name == 'reduced_gauss': KTYPTR = 1 PSLAPO = 0. # as observed in files; why not 1. ? PCLOPO = 0. # as observed in files; why not 1. ? PSLOPO = 0. elif geometry.name == 'rotated_reduced_gauss': KTYPTR = 2 PSLAPO = geometry.grid['pole_lat'].get('cos_sin')[1] PCLOPO = geometry.grid['pole_lon'].get('cos_sin')[0] PSLOPO = geometry.grid['pole_lon'].get('cos_sin')[1] PCODIL = geometry.grid['dilatation_coef'] if spectral_geometry != None: KTRONC = spectral_geometry.truncation['max'] else: # default: linear truncation... KTRONC = (geometry.dimensions['max_lon_number'] - 1) / 2 KNLATI = geometry.dimensions['lat_number'] KNXLON = geometry.dimensions['max_lon_number'] KNLOPA = numpy.zeros(JPXPAH, dtype=numpy.int64) KNLOPA[0:KNLATI / 2] = geometry.dimensions['lon_number_by_lat'][0:KNLATI / 2] KNOZPA = numpy.zeros(JPXIND, dtype=numpy.int64) if spectral_geometry != None: KNOZPA[0:KNLATI / 2] = spectral_geometry.truncation['max_zonal_wavenumber_by_lat'][0:KNLATI / 2] else: KNOZPA[0:KNLATI / 2] = arpifs4py.w_trans_inq(geometry.dimensions['lat_number'], KTRONC, len(geometry.dimensions['lon_number_by_lat']), numpy.array(geometry.dimensions['lon_number_by_lat']), config.KNUMMAXRESOL)[2][:] PSINLA = numpy.zeros((1 + geometry.dimensions['lat_number']) / 2, dtype=numpy.float64) PSINLA[0:KNLATI / 2] = numpy.array([s.get('cos_sin')[1] for s in geometry.grid['latitudes'][0:KNLATI / 2]]) # vertical geometry try: KNIVER = geometry.vcoordinate['levels_number'] except KeyError: KNIVER = 1 PREFER = FA.reference_pressure try: PAHYBR = numpy.array([0.] + geometry.vcoordinate['Ai']) / FA.reference_pressure except KeyError: PAHYBR = numpy.array([0., 0.]) try:PBHYBR = numpy.array([0.] + geometry.vcoordinate['Bi']) except KeyError: PBHYBR = numpy.array([0., 1.]) LDGARD = True arpifs4py.wfacade(CDNOMC, KTYPTR, PSLAPO, PCLOPO, PSLOPO, PCODIL, KTRONC, KNLATI, KNXLON, len(KNLOPA), KNLOPA, len(KNOZPA), KNOZPA, len(PSINLA), PSINLA, KNIVER, PREFER, PAHYBR, PBHYBR, LDGARD) return headername
[docs]class FA(Resource): """Class implementing all specificities for FA resource format.""" _footprint = dict( attr=dict( format=dict( values=set(['FA']), default='FA'), headername=dict( optional=True, info="With openmode == 'w', name of an existing header, for the\ new FA to use its geometry."), validity=dict( type=FieldValidity, optional=True, info="With openmode == 'w', describes the temporal validity of\ the resource."), default_compression=dict( type=FPDict, optional=True, info="Default compression for writing fields in resource."), cdiden=dict( optional=True, default='unknown', access='rwx', info="With openmode == 'w', identifies the FA by a keyword,\ usually the model abbreviation."), processtype=dict( optional=True, default='analysis', access='rwx', info="With openmode == 'w', identifies the processus that\ produced the resource.") ) ) # the Field Dictionary gathers info about fields nature CSV_field_dictionaries = config.CSV_field_dictionaries_FA # syntax: _field_dict = [{'name':'fieldname1', 'type':'...', ...}, # {'name':'fieldname2', 'type':'...', ...}, ...] _field_dict = [] # reference pressure coefficient for converting hybrid A coefficients in FA reference_pressure = config.FA_default_reference_pressure @classmethod def _FAsoft_init(cls): """Initialize the FA software maximum dimensions.""" cls._FAsoftware_cst = dict(zip(('JPXPAH', 'JPXIND', 'JPXGEO', 'JPXNIV'), arpifs4py.get_facst())) @classmethod def _read_field_dict(cls, fd_abspath): """Reads the CSV fields dictionary of the format.""" field_dict, file_priority = util.read_CSV_as_dict(fd_abspath) if file_priority == 'main': cls._field_dict = field_dict elif file_priority == 'underwrite': for fd in field_dict: found = False for cfd in cls._field_dict: found = fd['name'] == cfd['name'] if not found: cls._field_dict.append(fd) elif file_priority == 'overwrite': for cfd in cls._field_dict: found = False for fd in field_dict: found = fd['name'] == cfd['name'] if not found: field_dict.append(cfd) cls._field_dict = field_dict def __init__(self, *args, **kwargs): """Constructor. See its footprint for arguments.""" self.isopen = False # At creation of the first FA, initialize FA._field_dict if self._field_dict == []: self._read_field_dict(self.CSV_field_dictionaries['default']) if os.path.exists(self.CSV_field_dictionaries['user']): self._read_field_dict(self.CSV_field_dictionaries['user']) super(FA, self).__init__(*args, **kwargs) # Initialization of FA software (if necessary): if not hasattr(self, '_FAsoftware_cst'): self._FAsoft_init() self.fieldscompression = {} if not self.fmtdelayedopen: self.open()
[docs] def open(self, geometry=None, spectral_geometry=None, validity=None, openmode=None): """ Opens a FA with ifsaux' FAITOU, and initializes some attributes. Actually, as FAITOU needs an existing header with 'w' *openmode*, opening file in 'w' *openmode* will require else an existing header to which the resource is linked, or to create a header from a geometry via *_create_header_from_geometry()* function. This explains the eventual need for *geometry/spectral_geometry/validity* in *open()* method. If neither headername nor geometry are available, resource is not opened: the *open()* will be called again at first writing of a field in resource. Args: \n - *geometry*: optional, must be a :class:`epygram.geometries.H2DGeometry` instance. - *spectral_geometry*: optional, must be a :class:`epygram.geometries.SpectralGeometry` instance. - *validity*: optional, must be a :class:`epygram.base.FieldValidity` instance. - *openmode*: optional, to open with a specific openmode, eventually different from the one specified at initialization. """ super(FA, self).open(openmode=openmode) if self.openmode in ('r', 'a'): # FA already exists, including geometry and validity if self.openmode in ('r', 'a') and self.headername == None: self._attributes['headername'] = _gen_headername() if geometry != None or validity != None: epylog.warning(self.container.abspath + ": FA.open():\ geometry/validity argument will be ignored with\ this openmode ('r','a').") # open, getting logical unit self._unit = arpifs4py.wfaitou(self.container.abspath, 'OLD', self.headername) self.isopen = True self.empty = False # read info self._attributes['cdiden'] = arpifs4py.wfalsif(self._unit) self._read_geometry() self._read_validity() if self.openmode == 'a': if self.default_compression == None: self._attributes['default_compression'] = FPDict(self._getrunningcompression()) self._setrunningcompression(**self.default_compression) elif self.openmode == 'w': if geometry != None: if not isinstance(geometry, H2DGeometry): raise epygramError("validity must be a H2DGeometry instance.") self._attributes['headername'] = _create_header_from_geometry(geometry, spectral_geometry) if validity != None: if not isinstance(validity, FieldValidity): raise epygramError("validity must be a FieldValidity instance.") self._attributes['validity'] = validity if self.headername != None and self.validity != None: # new FA, with an already existing header and validity # set geometry from existing header self._read_geometry() # open (self._unit) = arpifs4py.wfaitou(self.container.abspath, 'NEW', self.headername) self.isopen = True self.empty = True # set FA date if self.validity.get() != self.validity.getbasis(): self.processtype = 'forecast' self._set_validity() # set CDIDEN arpifs4py.wfautif(self._unit, self.cdiden) if self.default_compression == None: self._attributes['default_compression'] = FPDict(config.FA_default_compression) self._setrunningcompression(**self.default_compression) elif self.headername == None or self.validity == None: # a header need to be created prior to opening: # header definition (then opening) will be done from the # geometry taken in the first field to be written in resource pass
[docs] def close(self): """Closes a FA with ifsaux' FAIRME.""" if self.isopen: try: arpifs4py.wfairme(self._unit, 'KEEP') except Exception: raise IOError("closing " + self.container.abspath) self.isopen = False # Cleanings if self.openmode == 'w' and self.empty: os.remove(self.container.abspath) ################ # ABOUT FIELDS # ################
[docs] def find_fields_in_resource(self, seed=None, fieldtype=None): """ Returns a list of the fields from resource whose name match the given seed. Args: \n - *seed*: might be a regular expression, a list of regular expressions or *None*. If *None* (default), returns the list of all fields in resource. - *fieldtype*: optional, among ('H2D', 'Misc'). If provided, filters out the fields not of the given type. """ fieldslist = [] if seed == None: tmplist = self.listfields() for f in tmplist: if fieldtype == None or\ inquire_field_dict(f)['type'] == fieldtype: fieldslist.append(f) elif isinstance(seed, str): tmplist = util.find_re_in_list(seed, self.listfields()) for f in tmplist: if fieldtype == None or\ inquire_field_dict(f)['type'] == fieldtype: fieldslist.append(f) elif isinstance(seed, list): tmplist = [] for s in seed: tmplist += util.find_re_in_list(s, self.listfields()) for f in tmplist: if fieldtype == None or\ inquire_field_dict(f)['type'] == fieldtype: fieldslist.append(f) if fieldslist == []: raise epygramError("no field matching '" + seed + \ "' was found in resource " + \ self.container.abspath) return fieldslist
[docs] def listfields(self): """ Returns a list containing the FA identifiers of all the fields of the resource. """ return super(FA, self).listfields()
@Resource._openbeforedelayed def _listfields(self): """Actual listfields() method for FA.""" records_number = arpifs4py.wlfinaf(self._unit)[0] arpifs4py.wlfipos(self._unit) # rewind fieldslist = [] for i in range(records_number): fieldname = arpifs4py.wlficas(self._unit, True)[0] if i >= 7 and fieldname != 'DATX-DES-DONNEES': fieldslist.append(fieldname.strip()) # i >= 7: 7 first fields in LFI are the header ("cadre") # 8th field added by P.Marguinaud, DATX-DES-DONNEES, to store dates # with 1-second precision, from cy40t1 onwards. return fieldslist
[docs] def sortfields(self): """ Returns a sorted list of fields with regards to their name and nature, as a dict of lists. """ list3Dparams = [] list3D = [] list2D = [] # final lists list3Dsp = [] list3Dgp = [] list2Dsp = [] list2Dgp = [] listMisc = [] for f in self.listfields(): info = inquire_field_dict(f) # separate H2D from Misc if info['type'] == 'H2D': # separate 3D from 2D if info['grib2_surface_type'] in ('119', '100', '103', '109', '20'): list3D.append(f) list3Dparams.append(filter(lambda x: not x.isdigit(), f[1:])) else: list2D.append(f) else: listMisc.append(f) list3Dparams = list(set(list3Dparams)) # separate gp/sp for f in list2D: if self.fieldencoding(f)['spectral']: list2Dsp.append(f) else: list2Dgp.append(f) # sort 2D list2Dsp.sort() list2Dgp.sort() # sort parameters for p in sorted(list3Dparams): interlist = [] for f in list3D: if p in f: interlist.append(f) # sort by increasing level interlist.sort() # separate gp/sp if self.fieldencoding(interlist[0])['spectral']: list3Dsp.extend(interlist) else: list3Dgp.extend(interlist) outlists = {'3D spectral fields':list3Dsp, '3D gridpoint fields':list3Dgp, '2D spectral fields':list2Dsp, '2D gridpoint fields':list2Dgp, 'Misc-fields':listMisc} return outlists
@Resource._openbeforedelayed
[docs] def fieldencoding(self, fieldname): """ Returns a dict containing info about how the field is encoded: spectral? and compression. Interface to ifsaux' FANION. """ try: (LDCOSP, KNGRIB, KNBITS, KSTRON, KPUILA) = arpifs4py.wfanion(self._unit, fieldname[0:4], 0, fieldname[4:])[1:6] except RuntimeError: raise RuntimeError("error while calling FANION: " + fieldname + \ " type is probably 'Misc' instead of 'H2D':\ please update Field_Dict_FA.csv accordingly\ (cf. config.py for location of field dict).") encoding = {'spectral':LDCOSP, 'KNGRIB':KNGRIB, 'KNBITS':KNBITS, 'KSTRON':KSTRON, 'KPUILA':KPUILA} return encoding
@Resource._openbeforedelayed
[docs] def readfield(self, fieldname, getdata=True, footprints_builder=config.use_footprints_as_builder): """ Reads one field, given its FA name, and returns a Field instance. Interface to Fortran routines from 'ifsaux'. Args: \n - *fieldname*: FA fieldname - *getdata*: if *False*, only metadata are read, the field do not contain data. - *footprints_builder*: if *True*, uses footprints.proxy to build fields. Defaults to False for performance reasons. """ if self.openmode == 'w': raise epygramError("cannot read fields in resource if with\ openmode == 'w'.") # Get field info field_info = inquire_field_dict(fieldname) if footprints_builder: builder = fpx.field else: if field_info['type'] == 'H2D': builder = H2DField elif field_info['type'] == 'Misc': builder = MiscField if field_info['type'] == 'H2D': encoding = self.fieldencoding(fieldname) # Save compression in FA compression = {'KNGRIB':encoding['KNGRIB'], 'KNBPDG':encoding['KNBITS'], 'KNBCSP':encoding['KNBITS'], 'KSTRON':encoding['KSTRON'], 'KPUILA':encoding['KPUILA']} self.fieldscompression[fieldname] = compression # vertical geometry vcoordinate = copy.deepcopy(self.geometry.vcoordinate) vcoordinate.update(field_info) vcoordinate.pop('type') vcoordinate.pop('name') # get level for Sigma/Pressure levels in field name (deeply linked # to FA naming conventions) if vcoordinate['grib2_surface_type'] == '119': # hybrid-pressure vcoordinate['Z'] = int(fieldname[1:4]) elif vcoordinate['grib2_surface_type'] == '100': # isobaric (P) vcoordinate['Z'] = int(fieldname[1:6]) # problem linked to number of digits: if vcoordinate['Z'] == 0: vcoordinate['Z'] = 100000 elif vcoordinate['grib2_surface_type'] == '103': # height try: vcoordinate['Z'] = int(fieldname[1:6]) except ValueError: if 'level' in field_info: vcoordinate['Z'] = field_info['level'] else: vcoordinate['Z'] = 255 elif vcoordinate['grib2_surface_type'] == '109': # PV vcoordinate['Z'] = float(fieldname[1:4]) / 10. # to handle PVU elif vcoordinate['grib2_surface_type'] == 1: # surface vcoordinate.pop('Ai') vcoordinate.pop('Bi') vcoordinate.pop('ABgrid_position') vcoordinate.pop('levels') # Prepare field dimensions spectral = encoding['spectral'] if spectral: truncation = self.spectral_geometry.truncation if 'fourier' in self.spectral_geometry.space: # LAM SPdatasize = arpifs4py.w_etrans_inq(self.geometry.dimensions['X'], self.geometry.dimensions['Y'], self.geometry.dimensions['X_CIzone'], self.geometry.dimensions['Y_CIzone'], truncation['in_X'], truncation['in_Y'], config.KNUMMAXRESOL)[1] elif self.spectral_geometry.space == 'legendre': # Global total_system_memory = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') memory_needed_for_transforms = truncation['max'] ** 3 / 2 * 8 if memory_needed_for_transforms >= config.prevent_swapping_legendre * total_system_memory: raise epygramError('Legendre spectral transforms need ' + \ str(int(float(memory_needed_for_transforms) / 1024 ** 2.)) + \ ' MB memory, while only ' + \ str(int(float(total_system_memory) / 1024 ** 2.)) + \ ' MB is available: SWAPPING prevented !') SPdatasize = arpifs4py.w_trans_inq(self.geometry.dimensions['lat_number'], truncation['max'], len(self.geometry.dimensions['lon_number_by_lat']), numpy.array(self.geometry.dimensions['lon_number_by_lat']), config.KNUMMAXRESOL)[1] SPdatasize *= 2 # complex coefficients datasize = SPdatasize spectral_geometry = self.spectral_geometry else: if self.geometry.rectangular_grid: GPdatasize = self.geometry.dimensions['X'] * self.geometry.dimensions['Y'] else: GPdatasize = sum(self.geometry.dimensions['lon_number_by_lat']) datasize = GPdatasize spectral_geometry = None # Make geometry object kwargs_geom = dict(structure='H2D', name=self.geometry.name, grid=FPDict(self.geometry.grid), dimensions=FPDict(self.geometry.dimensions), vcoordinate=FPDict(vcoordinate), position_on_grid=FPDict(self.geometry.position_on_grid), geoid=config.FA_default_geoid) if self.geometry.projected_geometry: kwargs_geom['projection'] = FPDict(self.geometry.projection) geometry = fpx.geometry(**kwargs_geom) # Get data if requested if getdata: if field_info['type'] == 'Misc': field_length = arpifs4py.wlfinfo(self._unit, fieldname)[0] data = arpifs4py.wfalais(self._unit, fieldname, field_length) if field_info['dimension'] == '0': if field_info['nature'] == 'int': dataOut = data.view('int64')[0] elif field_info['nature'] == 'str': dataInt = data.view('int64') dataOut = "" for num in dataInt: dataOut += chr(num) elif field_info['nature'] == 'bool': dataOut = bool(data.view('int64')[0]) elif field_info['nature'] == 'float': dataOut = data[0] else: raise NotImplementedError("reading of datatype " + \ field_info['nature'] + ".") else: # copy is necessary for garbage collector if field_info['nature'] == 'int': dataOut = numpy.copy(data.view('int64')[:]) elif field_info['nature'] == 'float': dataOut = numpy.copy(data) elif field_info['nature'] == 'str': #FIXME: table of int(str) = table of tables ??? dataOut = numpy.copy(data) elif field_info['nature'] == 'bool': dataOut = numpy.copy(data.view('bool')[:]) else: raise NotImplementedError("reading of datatype " + \ field_info['nature'] + " array.") data = dataOut elif field_info['type'] == 'H2D': data = numpy.array(arpifs4py.wfacile(datasize, self._unit, fieldname[0:4], 0, fieldname[4:], spectral)) # Create field if field_info['type'] == 'H2D': # Create H2D field field = builder(fid=FPDict({self.format:fieldname}), geometry=geometry, validity=self.validity, spectral_geometry=spectral_geometry, processtype=self.processtype) if 'gauss' in self.geometry.name and config.FA_buffered_gauss_grid: field.geometry._bound_gauss_grid = self.geometry.get_lonlat_grid elif field_info['type'] == 'Misc': # Create Misc field field = builder(fid=FPDict({self.format:fieldname})) if getdata: if field_info['type'] == 'H2D' and not field.spectral: data = geometry.reshape_data(data) field.setdata(data) return field
[docs] def readfields(self, requestedfields=None, getdata=True): """ Returns a :class:`epygram.base.FieldSet` containing requested fields read in the resource. Args: \n - *requestedfields*: might be \n - a regular expression (e.g. 'S\*WIND.[U,V].PHYS') - a list of FA fields identifiers with regular expressions (e.g. ['SURFTEMPERATURE', 'S0[10-20]WIND.?.PHYS']) - if not specified, interpretated as all fields that will be found in resource - *getdata*: optional, if *False*, only metadata are read, the fields do not contain data. Default is *True*. """ requestedfields = self.find_fields_in_resource(requestedfields) if requestedfields == []: raise epygramError("unable to find requested fields in resource.") return super(FA, self).readfields(requestedfields, getdata)
[docs] def writefield(self, field, compression=None): """ Write a field in the resource. Args: \n - *field*: a :class:`epygram.base.Field` instance or :class:`epygram.fields.H2DField`. - *compression*: optional, a (possibly partial) dict containing parameters for field compression (in case of a :class:`epygram.fields.H2DField`). Ex: {'KNGRIB': 2, 'KDMOPL': 5, 'KPUILA': 1, 'KSTRON': 10, 'KNBPDG': 24, 'KNBCSP': 24} """ if self.openmode == 'r': raise IOError("cannot write field in a FA with openmode 'r'.") if not self.isopen: if not isinstance(field, H2DField): # FA need a geometry to be open. Maybe this FA has not been # given one at opening. For opening it, either write a H2DField # in, or call its method open(geometry, validity), geometry # being a H2DGeometry, validity being a FieldValidity. raise epygramError("cannot write a this kind of field on a\ non-open FA.") if self.validity == None: self._attributes['validity'] = field.validity self._attributes['headername'] = _create_header_from_geometry(field.geometry, field.spectral_geometry) self.open() if not self.empty and field.fid['FA'] in self.listfields(): epylog.info("there already is a field with the same name in this\ FA: overwrite.") if isinstance(field, MiscField): data = field.data if field.shape in ((1,), ()): if 'int' in field.datatype.name: dataReal = data.view('float64') elif 'str' in field.datatype.name: data = str(data) # ndarray of str -> simple str dataReal = numpy.array([ord(d) for d in data]).view('float64') elif 'bool' in field.datatype.name: dataReal = numpy.array(1 if data else 0).view('float64') elif 'float' in field.datatype.name: dataReal = data else: raise NotImplementedError("writing of datatype " + \ field.datatype.name + ".") else: try: dataReal = numpy.copy(data.view('float64')) except Exception: raise NotImplementedError("writing of datatype " + \ field.datatype.__name__ + \ " array.") arpifs4py.wfaisan(self._unit, field.fid['FA'], dataReal.size, dataReal) elif isinstance(field, H2DField): if [self.geometry.name, self.geometry.grid, self.geometry.dimensions] != \ [field.geometry.name, field.geometry.grid, field.geometry.dimensions]: raise epygramError("gridpoint geometry incompatibility:\ a FA can hold only one geometry.") if field.spectral_geometry != None and\ field.spectral_geometry != self.spectral_geometry: # compatibility check raise epygramError("spectral geometry incompatibility:\ a FA can hold only one geometry.") data = numpy.ma.copy(field.data).flatten() if isinstance(data, numpy.ma.core.MaskedArray): data = numpy.copy(data[data.mask == False].data) if compression != None: modified_compression = True elif self.fieldscompression.has_key(field.fid['FA']): compression = self.fieldscompression[field.fid['FA']] modified_compression = True else: modified_compression = False if modified_compression: self._setrunningcompression(**compression) arpifs4py.wfaienc(self._unit, field.fid['FA'][0:4], 0, field.fid['FA'][4:], len(data), data, field.spectral) if modified_compression: # set back to default self._setrunningcompression(**self.default_compression) if not self.fieldscompression.has_key(field.fid['FA']): self.fieldscompression[field.fid['FA']] = compression if self.empty: self.empty = False
[docs] def writefields(self, fieldset, compression=None): """ Write the fields of the *fieldset* in the resource. Args: \n - *fieldset*: must be a :class:`epygram.base.FieldSet` instance. - *compression*: must be a list of compression dicts (cf. *writefield()* method), of length equal to the length of the *fieldset*, and with the same order. """ if not isinstance(fieldset, FieldSet): raise epygramError("'fieldset' argument must be of kind FieldSet.") if len(fieldset) != len(compression): raise epygramError("fieldset and compression must have the same\ length.") # Be sure the first field to be written is an H2DField, # for being able to set the header if necessary if not self.isopen and not isinstance(fieldset[0], H2DField): for f in range(len(fieldset)): if isinstance(fieldset[f], H2DField): fieldset.insert(0, fieldset.pop(f)) if compression != None: compression.insert(0, compression.pop(f)) break if compression != None: # loop separated from the above one, # because fieldset is there-above modified for f in range(len(fieldset)): self.writefield(fieldset[f], compression[f]) else: super(FA, self).writefields(fieldset)
[docs] def rename_field(self, fieldname, new_name): """ Renames a field "in place". """ arpifs4py.wlfiren(self._unit, fieldname, new_name)
[docs] def delfield(self, fieldname): """ Deletes a field from file "in place". """ arpifs4py.wlfisup(self._unit, fieldname)
[docs] def extractprofile(self, fid, lon, lat, vertical_coordinate=None, interpolation='nearest', cheap_alt=False): """ Extracts a vertical profile from the FA resource, given its pseudo-fid and the geographic location (*lon*/*lat*) of the profile. Args: \n - *fid* must have syntax: 'K\*PARAMETER', K being the kind of surface (S,P,H,V), \* being a true star character, and PARAMETER being the name of the parameter requested, as named in FA. - *lon* is the longitude of the desired point. - *lat* is the latitude of the desired point. - *vertical_coordinate* defines the requested vertical coordinate of the V2DField (cf. :class:`epygram.geometries.V1DGeometry` coordinate possible values). - *interpolation* defines the interpolation function used to compute the profile at requested lon/lat from the fields grid: - if 'nearest' (default), extracts profile at the horizontal nearest neighboring gridpoint; - if 'linear', computes profile with horizontal linear interpolation; - if 'cubic', computes profile with horizontal cubic interpolation. - *cheap_alt*: if True and *vertical_coordinate* among ('altitude', 'height'), the computation of altitudes is done without taking hydrometeors into account (in R computation) nor NH Pressure departure (Non-Hydrostatic data). """ try: N = len(lon) except Exception: N = 1 lon = numpy.array([lon]) lat = numpy.array([lat]) l = len(fid[2:]) fidlist = self.find_fields_in_resource(seed=fid, fieldtype='H2D') fidlist = filter(lambda x: all([i.isdigit() for i in x[1:-l]]), fidlist) if fidlist == []: raise epygramError("cannot find profile for " + fid + \ " in resource.") fs = self.readfields(fidlist, getdata=False) fs.sort(attribute=['geometry', 'vcoordinate'], key='Z') fidlist = fs.listfields(typefmt='FA') # Build commons field = fs[0] # location for i in range(N): if not field.geometry.point_is_inside_domain_ll(lon[i], lat[i]): raise ValueError("point (" + str(lon[i]) + ", " + \ str(lat[i]) + ") is out of field domain.") if interpolation == 'nearest': true_loc = field.geometry.ij2ll(*field.geometry.nearest_points(lon, lat, 'nearest')) comments = [] for i in range(N): distance = field.geometry.distance((lon[i], lat[i]), (true_loc[0][i], true_loc[1][i])) az = field.geometry.azimuth((lon[i], lat[i]), (true_loc[0][i], true_loc[1][i])) if -22.5 < az <= 22.5: direction = 'N' elif -77.5 < az <= -22.5: direction = 'NW' elif -112.5 < az <= -77.5: direction = 'W' elif -157.5 < az <= -112.5: direction = 'SW' elif -180.5 <= az <= -157.5 or 157.5 < az <= 180.: direction = 'S' elif 22.5 < az <= 77.5: direction = 'NE' elif 77.5 < az <= 112.5: direction = 'E' elif 112.5 < az <= 157.5: direction = 'SE' gridpointstr = "(" + \ '{:.{precision}{type}}'.format(true_loc[0][i], type='F', precision=4) + \ ", " + \ '{:.{precision}{type}}'.format(true_loc[1][i], type='F', precision=4) + \ ")" comment = "Profile @ " + str(int(distance)) + "m " + \ direction + " from " + str((lon[i], lat[i])) + \ "\n" + "( = nearest gridpoint: " + gridpointstr + ")" comments.append(comment) elif interpolation in ('linear', 'cubic'): if interpolation == 'linear': interpstr = 'linearly' elif interpolation == 'cubic': interpstr = 'cubically' comments = [] for i in range(N): comment = "Profile " + interpstr + " interpolated @ " + \ str((lon[i], lat[i])) comments.append(comment) else: raise NotImplementedError(interpolation + "interpolation.") # build vertical geometry fieldlevels_number = len(fidlist) gridsize = fieldlevels_number if fid[0] == 'S': coordinate = 'hybrid_pressure' gridposition = 'flux' position_on_grid = 'mass' grid = {'levels':FPDict({'Ai':tuple(self.geometry.vcoordinate['Ai']), 'Bi':tuple(self.geometry.vcoordinate['Bi']) })} elif fid[0] == 'P': coordinate = 'pressure' gridposition = None position_on_grid = None grid = {'levels':tuple([fld.geometry.vcoordinate['Z'] for fld in fs])} elif fid[0] == 'H': coordinate = 'height' gridposition = None position_on_grid = None grid = {'levels':tuple([fld.geometry.vcoordinate['Z'] for fld in fs])} elif fid[0] == 'V': coordinate = 'potential_vortex' gridposition = None position_on_grid = None grid = {'levels':tuple([fld.geometry.vcoordinate['Z'] for fld in fs])} grid['fieldlevels_number'] = fieldlevels_number grid['gridsize'] = gridsize grid['gridposition'] = gridposition vgeometries = [] for i in range(N): vgeometry = fpx.geometry(structure='V1D', coordinate=coordinate, grid=FPDict(grid), position_on_grid=position_on_grid, hlocation=FPDict(zip(('lon', 'lat'), (lon[i], lat[i])) ) ) vgeometries.append(vgeometry) # profile values = [] for f in fidlist: # field field = self.readfield(f) if field.spectral: field.sp2gp() # read profile value values.append(field.getvalue_ll(lon, lat, interpolation=interpolation)) # build profile as a V1DField profilesSet = FieldSet() for i in range(N): profile = fpx.field(fid=FPDict({'FA':fid}), geometry=vgeometries[i], validity=self.validity, processtype=self.processtype, comment=comments[i]) if N == 1: profile.setdata(values) else: profile.setdata([v[i] for v in values]) profilesSet.append(profile) # preparation for vertical coords conversion if vertical_coordinate not in (None, vgeometries[0].coordinate): if vgeometries[0].coordinate == 'hybrid_pressure' and \ vertical_coordinate == 'pressure': Psurf = self.readfield('SURFPRESSION') if Psurf.spectral: Psurf.sp2gp() ps_transect = numpy.exp(Psurf.getvalue_ll(lon, lat, interpolation=interpolation)) del Psurf elif vgeometries[0].coordinate == 'hybrid_pressure' and \ vertical_coordinate in ('altitude', 'height'): Psurf = self.readfield('SURFPRESSION') if Psurf.spectral: Psurf.sp2gp() ps_transect = numpy.exp(Psurf.getvalue_ll(lon, lat, interpolation=interpolation)) del Psurf if not all(('S001TEMPERATURE' in self.listfields(), 'S001HUMI.SPECIFI' in self.listfields())): raise epygramError("for hybridP2altitude/height\ conversions, Temperature and\ Specific Humidity must be in\ resource.") # T & q if fid != 'S*TEMPERATURE': epylog.info("extract S*TEMPERATURE") profileslist = self.extractprofile('S*TEMPERATURE', lon, lat, interpolation=interpolation) else: profileslist = profilesSet T = numpy.array([p.data for p in profileslist]).transpose() if fid != 'S*HUMI.SPECIFI': epylog.info("extract S*HUMI.SPECIFI") profileslist = self.extractprofile('S*HUMI.SPECIFI', lon, lat, interpolation=interpolation) else: profileslist = profilesSet q = numpy.array([p.data for p in profileslist]).transpose() # other optional fields try: if cheap_alt: raise Exception() if fid != 'S*CLOUD_WATER': epylog.info("extract S*CLOUD_WATER") profileslist = self.extractprofile('S*CLOUD_WATER', lon, lat, interpolation=interpolation) else: profileslist = profilesSet ql = numpy.array([p.data for p in profileslist]).transpose() if fid != 'S*ICE_CRYSTAL': epylog.info("extract S*ICE_CRYSTAL") profileslist = self.extractprofile('S*ICE_CRYSTAL', lon, lat, interpolation=interpolation) else: profileslist = profilesSet qi = numpy.array([p.data for p in profileslist]).transpose() if fid != 'S*SNOW': epylog.info("extract S*SNOW") profileslist = self.extractprofile('S*SNOW', lon, lat, interpolation=interpolation) else: profileslist = profilesSet qs = numpy.array([p.data for p in profileslist]).transpose() if fid != 'S*RAIN': epylog.info("extract S*RAIN") profileslist = self.extractprofile('S*RAIN', lon, lat, interpolation=interpolation) else: profileslist = profilesSet qr = numpy.array([p.data for p in profileslist]).transpose() except Exception: ql = qi = qs = qr = numpy.zeros(q.shape) try: if cheap_alt: raise Exception() if fid != 'S*GRAUPEL': epylog.info("extract S*GRAUPEL") profileslist = self.extractprofile('S*GRAUPEL', lon, lat, interpolation=interpolation) else: profileslist = profilesSet qg = numpy.array([p.data for p in profileslist]).transpose() except Exception: qg = numpy.zeros(q.shape) R = profiles.gfl2R(q, ql, qi, qr, qs, qg) try: if cheap_alt: raise Exception() if fid != 'S*PRESS.DEPART': epylog.info("extract S*PRESS.DEPART") profileslist = self.extractprofile('S*PRESS.DEPART', lon, lat, interpolation=interpolation) else: profileslist = profilesSet Pdep = numpy.array([p.data for p in profileslist]).transpose() except Exception: Pdep = numpy.zeros(numpy.array(T).shape) if vertical_coordinate == 'altitude': Phi_surf = self.readfield('SPECSURFGEOPOTEN') if Phi_surf.spectral: Phi_surf.sp2gp() phisurf_transect = Phi_surf.getvalue_ll(lon, lat, interpolation=interpolation) del Phi_surf else: phisurf_transect = None for i in range(N): # effective vertical coords conversion if vertical_coordinate not in (None, profilesSet[i].geometry.coordinate): if profilesSet[i].geometry.coordinate == 'hybrid_pressure' and \ vertical_coordinate == 'pressure': profilesSet[i].geometry.hybridP2pressure(ps_transect[i], gridposition='mass') elif profilesSet[i].geometry.coordinate == 'hybrid_pressure' and \ vertical_coordinate in ('altitude', 'height'): profilesSet[i].geometry.hybridP2altitude(R[:, i], T[:, i], ps_transect[i], Pdep[:, i], phisurf_transect[i] if phisurf_transect != None else 0.) else: raise NotImplementedError("this vertical coordinate\ conversion.") if N == 1: profile = profilesSet[0] else: profile = profilesSet return profile
@Resource._openbeforedelayed
[docs] def extractsection(self, fid, end1, end2, transect=None, points_number=None, resolution=None, vertical_coordinate=None, interpolation='linear', cheap_alt=False): """ Extracts a vertical section from the FA resource, given its pseudo-fid and the geographic (lon/lat) coordinates of its ends. The section is returned as a V2DField. Args: \n - *fid* must have syntax: 'K\*PARAMETER', K being the kind of surface (S,P,H,V), \* being a true star character, and PARAMETER being the name of the parameter requested, as named in FA. - *end1* must be a tuple (lon, lat). - *end2* must be a tuple (lon, lat). - *transect* is the list of (lon,lat) tuples at which points compising the section are extracted. If None, defaults to linearily spaced positions computed from *points_number*. - *points_number* defines the total number of horizontal points of the section (including ends). If None, defaults to a number computed from the *ends* and the *resolution*. - *resolution* defines the horizontal resolution to be given to the field. If None, defaults to the horizontal resolution of the field. - *vertical_coordinate* defines the requested vertical coordinate of the V2DField (cf. :class:`epygram.geometries.V1DGeometry` coordinate possible values). - *interpolation* defines the interpolation function used to compute the profile points locations from the fields grid: \n - if 'nearest', each horizontal point of the section is taken as the horizontal nearest neighboring gridpoint; - if 'linear' (default), each horizontal point of the section is computed with linear spline interpolation; - if 'cubic', each horizontal point of the section is computed with linear spline interpolation. - *cheap_alt*: if True and *vertical_coordinate* among ('altitude', 'height'), the computation of altitudes is done without taking hydrometeors into account (in R computation) nor NH Pressure departure (Non-Hydrostatic data). """ if transect == None: if resolution != None and points_number != None: raise epygramError("only one of resolution and points_number\ can be given.") distance = self.geometry.distance(end1, end2) if resolution == None and points_number == None: resolution = 0.5 * (self.geometry.resolution_ll(*end1) + \ self.geometry.resolution_ll(*end2)) if resolution > distance: raise epygramError("'ends' are too near: pure interpolation\ between two gridpoints.") elif points_number != None and points_number < 2: raise epygramError("'points_number' must be at least 2.") if resolution != None: points_number = int(numpy.rint(distance / resolution)) + 1 epylog.info('section computed on ' + str(points_number) + \ ' points == ' + str(resolution) + ' m resolution.') if points_number >= 3: transect = self.geometry.linspace(end1, end2, points_number) elif points_number == 2: transect = [end1, end2] else: raise epygramError("cannot make a section with less than\ 2 points.") else: if resolution != None or points_number != None: raise epygramError("if transect is given, resolution and\ points_number must be None.") if len(transect) < 2: raise epygramError("transect must contain at least two points.") profileslist = self.extractprofile(fid, [p[0] for p in transect], [p[1] for p in transect], interpolation=interpolation, vertical_coordinate=vertical_coordinate, cheap_alt=cheap_alt) geometry = fpx.geometry(structure='V2D', grid=FPList([p.geometry for p in profileslist])) geometry.distance = self.geometry.distance geometry.azimuth = self.geometry.azimuth section = fpx.field(fid=FPDict({'section':fid}), geometry=geometry, validity=profileslist[0].validity) section.setdata(numpy.array([p.data for p in profileslist]).transpose()) return section ########### # pre-app # ###########
@Resource._openbeforedelayed
[docs] def what(self, out, fieldscompression=False, sortfields=False): """ Writes in file a summary of the contents of the FA. Args: \n - *out*: the output open file-like object (duck-typing: *out*.write() only is needed). - *fieldscompression*: **True** if information about fields compression is requested. - *sortfields*: **True** if the fields have to be sorted by type. """ firstcolumn_width = 50 secondcolumn_width = 16 sepline = '{:-^{width}}'.format('', width=firstcolumn_width + \ secondcolumn_width + 1) + '\n' first_H2DField = [f for f in self.listfields() if inquire_field_dict(f)['type'] == 'H2D'][0] firstfield = self.readfield(first_H2DField, getdata=False) if not firstfield.spectral and self.spectral_geometry != None: firstfield._attributes['spectral_geometry'] = self.spectral_geometry v_geometry = self.geometry.vcoordinate listoffields = self.listfields() if sortfields: sortedfields = self.sortfields() def write_formatted(dest, label, value): dest.write('{:<{width}}'.format(label, width=firstcolumn_width) \ + ':' \ + '{:>{width}}'.format(str(value), width=secondcolumn_width) \ + '\n') def write_formatted_col(dest, label, value): dest.write('{:>{width}}'.format(label, width=firstcolumn_width) \ + ':' \ + '{:>{width}}'.format(str(value), width=secondcolumn_width) \ + '\n') def write_formatted_fields(dest, label, value, compression=None): if compression == None: dest.write('{:<{width}}'.format(label, width=20) \ + ':' \ + '{:^{width}}'.format(str(value), width=10) \ + '\n') else: line = '{:<{width}}'.format(label, width=20) \ + ':' \ + '{:^{width}}'.format(str(value), width=10) \ + ':' \ + compression \ + '\n' dest.write(line) out.write("### IDENTIFIER (CDIDEN): " + self.cdiden + "\n") out.write("\n") firstfield.what(out, vertical_geometry=False) out.write("#########################\n") out.write("### VERTICAL GEOMETRY ###\n") out.write("#########################\n") write_formatted(out, "Number of vertical levels", v_geometry['levels_number']) out.write("Hybrid-pressure coord. A coefficients\n") for c in range(len(v_geometry['Ai'])): write_formatted_col(out, str(c + 1), v_geometry['Ai'][c]) out.write(sepline) out.write("Hybrid-pressure coord. B coefficients\n") for c in range(len(v_geometry['Bi'])): write_formatted_col(out, str(c + 1), v_geometry['Bi'][c]) out.write(sepline) out.write("\n") out.write("######################\n") out.write("### LIST OF FIELDS ###\n") out.write("######################\n") if sortfields: listoffields = [] for k in sorted(sortedfields.keys()): listoffields.append(k) listoffields.append('--------------------') listoffields.extend(sortedfields[k]) listoffields.append('--------------------') numfields = sum([len(v) for v in sortedfields.values()]) else: numfields = len(listoffields) out.write("Number: " + str(numfields) + "\n") if not fieldscompression: write_formatted_fields(out, "Field name", "Spectral") else: params = ['KNGRIB', 'KNBITS', 'KSTRON', 'KPUILA'] width_cp = 8 compressionline = "" for p in params: compressionline += '{:^{width}}'.format(p, width=width_cp) write_formatted_fields(out, "Field name", "Spectral", compressionline) out.write(sepline) for f in listoffields: if inquire_field_dict(f)['type'] == 'H2D': encoding = self.fieldencoding(f) if not fieldscompression: write_formatted_fields(out, f, encoding['spectral']) else: compressionline = "" for p in params: try: compressionline += '{:^{width}}'.format(str(encoding[p]), width=width_cp) except KeyError: compressionline += '{:^{width}}'.format('-', width=width_cp) write_formatted_fields(out, f, encoding['spectral'], compression=compressionline) else: write_formatted_fields(out, f, '-') out.write(sepline) ############## # the FA WAY # ##############
def _read_geometry(self): """ Reads the geometry in the FA header. Interface to Fortran routines from 'ifsaux'. """ (KTYPTR, PSLAPO, PCLOPO, PSLOPO, PCODIL, KTRONC, KNLATI, KNXLON, KNLOPA, KNOZPA, PSINLA, KNIVER, PREFER, PAHYBR, PBHYBR) = arpifs4py.wfacies(self._FAsoftware_cst['JPXPAH'], self._FAsoftware_cst['JPXIND'], self._FAsoftware_cst['JPXGEO'], self._FAsoftware_cst['JPXNIV'], self.headername)[:-1] vcoordinate_read_in_header = {'grib2_surface_type':'119', 'levels_number':KNIVER, 'Ai':FPList([c * PREFER for c in PAHYBR[1:KNIVER + 1]]), 'Bi':FPList([c for c in PBHYBR[1:KNIVER + 1]]), 'ABgrid_position':'flux' } FA.reference_pressure = PREFER rectangular_grid = KTYPTR <= 0 if rectangular_grid: LMAP = int(PCODIL) != -1 # LAM or regular lat/lon projected_geometry = int(PSINLA[1]) != -9 if projected_geometry: # LAM (projection) if LMAP and int(PSINLA[0]) == 0: raise NotImplementedError("'old' FA header.") if LMAP: projection = {'reference_lon':Angle(PSINLA[2], 'radians'), 'reference_lat':Angle(PSINLA[3], 'radians'), 'center_lon':Angle(PSINLA[4], 'radians'), 'center_lat':Angle(PSINLA[5], 'radians') } if int(PSINLA[0]) != 0: grid = {'X_resolution':PSINLA[6], 'Y_resolution':PSINLA[7] } else: grid = {'X_resolution':PSINLA[14], 'Y_resolution':PSINLA[15] } dimensions = {'X':KNXLON, 'Y':KNLATI, 'X_CIzone':KNLOPA[3] - KNLOPA[2] + 1, 'Y_CIzone':KNLOPA[5] - KNLOPA[4] + 1, 'X_Iwidth':KNLOPA[6], 'Y_Iwidth':KNLOPA[7], 'X_Czone':KNLOPA[3] - KNLOPA[2] + 1 - 2 * KNLOPA[6], 'Y_Czone':KNLOPA[5] - KNLOPA[4] + 1 - 2 * KNLOPA[7], } if KNLOPA[1] == 0: #C+I grid['LAMzone'] = 'CI' dimensions['X'] = dimensions['X_CIzone'] dimensions['Y'] = dimensions['Y_CIzone'] elif abs(KNLOPA[1]) == 1: #C+I+E grid['LAMzone'] = 'CIE' dimensions['X_CIoffset'] = KNLOPA[2] - 1 dimensions['Y_CIoffset'] = KNLOPA[4] - 1 if abs(PSINLA[1]) <= config.epsilon: geometryname = 'mercator' elif 1.0 - abs(PSINLA[1]) <= config.epsilon: geometryname = 'polar_stereographic' elif config.epsilon < abs(PSINLA[1]) < 1.0 - config.epsilon: geometryname = 'lambert' spectral_space = 'bi-fourier' spectral_trunc = {'in_X':KTYPTR * -1, 'in_Y':KTRONC, 'shape':'elliptic'} if not LMAP: if dimensions['X'] == 1: spectral_space = 'fourier' spectral_trunc = {'in_Y':KTRONC, 'in_X':KTYPTR * -1} projection = None geometryname = 'academic' elif not projected_geometry: # regular lat/lon projection = None geometryname = 'regular_lonlat' grid = {'center_lon':Angle(PSINLA[4], 'radians'), 'center_lat':Angle(PSINLA[5], 'radians'), 'X_resolution':Angle(PSINLA[6], 'radians'), 'Y_resolution':Angle(PSINLA[7], 'radians') } dimensions = {'X':KNXLON, 'Y':KNLATI } spectral_space = None else: # ARPEGE global projection = None # reconstruction of tables on both hemispheres KNLOPA = KNLOPA[:KNLATI / 2] KNOZPA = KNOZPA[:KNLATI / 2] PSINLA = PSINLA[:KNLATI / 2] lon_number_by_lat = [n for n in KNLOPA] + [KNLOPA[-(n + 1)] for n in range(0, len(KNLOPA))] max_zonal_wavenumber_by_lat = [n for n in KNOZPA] + \ [KNOZPA[-(n + 1)] for n in range(0, len(KNOZPA))] latitudes = [Angle((math.cos(math.asin(sinlat)), sinlat), 'cos_sin') for sinlat in PSINLA] \ + [Angle((math.cos(math.asin(PSINLA[-(n + 1)])), -PSINLA[-(n + 1)]), 'cos_sin') for n in range(0, len(PSINLA))] grid = {'dilatation_coef':PCODIL, 'latitudes':FPList([l for l in latitudes]) } if KTYPTR == 1: geometryname = 'reduced_gauss' elif KTYPTR == 2: geometryname = 'rotated_reduced_gauss' grid['pole_lat'] = Angle((math.cos(math.asin(PSLAPO)), PSLAPO), 'cos_sin') grid['pole_lon'] = Angle((PCLOPO, PSLOPO), 'cos_sin') dimensions = {'max_lon_number':KNXLON, 'lat_number':KNLATI, 'lon_number_by_lat':FPList([n for n in lon_number_by_lat]) } spectral_space = 'legendre' spectral_trunc = {'max':KTRONC, 'shape':'triangular', 'max_zonal_wavenumber_by_lat':FPList([k for k in max_zonal_wavenumber_by_lat]) } kwargs_geom = dict(structure='H2D', name=geometryname, grid=FPDict(grid), dimensions=FPDict(dimensions), vcoordinate=FPDict(vcoordinate_read_in_header), position_on_grid=FPDict({'vertical':'mass', 'horizontal':'center'}), geoid=FPDict(config.FA_default_geoid)) if projection != None: kwargs_geom['projection'] = FPDict(projection) self.geometry = fpx.geometry(**kwargs_geom) if spectral_space != None: self.spectral_geometry = SpectralGeometry(space=spectral_space, truncation=FPDict(spectral_trunc)) else: self.spectral_geometry = None @Resource._openbeforedelayed def _read_validity(self): """ Reads the validity in the FA header. Interface to Fortran routines from 'ifsaux'. """ KDATEF = arpifs4py.wfadiex(self._unit) year = int(KDATEF[0]) month = int(KDATEF[1]) day = int(KDATEF[2]) hour = int(KDATEF[3]) minute = int(KDATEF[4]) second = int(KDATEF[13]) - hour * 3600 - minute * 60 processtype = int(KDATEF[8]) if processtype == 0: self.processtype = 'analysis' elif processtype == 1: self.processtype = 'initialization' elif processtype == 10: self.processtype = 'forecast' term_in_seconds = int(KDATEF[14]) cumulationstart_in_seconds = int(KDATEF[15]) cumulativeduration_in_seconds = term_in_seconds - cumulationstart_in_seconds basis = datetime.datetime(year, month, day, hour, minute, second) term = datetime.timedelta(seconds=term_in_seconds) cumulativeduration = datetime.timedelta(seconds=cumulativeduration_in_seconds) self._attributes['validity'] = FieldValidity(basis=basis, term=term, cumulativeduration=cumulativeduration) @Resource._openbeforedelayed def _set_validity(self, termunit='hours'): """ Sets date, hour and the processtype in the resource. """ if not self.isopen: raise epygramError("_set_validity must be called after FA is open.") basis = self.validity.getbasis() KDATEF = numpy.zeros(22, dtype=numpy.int64) KDATEF[0] = int(basis.year) KDATEF[1] = int(basis.month) KDATEF[2] = int(basis.day) KDATEF[3] = int(basis.hour) KDATEF[4] = int(basis.minute) if termunit == 'hours': KDATEF[5] = 1 KDATEF[6] = self.validity.term('IntHours') KDATEF[9] = self.validity.term('IntHours') - \ self.validity.cumulativeduration('IntHours') elif termunit == 'days': KDATEF[5] = 2 KDATEF[6] = self.validity.term('IntHours') / 24 KDATEF[9] = (self.validity.term('IntHours') - \ self.validity.cumulativeduration('IntHours')) / 24 else: raise NotImplementedError("term unit other than hours/days ?") #KDATEF[7] = 0 if self.processtype == 'analysis': processtype = 0 elif self.processtype == 'initialization': processtype = 1 elif self.processtype == 'forecast': processtype = 10 else: processtype = 255 # unknown processtype KDATEF[8] = processtype #KDATEF[10] = 0 if not config.FA_fandax: arpifs4py.wfandar(self._unit, KDATEF) else: # fandax KDATEF[11] = 1 KDATEF[13] = int(basis.second) + \ int(basis.minute) * 60 + \ int(basis.hour) * 3600 KDATEF[14] = self.validity.term(fmt='IntSeconds') KDATEF[15] = self.validity.term(fmt='IntSeconds') - \ self.validity.cumulativeduration(fmt='IntSeconds') arpifs4py.wfandax(self._unit, KDATEF) @Resource._openbeforedelayed def _getrunningcompression(self): """ Returns the current compression parameters of the FA (at time of writing). Interface to ifsaux' FAVEUR. """ comp = dict() (comp['KNGRIB'], comp['KNBPDG'], comp['KNBCSP'], comp['KSTRON'], comp['KPUILA'], comp['KDMOPL']) = arpifs4py.wfaveur(self._unit) return comp @Resource._openbeforedelayed def _setrunningcompression(self, **kwargs): """ Sets the compression parameters of the FA. Interface to FAGOTE (cf. FAGOTE documentation for significance of arguments). """ if self.openmode == 'r': raise IOError("method _setrunningcompression() can only be called\ if 'openmode' in('w', 'a').") comp = copy.deepcopy(self.default_compression) for k in kwargs.keys(): if k in self.default_compression.keys(): comp[k] = kwargs[k] else: raise epygramError("unknown parameter: " + k + \ " passed as argument.") arpifs4py.wfagote(self._unit, comp['KNGRIB'], comp['KNBPDG'], comp['KNBCSP'], comp['KSTRON'], comp['KPUILA'], comp['KDMOPL'])