Source code for epygram.FA

#!/usr/bin/env python
# -*- coding: utf-8 -*-

__all__ = ['FA']

import datetime
import os
import tempfile
import shutil
import copy
import numpy
import math
import re

import footprints
from footprints import FPDict, FPList

import arpifs4py

from epygram import config, epygramError, util, epylog, profiles
from epygram import H2DGeometry, SpectralGeometry, H2DField, MiscField
from epygram.base import Resource, FieldSet, FieldValidity
from epygram.util import Angle


    
def inquire_field_dict(fieldname):
    """
    Returns the info contained in the FA _field_dict for the requested field. 
    """
    
    matching_field = None
    for fd in FA._field_dict:
        dictitem = fd['name']
        pattern = re.subn('\.', r'\.', dictitem)[0] # protect '.'
        pattern = pattern.replace('?', '.') # change unix '?' to python '.' (any char)
        pattern = pattern.replace('*', '.*') # change unix '*' to python '.*' (several any char)
        pattern += '(?!.)'
        if re.match(pattern, fieldname):
            matching_field = fd
            break
    
    if matching_field == None:
        epylog.info("field '"+fieldname+"' is not referenced in Field_Dict_FA. Assume its type being a MiscField.")
        matching_field = {'name':fieldname, 'type':'Misc', 'nature':'float', 'dimension':'1'} 
    
    return copy.deepcopy(matching_field)

def _gen_headername():
    """
    Generates a random headername for the FA software.
    """
    import uuid
    
    return str(uuid.uuid4()).replace('-','')[0:16]

def _create_header_from_geometry(geometry, spectral_geometry=None):
    """
    Create a header and returns its name, from a H2DGeometry and (preferably) a SpectralGeometry.
    Args:
    - geometry: a H2DGeometry instance, from which the header is set.
    - spectral_geometry: optional, a SpectralGeometry instance, from which truncation is set in header.
                         If not provided, in LAM case the X/Y truncations are computed from field dimension,
                         as linear grid. In global case, an error is raised.
    Contains a call to arpifs4py.wfacade (wrapper for FACADE routine).
    """
    
    if not isinstance(geometry, H2DGeometry):
        raise epygramError("geometry must be a H2DGeometry instance.")
    if spectral_geometry != None and not isinstance(spectral_geometry, SpectralGeometry):
        raise epygramError("spectral_geometry must be a SpectralGeometry instance.")
    
    headername = _gen_headername()
    CDNOMC = headername
    
    JPXPAH = FA._FAsoftware_cst['JPXPAH']
    JPXIND = FA._FAsoftware_cst['JPXIND']
    if geometry.rectangular_grid:
        if spectral_geometry != None:
            truncation_in_X = spectral_geometry.truncation['in_X']
            truncation_in_Y = spectral_geometry.truncation['in_Y']
        else:
            truncation_in_X = numpy.floor((geometry.dimensions['X'] - 1)/2) # default: linear truncation...
            truncation_in_Y = numpy.floor((geometry.dimensions['Y'] - 1)/2)
            
        # scalars
        if geometry.name == 'regular_lonlat':
            KTYPTR = -11
        else:
            KTYPTR = -1 * truncation_in_X
        if geometry.name in ('lambert', 'polar_stereographic'):
            PSLAPO = geometry.projection['center_lon'].get('radians') - geometry.projection['reference_lon'].get('radians')
        else:
            PSLAPO = 0.0
        PCLOPO = 0.0
        PSLOPO = 0.0
        if geometry.name == 'academic':
            PCODIL = -1.0
        else:
            PCODIL = 0.0
        if geometry.name == 'regular_lonlat':
            KTRONC = 11
        else:
            KTRONC = truncation_in_Y
        KNLATI = geometry.dimensions['Y']
        KNXLON = geometry.dimensions['X']
        
        # KNLOPA
        KNLOPA = numpy.zeros(JPXPAH, dtype=numpy.int64)
        KNLOPA[0] = max(0, min(11, truncation_in_X, truncation_in_Y) - 1)
        if geometry.name == 'regular_lonlat':
            corners = geometry.gimme_corners_ij()
            KNLOPA[1] = 0
            KNLOPA[2] = 1 + corners['ll'][0]
            KNLOPA[3] = 1 + corners['ur'][0]
            KNLOPA[4] = 1 + corners['ll'][1]
            KNLOPA[5] = 1 + corners['ur'][1]
            KNLOPA[6] = 8
            KNLOPA[7] = 8
        else:
            corners = geometry.gimme_corners_ij(subzone='CI')
            if geometry.grid['LAMzone'] == 'CIE':
                KNLOPA[1] = 1
            KNLOPA[2] = 1 + corners['ll'][0]
            KNLOPA[3] = 1 + corners['ur'][0]
            KNLOPA[4] = 1 + corners['ll'][1]
            KNLOPA[5] = 1 + corners['ur'][1]
            KNLOPA[6] = geometry.dimensions['X_Iwidth']
            KNLOPA[7] = geometry.dimensions['Y_Iwidth']
        
        KNOZPA = numpy.zeros(JPXIND, dtype=numpy.int64)
        
        # PSINLA
        PSINLA = numpy.zeros(max((1+geometry.dimensions['Y'])/2, 18))
        PSINLA[0] = -1
        if geometry.name == 'regular_lonlat':
            PSINLA[1] = -9
            PSINLA[2] = geometry.grid['center_lon'].get('radians')
            PSINLA[3] = geometry.grid['center_lat'].get('radians')
            PSINLA[4] = geometry.grid['center_lon'].get('radians')
            PSINLA[5] = geometry.grid['center_lat'].get('radians')
            PSINLA[6] = geometry.grid['X_resolution'].get('radians')
            PSINLA[7] = geometry.grid['Y_resolution'].get('radians')
            PSINLA[8] = geometry.grid['X_resolution'].get('radians') * geometry.dimensions['X']
            PSINLA[9] = geometry.grid['Y_resolution'].get('radians') * geometry.dimensions['Y']
            PSINLA[10] = 0.0
            PSINLA[11] = 0.0
        elif geometry.projected_geometry:
            if geometry.secant_projection:
                raise epygramError("cannot write secant projected geometries in FA.")
            PSINLA[1] = geometry.projection['reference_lat'].get('cos_sin')[1]
            PSINLA[6] = geometry.grid['X_resolution']
            PSINLA[7] = geometry.grid['Y_resolution']
            PSINLA[8] = geometry.grid['X_resolution'] * geometry.dimensions['X']
            PSINLA[9] = geometry.grid['Y_resolution'] * geometry.dimensions['Y']
            PSINLA[10] = 2 * math.pi / PSINLA[8]
            PSINLA[11] = 2 * math.pi / PSINLA[9]
            PSINLA[2] = geometry.projection['reference_lon'].get('radians')
            PSINLA[3] = geometry.projection['reference_lat'].get('radians')
            PSINLA[4] = geometry.projection['center_lon'].get('radians')
            PSINLA[5] = geometry.projection['center_lat'].get('radians')
        if geometry.name != 'academic':
            PSINLA[12] = Angle(geometry.ij2ll(*corners['ll'])[0], 'degrees').get('radians')
            PSINLA[13] = Angle(geometry.ij2ll(*corners['ll'])[1], 'degrees').get('radians')
            PSINLA[14] = Angle(geometry.ij2ll(*corners['ur'])[0], 'degrees').get('radians')
            PSINLA[15] = Angle(geometry.ij2ll(*corners['ur'])[1], 'degrees').get('radians')
        else:
            PSINLA[6] = geometry.grid['X_resolution']
            PSINLA[7] = geometry.grid['Y_resolution']
            PSINLA[8] = geometry.grid['X_resolution'] * geometry.dimensions['X']
            PSINLA[9] = geometry.grid['Y_resolution'] * geometry.dimensions['Y']
            PSINLA[10] = 2 * math.pi / PSINLA[8]
            PSINLA[11] = 2 * math.pi / PSINLA[9]
        
    else: # global
        if geometry.name == 'reduced_gauss':
            KTYPTR = 1
            PSLAPO = 0. # as observed in files; why not 1. ?
            PCLOPO = 0. # as observed in files; why not 1. ?
            PSLOPO = 0.
        elif geometry.name == 'rotated_reduced_gauss':
            KTYPTR = 2
            PSLAPO = geometry.grid['pole_lat'].get('cos_sin')[1]
            PCLOPO = geometry.grid['pole_lon'].get('cos_sin')[0]
            PSLOPO = geometry.grid['pole_lon'].get('cos_sin')[1]
        PCODIL = geometry.grid['dilatation_coef']
        if spectral_geometry != None:
            KTRONC = spectral_geometry.truncation['max']
        else:
            KTRONC = (geometry.dimensions['max_lon_number'] - 1) / 2 # default: linear truncation...
        KNLATI = geometry.dimensions['lat_number']
        KNXLON = geometry.dimensions['max_lon_number']
        KNLOPA = numpy.zeros(JPXPAH, dtype=numpy.int64)
        KNLOPA[0:KNLATI/2] = geometry.dimensions['lon_number_by_lat'][0:KNLATI/2]
        KNOZPA = numpy.zeros(JPXIND, dtype=numpy.int64)
        if spectral_geometry != None:
            KNOZPA[0:KNLATI/2] = spectral_geometry.truncation['max_zonal_wavenumber_by_lat'][0:KNLATI/2]
        else:
            KNOZPA[0:KNLATI/2] = arpifs4py.w_trans_inq(geometry.dimensions['lat_number'], KTRONC,
                                                       len(geometry.dimensions['lon_number_by_lat']),
                                                       numpy.array(geometry.dimensions['lon_number_by_lat']),
                                                       config.KNUMMAXRESOL)[2][:]
        PSINLA = numpy.zeros((1+geometry.dimensions['lat_number'])/2, dtype=numpy.float64)
        PSINLA[0:KNLATI/2] = numpy.array([s.get('cos_sin')[1] for s in geometry.grid['latitudes'][0:KNLATI/2]])
    
    # vertical geometry
    try: KNIVER = geometry.vcoordinate['levels_number']
    except KeyError: KNIVER = 1
    PREFER = FA.reference_pressure
    try: PAHYBR = numpy.array([0.] + geometry.vcoordinate['Ai']) / FA.reference_pressure
    except KeyError: PAHYBR = numpy.array([0., 0.])
    try: PBHYBR = numpy.array([0.] + geometry.vcoordinate['Bi'])
    except KeyError: PBHYBR = numpy.array([0., 1.])
    LDGARD = True

    arpifs4py.wfacade(CDNOMC,
                      KTYPTR, PSLAPO, PCLOPO, PSLOPO,
                      PCODIL, KTRONC,
                      KNLATI, KNXLON,
                      len(KNLOPA), KNLOPA, len(KNOZPA), KNOZPA, len(PSINLA), PSINLA,
                      KNIVER, PREFER, PAHYBR, PBHYBR,
                      LDGARD)
    
    return headername

