Source code for epygram.GRIB

#!/usr/bin/env python
# -*- coding: utf-8 -*-

__all__ = ['GRIB']

import datetime
import gribapi

import footprints
from footprints import FPDict

from epygram import config, epygramError, epylog
from epygram import H2DField
from epygram.base import Resource, FieldSet, FieldValidity
from epygram.util import Angle, RecursiveObject



class GRIBmessage(RecursiveObject, dict):
    """
    Class implementing a GRIB message as an object.
    """
    
    def __init__(self, source):
        """
        *source* being either
          - a dict with keys ('file', 'index_in_file');
            - *filename* being an open file-like object designing the GRIB
              file it is read from,
            - *index_in_file* being the index of the message in file.
          - a binary string GRIB message (Not Implemented Yet)
          - a GRIB sample name (Not Implemented Yet).
        """
        
        self.namespace = None
        super(GRIBmessage, self).__init__()
        if isinstance(source, dict) and set(source.keys()) == set(['file', 'index_in_file']):
            self._meta_gid = gribapi.grib_new_from_file(source['file'], headers_only=True)
            self._filename = source['file'].name
            self._index = source['index_in_file']
        else:
            raise NotImplementedError("not yet.")
    
    def __del__(self):
        try:
            gribapi.grib_release(self._meta_gid)
        except gribapi.GribInternalError:
            pass
    
    def __getitem__(self, item):
        if item not in self.keys():
            if config.GRIB_read_whole_message:
                for namespace in ('ls', 'mars', None):
                    self.readmessage(namespace=namespace)
                    if item in self.keys():
                        break
            else:
                self._readattribute(item)
            if item not in self.keys():
                raise KeyError(item + " not in GRIBmessage keys.")

        return super(GRIBmessage, self).__getitem__(item)
    
    def readmessage(self, namespace=None):
        """
        Reads the meta-data of the message.
        
        Args:\n
        - *namespace*: the namespace of keys to be read, among:\n
          - **None**: to get all keys present in message,
          - ['myKey1', 'myKey2', ...] for any custom namespace,
          - 'ls': to get the same default keys as the grib_ls,
          - 'mars': to get the keys used by MARS.
        """
        
        self.clear()
        self.namespace = namespace
        if namespace in (None, 'ls', 'mars'):
            key_iter = gribapi.grib_keys_iterator_new(self._meta_gid, namespace=namespace)
            namespace = []
            while gribapi.grib_keys_iterator_next(key_iter):
                namespace.append(gribapi.grib_keys_iterator_get_name(key_iter))
        # Explanation: the message accessed via self._meta_gid is "headers_only";
        # if a key is not found or returns an error, we try to access it via a
        # "full" temporary gid, generated on that occasion.
        gid_full = None
        for k in namespace:
            try:
                try:
                    self[k] = gribapi.grib_get(self._meta_gid, k)
                except gribapi.GribInternalError as e:
                    if str(e) == 'Passed array is too small':
                        self[k] = gribapi.grib_get_double_array(self._meta_gid, k)
            except Exception as e:
                if gid_full == None:
                    tempfile = open(self._filename, 'r')
                    for _ in range(1, self._index):
                        gid = gribapi.grib_new_from_file(tempfile, headers_only=True)
                        gribapi.grib_release(gid)
                    gid_full = gribapi.grib_new_from_file(tempfile)
                try:
                    self[k] = gribapi.grib_get(gid_full, k)
                except gribapi.GribInternalError as e: # differenciation not well done... PB in gribapi
                    if str(e) == 'Passed array is too small':
                        self[k] = gribapi.grib_get_double_array(gid_full, k)
                    else:
                        gribapi.grib_release(gid_full)
                        raise
        if gid_full != None:
            gribapi.grib_release(gid_full)
    
    def _readattribute(self, attribute):
        # Explanation: the message accessed via self._meta_gid is "headers_only";
        # if a key is not found or returns an error, we try to access it via a
        # "full" temporary gid, generated on that occasion.
        try:
            try:
                self[attribute] = gribapi.grib_get(self._meta_gid, attribute)
            except gribapi.GribInternalError as e:
                if str(e) == 'Passed array is too small':
                    self[attribute] = gribapi.grib_get_double_array(self._meta_gid, attribute)
                else:
                    raise
        except Exception as e:
            tempfile = open(self._filename, 'r')
            for _ in range(1, self._index):
                gid = gribapi.grib_new_from_file(tempfile, headers_only=True)
                gribapi.grib_release(gid)
            gid_full = gribapi.grib_new_from_file(tempfile)
            try:
                self[attribute] = gribapi.grib_get(gid_full, attribute)
            except gribapi.GribInternalError as e: # differenciation not well done... PB in gribapi
                if str(e) == 'Passed array is too small':
                    self[attribute] = gribapi.grib_get_double_array(gid_full, attribute)
                else:
                    raise
            finally:
                gribapi.grib_release(gid_full)
    
    def getvalues(self):
        """
        Reads and returns the message values.
        """
        
        if not hasattr(self, '_values'):
            with open(self._filename, 'r') as f:
                for _ in range(1, self._index):
                    gid = gribapi.grib_new_from_file(f, headers_only=True)
                    gribapi.grib_release(gid)
                gid = gribapi.grib_new_from_file(f)
                self._values = gribapi.grib_get_values(gid)
                gribapi.grib_release(gid)
        return self._values
    
    def genfid(self):
        """
        Generates and returns a GRIB-type epygram fid from the message.
        """
        
        fid = {'name':self['name'],
               'shortName':self['shortName'],
               'indicatorOfParameter':self['indicatorOfParameter'],
               'paramId':self['paramId'],
               'indicatorOfTypeOfLevel':self['indicatorOfTypeOfLevel'],
               'typeOfLevel':self['typeOfLevel'],
               'level':self['level'],
               'table2Version':self['table2Version']}
        
        return fid
    
    def read_geometry(self):
        """
        Returns a :class:`epygram.H2DGeometry.H2DGeometry` object containing the geometry information of the GRIB message.
        """
        
        geoid = config.default_geoid
        vcoordinate = {'indicatorOfTypeOfLevel':self['indicatorOfTypeOfLevel'],
                       'typeOfLevel':self['typeOfLevel'],
                       'Z':self['level']
                       }
        if self['gridType'] == 'regular_ll':
            geometryname = 'regular_lonlat'
            center_lon = float(self['longitudeOfLastGridPointInDegrees'] + self['longitudeOfFirstGridPointInDegrees']) / 2.
            center_lat = float(self['latitudeOfLastGridPointInDegrees'] + self['latitudeOfFirstGridPointInDegrees']) / 2.
            grid = {'center_lon':Angle(center_lon, 'degrees'),
                    'center_lat':Angle(center_lat, 'degrees'),
                    'X_resolution':Angle(self['iDirectionIncrementInDegrees'], 'degrees'),
                    'Y_resolution':Angle(self['jDirectionIncrementInDegrees'], 'degrees')
                    }
            dimensions = {'X':self['Ni'],
                          'Y':self['Nj']
                          }
            projection = None
        elif self['gridType'] in ('polar_stereographic',):
            geometryname = self['gridType']
            if config.default_projtool == 'pyproj':
                import pyproj
                projtool = pyproj
                projdict = {'lambert':'lcc',
                            'mercator':'merc',
                            'polar_stereographic':'stere'}
            elif config.default_projtool == 'myproj':
                from epygram import myproj
                projtool = myproj
                projdict = {'lambert':'lambert',
                            'mercator':'mercator',
                            'polar_stereographic':'polar_stereographic'}
            proj = projdict[geometryname]
            if self['projectionCentreFlag'] == 0:
                lat_0 = 90.
            else:
                lat_0 = -90.
            if self['centreDescription'] == 'Oslo': # !!! dirty bypass of erroneous GRIBs from OSI-SAF !..
                epylog.warning(' '.join(['for centre:', self['centreDescription'],
                                        ' and polar stereographic projection,\
                                        lat_ts is taken in 3rd position in attribute "pv[]" of GRIB message !']))
                lat_ts = self['pv'][2]
                geoid = {'a':self['pv'][0], 'b':self['pv'][1]}
            p = projtool.Proj(proj=proj,
                              lon_0=float(self['orientationOfTheGridInDegrees']),
                              lat_0=lat_0, lat_ts=lat_ts,
                              **geoid)
            x1, y1 = p(float(self['longitudeOfFirstGridPointInDegrees']),
                       float(self['latitudeOfFirstGridPointInDegrees']))
            xc = x1 + (float(self['Nx'])-1) * self['DxInMetres'] / 2.
            yc = y1 + (float(self['Ny'])-1) * self['DyInMetres'] / 2.
            center_lon, center_lat = p(xc, yc, inverse=True)
            projection = {'reference_lon':Angle(float(self['orientationOfTheGridInDegrees']), 'degrees'),
                          'reference_lat':Angle(lat_0, 'degrees'),
                          'secant_lat':Angle(lat_ts, 'degrees'),
                          'center_lon':Angle(center_lon, 'degrees'),
                          'center_lat':Angle(center_lat, 'degrees')
                          }
            grid = {'X_resolution':float(self['DxInMetres']),
                    'Y_resolution':float(self['DyInMetres']),
                    'LAMzone':None
                    }
            dimensions = {'X':self['Nx'],
                          'Y':self['Ny']
                          }
        else:
            raise NotImplementedError("not yet !")
        # Make geometry object
        kwargs_geom = dict(structure='H2D',
                           name=geometryname,
                           grid=FPDict(grid),
                           dimensions=FPDict(dimensions),
                           vcoordinate=FPDict(vcoordinate),
                           projection=projection,
                           geoid=geoid)
        geometry = footprints.proxy.geometry(**kwargs_geom)
        
        return geometry
    
    def read_validity(self):
        """
        Returns a :class:`epygram.base.FieldValidity` object containing the validity of the GRIB message.
        """
        
        year = int(str(self['dataDate'])[0:4])
        month = int(str(self['dataDate'])[4:6])
        day = int(str(self['dataDate'])[6:8])
        dataTime = '{:0>{width}}'.format(self['dataTime'], width=4)
        hour = int(dataTime[0:2])
        minutes = int(dataTime[2:4])
        basis = datetime.datetime(year, month, day, hour, minutes)
        cum = None
        if self['stepUnits'] == 1:
            timeunitfactor = 3600
        if self['timeRangeIndicator'] in (0, 1, 2, 4):
            term = self['endStep']
            term = datetime.timedelta(seconds=term*timeunitfactor)
            if self['timeRangeIndicator'] in (2, 4):
                cum = self['endStep'] - self['startStep']
                cum = datetime.timedelta(seconds=cum*timeunitfactor)
        else:
            raise NotImplementedError("'timeRangeIndicator' not in (0,2,4): not yet !")
        validity = FieldValidity(basis=basis, term=term, cumulativeduration=cum)
        
        return validity
    
    def asfield(self, getdata=True):
        """
        Returns an :class:`epygram.base.Field` made out from the GRIB message. 
        """
        
        fid = self.genfid()
        geometry = self.read_geometry()
        spectral_geometry = None
        validity = self.read_validity()
        
        processtype = self['generatingProcessIdentifier']
        comment = 'units = ' + self['units']
        field = H2DField(fid=FPDict({'GRIB':FPDict(fid)}),
                         geometry=geometry,
                         validity=validity,
                         spectral_geometry=spectral_geometry,
                         processtype=processtype,
                         comment=comment)
        if getdata:
            data1d = self.getvalues()
            if self['iScansNegatively'] == 0 and \
               self['jScansPositively'] == 0 and \
               self['jPointsAreConsecutive'] == 0:
                data2d = data1d.reshape((self['Nj'], self['Ni']), order='C')[::-1,:]
            elif self['iScansNegatively'] == 0 and \
                 self['jScansPositively'] == 1 and \
                 self['jPointsAreConsecutive'] == 0:
                data2d = data1d.reshape((self['Nj'], self['Ni']), order='C')[:,:]
            else:
                raise NotImplementedError("not yet !")
            field.setdata(data2d)
        
        return field



