#!/usr/bin/env python
# -*- coding: utf-8 -*-
__all__ = ['GRIB']
import datetime
import gribapi
import footprints
from footprints import FPDict
from epygram import config, epygramError, epylog
from epygram import H2DField
from epygram.base import Resource, FieldSet, FieldValidity
from epygram.util import Angle, RecursiveObject
class GRIBmessage(RecursiveObject, dict):
"""
Class implementing a GRIB message as an object.
"""
def __init__(self, source):
"""
*source* being either
- a dict with keys ('file', 'index_in_file');
- *filename* being an open file-like object designing the GRIB
file it is read from,
- *index_in_file* being the index of the message in file.
- a binary string GRIB message (Not Implemented Yet)
- a GRIB sample name (Not Implemented Yet).
"""
self.namespace = None
super(GRIBmessage, self).__init__()
if isinstance(source, dict) and set(source.keys()) == set(['file', 'index_in_file']):
self._meta_gid = gribapi.grib_new_from_file(source['file'], headers_only=True)
self._filename = source['file'].name
self._index = source['index_in_file']
else:
raise NotImplementedError("not yet.")
def __del__(self):
try:
gribapi.grib_release(self._meta_gid)
except gribapi.GribInternalError:
pass
def __getitem__(self, item):
if item not in self.keys():
if config.GRIB_read_whole_message:
for namespace in ('ls', 'mars', None):
self.readmessage(namespace=namespace)
if item in self.keys():
break
else:
self._readattribute(item)
if item not in self.keys():
raise KeyError(item + " not in GRIBmessage keys.")
return super(GRIBmessage, self).__getitem__(item)
def readmessage(self, namespace=None):
"""
Reads the meta-data of the message.
Args:\n
- *namespace*: the namespace of keys to be read, among:\n
- **None**: to get all keys present in message,
- ['myKey1', 'myKey2', ...] for any custom namespace,
- 'ls': to get the same default keys as the grib_ls,
- 'mars': to get the keys used by MARS.
"""
self.clear()
self.namespace = namespace
if namespace in (None, 'ls', 'mars'):
key_iter = gribapi.grib_keys_iterator_new(self._meta_gid, namespace=namespace)
namespace = []
while gribapi.grib_keys_iterator_next(key_iter):
namespace.append(gribapi.grib_keys_iterator_get_name(key_iter))
# Explanation: the message accessed via self._meta_gid is "headers_only";
# if a key is not found or returns an error, we try to access it via a
# "full" temporary gid, generated on that occasion.
gid_full = None
for k in namespace:
try:
try:
self[k] = gribapi.grib_get(self._meta_gid, k)
except gribapi.GribInternalError as e:
if str(e) == 'Passed array is too small':
self[k] = gribapi.grib_get_double_array(self._meta_gid, k)
except Exception as e:
if gid_full == None:
tempfile = open(self._filename, 'r')
for _ in range(1, self._index):
gid = gribapi.grib_new_from_file(tempfile, headers_only=True)
gribapi.grib_release(gid)
gid_full = gribapi.grib_new_from_file(tempfile)
try:
self[k] = gribapi.grib_get(gid_full, k)
except gribapi.GribInternalError as e: # differenciation not well done... PB in gribapi
if str(e) == 'Passed array is too small':
self[k] = gribapi.grib_get_double_array(gid_full, k)
else:
gribapi.grib_release(gid_full)
raise
if gid_full != None:
gribapi.grib_release(gid_full)
def _readattribute(self, attribute):
# Explanation: the message accessed via self._meta_gid is "headers_only";
# if a key is not found or returns an error, we try to access it via a
# "full" temporary gid, generated on that occasion.
try:
try:
self[attribute] = gribapi.grib_get(self._meta_gid, attribute)
except gribapi.GribInternalError as e:
if str(e) == 'Passed array is too small':
self[attribute] = gribapi.grib_get_double_array(self._meta_gid, attribute)
else:
raise
except Exception as e:
tempfile = open(self._filename, 'r')
for _ in range(1, self._index):
gid = gribapi.grib_new_from_file(tempfile, headers_only=True)
gribapi.grib_release(gid)
gid_full = gribapi.grib_new_from_file(tempfile)
try:
self[attribute] = gribapi.grib_get(gid_full, attribute)
except gribapi.GribInternalError as e: # differenciation not well done... PB in gribapi
if str(e) == 'Passed array is too small':
self[attribute] = gribapi.grib_get_double_array(gid_full, attribute)
else:
raise
finally:
gribapi.grib_release(gid_full)
def getvalues(self):
"""
Reads and returns the message values.
"""
if not hasattr(self, '_values'):
with open(self._filename, 'r') as f:
for _ in range(1, self._index):
gid = gribapi.grib_new_from_file(f, headers_only=True)
gribapi.grib_release(gid)
gid = gribapi.grib_new_from_file(f)
self._values = gribapi.grib_get_values(gid)
gribapi.grib_release(gid)
return self._values
def genfid(self):
"""
Generates and returns a GRIB-type epygram fid from the message.
"""
fid = {'name':self['name'],
'shortName':self['shortName'],
'indicatorOfParameter':self['indicatorOfParameter'],
'paramId':self['paramId'],
'indicatorOfTypeOfLevel':self['indicatorOfTypeOfLevel'],
'typeOfLevel':self['typeOfLevel'],
'level':self['level'],
'table2Version':self['table2Version']}
return fid
def read_geometry(self):
"""
Returns a :class:`epygram.H2DGeometry.H2DGeometry` object containing the geometry information of the GRIB message.
"""
geoid = config.default_geoid
vcoordinate = {'indicatorOfTypeOfLevel':self['indicatorOfTypeOfLevel'],
'typeOfLevel':self['typeOfLevel'],
'Z':self['level']
}
if self['gridType'] == 'regular_ll':
geometryname = 'regular_lonlat'
center_lon = float(self['longitudeOfLastGridPointInDegrees'] + self['longitudeOfFirstGridPointInDegrees']) / 2.
center_lat = float(self['latitudeOfLastGridPointInDegrees'] + self['latitudeOfFirstGridPointInDegrees']) / 2.
grid = {'center_lon':Angle(center_lon, 'degrees'),
'center_lat':Angle(center_lat, 'degrees'),
'X_resolution':Angle(self['iDirectionIncrementInDegrees'], 'degrees'),
'Y_resolution':Angle(self['jDirectionIncrementInDegrees'], 'degrees')
}
dimensions = {'X':self['Ni'],
'Y':self['Nj']
}
projection = None
elif self['gridType'] in ('polar_stereographic',):
geometryname = self['gridType']
if config.default_projtool == 'pyproj':
import pyproj
projtool = pyproj
projdict = {'lambert':'lcc',
'mercator':'merc',
'polar_stereographic':'stere'}
elif config.default_projtool == 'myproj':
from epygram import myproj
projtool = myproj
projdict = {'lambert':'lambert',
'mercator':'mercator',
'polar_stereographic':'polar_stereographic'}
proj = projdict[geometryname]
if self['projectionCentreFlag'] == 0:
lat_0 = 90.
else:
lat_0 = -90.
if self['centreDescription'] == 'Oslo': # !!! dirty bypass of erroneous GRIBs from OSI-SAF !..
epylog.warning(' '.join(['for centre:', self['centreDescription'],
' and polar stereographic projection,\
lat_ts is taken in 3rd position in attribute "pv[]" of GRIB message !']))
lat_ts = self['pv'][2]
geoid = {'a':self['pv'][0], 'b':self['pv'][1]}
p = projtool.Proj(proj=proj,
lon_0=float(self['orientationOfTheGridInDegrees']),
lat_0=lat_0, lat_ts=lat_ts,
**geoid)
x1, y1 = p(float(self['longitudeOfFirstGridPointInDegrees']),
float(self['latitudeOfFirstGridPointInDegrees']))
xc = x1 + (float(self['Nx'])-1) * self['DxInMetres'] / 2.
yc = y1 + (float(self['Ny'])-1) * self['DyInMetres'] / 2.
center_lon, center_lat = p(xc, yc, inverse=True)
projection = {'reference_lon':Angle(float(self['orientationOfTheGridInDegrees']), 'degrees'),
'reference_lat':Angle(lat_0, 'degrees'),
'secant_lat':Angle(lat_ts, 'degrees'),
'center_lon':Angle(center_lon, 'degrees'),
'center_lat':Angle(center_lat, 'degrees')
}
grid = {'X_resolution':float(self['DxInMetres']),
'Y_resolution':float(self['DyInMetres']),
'LAMzone':None
}
dimensions = {'X':self['Nx'],
'Y':self['Ny']
}
else:
raise NotImplementedError("not yet !")
# Make geometry object
kwargs_geom = dict(structure='H2D',
name=geometryname,
grid=FPDict(grid),
dimensions=FPDict(dimensions),
vcoordinate=FPDict(vcoordinate),
projection=projection,
geoid=geoid)
geometry = footprints.proxy.geometry(**kwargs_geom)
return geometry
def read_validity(self):
"""
Returns a :class:`epygram.base.FieldValidity` object containing the validity of the GRIB message.
"""
year = int(str(self['dataDate'])[0:4])
month = int(str(self['dataDate'])[4:6])
day = int(str(self['dataDate'])[6:8])
dataTime = '{:0>{width}}'.format(self['dataTime'], width=4)
hour = int(dataTime[0:2])
minutes = int(dataTime[2:4])
basis = datetime.datetime(year, month, day, hour, minutes)
cum = None
if self['stepUnits'] == 1:
timeunitfactor = 3600
if self['timeRangeIndicator'] in (0, 1, 2, 4):
term = self['endStep']
term = datetime.timedelta(seconds=term*timeunitfactor)
if self['timeRangeIndicator'] in (2, 4):
cum = self['endStep'] - self['startStep']
cum = datetime.timedelta(seconds=cum*timeunitfactor)
else:
raise NotImplementedError("'timeRangeIndicator' not in (0,2,4): not yet !")
validity = FieldValidity(basis=basis, term=term, cumulativeduration=cum)
return validity
def asfield(self, getdata=True):
"""
Returns an :class:`epygram.base.Field` made out from the GRIB message.
"""
fid = self.genfid()
geometry = self.read_geometry()
spectral_geometry = None
validity = self.read_validity()
processtype = self['generatingProcessIdentifier']
comment = 'units = ' + self['units']
field = H2DField(fid=FPDict({'GRIB':FPDict(fid)}),
geometry=geometry,
validity=validity,
spectral_geometry=spectral_geometry,
processtype=processtype,
comment=comment)
if getdata:
data1d = self.getvalues()
if self['iScansNegatively'] == 0 and \
self['jScansPositively'] == 0 and \
self['jPointsAreConsecutive'] == 0:
data2d = data1d.reshape((self['Nj'], self['Ni']), order='C')[::-1,:]
elif self['iScansNegatively'] == 0 and \
self['jScansPositively'] == 1 and \
self['jPointsAreConsecutive'] == 0:
data2d = data1d.reshape((self['Nj'], self['Ni']), order='C')[:,:]
else:
raise NotImplementedError("not yet !")
field.setdata(data2d)
return field
[docs]class GRIB(Resource):
"""
Class implementing all specificities for GRIB resource format.
"""
_footprint = dict(
attr = dict(
format = dict(
values = set(['GRIB']),
default = 'GRIB')
)
)
def __init__(self, *args, **kwargs):
"""
Constructor. See its footprint for arguments.
"""
self.isopen = False
super(GRIB, self).__init__(*args, **kwargs)
_file = open(self.container.abspath, 'r')
isgrib = _file.readline()
_file.close()
if isgrib[0:4] != 'GRIB':
raise IOError("this resource is not a GRIB one.")
if not self.fmtdelayedopen:
self.open()
[docs] def open(self):
"""
Opens a GRIB and initializes some attributes.
"""
if self.openmode in ('r', 'a'):
self._file = open(self.container.abspath, 'r')
self.isopen = True
self._messages = [GRIBmessage({'file':self._file,
'index_in_file':i})
for i in range(1, self.messages_number+1)]
elif self.openmode == 'w':
raise NotImplementedError("not yet !")
[docs] def close(self):
"""
Closes a GRIB.
"""
if hasattr(self, '_file'):
self._file.close()
self.isopen = False
@property
@Resource._openbeforedelayed
def messages_number(self):
return gribapi.grib_count_in_file(self._file)
[docs] def listfields(self, onlykey=None):
"""
Returns a list containing the GRIB identifiers of all the fields of the resource.
Argument *onlykey* can be specified as a string or a tuple of strings,
so that only specified keys of the fid will returned.
"""
fidlist = super(GRIB, self).listfields()
if onlykey != None:
if isinstance(onlykey, str):
fidlist = [f[onlykey] for f in fidlist]
elif isinstance(onlykey, tuple):
fidlist = [{k:f[k] for k in onlykey} for f in fidlist]
return fidlist
@Resource._openbeforedelayed
def _listfields(self):
"""
Returns a list of GRIB-type fid of the fields inside the resource.
"""
fidlist = []
for msg in self._messages:
fidlist.append(msg.genfid())
return fidlist
[docs] def sortfields(self, sortingkey, onlykey=None):
"""
Returns a sorted list of fields with regards to the given *sortingkey*
of their fid, as a dict of lists.
Argument *onlykey* can be specified as a string or a tuple of strings,
so that only specified keys of the fid will returned.
"""
sortedfields = {}
listoffields = self.listfields()
onlykeylistoffields = self.listfields(onlykey=onlykey)
for f in range(len(listoffields)):
try:
category = listoffields[f][sortingkey]
except KeyError:
category = 'None'
field = onlykeylistoffields[f]
if category in sortedfields.keys():
sortedfields[category].append(field)
else:
sortedfields[category] = [field]
return sortedfields
[docs] def readfield(self, handgrip, getdata=True):
"""
Finds in GRIB the message that corresponds to the *handgrip*,
and returns it as a :class:`epygram.base.Field`.
If several messages meet the requirements, returns the first one
found out.
*handgrip* is a dict where you can store all requested GRIB keys...
E.g. {'shortName':'t', 'indicatorOfTypeOfLevel':'pl', 'level':850}
will return the Temperature at 850hPa field.
If *getdata* == **False**, the data is not read, the field consist
in the meta-data only.
"""
matchingfields = self.readfields(handgrip, getdata=getdata)
if len(matchingfields) != 1:
#epylog.warning("GRIB.readfield(): several fields found for that *handgrip*.")
raise epygramError("several fields found for that *handgrip*; please refine.")
return matchingfields[0]
@Resource._openbeforedelayed
[docs] def readfields(self, handgrip, getdata=True):
"""
Finds in GRIB the message(s) that corresponds to the *handgrip*,
and returns it as a :class:`epygram.base.FieldSet` of
:class:`epygram.base.Field`.
*handgrip* is a dict where you can store all requested GRIB keys...
E.g. {'shortName':'t', 'indicatorOfTypeOfLevel':'pl'}
will return all the Temperature fields on Pressure levels.
If *getdata* == **False**, the data is not read, the field(s) consist
in the meta-data only.
"""
matchingfields = FieldSet()
for msg in self._messages:
ok = True
for key in handgrip.keys():
try:
ok = float(msg[key]) == float(handgrip[key])
except ValueError:
ok = msg[key] == handgrip[key]
except (gribapi.GribInternalError, KeyError):
ok = False
if not ok:
break
if ok:
matchingfields.append(msg.asfield(getdata=getdata))
if len(matchingfields) == 0:
raise epygramError("no field matching the given *handgrip* has been found in resource.")
return matchingfields
@Resource._openbeforedelayed
[docs] def what(self, out, mode='one+list', sortby=None):
"""
Writes in file a summary of the contents of the GRIB.
Args: \n
- *out*: the output open file-like object (duck-typing: *out*.write()
only is needed).
- *mode*: among ('one+list', 'fid_list', 'what', 'ls', 'mars'), \n
- 'one+list' = gives the validity/geometry of the first field in
GRIB, plus the list of fid.
- 'fid_list' = gives only the fid of each field in GRIB.
- 'what' = gives the values of the keys from each GRIB message that
are used to generate an **epygram** field from the message (slower).
- 'ls' = gives the values of the 'ls' keys from each GRIB message.
- 'mars' = gives the values of the 'mars' keys from each GRIB message.
- *sortby*: name of the fid key used to sort fields; e.g. 'typeOfLevel';
only for *mode* = 'one+list' or 'fid_list'.
"""
firstcolumn_width = 50
secondcolumn_width = 16
sepline = '{:-^{width}}'.format('', width=firstcolumn_width + secondcolumn_width + 1) + '\n'
def write_formatted_dict(dest, fid):
name = fid.pop('name')
dest.write('name: ' + name + '\n')
for k in sorted(fid.keys()):
dest.write(' ' + str(k) + ': ' + str(fid[k]) + '\n')
if mode == 'one+list':
onefield = self._messages[0].asfield(getdata=False)
g0 = onefield.geometry.footprint_as_dict()
g0.pop('vcoordinate')
for m in self._messages:
"""
g = m.read_geometry().footprint_as_dict()
g.pop('vcoordinate')
if g != g0:
print g
print g0
raise epygramError("all fields do not share their geometry; 'one+list' mode (-m) disabled.")
"""
if m.read_validity().get() != onefield.validity.get():
print m.read_validity()
print onefield.validity
raise epygramError("all fields do not share their validity; 'one+list' mode (-m) disabled.")
onefield.what(out, vertical_geometry=False, cumulativeduration=False)
out.write("######################\n")
out.write("### LIST OF FIELDS ###\n")
out.write("######################\n")
listoffields = self.listfields()
out.write("Number: " + str(len(listoffields)) + "\n")
if mode in ('what', 'ls', 'mars'):
out.write(sepline)
for m in self._messages:
if mode == 'what':
_ = m.asfield(getdata=False)
elif mode in ('ls', 'mars'):
m.readmessage(mode)
m._readattribute('name')
m_dict = dict(m)
write_formatted_dict(out, m_dict)
elif sortby != None:
out.write("sorted by: " + sortby + "\n")
out.write(sepline)
sortedfields = self.sortfields(sortby)
for category in sorted(sortedfields.keys()):
out.write(sortby + ": " + str(category) + '\n')
out.write('--------------------' + '\n')
for f in sortedfields[category]:
f.pop(sortby)
write_formatted_dict(out, f)
out.write('--------------------' + '\n')
else:
out.write(sepline)
for f in listoffields:
write_formatted_dict(out, f)
out.write(sepline)