[docs]class FA(Resource): """ Class implementing all specificities for FA resource format. """ _footprint = dict( attr = dict( format = dict( values = set(['FA']), default = 'FA'), headername = dict( optional = True, info = "With openmode == 'w', name of an existing header, for the new FA to use its geometry."), validity = dict( type = FieldValidity, optional = True, info = "With openmode == 'w', describes the temporal validity of the resource."), default_compression = dict( type = FPDict, optional = True, info = "Default compression for writing fields in resource."), cdiden = dict( optional = True, default = 'unknown', access = 'rwx', info = "With openmode == 'w', identifies the FA by a keyword, usually the model abbreviation."), processtype = dict( optional = True, default = 'analysis', access = 'rwx', info = "With openmode == 'w', identifies the processus that produced the resource.") ) ) # the Field Dictionary gathers info about fields nature CSV_field_dictionaries = config.CSV_field_dictionaries_FA # syntax: _field_dict = [{'name':'fieldname1', 'type':'...', ...}, {'name':'fieldname2', 'type':'...', ...}, ...] _field_dict = [] # reference pressure coefficient for converting hybrid A coefficients in FA reference_pressure = config.FA_default_reference_pressure @classmethod def _FAsoft_init(cls): """ Initialize the FA software maximum dimensions. """ cls._FAsoftware_cst = dict(zip(('JPXPAH','JPXIND','JPXGEO', 'JPXNIV'), arpifs4py.get_facst() )) @classmethod def _read_field_dict(cls, fd_abspath): """ Reads the CSV fields dictionary of the format. """ import io import csv field_dict = [] with io.open(fd_abspath, 'r') as f: delimiter = str(f.readline()[0]) file_priority = str(f.readline()[0:-1]) field_table = csv.reader(f, delimiter=delimiter) for field in field_table: # syntax example of field description: # name:FIELDNAME;param:value;... fd = {} for f in range(0,len(field)): fd[field[f].split(':')[0]] = field[f].split(':')[1] field_dict.append(fd) if file_priority == 'main': cls._field_dict = field_dict elif file_priority == 'underwrite': for fd in field_dict: found = False for cfd in cls._field_dict: found = fd['name'] == cfd['name'] if not found: cls._field_dict.append(fd) elif file_priority == 'overwrite': for cfd in cls._field_dict: found = False for fd in field_dict: found = fd['name'] == cfd['name'] if not found: field_dict.append(cfd) cls._field_dict = field_dict def __init__(self, *args, **kwargs): """ Constructor. See its footprint for arguments. """ self.isopen = False # At creation of the first FA, initialize FA._field_dict if self._field_dict == []: self._read_field_dict(self.CSV_field_dictionaries['default']) if os.path.exists(self.CSV_field_dictionaries['user']): self._read_field_dict(self.CSV_field_dictionaries['user']) super(FA, self).__init__(*args, **kwargs) # Initialization of FA software (if necessary): if not hasattr(self, '_FAsoftware_cst'): self._FAsoft_init() self.fieldscompression = {} if self.openmode in ('r', 'a') and self.headername == None: self._attributes['headername'] = _gen_headername() if not self.fmtdelayedopen: self.open()
[docs] def open(self, geometry=None, spectral_geometry=None, validity=None): """ Opens a FA with ifsaux' FAITOU, and initializes some attributes. Actually, as FAITOU needs an existing header with 'w' *openmode*, opening file in 'w' *openmode* will require else an existing header to which the resource is linked, or to create a header from a geometry via *_create_header_from_geometry()* routine. This explains the eventual need for *geometry/spectral_geometry/validity* in *open()* method. If neither headername nor geometry are available, resource is not opened: the *open()* will be called again at first writing of a field in resource. Args: \n - *geometry*: optional, must be a :class:`epygram.H2DGeometry` instance. - *spectral_geometry*: optional, must be a :class:`epygram.SpectralGeometry` instance. - *validity*: optional, must be a :class:`epygram.base.FieldValidity` instance. """ if self.openmode in ('r', 'a'): # FA already exists, including geometry and validity if geometry != None or validity != None: epylog.warning(self.container.abspath+": FA.open(): geometry/validity argument will be ignored with this openmode ('r','a').") # open, getting logical unit # use alias (shortened filename), in order to prevent filenames too long for the FA software self.container.alias = tempfile.mkstemp(prefix='FA.')[1] os.remove(self.container.alias) os.symlink(self.container.abspath, self.container.alias) self._unit = arpifs4py.wfaitou(self.container.alias, 'OLD', self.headername) self.isopen = True self.empty = False # read info self._attributes['cdiden'] = arpifs4py.wfalsif(self._unit) self._read_geometry() self._read_validity() if self.openmode == 'a': if self.default_compression == None: self._attributes['default_compression'] = FPDict(self._getrunningcompression()) #config.FA_default_compression) self._setrunningcompression(**self.default_compression) elif self.openmode == 'w': if geometry != None: if not isinstance(geometry, H2DGeometry): raise epygramError("validity must be a H2DGeometry instance.") self._attributes['headername'] = _create_header_from_geometry(geometry, spectral_geometry) if validity != None: if not isinstance(validity, FieldValidity): raise epygramError("validity must be a FieldValidity instance.") self._attributes['validity'] = validity if self.headername != None and self.validity != None: # new FA, with an already existing header and validity initialized # set geometry from existing header self._read_geometry() # open self.container.alias = tempfile.mkstemp(prefix='FA.')[1] os.remove(self.container.alias) (self._unit) = arpifs4py.wfaitou(self.container.alias, 'NEW', self.headername) self.isopen = True self.empty = True # set FA date if self.validity.get() != self.validity.getbasis(): self.processtype = 'forecast' self._set_validity() # set CDIDEN arpifs4py.wfautif(self._unit, self.cdiden) if self.default_compression == None: self._attributes['default_compression'] = FPDict(config.FA_default_compression) self._setrunningcompression(**self.default_compression) elif self.headername == None or self.validity == None: # a header need to be created prior to opening: # header definition (then opening) will be done from the geometry taken in the first field to be written in resource pass
[docs] def close(self): """ Closes a FA with ifsaux' FAIRME. """ if self.isopen: try: arpifs4py.wfairme(self._unit, 'KEEP') except Exception: raise IOError("closing " + self.container.abspath) self.isopen = False # Cleanings if self.openmode in ('r', 'a'): os.remove(self.container.alias) elif self.openmode == 'w': if self.empty: os.remove(self.container.alias) else: shutil.move(self.container.alias, self.container.abspath) ################ # ABOUT FIELDS # ################
[docs] def find_fields_in_resource(self, seed=None, fieldtype=None): """ Returns a list of the fields from resource whose name match the given seed. Args: \n - *seed*: might be a regular expression, a list of regular expressions or *None*. If *None* (default), returns the list of all fields in resource. - *fieldtype*: optional, among ('H2D', 'Misc'). If provided, filters out the fields not of the given type. """ fieldslist = [] if seed == None: tmplist = self.listfields() for f in tmplist: if fieldtype == None or inquire_field_dict(f)['type'] == fieldtype: fieldslist.append(f) elif isinstance(seed, str): tmplist = util.find_re_in_list(seed, self.listfields()) for f in tmplist: if fieldtype == None or inquire_field_dict(f)['type'] == fieldtype: fieldslist.append(f) elif isinstance(seed, list): tmplist = [] for s in seed: tmplist += util.find_re_in_list(s, self.listfields()) for f in tmplist: if fieldtype == None or inquire_field_dict(f)['type'] == fieldtype: fieldslist.append(f) if fieldslist == []: raise epygramError("no field matching '"+seed+"' was found in resource "+self.container.abspath) return fieldslist
[docs] def listfields(self): """ Returns a list containing the FA identifiers of all the fields of the resource. """ return super(FA, self).listfields()
@Resource._openbeforedelayed def _listfields(self): """ Actual listfields() method for FA. """ records_number = arpifs4py.wlfinaf(self._unit)[0] arpifs4py.wlfipos(self._unit) # rewind fieldslist = [] for i in range(records_number): fieldname = arpifs4py.wlficas(self._unit, True)[0] if i >= 7 and fieldname != 'DATX-DES-DONNEES': fieldslist.append(fieldname.strip()) # i >= 7: 7 first fields in LFI are the header ("cadre") # 8th field added by P.Marguinaud, DATX-DES-DONNEES, to store dates with 1-second precision, from cy40t1 onwards return fieldslist
[docs] def sortfields(self): """ Returns a sorted list of fields with regards to their name and nature, as a dict of lists. """ list3Dparams = [] list3D = [] list2D = [] # final lists list3Dsp = [] list3Dgp = [] list2Dsp = [] list2Dgp = [] listMisc = [] for f in self.listfields(): info = inquire_field_dict(f) # separate H2D from Misc if info['type'] == 'H2D': # separate 3D from 2D if info['grib2_surface_type'] in ('119','100','103','109','20'): list3D.append(f) list3Dparams.append(filter(lambda x: not x.isdigit(), f[1:])) else: list2D.append(f) else: listMisc.append(f) list3Dparams = list(set(list3Dparams)) # separate gp/sp for f in list2D: if self.fieldencoding(f)['spectral']: list2Dsp.append(f) else: list2Dgp.append(f) # sort 2D list2Dsp.sort() list2Dgp.sort() # sort parameters for p in sorted(list3Dparams): interlist = [] for f in list3D: if p in f: interlist.append(f) # sort by increasing level interlist.sort() # separate gp/sp if self.fieldencoding(interlist[0])['spectral']: list3Dsp.extend(interlist) else: list3Dgp.extend(interlist) outlists = {'3D spectral fields':list3Dsp, '3D gridpoint fields':list3Dgp, '2D spectral fields':list2Dsp, '2D gridpoint fields':list2Dgp, 'Misc-fields':listMisc} return outlists
@Resource._openbeforedelayed
[docs] def fieldencoding(self, fieldname): """ Returns a dict containing info about how the field is encoded: spectral? and compression. Interface to ifsaux' FANION. """ try: (LDCOSP, KNGRIB, KNBITS, KSTRON, KPUILA) = arpifs4py.wfanion(self._unit, fieldname[0:4], 0, fieldname[4:])[1:6] except RuntimeError: raise RuntimeError("error while calling FANION: "+fieldname+" type is probably 'Misc' instead of 'H2D': \ please update Field_Dict_FA.csv accordingly (cf. config.py for location of field dict).") encoding = {'spectral':LDCOSP, 'KNGRIB':KNGRIB, 'KNBITS':KNBITS, 'KSTRON':KSTRON, 'KPUILA':KPUILA} return encoding
@Resource._openbeforedelayed
[docs] def readfield(self, fieldname, getdata=True): """ Reads one field, given its FA name, and returns a Field instance. Interface to Fortran routines from 'ifsaux'. Args: \n - *fieldname*: FA fieldname - *getdata*: optional, if *False*, only metadata are read, the field do not contain data. Default is *True*. """ if self.openmode == 'w': raise epygramError("cannot read fields in resource if with openmode == 'w'.") # Check field in FA #if not fieldname in self.listfields(): # raise epygramError("field "+fieldname+" is not in resource.") # Get field info field_info = inquire_field_dict(fieldname) if field_info['type'] == 'H2D': encoding = self.fieldencoding(fieldname) # Save compression in FA compression = {'KNGRIB':encoding['KNGRIB'], 'KNBPDG':encoding['KNBITS'], 'KNBCSP':encoding['KNBITS'], 'KSTRON':encoding['KSTRON'], 'KPUILA':encoding['KPUILA']} self.fieldscompression[fieldname] = compression # vertical geometry vcoordinate = copy.deepcopy(self.geometry.vcoordinate) vcoordinate.update(field_info) vcoordinate.pop('type') vcoordinate.pop('name') # get level for Sigma/Pressure levels in field name (deeply linked to FA naming conventions) if vcoordinate['grib2_surface_type'] == '119': # hybrid pressure vcoordinate['Z'] = int(fieldname[1:4]) elif vcoordinate['grib2_surface_type'] == '100': # pressure (isobaric) vcoordinate['Z'] = int(fieldname[1:6]) # problem linked to number of digits: if vcoordinate['Z'] == 0: vcoordinate['Z'] = 100000 elif vcoordinate['grib2_surface_type'] == '103': # height vcoordinate['Z'] = int(fieldname[1:6]) elif vcoordinate['grib2_surface_type'] == '109': # PV vcoordinate['Z'] = float(fieldname[1:4])/10. # to handle PVU elif vcoordinate['grib2_surface_type'] == 1: # surface vcoordinate.pop('Ai') vcoordinate.pop('Bi') vcoordinate.pop('ABgrid_position') vcoordinate.pop('levels') # Prepare field dimensions spectral = encoding['spectral'] if spectral: truncation = self.spectral_geometry.truncation if 'fourier' in self.spectral_geometry.space: # LAM SPdatasize = arpifs4py.w_etrans_inq(self.geometry.dimensions['X'], self.geometry.dimensions['Y'], self.geometry.dimensions['X_CIzone'], self.geometry.dimensions['Y_CIzone'], truncation['in_X'], truncation['in_Y'], config.KNUMMAXRESOL)[1] elif self.spectral_geometry.space == 'legendre': # Global total_system_memory = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') memory_needed_for_transforms = truncation['max']**3 / 2 * 8 if memory_needed_for_transforms >= config.prevent_swapping_legendre * total_system_memory: raise epygramError('Legendre spectral transforms need ' + \ str(int( float(memory_needed_for_transforms)/1024**2.)) + \ ' MB memory, while only ' + \ str(int( float(total_system_memory)/1024**2.)) + \ ' MB is available: SWAPPING prevented !') SPdatasize = arpifs4py.w_trans_inq(self.geometry.dimensions['lat_number'], truncation['max'], len(self.geometry.dimensions['lon_number_by_lat']), numpy.array(self.geometry.dimensions['lon_number_by_lat']), config.KNUMMAXRESOL)[1] SPdatasize *= 2 # complex coefficients datasize = SPdatasize spectral_geometry = self.spectral_geometry else: if self.geometry.rectangular_grid: GPdatasize = self.geometry.dimensions['X'] * self.geometry.dimensions['Y'] else: GPdatasize = sum(self.geometry.dimensions['lon_number_by_lat']) datasize = GPdatasize spectral_geometry = None # Make geometry object kwargs_geom = dict(structure='H2D', name=self.geometry.name, grid=FPDict(self.geometry.grid), dimensions=FPDict(self.geometry.dimensions), vcoordinate=FPDict(vcoordinate), geoid=config.FA_default_geoid) if self.geometry.projected_geometry: kwargs_geom['projection'] = FPDict(self.geometry.projection) geometry = footprints.proxy.geometry(**kwargs_geom) # Get data if requested if getdata: if field_info['type'] == 'Misc': field_length = arpifs4py.wlfinfo(self._unit, fieldname)[0] data = arpifs4py.wfalais(self._unit, fieldname, field_length) if field_info['dimension'] == '0': if field_info['nature'] == 'int': dataOut = data.view('int64')[0] elif field_info['nature'] == 'str': dataInt = data.view('int64') dataOut = "" for num in dataInt: dataOut += chr(num) elif field_info['nature'] == 'bool': dataOut = bool(data.view('int64')[0]) elif field_info['nature'] == 'float': dataOut = data[0] else: raise NotImplementedError("reading of datatype "+field_info['nature']+".") else: if field_info['nature'] == 'int': dataOut = numpy.copy(data.view('int64')[:]) # copy is necessary for garbage collector elif field_info['nature'] == 'float': dataOut = numpy.copy(data) elif field_info['nature'] == 'str': #FIXME: table of int(str) = table of tables ??? dataOut = numpy.copy(data) elif field_info['nature'] == 'bool': dataOut = numpy.copy(data.view('bool')[:]) else: raise NotImplementedError("reading of datatype "+field_info['nature']+" array.") data = dataOut elif field_info['type'] == 'H2D': data = numpy.array(arpifs4py.wfacile(datasize, self._unit, fieldname[0:4], 0, fieldname[4:], spectral)) # Create field if field_info['type'] == 'H2D': # Create H2D field field = H2DField(fid=FPDict({self.format:fieldname}), geometry=geometry, validity=self.validity, spectral_geometry=spectral_geometry, processtype=self.processtype) if 'gauss' in self.geometry.name and config.FA_buffered_gauss_grid: field.geometry._bound_gauss_grid = self.geometry.get_lonlat_grid elif field_info['type'] == 'Misc': # Create Misc field field = MiscField(fid=FPDict({self.format:fieldname})) if getdata: if field_info['type'] == 'H2D' and not field.spectral: data = geometry.reshape_data(data) field.setdata(data) return field
[docs] def readfields(self, requestedfields=None, getdata=True): """ Returns a :class:`epygram.base.FieldSet` containing requested fields read in the resource. Args: \n - *requestedfields*: might be \n - a regular expression (e.g. 'S\*WIND.[U,V].PHYS') - a list of FA fields identifiers with regular expressions (e.g. ['SURFTEMPERATURE', 'S0[10-20]WIND.?.PHYS']) - if not specified, interpretated as all fields that will be found in resource - *getdata*: optional, if *False*, only metadata are read, the fields do not contain data. Default is *True*. """ requestedfields = self.find_fields_in_resource(requestedfields) if requestedfields == []: raise epygramError("unable to find requested fields in resource.") return super(FA, self).readfields(requestedfields, getdata)
[docs] def writefield(self, field, compression=None): """ Write a field in the resource. Args: \n - *field*: a :class:`epygram.base.Field` instance or :class:`epygram.H2DField`. - *compression*: optional, a (possibly partial) dict containing parameters for field compression (in case of a :class:`epygram.H2DField`). Ex: {'KNGRIB': 2, 'KDMOPL': 5, 'KPUILA': 1, 'KSTRON': 10, 'KNBPDG': 24, 'KNBCSP': 24} """ if self.openmode == 'r': raise IOError("cannot write field in a FA with openmode 'r'.") if not self.isopen: if not isinstance(field, H2DField): raise epygramError("cannot write a this kind of field on a non-open FA.\n \ FA need a geometry to be open. Maybe this FA has not been given one at opening.\n \ For opening it, either write a H2DField in, or call its method open(geometry, validity),\n \ geometry being a H2DGeometry, validity being a FieldValidity.") if self.validity == None: self._attributes['validity'] = field.validity self._attributes['headername'] = _create_header_from_geometry(field.geometry, field.spectral_geometry) self.open() if not self.empty and field.fid['FA'] in self.listfields(): raise epygramError("there already is a field with the same name in this FA.") if isinstance(field, MiscField): data = field.data if field.shape in ((1,), ()): if 'int' in field.datatype.name: dataReal = data.view('float64') elif 'str' in field.datatype.name: data = str(data) # ndarray of str -> simple str dataReal = numpy.array([ord(d) for d in data]).view('float64') elif 'bool' in field.datatype.name: dataReal = numpy.array(1 if data else 0).view('float64') elif 'float' in field.datatype.name: dataReal = data else: raise NotImplementedError("writing of datatype "+field.datatype.name+".") else: try: dataReal = numpy.copy(data.view('float64')) except Exception: raise NotImplementedError("writing of datatype "+field.datatype.__name__+" array.") arpifs4py.wfaisan(self._unit, field.fid['FA'], dataReal.size, dataReal) elif isinstance(field, H2DField): if [self.geometry.name, self.geometry.grid, self.geometry.dimensions] != \ [field.geometry.name, field.geometry.grid, field.geometry.dimensions]: raise epygramError("gridpoint geometry incompatibility: a FA can hold only one geometry.") if field.spectral_geometry != None and field.spectral_geometry != self.spectral_geometry: # compatibility check raise epygramError("spectral geometry incompatibility: a FA can hold only one geometry.") data = numpy.ma.copy(field.data).flatten() if isinstance(data, numpy.ma.core.MaskedArray): data=numpy.copy(data[data.mask==False].data) if compression != None: modified_compression = True elif self.fieldscompression.has_key(field.fid['FA']): compression = self.fieldscompression[field.fid['FA']] modified_compression = True else: modified_compression = False if modified_compression: self._setrunningcompression(**compression) arpifs4py.wfaienc(self._unit, field.fid['FA'][0:4], 0, field.fid['FA'][4:], len(data), data, field.spectral) if modified_compression: self._setrunningcompression(**self.default_compression) # set back to default if not self.fieldscompression.has_key(field.fid['FA']): self.fieldscompression[field.fid['FA']] = compression if self.empty: self.empty = False
[docs] def writefields(self, fieldset, compression=None): """ Write the fields of the *fieldset* in the resource. Args: \n - *fieldset*: must be a :class:`epygram.base.FieldSet` instance. - *compression*: must be a list of compression dicts (cf. *writefield()* method), of length equal to the length of the *fieldset*, and with the same order. """ if not isinstance(fieldset, FieldSet): raise epygramError("'fieldset' argument must be of kind FieldSet.") if len(fieldset) != len(compression): raise epygramError("fieldset and compression must have the same length.") # Be sure the first field to be written is an H2DField, # for being able to set the header if necessary if not self.isopen and not isinstance(fieldset[0], H2DField): for f in range(len(fieldset)): if isinstance(fieldset[f], H2DField): fieldset.insert(0, fieldset.pop(f)) if compression != None: compression.insert(0, compression.pop(f)) break if compression != None: # loop separated from the above one, because fieldset is there-above modified for f in range(len(fieldset)): self.writefield(fieldset[f], compression[f]) else: super(FA, self).writefields(fieldset)
[docs] def extractprofile(self, fid, lon, lat, interpolation='nearest'): """ Extracts a vertical profile from the FA resource, given its pseudo-fid and the geographic location (*lon*/*lat*) of the profile. Args: \n - *fid* must have syntax: 'K\*PARAMETER', K being the kind of surface (S,P,H,V), \* being a true star character, and PARAMETER being the name of the parameter requested, as named in FA. - *lon* is the longitude of the desired point. - *lat* is the latitude of the desired point. - *interpolation* defines the interpolation function used to compute the profile at requested lon/lat from the fields grid: - if 'nearest' (default), extracts profile at the horizontal nearest neighboring gridpoint; - if 'linear', computes profile with horizontal linear spline interpolation; - if 'cubic', computes profile with horizontal cubic spline interpolation. Warning: 'nearest' *interpolation* requires the :mod:`pyproj` module. """ try: N = len(lon) except Exception: N = 1 lon = numpy.array([lon]) lat = numpy.array([lat]) l = len(fid[2:]) fidlist = self.find_fields_in_resource(seed=fid, fieldtype='H2D') fidlist = filter(lambda x: all([i.isdigit() for i in x[1:-l]]), fidlist) if fidlist == []: raise epygramError("cannot find profile for" + fid + " in resource.") fs = self.readfields(fidlist, getdata=False) fs.sort(attribute=['geometry','vcoordinate'], key='Z') fidlist = fs.listfields(typefmt='FA') # Build commons field = fs[0] # location if field.geometry.rectangular_grid: for i in range(N): if not field.geometry.point_is_inside_domain(lon[i], lat[i]): raise ValueError("point ("+str(lon[i])+", " + \ str(lat[i])+") is out of field domain.") if interpolation == 'nearest': from pyproj import Geod g = Geod(ellps='sphere') (asked_i, asked_j) = field.geometry.ll2ij(lon, lat) gp_i = numpy.rint(asked_i).astype('int') gp_j = numpy.rint(asked_j).astype('int') gridpoint_ll = field.geometry.ij2ll(gp_i, gp_j) true_loc = gridpoint_ll arc = g.inv(lon, lat, gridpoint_ll[0], gridpoint_ll[1]) az = arc[0] distance = arc[2] comments = [] for i in range(N): if -22.5 < az[i] <= 22.5: direction = 'N' elif -77.5 < az[i] <= -22.5: direction = 'NW' elif -112.5 < az[i] <= -77.5: direction = 'W' elif -157.5 < az[i] <= -112.5: direction = 'SW' elif -180.5 <= az[i] <= -157.5 or 157.5 < az[i] <= 180.: direction = 'S' elif 22.5 < az[i] <= 77.5: direction = 'NE' elif 77.5 < az[i] <= 112.5: direction = 'E' elif 112.5 < az[i] <= 157.5: direction = 'SE' gridpointstr = "(" + '{:.{precision}{type}}'.format(gridpoint_ll[0][i], type='F', precision=4) + ", " + \ '{:.{precision}{type}}'.format(gridpoint_ll[1][i], type='F', precision=4) + ")" comment = "Profile @ " + str(int(distance[i])) + "m " + direction + \ " from " + str((lon[i], lat[i])) + "\n" + \ "( = nearest gridpoint: " + gridpointstr + ")" comments.append(comment) elif interpolation in ('linear', 'cubic'): true_loc = (lon, lat) if interpolation == 'linear': interpstr = 'linearly' elif interpolation == 'cubic': interpstr = 'cubically' comments = [] for i in range(N): comment = "Profile " + interpstr + " interpolated @ " + str((lon[i], lat[i])) comments.append(comment) else: raise NotImplementedError(interpolation + "interpolation.") # build vertical geometry fieldlevels_number = len(fidlist) gridsize = fieldlevels_number if fid[0] == 'S': coordinate = 'hybrid_pressure' gridposition = 'flux' position_on_grid = 'mass' grid = {'levels':FPDict({'Ai':tuple(self.geometry.vcoordinate['Ai']), 'Bi':tuple(self.geometry.vcoordinate['Bi'])})} elif fid[0] == 'P': coordinate = 'pressure' gridposition = None position_on_grid = None grid = {'levels':tuple([fld.geometry.vcoordinate['Z'] for fld in fs])} elif fid[0] == 'H': coordinate = 'height' gridposition = None position_on_grid = None grid = {'levels':tuple([fld.geometry.vcoordinate['Z'] for fld in fs])} elif fid[0] == 'V': coordinate = 'potential_vortex' gridposition = None position_on_grid = None grid = {'levels':tuple([fld.geometry.vcoordinate['Z'] for fld in fs])} grid['fieldlevels_number'] = fieldlevels_number grid['gridsize'] = gridsize grid['gridposition'] = gridposition vgeometries = [] for i in range(N): vgeometry = footprints.proxy.geometry(structure='V1D', coordinate=coordinate, grid=FPDict(grid), position_on_grid=position_on_grid, hlocation=FPDict(zip(('lon', 'lat'), (true_loc[0][i], true_loc[1][i]))) ) vgeometries.append(vgeometry) # profile values = [] for f in fidlist: # field field = self.readfield(f) if field.spectral: field.sp2gp() # read profile value values.append(field.getvalue_ll(lon, lat, interpolation=interpolation)) # build profile as a V1DField profiles = FieldSet() for i in range(N): profile = footprints.proxy.field(fid=FPDict({'FA':fid}), geometry=vgeometries[i], validity=self.validity, processtype=self.processtype, comment=comments[i]) if N == 1: profile.setdata(values) else: profile.setdata([v[i] for v in values]) profiles.append(profile) if N == 1: profile = profiles[0] else: profile = profiles return profile
@Resource._openbeforedelayed
[docs] def extractsection(self, fid, end1, end2, points_number=None, resolution=None, vertical_coordinate=None, interpolation='linear', cheap_alt=False): """ Extracts a vertical section from the FA resource, given its pseudo-fid and the geographic (lon/lat) coordinates of its ends. The section is returned as a V2DField. Args: \n - *fid* must have syntax: 'K\*PARAMETER', K being the kind of surface (S,P,H,V), \* being a true star character, and PARAMETER being the name of the parameter requested, as named in FA. - *end1* must be a tuple (lon, lat). - *end2* must be a tuple (lon, lat). - *points_number* defines the total number of horizontal points of the section (including ends). If None, defaults to a number computed from the *ends* and the *resolution*. - *resolution* defines the horizontal resolution to be given to the field. If None, defaults to the horizontal resolution of the field. - *vertical_coordinate* defines the requested vertical coordinate of the V2DField (cf. :class:`epygram.V1DGeometry` coordinate possible values). - *interpolation* defines the interpolation function used to compute the profile points locations from the fields grid: \n - if 'nearest', each horizontal point of the section is taken as the horizontal nearest neighboring gridpoint; - if 'linear' (default), each horizontal point of the section is computed with linear spline interpolation; - if 'cubic', each horizontal point of the section is computed with linear spline interpolation. - *cheap_alt*: if True and *vertical_coordinate* among ('altitude', 'height'), the computation of altitudes is done without taking hydrometeors into account (in R computation) nor NH Pressure departure (Non-Hydrostatic data). Warning: requires the :mod:`pyproj` module. """ import pyproj g = pyproj.Geod(ellps='sphere') if resolution == None and points_number == None: (i1, j1) = self.geometry.ll2ij(*end1) inear = i1 - 1 if i1 > 1 else i1 + 1 resolution = g.inv(end1[0], end1[1], *self.geometry.ij2ll(inear, j1))[2] if resolution > g.inv(end1[0], end1[1], end2[0], end2[1])[2]: raise epygramError("'ends' are too near: pure interpolation between two gridpoints.") elif points_number != None and points_number < 2: raise epygramError("'points_number' must be at least 2.") if resolution != None: distance = g.inv(end1[0], end1[1], end2[0], end2[1])[2] points_number = int(numpy.ceil(distance / resolution)) + 1 epylog.info('section computed on ' + str(points_number) + \ ' points == ' + str(resolution) + ' m resolution.') if points_number >=3: transect = g.npts(end1[0], end1[1], end2[0], end2[1], points_number-2) elif points_number == 2: transect = [] else: raise epygramError("cannot make a section with less than 2 points.") transect.insert(0,end1) transect.append(end2) profileslist = self.extractprofile(fid, [p[0] for p in transect], [p[1] for p in transect], interpolation=interpolation) geometry = footprints.proxy.geometry(structure='V2D', grid=FPList([p.geometry for p in profileslist])) section = footprints.proxy.field(fid=FPDict({'section':fid}), geometry=geometry, validity=profileslist[0].validity) section.setdata(numpy.array([p.data for p in profileslist]).transpose()) del profileslist # vertical coords conversion if vertical_coordinate not in (None, section.geometry.coordinate): if section.geometry.coordinate == 'hybrid_pressure' and \ vertical_coordinate == 'pressure': Psurf = self.readfield('SURFPRESSION') if Psurf.spectral: Psurf.sp2gp() ps_transect = numpy.exp([Psurf.getvalue_ll(p.hlocation['lon'], p.hlocation['lat'], interpolation=interpolation) for p in section.geometry.grid]) del Psurf section.geometry.hybrid2pressure(ps_transect, gridposition='mass') elif section.geometry.coordinate == 'hybrid_pressure' and \ vertical_coordinate in ('altitude', 'height'): Psurf = self.readfield('SURFPRESSION') if Psurf.spectral: Psurf.sp2gp() ps_transect = numpy.exp([Psurf.getvalue_ll(p.hlocation['lon'], p.hlocation['lat'], interpolation=interpolation) for p in section.geometry.grid]) del Psurf if not all(('S001TEMPERATURE' in self.listfields(), 'S001HUMI.SPECIFI' in self.listfields())): raise epygramError("for hybrid2altitude/height conversions, \ Temperature and Specific Humidity must be in resource.") # T & q if fid != 'S*TEMPERATURE': epylog.info("extract S*TEMPERATURE") T = self.extractsection('S*TEMPERATURE', end1, end2, points_number=points_number, interpolation=interpolation).data else: T = section.data if fid != 'S*HUMI.SPECIFI': epylog.info("extract S*HUMI.SPECIFI") q = self.extractsection('S*HUMI.SPECIFI', end1, end2, points_number=points_number, interpolation=interpolation).data else: q = section.data # other optional fields try: if cheap_alt: raise Exception() epylog.info("extract S*CLOUD_WATER") ql = self.extractsection('S*CLOUD_WATER', end1, end2, points_number=points_number, interpolation=interpolation).data epylog.info("extract S*ICE_CRYSTAL") qi = self.extractsection('S*ICE_CRYSTAL', end1, end2, points_number=points_number, interpolation=interpolation).data epylog.info("extract S*SNOW") qs = self.extractsection('S*SNOW', end1, end2, points_number=points_number, interpolation=interpolation).data epylog.info("extract S*RAIN") qr = self.extractsection('S*RAIN', end1, end2, points_number=points_number, interpolation=interpolation).data except Exception: ql = qi = qs = qr = numpy.zeros(q.shape) try: if cheap_alt: raise Exception() epylog.info("extract S*GRAUPEL") qg = self.extractsection('S*GRAUPEL', end1, end2, points_number=points_number, interpolation=interpolation).data except Exception: qg = numpy.zeros(q.shape) R = profiles.gfl2R(q, ql, qi, qr, qs, qg) try: if cheap_alt: raise Exception() epylog.info("extract S*PRESS.DEPART") Pdep = self.extractsection('S*PRESS.DEPART', end1, end2, points_number=points_number, interpolation=interpolation).data except Exception: Pdep = numpy.zeros(T.shape) if vertical_coordinate == 'altitude': Phi_surf = self.readfield('SPECSURFGEOPOTEN') if Phi_surf.spectral: Phi_surf.sp2gp() phisurf_transect = [Phi_surf.getvalue_ll(p.hlocation['lon'], p.hlocation['lat'], interpolation=interpolation) for p in section.geometry.grid] del Phi_surf else: phisurf_transect = 0. section.geometry.hybrid2altitude(R, T, ps_transect, Pdep, phisurf_transect) else: raise NotImplementedError("this vertical coordinate conversion.") return section ########### # pre-app # ###########
@Resource._openbeforedelayed
[docs] def what(self, out, fieldscompression=False, sortfields=False): """ Writes in file a summary of the contents of the FA. Args: \n - *out*: the output open file-like object (duck-typing: *out*.write() only is needed). - *fieldscompression*: **True** if information about fields compression is requested. - *sortfields*: **True** if the fields have to be sorted by type. """ firstcolumn_width = 50 secondcolumn_width = 16 sepline = '{:-^{width}}'.format('', width=firstcolumn_width + secondcolumn_width + 1) + '\n' first_H2DField = [f for f in self.listfields() if inquire_field_dict(f)['type'] == 'H2D'][0] firstfield = self.readfield(first_H2DField, getdata=False) if not firstfield.spectral and self.spectral_geometry != None: firstfield._attributes['spectral_geometry'] = self.spectral_geometry v_geometry = self.geometry.vcoordinate listoffields = self.listfields() if sortfields: sortedfields = self.sortfields() def write_formatted(dest, label, value): dest.write('{:<{width}}'.format(label, width=firstcolumn_width) \ + ':' \ + '{:>{width}}'.format(str(value), width=secondcolumn_width) \ + '\n') def write_formatted_col(dest, label, value): dest.write('{:>{width}}'.format(label, width=firstcolumn_width) \ + ':' \ + '{:>{width}}'.format(str(value), width=secondcolumn_width) \ + '\n') def write_formatted_fields(dest, label, value, compression=None): if compression == None: dest.write('{:<{width}}'.format(label, width=20) \ + ':' \ + '{:^{width}}'.format(str(value), width=10) \ + '\n') else: line = '{:<{width}}'.format(label, width=20) \ + ':' \ + '{:^{width}}'.format(str(value), width=10) \ + ':' \ + compression \ + '\n' dest.write(line) out.write("### IDENTIFIER (CDIDEN): " + self.cdiden + "\n") out.write("\n") firstfield.what(out, vertical_geometry=False) out.write("#########################\n") out.write("### VERTICAL GEOMETRY ###\n") out.write("#########################\n") write_formatted(out, "Number of vertical levels", v_geometry['levels_number']) out.write("Hybrid-pressure coord. A coefficients\n") for c in range(len(v_geometry['Ai'])): write_formatted_col(out, str(c+1), v_geometry['Ai'][c]) out.write(sepline) out.write("Hybrid-pressure coord. B coefficients\n") for c in range(len(v_geometry['Bi'])): write_formatted_col(out, str(c+1), v_geometry['Bi'][c]) out.write(sepline) out.write("\n") out.write("######################\n") out.write("### LIST OF FIELDS ###\n") out.write("######################\n") if sortfields: listoffields = [] for k in sorted(sortedfields.keys()): listoffields.append(k) listoffields.append('--------------------') listoffields.extend(sortedfields[k]) listoffields.append('--------------------') numfields = sum([len(v) for v in sortedfields.values()]) else: numfields = len(listoffields) out.write("Number: " + str(numfields) + "\n") if not fieldscompression: write_formatted_fields(out, "Field name", "Spectral") else: params = ['KNGRIB', 'KNBITS', 'KSTRON', 'KPUILA'] width_cp = 8 compressionline = "" for p in params: compressionline += '{:^{width}}'.format(p, width=width_cp) write_formatted_fields(out, "Field name", "Spectral", compressionline) out.write(sepline) for f in listoffields: if inquire_field_dict(f)['type'] == 'H2D': encoding = self.fieldencoding(f) if not fieldscompression: write_formatted_fields(out, f, encoding['spectral']) else: compressionline = "" for p in params: try: compressionline += '{:^{width}}'.format(str(encoding[p]), width=width_cp) except KeyError: compressionline +='{:^{width}}'.format('-', width=width_cp) write_formatted_fields(out, f, encoding['spectral'], compression=compressionline) else: write_formatted_fields(out, f, '-') out.write(sepline) ############## # the FA WAY # ##############
@Resource._openbeforedelayed def _read_geometry(self): """ Reads the geometry in the FA header. Interface to Fortran routines from 'ifsaux'. """ (KTYPTR, PSLAPO, PCLOPO, PSLOPO, PCODIL, KTRONC, KNLATI, KNXLON, KNLOPA, KNOZPA, PSINLA, KNIVER, PREFER, PAHYBR, PBHYBR) = arpifs4py.wfacies(self._FAsoftware_cst['JPXPAH'], self._FAsoftware_cst['JPXIND'], self._FAsoftware_cst['JPXGEO'], self._FAsoftware_cst['JPXNIV'], self.headername)[:-1] vcoordinate_read_in_header = {'grib2_surface_type':'119', 'levels_number':KNIVER, 'Ai':FPList([c*PREFER for c in PAHYBR[1:KNIVER+1]]), # conversion for footprints purpose... :-/ 'Bi':FPList([c for c in PBHYBR[1:KNIVER+1]]), 'ABgrid_position':'flux' } FA.reference_pressure = PREFER rectangular_grid = KTYPTR <= 0 if rectangular_grid: LMAP = int(PCODIL) != -1 # LAM or regular lat/lon projected_geometry = int(PSINLA[1]) != -9 if projected_geometry: # LAM (projection) if LMAP and int(PSINLA[0]) == 0: raise NotImplementedError("'old' FA header.") if LMAP: projection = {'reference_lon':Angle(PSINLA[2], 'radians'), 'reference_lat':Angle(PSINLA[3], 'radians'), 'center_lon':Angle(PSINLA[4], 'radians'), 'center_lat':Angle(PSINLA[5], 'radians') } if int(PSINLA[0]) != 0: grid = {'X_resolution':PSINLA[6], 'Y_resolution':PSINLA[7] } else: grid = {'X_resolution':PSINLA[14], 'Y_resolution':PSINLA[15] } dimensions = {'X':KNXLON, 'Y':KNLATI, 'X_CIzone':KNLOPA[3] - KNLOPA[2] + 1, 'Y_CIzone':KNLOPA[5] - KNLOPA[4] + 1, 'X_Iwidth':KNLOPA[6], 'Y_Iwidth':KNLOPA[7], 'X_Czone':KNLOPA[3] - KNLOPA[2] + 1 - 2 * KNLOPA[6], 'Y_Czone':KNLOPA[5] - KNLOPA[4] + 1 - 2 * KNLOPA[7], } if KNLOPA[1] == 0: #C+I grid['LAMzone'] = 'CI' dimensions['X'] = dimensions['X_CIzone'] dimensions['Y'] = dimensions['Y_CIzone'] elif abs(KNLOPA[1]) == 1: #C+I+E grid['LAMzone'] = 'CIE' dimensions['X_CIoffset'] = KNLOPA[2] - 1 dimensions['Y_CIoffset'] = KNLOPA[4] - 1 if abs(PSINLA[1]) <= config.epsilon: geometryname = 'mercator' elif 1.0 - abs(PSINLA[1]) <= config.epsilon: geometryname = 'polar_stereographic' elif config.epsilon < abs(PSINLA[1]) < 1.0 - config.epsilon: geometryname = 'lambert' spectral_space = 'bi-fourier' spectral_trunc = {'in_X':KTYPTR * -1, 'in_Y':KTRONC, 'shape':'elliptic'} if not LMAP: if dimensions['X'] == 1: spectral_space = 'fourier' spectral_trunc = {'in_Y':KTRONC, 'in_X':KTYPTR * -1} projection = None geometryname = 'academic' elif not projected_geometry: # regular lat/lon projection = None geometryname = 'regular_lonlat' grid = {'center_lon':Angle(PSINLA[4], 'radians'), 'center_lat':Angle(PSINLA[5], 'radians'), 'X_resolution':Angle(PSINLA[6], 'radians'), 'Y_resolution':Angle(PSINLA[7], 'radians') } dimensions = {'X':KNXLON, 'Y':KNLATI } spectral_space = None else: # ARPEGE global projection = None # reconstruction of tables on both hemispheres KNLOPA = KNLOPA[:KNLATI/2] KNOZPA = KNOZPA[:KNLATI/2] PSINLA = PSINLA[:KNLATI/2] lon_number_by_lat = [n for n in KNLOPA]+[KNLOPA[-(n+1)] for n in range(0,len(KNLOPA))] max_zonal_wavenumber_by_lat = [n for n in KNOZPA]+[KNOZPA[-(n+1)] for n in range(0,len(KNOZPA))] latitudes = [Angle((math.cos(math.asin(sinlat)), sinlat), 'cos_sin') for sinlat in PSINLA] \ +[Angle((math.cos(math.asin(PSINLA[-(n+1)])), -PSINLA[-(n+1)]), 'cos_sin') for n in range(0,len(PSINLA))] grid = {'dilatation_coef':PCODIL, 'latitudes':FPList([l for l in latitudes]) } if KTYPTR == 1: geometryname = 'reduced_gauss' elif KTYPTR == 2: geometryname = 'rotated_reduced_gauss' grid['pole_lat'] = Angle((math.cos(math.asin(PSLAPO)), PSLAPO), 'cos_sin') grid['pole_lon'] = Angle((PCLOPO, PSLOPO), 'cos_sin') dimensions = {'max_lon_number':KNXLON, 'lat_number':KNLATI, 'lon_number_by_lat':FPList([n for n in lon_number_by_lat]) } spectral_space = 'legendre' spectral_trunc = {'max':KTRONC, 'shape':'triangular', 'max_zonal_wavenumber_by_lat':FPList([k for k in max_zonal_wavenumber_by_lat]) } kwargs_geom = dict(structure='H2D', name=geometryname, grid=FPDict(grid), dimensions=FPDict(dimensions), vcoordinate=FPDict(vcoordinate_read_in_header), geoid=FPDict(config.FA_default_geoid)) if projection != None: kwargs_geom['projection'] = FPDict(projection) self.geometry = footprints.proxy.geometry(**kwargs_geom) if spectral_space != None: self.spectral_geometry = SpectralGeometry(space=spectral_space, truncation=FPDict(spectral_trunc)) else: self.spectral_geometry = None @Resource._openbeforedelayed def _read_validity(self): """ Reads the validity in the FA header. Interface to Fortran routines from 'ifsaux'. """ KDATEF = arpifs4py.wfadiex(self._unit) year = int(KDATEF[0]) month = int(KDATEF[1]) day = int(KDATEF[2]) hour = int(KDATEF[3]) minute = int(KDATEF[4]) second = int(KDATEF[13]) - hour*3600 - minute*60 processtype = int(KDATEF[8]) if processtype == 0: self.processtype = 'analysis' elif processtype == 1: self.processtype = 'initialization' elif processtype == 10: self.processtype = 'forecast' term_in_seconds = int(KDATEF[14]) cumulationstart_in_seconds = int(KDATEF[15]) cumulativeduration_in_seconds = term_in_seconds - cumulationstart_in_seconds basis = datetime.datetime(year, month, day, hour, minute, second) term = datetime.timedelta(seconds=term_in_seconds) cumulativeduration = datetime.timedelta(seconds=cumulativeduration_in_seconds) self._attributes['validity'] = FieldValidity(basis=basis, term=term, cumulativeduration=cumulativeduration) @Resource._openbeforedelayed def _set_validity(self, termunit='hours'): """ Sets date, hour and the processtype in the resource. """ if not self.isopen: raise epygramError("_set_validity must be called after FA is open.") basis = self.validity.getbasis() KDATEF = numpy.zeros(22, dtype=numpy.int64) KDATEF[0] = int(basis.year) KDATEF[1] = int(basis.month) KDATEF[2] = int(basis.day) KDATEF[3] = int(basis.hour) KDATEF[4] = int(basis.minute) if termunit == 'hours': KDATEF[5] = 1 KDATEF[6] = self.validity.term('IntHours') KDATEF[9] = self.validity.term('IntHours') - self.validity.cumulativeduration('IntHours') elif termunit == 'days': KDATEF[5] = 2 KDATEF[6] = self.validity.term('IntHours') / 24 KDATEF[9] = (self.validity.term('IntHours') - self.validity.cumulativeduration('IntHours')) / 24 else: raise NotImplementedError("term unit other than hours/days ?") #KDATEF[7] = 0 if self.processtype == 'analysis': processtype = 0 elif self.processtype == 'initialization': processtype = 1 elif self.processtype == 'forecast': processtype = 10 else: processtype = 255 # unknown processtype KDATEF[8] = processtype #KDATEF[10] = 0 if not config.FA_fandax: arpifs4py.wfandar(self._unit, KDATEF) else: # fandax KDATEF[11] = 1 KDATEF[13] = int(basis.second) + int(basis.hour)*3600 + int(basis.minute)*60 KDATEF[14] = self.validity.term(fmt='IntSeconds') KDATEF[15] = self.validity.term(fmt='IntSeconds') - self.validity.cumulativeduration(fmt='IntSeconds') arpifs4py.wfandax(self._unit, KDATEF) @Resource._openbeforedelayed def _getrunningcompression(self): """ Returns the current compression parameters of the FA (at time of writing). Interface to ifsaux' FAVEUR. """ comp = dict() (comp['KNGRIB'], comp['KNBPDG'], comp['KNBCSP'], comp['KSTRON'], comp['KPUILA'], comp['KDMOPL']) = arpifs4py.wfaveur(self._unit) return comp @Resource._openbeforedelayed def _setrunningcompression(self, **kwargs): """ Sets the compression parameters of the FA. Interface to FAGOTE (cf. FAGOTE documentation for significance of arguments). """ if self.openmode == 'r': raise IOError("method _setrunningcompression() can only be called if 'openmode' in('w', 'a').") comp = copy.deepcopy(self.default_compression) for k in kwargs.keys(): if k in self.default_compression.keys(): comp[k] = kwargs[k] else: raise epygramError("unknown parameter: "+k+" passed as argument.") arpifs4py.wfagote(self._unit, comp['KNGRIB'], comp['KNBPDG'], comp['KNBCSP'], comp['KSTRON'], comp['KPUILA'], comp['KDMOPL'])