[docs]class GRIB(Resource): """ Class implementing all specificities for GRIB resource format. """ _footprint = dict( attr = dict( format = dict( values = set(['GRIB']), default = 'GRIB') ) ) def __init__(self, *args, **kwargs): """ Constructor. See its footprint for arguments. """ self.isopen = False super(GRIB, self).__init__(*args, **kwargs) _file = open(self.container.abspath, 'r') isgrib = _file.readline() _file.close() if isgrib[0:4] != 'GRIB': raise IOError("this resource is not a GRIB one.") if not self.fmtdelayedopen: self.open()
[docs] def open(self): """ Opens a GRIB and initializes some attributes. """ if self.openmode in ('r', 'a'): self._file = open(self.container.abspath, 'r') self.isopen = True self._messages = [GRIBmessage({'file':self._file, 'index_in_file':i}) for i in range(1, self.messages_number+1)] elif self.openmode == 'w': raise NotImplementedError("not yet !")
[docs] def close(self): """ Closes a GRIB. """ if hasattr(self, '_file'): self._file.close() self.isopen = False
@property @Resource._openbeforedelayed def messages_number(self): return gribapi.grib_count_in_file(self._file)
[docs] def listfields(self, onlykey=None): """ Returns a list containing the GRIB identifiers of all the fields of the resource. Argument *onlykey* can be specified as a string or a tuple of strings, so that only specified keys of the fid will returned. """ fidlist = super(GRIB, self).listfields() if onlykey != None: if isinstance(onlykey, str): fidlist = [f[onlykey] for f in fidlist] elif isinstance(onlykey, tuple): fidlist = [{k:f[k] for k in onlykey} for f in fidlist] return fidlist
@Resource._openbeforedelayed def _listfields(self): """ Returns a list of GRIB-type fid of the fields inside the resource. """ fidlist = [] for msg in self._messages: fidlist.append(msg.genfid()) return fidlist
[docs] def sortfields(self, sortingkey, onlykey=None): """ Returns a sorted list of fields with regards to the given *sortingkey* of their fid, as a dict of lists. Argument *onlykey* can be specified as a string or a tuple of strings, so that only specified keys of the fid will returned. """ sortedfields = {} listoffields = self.listfields() onlykeylistoffields = self.listfields(onlykey=onlykey) for f in range(len(listoffields)): try: category = listoffields[f][sortingkey] except KeyError: category = 'None' field = onlykeylistoffields[f] if category in sortedfields.keys(): sortedfields[category].append(field) else: sortedfields[category] = [field] return sortedfields
[docs] def readfield(self, handgrip, getdata=True): """ Finds in GRIB the message that corresponds to the *handgrip*, and returns it as a :class:`epygram.base.Field`. If several messages meet the requirements, returns the first one found out. *handgrip* is a dict where you can store all requested GRIB keys... E.g. {'shortName':'t', 'indicatorOfTypeOfLevel':'pl', 'level':850} will return the Temperature at 850hPa field. If *getdata* == **False**, the data is not read, the field consist in the meta-data only. """ matchingfields = self.readfields(handgrip, getdata=getdata) if len(matchingfields) != 1: #epylog.warning("GRIB.readfield(): several fields found for that *handgrip*.") raise epygramError("several fields found for that *handgrip*; please refine.") return matchingfields[0]
@Resource._openbeforedelayed
[docs] def readfields(self, handgrip, getdata=True): """ Finds in GRIB the message(s) that corresponds to the *handgrip*, and returns it as a :class:`epygram.base.FieldSet` of :class:`epygram.base.Field`. *handgrip* is a dict where you can store all requested GRIB keys... E.g. {'shortName':'t', 'indicatorOfTypeOfLevel':'pl'} will return all the Temperature fields on Pressure levels. If *getdata* == **False**, the data is not read, the field(s) consist in the meta-data only. """ matchingfields = FieldSet() for msg in self._messages: ok = True for key in handgrip.keys(): try: ok = float(msg[key]) == float(handgrip[key]) except ValueError: ok = msg[key] == handgrip[key] except (gribapi.GribInternalError, KeyError): ok = False if not ok: break if ok: matchingfields.append(msg.asfield(getdata=getdata)) if len(matchingfields) == 0: raise epygramError("no field matching the given *handgrip* has been found in resource.") return matchingfields
@Resource._openbeforedelayed
[docs] def what(self, out, mode='one+list', sortby=None): """ Writes in file a summary of the contents of the GRIB. Args: \n - *out*: the output open file-like object (duck-typing: *out*.write() only is needed). - *mode*: among ('one+list', 'fid_list', 'what', 'ls', 'mars'), \n - 'one+list' = gives the validity/geometry of the first field in GRIB, plus the list of fid. - 'fid_list' = gives only the fid of each field in GRIB. - 'what' = gives the values of the keys from each GRIB message that are used to generate an **epygram** field from the message (slower). - 'ls' = gives the values of the 'ls' keys from each GRIB message. - 'mars' = gives the values of the 'mars' keys from each GRIB message. - *sortby*: name of the fid key used to sort fields; e.g. 'typeOfLevel'; only for *mode* = 'one+list' or 'fid_list'. """ firstcolumn_width = 50 secondcolumn_width = 16 sepline = '{:-^{width}}'.format('', width=firstcolumn_width + secondcolumn_width + 1) + '\n' def write_formatted_dict(dest, fid): name = fid.pop('name') dest.write('name: ' + name + '\n') for k in sorted(fid.keys()): dest.write(' ' + str(k) + ': ' + str(fid[k]) + '\n') if mode == 'one+list': onefield = self._messages[0].asfield(getdata=False) g0 = onefield.geometry.footprint_as_dict() g0.pop('vcoordinate') for m in self._messages: """ g = m.read_geometry().footprint_as_dict() g.pop('vcoordinate') if g != g0: print g print g0 raise epygramError("all fields do not share their geometry; 'one+list' mode (-m) disabled.") """ if m.read_validity().get() != onefield.validity.get(): print m.read_validity() print onefield.validity raise epygramError("all fields do not share their validity; 'one+list' mode (-m) disabled.") onefield.what(out, vertical_geometry=False, cumulativeduration=False) out.write("######################\n") out.write("### LIST OF FIELDS ###\n") out.write("######################\n") listoffields = self.listfields() out.write("Number: " + str(len(listoffields)) + "\n") if mode in ('what', 'ls', 'mars'): out.write(sepline) for m in self._messages: if mode == 'what': _ = m.asfield(getdata=False) elif mode in ('ls', 'mars'): m.readmessage(mode) m._readattribute('name') m_dict = dict(m) write_formatted_dict(out, m_dict) elif sortby != None: out.write("sorted by: " + sortby + "\n") out.write(sepline) sortedfields = self.sortfields(sortby) for category in sorted(sortedfields.keys()): out.write(sortby + ": " + str(category) + '\n') out.write('--------------------' + '\n') for f in sortedfields[category]: f.pop(sortby) write_formatted_dict(out, f) out.write('--------------------' + '\n') else: out.write(sepline) for f in listoffields: write_formatted_dict(out, f) out.write(sepline)