Source code for epygram.formats.GRIB

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
GRIB:

Contains classes for GRIB resource and GRIB individual message,
editions 1 and 2.
"""

__all__ = ['GRIB']  #, 'GRIBmessageFactory']

import datetime
import os
os.putenv('GRIB_SAMPLES_PATH', os.getenv('GRIB_SAMPLES_PATH') + \
          ':/home/mary/worktmp/GRIB')  #TODO: get a better place for sample
import gribapi

from footprints import FPDict, proxy as fpx

from epygram import config, epygramError, epylog
from epygram.base import Resource, FieldSet, FieldValidity
from epygram.util import Angle, RecursiveObject
from epygram.fields import H2DField
from epygram.geometries import H2DGeometry



[docs]class GRIBmessage(RecursiveObject, dict): """ Class implementing a GRIB message as an object. Abstract class: common patterns to editions 1 and 2. """ @property def grib_edition(self): """Returns the edition (1,2) of the GRIB message.""" try: return int(self.__class__.__name__[4:5]) except ValueError: raise epygramError("GRIBmessage is an abstract class.") def __init__(self, source, centre_id=config.GRIB_default_centre, process_id=config.GRIB_default_process, packing=config.GRIB_default_packing): """ *source* being either:\n - a dict with keys ('file', 'index_in_file'); - *file* being an open file-like object designing the GRIB file it is read from, - *index_in_file* being the index of the message in file. - a :class:`epygram.fields.H2DField` instance. In that case, the *centre_id*, *process_id* and *packing* options can be forced using the adhoc arguments. """ if self.__class__.__name__ == 'GRIBmessage': raise epygramError("GRIBmessage is an abstract class.\ Use GRIB1message or GRIB2message.") self.namespace = None super(GRIBmessage, self).__init__() if isinstance(source, dict) and\ set(source.keys()) == set(['file', 'index_in_file']): self.built_from = 'file' self._gid = gribapi.grib_new_from_file(source['file'], headers_only=True) self._filename = source['file'].name self._index = source['index_in_file'] elif isinstance(source, H2DField): self.built_from = 'field' self._build_msg_from_field(source, centre_id=centre_id, process_id=process_id, packing=packing) #elif isinstance(source, str): # self._gid = gribapi.grib_new_from_samples(source) else: raise NotImplementedError("not yet.") def __del__(self): try: gribapi.grib_release(self._gid) except gribapi.GribInternalError: pass def __getitem__(self, item): if item not in self.keys(): if config.GRIB_read_whole_message: for namespace in ('ls', 'mars', None): self.readmessage(namespace=namespace) if item in self.keys(): break else: self._readattribute(item) if item not in self.keys(): raise KeyError(item + " not in GRIBmessage keys.") return super(GRIBmessage, self).__getitem__(item) def __setitem__(self, key, value): gribapi.grib_set(self._gid, key, value) super(GRIBmessage, self).__setitem__(key, value)
[docs] def readmessage(self, namespace=None): """ Reads the meta-data of the message. Args:\n - *namespace*: the namespace of keys to be read, among:\n - **None**: to get all keys present in message, - ['myKey1', 'myKey2', ...] for any custom namespace, - 'ls': to get the same default keys as the grib_ls, - 'mars': to get the keys used by MARS. """ self.clear() self.namespace = namespace if namespace in (None, 'ls', 'mars'): key_iter = gribapi.grib_keys_iterator_new(self._gid, namespace=namespace) namespace = [] while gribapi.grib_keys_iterator_next(key_iter): namespace.append(gribapi.grib_keys_iterator_get_name(key_iter)) # Explanation: the message accessed via self._gid is "headers_only"; # if a key is not found or returns an error, we try to access it via a # "full" temporary gid, generated on that occasion. gid_full = None for k in namespace: try: try: super(GRIBmessage, self).__setitem__(k, gribapi.grib_get(self._gid, k)) except gribapi.GribInternalError as e: if str(e) == 'Passed array is too small': super(GRIBmessage, self).__setitem__(k, gribapi.grib_get_double_array(self._gid, k)) except Exception as e: if self.built_from != 'file': raise e if gid_full == None: tempfile = open(self._filename, 'r') for _ in range(1, self._index): gid = gribapi.grib_new_from_file(tempfile, headers_only=True) gribapi.grib_release(gid) gid_full = gribapi.grib_new_from_file(tempfile) try: super(GRIBmessage, self).__setitem__(k, gribapi.grib_get(gid_full, k)) except gribapi.GribInternalError as e: # differenciation not well done... PB in gribapi if str(e) == 'Passed array is too small': super(GRIBmessage, self).__setitem__(k, gribapi.grib_get_double_array(gid_full, k)) else: gribapi.grib_release(gid_full) raise if gid_full != None: gribapi.grib_release(gid_full)
def _readattribute(self, attribute): """Actual access to the attributes.""" # Explanation: the message accessed via self._gid is "headers_only"; # if a key is not found or returns an error, we try to access it via a # "full" temporary gid, generated on that occasion. try: try: super(GRIBmessage, self).__setitem__(attribute, gribapi.grib_get(self._gid, attribute)) except gribapi.GribInternalError as e: if str(e) == 'Passed array is too small': super(GRIBmessage, self).__setitem__(attribute, gribapi.grib_get_double_array(self._gid, attribute)) else: raise except Exception as e: if self.built_from != 'file': raise e tempfile = open(self._filename, 'r') for _ in range(1, self._index): gid = gribapi.grib_new_from_file(tempfile, headers_only=True) gribapi.grib_release(gid) gid_full = gribapi.grib_new_from_file(tempfile) try: super(GRIBmessage, self).__setitem__(attribute, gribapi.grib_get(gid_full, attribute)) except gribapi.GribInternalError as e: # differenciation not well done... PB in gribapi if str(e) == 'Passed array is too small': super(GRIBmessage, self).__setitem__(attribute, gribapi.grib_get_double_array(gid_full, attribute)) else: raise finally: gribapi.grib_release(gid_full) def getvalues(self): """Reads and returns the message values.""" if self.built_from != 'file': raise epygramError("'getvalues()' method should not be called for\ a GRIBmessage built from other than a file.") if not hasattr(self, '_values'): if self.built_from != 'file': self._values = gribapi.grib_get_values(self._gid) else: with open(self._filename, 'r') as f: for _ in range(1, self._index): gid = gribapi.grib_new_from_file(f, headers_only=True) gribapi.grib_release(gid) gid = gribapi.grib_new_from_file(f) self._values = gribapi.grib_get_values(gid) gribapi.grib_release(gid) return self._values def read_geometry(self, footprints_builder=config.use_footprints_as_builder): """ Returns a geometry object containing the geometry information of the GRIB message. - *footprints_builder*: if *True*, uses footprints.proxy to build fields. Defaults to False for performance reasons. """ geoid = config.default_geoid vcoordinate = {'indicatorOfTypeOfLevel':self['indicatorOfTypeOfLevel'], 'typeOfLevel':self['typeOfLevel'], 'Z':self['level'] } if self['gridType'] == 'regular_ll': geometryname = 'regular_lonlat' center_lon = float(self['longitudeOfLastGridPointInDegrees'] + \ self['longitudeOfFirstGridPointInDegrees']) / 2. center_lat = float(self['latitudeOfLastGridPointInDegrees'] + \ self['latitudeOfFirstGridPointInDegrees']) / 2. grid = {'center_lon':Angle(center_lon, 'degrees'), 'center_lat':Angle(center_lat, 'degrees'), 'X_resolution':Angle(self['iDirectionIncrementInDegrees'], 'degrees'), 'Y_resolution':Angle(self['jDirectionIncrementInDegrees'], 'degrees') } dimensions = {'X':self['Ni'], 'Y':self['Nj'] } projection = None elif self['gridType'] in ('polar_stereographic',): geometryname = self['gridType'] if config.default_projtool == 'pyproj': import pyproj projtool = pyproj projdict = {'lambert':'lcc', 'mercator':'merc', 'polar_stereographic':'stere'} elif config.default_projtool == 'myproj': from epygram import myproj projtool = myproj projdict = {'lambert':'lambert', 'mercator':'mercator', 'polar_stereographic':'polar_stereographic'} proj = projdict[geometryname] if self['projectionCentreFlag'] == 0: lat_0 = 90. else: lat_0 = -90. # !!! dirty bypass of erroneous GRIBs from OSI-SAF !.. if self['centreDescription'] == 'Oslo': epylog.warning(' '.join(['for centre:', self['centreDescription'], ' and polar stereographic projection,\ lat_ts is taken in 3rd position in\ attribute "pv[]" of GRIB message !'])) lat_ts = self['pv'][2] geoid = {'a':self['pv'][0], 'b':self['pv'][1]} p = projtool.Proj(proj=proj, lon_0=float(self['orientationOfTheGridInDegrees']), lat_0=lat_0, lat_ts=lat_ts, **geoid) x1, y1 = p(float(self['longitudeOfFirstGridPointInDegrees']), float(self['latitudeOfFirstGridPointInDegrees'])) xc = x1 + (float(self['Nx']) - 1) * self['DxInMetres'] / 2. yc = y1 + (float(self['Ny']) - 1) * self['DyInMetres'] / 2. center_lon, center_lat = p(xc, yc, inverse=True) projection = {'reference_lon':Angle(float(self['orientationOfTheGridInDegrees']), 'degrees'), 'reference_lat':Angle(lat_0, 'degrees'), 'secant_lat':Angle(lat_ts, 'degrees'), 'center_lon':Angle(center_lon, 'degrees'), 'center_lat':Angle(center_lat, 'degrees') } grid = {'X_resolution':float(self['DxInMetres']), 'Y_resolution':float(self['DyInMetres']), 'LAMzone':None } dimensions = {'X':self['Nx'], 'Y':self['Ny'] } else: raise NotImplementedError("not yet !") # Make geometry object kwargs_geom = dict(structure='H2D', name=geometryname, grid=FPDict(grid), dimensions=FPDict(dimensions), vcoordinate=FPDict(vcoordinate), projection=projection, position_on_grid=FPDict({'vertical':'mass', 'horizontal':'center'}), geoid=geoid) if footprints_builder: builder = fpx.geometry else: builder = H2DGeometry geometry = builder(**kwargs_geom) return geometry def read_validity(self): """ Returns a :class:`epygram.base.FieldValidity` object containing the validity of the GRIB message. """ year = int(str(self['dataDate'])[0:4]) month = int(str(self['dataDate'])[4:6]) day = int(str(self['dataDate'])[6:8]) dataTime = '{:0>{width}}'.format(self['dataTime'], width=4) hour = int(dataTime[0:2]) minutes = int(dataTime[2:4]) basis = datetime.datetime(year, month, day, hour, minutes) cum = None if self['stepUnits'] == 1: timeunitfactor = 3600 if self['timeRangeIndicator'] in (0, 1, 2, 4): term = self['endStep'] term = datetime.timedelta(seconds=term * timeunitfactor) if self['timeRangeIndicator'] in (2, 4): cum = self['endStep'] - self['startStep'] cum = datetime.timedelta(seconds=cum * timeunitfactor) else: raise NotImplementedError("'timeRangeIndicator' not in (0,2,4).") validity = FieldValidity(basis=basis, term=term, cumulativeduration=cum) return validity
[docs] def asfield(self, getdata=True, footprints_builder=config.use_footprints_as_builder): """ Returns an :class:`epygram.base.Field` made out from the GRIB message. - *getdata*: if *False*, only metadata are read, the field do not contain data. - *footprints_builder*: if *True*, uses footprints.proxy to build fields. Defaults to False for performance reasons. """ if footprints_builder: builder = fpx.field else: builder = H2DField fid = self.genfid() geometry = self.read_geometry(footprints_builder=footprints_builder) spectral_geometry = None validity = self.read_validity() processtype = self['generatingProcessIdentifier'] comment = 'units = ' + self['units'] field = builder(fid=FPDict({'GRIB':FPDict(fid)}), geometry=geometry, validity=validity, spectral_geometry=spectral_geometry, processtype=processtype, comment=comment) if getdata: data1d = self.getvalues() if self['iScansNegatively'] == 0 and \ self['jScansPositively'] == 0 and \ self['jPointsAreConsecutive'] == 0: data2d = data1d.reshape((self['Nj'], self['Ni']), order='C')[::-1, :] elif self['iScansNegatively'] == 0 and \ self['jScansPositively'] == 1 and \ self['jPointsAreConsecutive'] == 0: data2d = data1d.reshape((self['Nj'], self['Ni']), order='C')[:, :] else: raise NotImplementedError("not yet !") field.setdata(data2d) return field
[docs] def write_to_file(self, ofile): """ *ofile* being an open file-like object designing the physical GRIB file to be written to. """ gribapi.grib_write(self._gid, ofile)
[docs]class GRIB1message(GRIBmessage): """ Specificities of GRIB1. """
[docs] def genfid(self): """Generates and returns a GRIB-type epygram fid from the message.""" fid = {'name':self['name'], 'shortName':self['shortName'], 'indicatorOfParameter':self['indicatorOfParameter'], 'paramId':self['paramId'], 'indicatorOfTypeOfLevel':self['indicatorOfTypeOfLevel'], 'typeOfLevel':self['typeOfLevel'], 'level':self['level'], 'editionNumber':self['editionNumber'], 'table2Version':self['table2Version']} return fid
def _build_msg_from_field(self, field, centre_id=config.GRIB_default_centre, process_id=config.GRIB_default_process, packing=config.GRIB_default_packing): """ Build the GRIB message from the field, using a template. *field* being a :class:`epygram.base.Field`. Optional arguments:\n - *centre_id*: identifier of the originating center - *process_id*: identifier of the originating process - *packing*: options of packing and compression in GRIB, given as a list of tuples (key, value), e.g. [(complexPacking, 1), ...]. """ if not isinstance(field, H2DField): raise NotImplementedError("not yet.") if field.max() - field.min() < config.epsilon: sample = 'GRIB' + str(self.grib_edition) packing = [('complexPacking', 0), ] else: #sample = 'GRIDHSTFRANGP0025+0003_t2m' sample = 'GRIDHSTEUROC25+0006_t2m' sample_gid = gribapi.grib_new_from_samples(sample) self._gid = gribapi.grib_clone(sample_gid) gribapi.grib_release(sample_gid) # part 1 --- fid if 'GRIB' not in field.fid.keys(): from epygram.formats import fid_converter GRIBfid = fid_converter(field.fid, 'GRIB' + \ str(self.grib_edition)) else: GRIBfid = field.fid['GRIB'] for key in ['table2Version', 'indicatorOfParameter', 'indicatorOfTypeOfLevel', 'level', 'typeOfLevel']: self[key] = GRIBfid[key] # part 2 --- context self['centre'] = centre_id try: process_id = int(field.processtype) except ValueError: if field.processtype == 'forecast': process_id = 204 #TODO: other cases self['generatingProcessIdentifier'] = process_id # part 3 --- validity self['dataDate'] = int(field.validity.getbasis(fmt='IntStr')[:8]) self['dataTime'] = int(field.validity.getbasis(fmt='IntStr')[8:12]) term_in_seconds = field.validity.term().total_seconds() cumulative = field.validity.cumulativeduration() if cumulative: startStep_in_seconds = (field.validity.term() - cumulative).total_seconds() self['timeRangeIndicator'] = 4 if term_in_seconds / 3600. < config.epsilon: if cumulative: self['startStep'] = int(startStep_in_seconds / 3600) self['stepUnits'] = 'h' # hours self['endStep'] = int(term_in_seconds / 3600) else: if cumulative: self['startStep'] = int(startStep_in_seconds) self['stepUnits'] = 's' # seconds self['endStep'] = int(term_in_seconds) # part 4 --- geometry self['Ni'] = field.geometry.dimensions['X'] self['Nj'] = field.geometry.dimensions['Y'] corners = field.geometry.gimme_corners_ll() self['longitudeOfFirstGridPointInDegrees'] = corners['ul'][0] self['latitudeOfFirstGridPointInDegrees'] = corners['ul'][1] self['longitudeOfLastGridPointInDegrees'] = corners['lr'][0] self['latitudeOfLastGridPointInDegrees'] = corners['lr'][1] if field.geometry.name == 'regular_lonlat': self['iDirectionIncrementInDegrees'] = field.geometry.grid['X_resolution'].get('degrees') self['jDirectionIncrementInDegrees'] = field.geometry.grid['Y_resolution'].get('degrees') else: raise NotImplementedError("not yet.") # part 5 --- values """print gribapi.grib_get(self._gid, 'N2') print gribapi.grib_get(self._gid, 'numberOfSecondOrderPackedValues') print gribapi.grib_get(self._gid, 'widthOfWidths') print gribapi.grib_get(self._gid, 'widthOfLengths') print gribapi.grib_get(self._gid, 'NL') addpack = [('N2', gribapi.grib_get(self._gid, 'N2')), ('numberOfSecondOrderPackedValues', gribapi.grib_get(self._gid, 'numberOfSecondOrderPackedValues')), ('widthOfWidths', gribapi.grib_get(self._gid, 'widthOfWidths')), ('widthOfLengths', gribapi.grib_get(self._gid, 'widthOfLengths')), ('NL', gribapi.grib_get(self._gid, 'NL'))]""" """self['packingType'] = 'grid_simple' gribapi.grib_set_missing(self._gid, 'values') self['bitsPerValue'] = 12""" if self['iScansNegatively'] == 0 and \ self['jScansPositively'] == 0 and \ self['jPointsAreConsecutive'] == 0: gribapi.grib_set_values(self._gid, field.getdata()[::-1, :].flatten(order='C')) elif self['iScansNegatively'] == 0 and \ self['jScansPositively'] == 1 and \ self['jPointsAreConsecutive'] == 0: gribapi.grib_set_values(self._gid, field.getdata()[:, :].flatten(order='C')) for (k, v) in packing: self[k] = v """print gribapi.grib_get(self._gid, 'N2') print gribapi.grib_get(self._gid, 'numberOfSecondOrderPackedValues') print gribapi.grib_get(self._gid, 'widthOfWidths') print gribapi.grib_get(self._gid, 'widthOfLengths') print gribapi.grib_get(self._gid, 'NL') for (k,v) in addpack: self[k] = v"""
[docs]class GRIB2message(GRIBmessage): """ Specificities of GRIB2. """
[docs] def genfid(self): """ Generates and returns a GRIB-type epygram fid from the message. """ raise NotImplementedError("not yet.") fid = {'name':self['name'], 'shortName':self['shortName'], 'indicatorOfParameter':self['indicatorOfParameter'], 'paramId':self['paramId'], 'indicatorOfTypeOfLevel':self['indicatorOfTypeOfLevel'], 'typeOfLevel':self['typeOfLevel'], 'level':self['level'], 'editionNumber':self['editionNumber'], 'table2Version':self['table2Version']} return fid
def _build_msg_from_field(self, field): """ Build the GRIB message from the field, using a template. """ raise NotImplementedError("not yet.") #TODO:
[docs]class GRIB(Resource): """Class implementing all specificities for GRIB resource format.""" _footprint = dict( attr=dict( format=dict( values=set(['GRIB']), default='GRIB'), grib_edition=dict( values=set([1, 2]), type=int, optional=True, info="Only with openmode = 'w'.", default=config.GRIB_default_edition) ) ) def __init__(self, *args, **kwargs): """Constructor. See its footprint for arguments.""" self.isopen = False super(GRIB, self).__init__(*args, **kwargs) if self.openmode in ('r', 'a'): _file = open(self.container.abspath, 'r') isgrib = _file.readline() _file.close() if isgrib[0:4] != 'GRIB': raise IOError("this resource is not a GRIB one.") if not self.fmtdelayedopen: self.open() def _GRIBmessageClass(self, source, **kwargs): """ Factory of GRIBmessage's with choice of grib edition number determined by 1/ the 'editionNumber' of the fid of the field given to the constructor 2/ the grib_edition attribute of the GRIB object """ if self.grib_edition == 1: msg = GRIB1message(source, **kwargs) else: msg = GRIB2message(source, **kwargs) return msg
[docs] def open(self, openmode=None): """ Opens a GRIB and initializes some attributes. - *openmode*: optional, to open with a specific openmode, eventually different from the one specified at initialization. """ super(GRIB, self).open(openmode=openmode) self._file = open(self.container.abspath, self.openmode) if self.openmode in ('r', 'a'): # Find GRIB edition number in the first message gid = gribapi.grib_new_from_file(self._file, headers_only=True) self._attributes['grib_edition'] = int(gribapi.grib_get(gid, 'editionNumber')) gribapi.grib_release(gid) self._file.close() # and re-open self._file = open(self.container.abspath, self.openmode) self.isopen = True self._messages = [self._GRIBmessageClass({'file':self._file, 'index_in_file':i}) for i in range(1, self.messages_number + 1)] elif self.openmode == 'w': self.isopen = True self.empty = True self._messages = []
[docs] def close(self): """ Closes a GRIB. """ if hasattr(self, '_file'): self._file.close() self.isopen = False
@property @Resource._openbeforedelayed
[docs] def messages_number(self): """Counts the number of messages in file.""" return gribapi.grib_count_in_file(self._file)
[docs] def listfields(self, onlykey=None): """ Returns a list containing the GRIB identifiers of all the fields of the resource. Argument *onlykey* can be specified as a string or a tuple of strings, so that only specified keys of the fid will returned. """ fidlist = super(GRIB, self).listfields() if onlykey != None: if isinstance(onlykey, str): fidlist = [f[onlykey] for f in fidlist] elif isinstance(onlykey, tuple): fidlist = [{k:f[k] for k in onlykey} for f in fidlist] return fidlist
@Resource._openbeforedelayed def _listfields(self): """Returns a list of GRIB-type fid of the fields inside the resource.""" fidlist = [] for msg in self._messages: fidlist.append(msg.genfid()) return fidlist
[docs] def sortfields(self, sortingkey, onlykey=None): """ Returns a sorted list of fields with regards to the given *sortingkey* of their fid, as a dict of lists. Argument *onlykey* can be specified as a string or a tuple of strings, so that only specified keys of the fid will returned. """ sortedfields = {} listoffields = self.listfields() onlykeylistoffields = self.listfields(onlykey=onlykey) for f in range(len(listoffields)): try: category = listoffields[f][sortingkey] except KeyError: category = 'None' field = onlykeylistoffields[f] if category in sortedfields.keys(): sortedfields[category].append(field) else: sortedfields[category] = [field] return sortedfields
[docs] def readfield(self, handgrip, getdata=True, footprints_builder=config.use_footprints_as_builder): """ Finds in GRIB the message that corresponds to the *handgrip*, and returns it as a :class:`epygram.base.Field`. If several messages meet the requirements, raises error (use readfields() method instead). *handgrip* is a dict where you can store all requested GRIB keys... E.g. {'shortName':'t', 'indicatorOfTypeOfLevel':'pl', 'level':850} will return the Temperature at 850hPa field. If *getdata* == **False**, the data is not read, the field consist in the meta-data only. If *footprints_builder* == *True*, uses footprints.proxy to build fields. True decreases performance. """ matchingfields = self.readfields(handgrip, getdata=getdata, footprints_builder=footprints_builder) if len(matchingfields) != 1: raise epygramError("several fields found for that *handgrip*;\ please refine.") return matchingfields[0]
@Resource._openbeforedelayed
[docs] def readfields(self, handgrip, getdata=True, footprints_builder=config.use_footprints_as_builder): """ Finds in GRIB the message(s) that corresponds to the *handgrip*, and returns it as a :class:`epygram.base.FieldSet` of :class:`epygram.base.Field`. *handgrip* is a dict where you can store all requested GRIB keys... E.g. {'shortName':'t', 'indicatorOfTypeOfLevel':'pl'} will return all the Temperature fields on Pressure levels. If *getdata* == **False**, the data is not read, the field(s) consist in the meta-data only. If *footprints_builder* == *True*, uses footprints.proxy to build fields. True decreases performance. """ matchingfields = FieldSet() for msg in self._messages: ok = True for key in handgrip.keys(): try: ok = float(msg[key]) == float(handgrip[key]) except ValueError: ok = msg[key] == handgrip[key] except (gribapi.GribInternalError, KeyError): ok = False if not ok: break if ok: matchingfields.append(msg.asfield(getdata=getdata, footprints_builder=footprints_builder)) if len(matchingfields) == 0: raise epygramError("no field matching the given *handgrip* has been\ found in resource.") return matchingfields
@Resource._openbeforedelayed
[docs] def writefield(self, field, force_to_field_grib_edition=False, centre_id=config.GRIB_default_centre, process_id=config.GRIB_default_process, packing=config.GRIB_default_packing): """ Writes a Field as a GRIBmessage into the GRIB resource. Args:\n - *field*: a :class:`epygram.base.Field` instance - *centre_id*: identifier of the originating center - *process_id*: identifier of the originating process - *packing*: options of packing and compression in GRIB, given as a list of tuples (key, value), e.g. [(complexPacking, 1), ...]. """ if not isinstance(field, H2DField): raise NotImplementedError("'field' argument other than a H2DField.") if force_to_field_grib_edition and 'GRIB' in field.fid.keys(): if self.grib_edition != field.fid['GRIB']['editionNumber']: if self.empty: self.grib_edition = field.fid['GRIB']['editionNumber'] else: raise epygramError("cannot force grib edition if the GRIB\ is not empty.") m = self._GRIBmessageClass(field, centre_id=centre_id, process_id=process_id, packing=packing) self._messages.append(m) m.write_to_file(self._file) if self.empty: self.empty = False
@Resource._openbeforedelayed
[docs] def what(self, out, mode='one+list', sortby=None): """ Writes in file a summary of the contents of the GRIB. Args: \n - *out*: the output open file-like object (duck-typing: *out*.write() only is needed). - *mode*: among ('one+list', 'fid_list', 'what', 'ls', 'mars'), \n - 'one+list' = gives the validity/geometry of the first field in GRIB, plus the list of fid. - 'fid_list' = gives only the fid of each field in GRIB. - 'what' = gives the values of the keys from each GRIB message that are used to generate an **epygram** field from the message (slower). - 'ls' = gives the values of the 'ls' keys from each GRIB message. - 'mars' = gives the values of the 'mars' keys from each GRIB message. - *sortby*: name of the fid key used to sort fields; e.g. 'typeOfLevel'; only for *mode* = 'one+list' or 'fid_list'. """ firstcolumn_width = 50 secondcolumn_width = 16 sepline = '{:-^{width}}'.format('', width=firstcolumn_width + \ secondcolumn_width + 1) + '\n' def write_formatted_dict(dest, fid): name = fid.pop('name') dest.write('name: ' + name + '\n') for k in sorted(fid.keys()): dest.write(' ' + str(k) + ': ' + str(fid[k]) + '\n') if mode == 'one+list': onefield = self._messages[0].asfield(getdata=False) g0 = onefield.geometry.footprint_as_dict() g0.pop('vcoordinate') for m in self._messages: if m.read_validity().get() != onefield.validity.get(): print m.read_validity() print onefield.validity raise epygramError("all fields do not share their validity;\ 'one+list' mode (-m) disabled.") onefield.what(out, vertical_geometry=False, cumulativeduration=False) out.write("######################\n") out.write("### LIST OF FIELDS ###\n") out.write("######################\n") listoffields = self.listfields() out.write("Number: " + str(len(listoffields)) + "\n") if mode in ('what', 'ls', 'mars'): out.write(sepline) for m in self._messages: if mode == 'what': _ = m.asfield(getdata=False) elif mode in ('ls', 'mars'): m.readmessage(mode) m._readattribute('name') m_dict = dict(m) write_formatted_dict(out, m_dict) elif sortby != None: out.write("sorted by: " + sortby + "\n") out.write(sepline) sortedfields = self.sortfields(sortby) for category in sorted(sortedfields.keys()): out.write(sortby + ": " + str(category) + '\n') out.write('--------------------' + '\n') for f in sortedfields[category]: f.pop(sortby) write_formatted_dict(out, f) out.write('--------------------' + '\n') else: out.write(sepline) for f in listoffields: write_formatted_dict(out, f) out.write(sepline)