#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
GRIB:
Contains classes for GRIB resource and GRIB individual message,
editions 1 and 2.
"""
__all__ = ['GRIB'] #, 'GRIBmessageFactory']
import datetime
import os
os.putenv('GRIB_SAMPLES_PATH', os.getenv('GRIB_SAMPLES_PATH') + \
':/home/mary/worktmp/GRIB') #TODO: get a better place for sample
import gribapi
from footprints import FPDict, proxy as fpx
from epygram import config, epygramError, epylog
from epygram.base import Resource, FieldSet, FieldValidity
from epygram.util import Angle, RecursiveObject
from epygram.fields import H2DField
from epygram.geometries import H2DGeometry
[docs]class GRIBmessage(RecursiveObject, dict):
"""
Class implementing a GRIB message as an object.
Abstract class: common patterns to editions 1 and 2.
"""
@property
def grib_edition(self):
"""Returns the edition (1,2) of the GRIB message."""
try:
return int(self.__class__.__name__[4:5])
except ValueError:
raise epygramError("GRIBmessage is an abstract class.")
def __init__(self, source,
centre_id=config.GRIB_default_centre,
process_id=config.GRIB_default_process,
packing=config.GRIB_default_packing):
"""
*source* being either:\n
- a dict with keys ('file', 'index_in_file');
- *file* being an open file-like object designing the GRIB
file it is read from,
- *index_in_file* being the index of the message in file.
- a :class:`epygram.fields.H2DField` instance.
In that case, the *centre_id*, *process_id* and *packing* options
can be forced using the adhoc arguments.
"""
if self.__class__.__name__ == 'GRIBmessage':
raise epygramError("GRIBmessage is an abstract class.\
Use GRIB1message or GRIB2message.")
self.namespace = None
super(GRIBmessage, self).__init__()
if isinstance(source, dict) and\
set(source.keys()) == set(['file', 'index_in_file']):
self.built_from = 'file'
self._gid = gribapi.grib_new_from_file(source['file'],
headers_only=True)
self._filename = source['file'].name
self._index = source['index_in_file']
elif isinstance(source, H2DField):
self.built_from = 'field'
self._build_msg_from_field(source,
centre_id=centre_id,
process_id=process_id,
packing=packing)
#elif isinstance(source, str):
# self._gid = gribapi.grib_new_from_samples(source)
else:
raise NotImplementedError("not yet.")
def __del__(self):
try:
gribapi.grib_release(self._gid)
except gribapi.GribInternalError:
pass
def __getitem__(self, item):
if item not in self.keys():
if config.GRIB_read_whole_message:
for namespace in ('ls', 'mars', None):
self.readmessage(namespace=namespace)
if item in self.keys():
break
else:
self._readattribute(item)
if item not in self.keys():
raise KeyError(item + " not in GRIBmessage keys.")
return super(GRIBmessage, self).__getitem__(item)
def __setitem__(self, key, value):
gribapi.grib_set(self._gid, key, value)
super(GRIBmessage, self).__setitem__(key, value)
[docs] def readmessage(self, namespace=None):
"""
Reads the meta-data of the message.
Args:\n
- *namespace*: the namespace of keys to be read, among:\n
- **None**: to get all keys present in message,
- ['myKey1', 'myKey2', ...] for any custom namespace,
- 'ls': to get the same default keys as the grib_ls,
- 'mars': to get the keys used by MARS.
"""
self.clear()
self.namespace = namespace
if namespace in (None, 'ls', 'mars'):
key_iter = gribapi.grib_keys_iterator_new(self._gid,
namespace=namespace)
namespace = []
while gribapi.grib_keys_iterator_next(key_iter):
namespace.append(gribapi.grib_keys_iterator_get_name(key_iter))
# Explanation: the message accessed via self._gid is "headers_only";
# if a key is not found or returns an error, we try to access it via a
# "full" temporary gid, generated on that occasion.
gid_full = None
for k in namespace:
try:
try:
super(GRIBmessage, self).__setitem__(k,
gribapi.grib_get(self._gid, k))
except gribapi.GribInternalError as e:
if str(e) == 'Passed array is too small':
super(GRIBmessage, self).__setitem__(k,
gribapi.grib_get_double_array(self._gid, k))
except Exception as e:
if self.built_from != 'file':
raise e
if gid_full == None:
tempfile = open(self._filename, 'r')
for _ in range(1, self._index):
gid = gribapi.grib_new_from_file(tempfile,
headers_only=True)
gribapi.grib_release(gid)
gid_full = gribapi.grib_new_from_file(tempfile)
try:
super(GRIBmessage, self).__setitem__(k,
gribapi.grib_get(gid_full, k))
except gribapi.GribInternalError as e:
# differenciation not well done... PB in gribapi
if str(e) == 'Passed array is too small':
super(GRIBmessage, self).__setitem__(k,
gribapi.grib_get_double_array(gid_full, k))
else:
gribapi.grib_release(gid_full)
raise
if gid_full != None:
gribapi.grib_release(gid_full)
def _readattribute(self, attribute):
"""Actual access to the attributes."""
# Explanation: the message accessed via self._gid is "headers_only";
# if a key is not found or returns an error, we try to access it via a
# "full" temporary gid, generated on that occasion.
try:
try:
super(GRIBmessage, self).__setitem__(attribute,
gribapi.grib_get(self._gid, attribute))
except gribapi.GribInternalError as e:
if str(e) == 'Passed array is too small':
super(GRIBmessage, self).__setitem__(attribute,
gribapi.grib_get_double_array(self._gid, attribute))
else:
raise
except Exception as e:
if self.built_from != 'file':
raise e
tempfile = open(self._filename, 'r')
for _ in range(1, self._index):
gid = gribapi.grib_new_from_file(tempfile, headers_only=True)
gribapi.grib_release(gid)
gid_full = gribapi.grib_new_from_file(tempfile)
try:
super(GRIBmessage, self).__setitem__(attribute,
gribapi.grib_get(gid_full, attribute))
except gribapi.GribInternalError as e:
# differenciation not well done... PB in gribapi
if str(e) == 'Passed array is too small':
super(GRIBmessage, self).__setitem__(attribute,
gribapi.grib_get_double_array(gid_full, attribute))
else:
raise
finally:
gribapi.grib_release(gid_full)
def getvalues(self):
"""Reads and returns the message values."""
if self.built_from != 'file':
raise epygramError("'getvalues()' method should not be called for\
a GRIBmessage built from other than a file.")
if not hasattr(self, '_values'):
if self.built_from != 'file':
self._values = gribapi.grib_get_values(self._gid)
else:
with open(self._filename, 'r') as f:
for _ in range(1, self._index):
gid = gribapi.grib_new_from_file(f, headers_only=True)
gribapi.grib_release(gid)
gid = gribapi.grib_new_from_file(f)
self._values = gribapi.grib_get_values(gid)
gribapi.grib_release(gid)
return self._values
def read_geometry(self, footprints_builder=config.use_footprints_as_builder):
"""
Returns a geometry object containing
the geometry information of the GRIB message.
- *footprints_builder*: if *True*, uses footprints.proxy to build
fields. Defaults to False for performance reasons.
"""
geoid = config.default_geoid
vcoordinate = {'indicatorOfTypeOfLevel':self['indicatorOfTypeOfLevel'],
'typeOfLevel':self['typeOfLevel'],
'Z':self['level']
}
if self['gridType'] == 'regular_ll':
geometryname = 'regular_lonlat'
center_lon = float(self['longitudeOfLastGridPointInDegrees'] + \
self['longitudeOfFirstGridPointInDegrees']) / 2.
center_lat = float(self['latitudeOfLastGridPointInDegrees'] + \
self['latitudeOfFirstGridPointInDegrees']) / 2.
grid = {'center_lon':Angle(center_lon, 'degrees'),
'center_lat':Angle(center_lat, 'degrees'),
'X_resolution':Angle(self['iDirectionIncrementInDegrees'],
'degrees'),
'Y_resolution':Angle(self['jDirectionIncrementInDegrees'],
'degrees')
}
dimensions = {'X':self['Ni'],
'Y':self['Nj']
}
projection = None
elif self['gridType'] in ('polar_stereographic',):
geometryname = self['gridType']
if config.default_projtool == 'pyproj':
import pyproj
projtool = pyproj
projdict = {'lambert':'lcc',
'mercator':'merc',
'polar_stereographic':'stere'}
elif config.default_projtool == 'myproj':
from epygram import myproj
projtool = myproj
projdict = {'lambert':'lambert',
'mercator':'mercator',
'polar_stereographic':'polar_stereographic'}
proj = projdict[geometryname]
if self['projectionCentreFlag'] == 0:
lat_0 = 90.
else:
lat_0 = -90.
# !!! dirty bypass of erroneous GRIBs from OSI-SAF !..
if self['centreDescription'] == 'Oslo':
epylog.warning(' '.join(['for centre:',
self['centreDescription'],
' and polar stereographic projection,\
lat_ts is taken in 3rd position in\
attribute "pv[]" of GRIB message !']))
lat_ts = self['pv'][2]
geoid = {'a':self['pv'][0], 'b':self['pv'][1]}
p = projtool.Proj(proj=proj,
lon_0=float(self['orientationOfTheGridInDegrees']),
lat_0=lat_0, lat_ts=lat_ts,
**geoid)
x1, y1 = p(float(self['longitudeOfFirstGridPointInDegrees']),
float(self['latitudeOfFirstGridPointInDegrees']))
xc = x1 + (float(self['Nx']) - 1) * self['DxInMetres'] / 2.
yc = y1 + (float(self['Ny']) - 1) * self['DyInMetres'] / 2.
center_lon, center_lat = p(xc, yc, inverse=True)
projection = {'reference_lon':Angle(float(self['orientationOfTheGridInDegrees']), 'degrees'),
'reference_lat':Angle(lat_0, 'degrees'),
'secant_lat':Angle(lat_ts, 'degrees'),
'center_lon':Angle(center_lon, 'degrees'),
'center_lat':Angle(center_lat, 'degrees')
}
grid = {'X_resolution':float(self['DxInMetres']),
'Y_resolution':float(self['DyInMetres']),
'LAMzone':None
}
dimensions = {'X':self['Nx'],
'Y':self['Ny']
}
else:
raise NotImplementedError("not yet !")
# Make geometry object
kwargs_geom = dict(structure='H2D',
name=geometryname,
grid=FPDict(grid),
dimensions=FPDict(dimensions),
vcoordinate=FPDict(vcoordinate),
projection=projection,
position_on_grid=FPDict({'vertical':'mass',
'horizontal':'center'}),
geoid=geoid)
if footprints_builder:
builder = fpx.geometry
else:
builder = H2DGeometry
geometry = builder(**kwargs_geom)
return geometry
def read_validity(self):
"""
Returns a :class:`epygram.base.FieldValidity` object containing the
validity of the GRIB message.
"""
year = int(str(self['dataDate'])[0:4])
month = int(str(self['dataDate'])[4:6])
day = int(str(self['dataDate'])[6:8])
dataTime = '{:0>{width}}'.format(self['dataTime'], width=4)
hour = int(dataTime[0:2])
minutes = int(dataTime[2:4])
basis = datetime.datetime(year, month, day, hour, minutes)
cum = None
if self['stepUnits'] == 1:
timeunitfactor = 3600
if self['timeRangeIndicator'] in (0, 1, 2, 4):
term = self['endStep']
term = datetime.timedelta(seconds=term * timeunitfactor)
if self['timeRangeIndicator'] in (2, 4):
cum = self['endStep'] - self['startStep']
cum = datetime.timedelta(seconds=cum * timeunitfactor)
else:
raise NotImplementedError("'timeRangeIndicator' not in (0,2,4).")
validity = FieldValidity(basis=basis, term=term, cumulativeduration=cum)
return validity
[docs] def asfield(self, getdata=True,
footprints_builder=config.use_footprints_as_builder):
"""
Returns an :class:`epygram.base.Field` made out from the GRIB message.
- *getdata*: if *False*, only metadata are read, the field do not
contain data.
- *footprints_builder*: if *True*, uses footprints.proxy to build
fields. Defaults to False for performance reasons.
"""
if footprints_builder:
builder = fpx.field
else:
builder = H2DField
fid = self.genfid()
geometry = self.read_geometry(footprints_builder=footprints_builder)
spectral_geometry = None
validity = self.read_validity()
processtype = self['generatingProcessIdentifier']
comment = 'units = ' + self['units']
field = builder(fid=FPDict({'GRIB':FPDict(fid)}),
geometry=geometry,
validity=validity,
spectral_geometry=spectral_geometry,
processtype=processtype,
comment=comment)
if getdata:
data1d = self.getvalues()
if self['iScansNegatively'] == 0 and \
self['jScansPositively'] == 0 and \
self['jPointsAreConsecutive'] == 0:
data2d = data1d.reshape((self['Nj'], self['Ni']),
order='C')[::-1, :]
elif self['iScansNegatively'] == 0 and \
self['jScansPositively'] == 1 and \
self['jPointsAreConsecutive'] == 0:
data2d = data1d.reshape((self['Nj'], self['Ni']),
order='C')[:, :]
else:
raise NotImplementedError("not yet !")
field.setdata(data2d)
return field
[docs] def write_to_file(self, ofile):
"""
*ofile* being an open file-like object designing the physical GRIB
file to be written to.
"""
gribapi.grib_write(self._gid, ofile)
[docs]class GRIB1message(GRIBmessage):
"""
Specificities of GRIB1.
"""
[docs] def genfid(self):
"""Generates and returns a GRIB-type epygram fid from the message."""
fid = {'name':self['name'],
'shortName':self['shortName'],
'indicatorOfParameter':self['indicatorOfParameter'],
'paramId':self['paramId'],
'indicatorOfTypeOfLevel':self['indicatorOfTypeOfLevel'],
'typeOfLevel':self['typeOfLevel'],
'level':self['level'],
'editionNumber':self['editionNumber'],
'table2Version':self['table2Version']}
return fid
def _build_msg_from_field(self, field,
centre_id=config.GRIB_default_centre,
process_id=config.GRIB_default_process,
packing=config.GRIB_default_packing):
"""
Build the GRIB message from the field, using a template.
*field* being a :class:`epygram.base.Field`.
Optional arguments:\n
- *centre_id*: identifier of the originating center
- *process_id*: identifier of the originating process
- *packing*: options of packing and compression in GRIB,
given as a list of tuples (key, value),
e.g. [(complexPacking, 1), ...].
"""
if not isinstance(field, H2DField):
raise NotImplementedError("not yet.")
if field.max() - field.min() < config.epsilon:
sample = 'GRIB' + str(self.grib_edition)
packing = [('complexPacking', 0), ]
else:
#sample = 'GRIDHSTFRANGP0025+0003_t2m'
sample = 'GRIDHSTEUROC25+0006_t2m'
sample_gid = gribapi.grib_new_from_samples(sample)
self._gid = gribapi.grib_clone(sample_gid)
gribapi.grib_release(sample_gid)
# part 1 --- fid
if 'GRIB' not in field.fid.keys():
from epygram.formats import fid_converter
GRIBfid = fid_converter(field.fid,
'GRIB' + \
str(self.grib_edition))
else:
GRIBfid = field.fid['GRIB']
for key in ['table2Version', 'indicatorOfParameter',
'indicatorOfTypeOfLevel', 'level', 'typeOfLevel']:
self[key] = GRIBfid[key]
# part 2 --- context
self['centre'] = centre_id
try:
process_id = int(field.processtype)
except ValueError:
if field.processtype == 'forecast':
process_id = 204
#TODO: other cases
self['generatingProcessIdentifier'] = process_id
# part 3 --- validity
self['dataDate'] = int(field.validity.getbasis(fmt='IntStr')[:8])
self['dataTime'] = int(field.validity.getbasis(fmt='IntStr')[8:12])
term_in_seconds = field.validity.term().total_seconds()
cumulative = field.validity.cumulativeduration()
if cumulative:
startStep_in_seconds = (field.validity.term() - cumulative).total_seconds()
self['timeRangeIndicator'] = 4
if term_in_seconds / 3600. < config.epsilon:
if cumulative:
self['startStep'] = int(startStep_in_seconds / 3600)
self['stepUnits'] = 'h' # hours
self['endStep'] = int(term_in_seconds / 3600)
else:
if cumulative:
self['startStep'] = int(startStep_in_seconds)
self['stepUnits'] = 's' # seconds
self['endStep'] = int(term_in_seconds)
# part 4 --- geometry
self['Ni'] = field.geometry.dimensions['X']
self['Nj'] = field.geometry.dimensions['Y']
corners = field.geometry.gimme_corners_ll()
self['longitudeOfFirstGridPointInDegrees'] = corners['ul'][0]
self['latitudeOfFirstGridPointInDegrees'] = corners['ul'][1]
self['longitudeOfLastGridPointInDegrees'] = corners['lr'][0]
self['latitudeOfLastGridPointInDegrees'] = corners['lr'][1]
if field.geometry.name == 'regular_lonlat':
self['iDirectionIncrementInDegrees'] = field.geometry.grid['X_resolution'].get('degrees')
self['jDirectionIncrementInDegrees'] = field.geometry.grid['Y_resolution'].get('degrees')
else:
raise NotImplementedError("not yet.")
# part 5 --- values
"""print gribapi.grib_get(self._gid, 'N2')
print gribapi.grib_get(self._gid, 'numberOfSecondOrderPackedValues')
print gribapi.grib_get(self._gid, 'widthOfWidths')
print gribapi.grib_get(self._gid, 'widthOfLengths')
print gribapi.grib_get(self._gid, 'NL')
addpack = [('N2', gribapi.grib_get(self._gid, 'N2')),
('numberOfSecondOrderPackedValues', gribapi.grib_get(self._gid, 'numberOfSecondOrderPackedValues')),
('widthOfWidths', gribapi.grib_get(self._gid, 'widthOfWidths')),
('widthOfLengths', gribapi.grib_get(self._gid, 'widthOfLengths')),
('NL', gribapi.grib_get(self._gid, 'NL'))]"""
"""self['packingType'] = 'grid_simple'
gribapi.grib_set_missing(self._gid, 'values')
self['bitsPerValue'] = 12"""
if self['iScansNegatively'] == 0 and \
self['jScansPositively'] == 0 and \
self['jPointsAreConsecutive'] == 0:
gribapi.grib_set_values(self._gid,
field.getdata()[::-1, :].flatten(order='C'))
elif self['iScansNegatively'] == 0 and \
self['jScansPositively'] == 1 and \
self['jPointsAreConsecutive'] == 0:
gribapi.grib_set_values(self._gid,
field.getdata()[:, :].flatten(order='C'))
for (k, v) in packing:
self[k] = v
"""print gribapi.grib_get(self._gid, 'N2')
print gribapi.grib_get(self._gid, 'numberOfSecondOrderPackedValues')
print gribapi.grib_get(self._gid, 'widthOfWidths')
print gribapi.grib_get(self._gid, 'widthOfLengths')
print gribapi.grib_get(self._gid, 'NL')
for (k,v) in addpack:
self[k] = v"""
[docs]class GRIB2message(GRIBmessage):
"""
Specificities of GRIB2.
"""
[docs] def genfid(self):
"""
Generates and returns a GRIB-type epygram fid from the message.
"""
raise NotImplementedError("not yet.")
fid = {'name':self['name'],
'shortName':self['shortName'],
'indicatorOfParameter':self['indicatorOfParameter'],
'paramId':self['paramId'],
'indicatorOfTypeOfLevel':self['indicatorOfTypeOfLevel'],
'typeOfLevel':self['typeOfLevel'],
'level':self['level'],
'editionNumber':self['editionNumber'],
'table2Version':self['table2Version']}
return fid
def _build_msg_from_field(self, field):
"""
Build the GRIB message from the field, using a template.
"""
raise NotImplementedError("not yet.")
#TODO:
[docs]class GRIB(Resource):
"""Class implementing all specificities for GRIB resource format."""
_footprint = dict(
attr=dict(
format=dict(
values=set(['GRIB']),
default='GRIB'),
grib_edition=dict(
values=set([1, 2]),
type=int,
optional=True,
info="Only with openmode = 'w'.",
default=config.GRIB_default_edition)
)
)
def __init__(self, *args, **kwargs):
"""Constructor. See its footprint for arguments."""
self.isopen = False
super(GRIB, self).__init__(*args, **kwargs)
if self.openmode in ('r', 'a'):
_file = open(self.container.abspath, 'r')
isgrib = _file.readline()
_file.close()
if isgrib[0:4] != 'GRIB':
raise IOError("this resource is not a GRIB one.")
if not self.fmtdelayedopen:
self.open()
def _GRIBmessageClass(self, source, **kwargs):
"""
Factory of GRIBmessage's with choice of grib edition number
determined by
1/ the 'editionNumber' of the fid of the field given to the constructor
2/ the grib_edition attribute of the GRIB object
"""
if self.grib_edition == 1:
msg = GRIB1message(source, **kwargs)
else:
msg = GRIB2message(source, **kwargs)
return msg
[docs] def open(self, openmode=None):
"""
Opens a GRIB and initializes some attributes.
- *openmode*: optional, to open with a specific openmode, eventually
different from the one specified at initialization.
"""
super(GRIB, self).open(openmode=openmode)
self._file = open(self.container.abspath, self.openmode)
if self.openmode in ('r', 'a'):
# Find GRIB edition number in the first message
gid = gribapi.grib_new_from_file(self._file, headers_only=True)
self._attributes['grib_edition'] = int(gribapi.grib_get(gid, 'editionNumber'))
gribapi.grib_release(gid)
self._file.close()
# and re-open
self._file = open(self.container.abspath, self.openmode)
self.isopen = True
self._messages = [self._GRIBmessageClass({'file':self._file,
'index_in_file':i})
for i in range(1, self.messages_number + 1)]
elif self.openmode == 'w':
self.isopen = True
self.empty = True
self._messages = []
[docs] def close(self):
"""
Closes a GRIB.
"""
if hasattr(self, '_file'):
self._file.close()
self.isopen = False
@property
@Resource._openbeforedelayed
[docs] def messages_number(self):
"""Counts the number of messages in file."""
return gribapi.grib_count_in_file(self._file)
[docs] def listfields(self, onlykey=None):
"""
Returns a list containing the GRIB identifiers of all the fields of the
resource.
Argument *onlykey* can be specified as a string or a tuple of strings,
so that only specified keys of the fid will returned.
"""
fidlist = super(GRIB, self).listfields()
if onlykey != None:
if isinstance(onlykey, str):
fidlist = [f[onlykey] for f in fidlist]
elif isinstance(onlykey, tuple):
fidlist = [{k:f[k] for k in onlykey} for f in fidlist]
return fidlist
@Resource._openbeforedelayed
def _listfields(self):
"""Returns a list of GRIB-type fid of the fields inside the resource."""
fidlist = []
for msg in self._messages:
fidlist.append(msg.genfid())
return fidlist
[docs] def sortfields(self, sortingkey, onlykey=None):
"""
Returns a sorted list of fields with regards to the given *sortingkey*
of their fid, as a dict of lists.
Argument *onlykey* can be specified as a string or a tuple of strings,
so that only specified keys of the fid will returned.
"""
sortedfields = {}
listoffields = self.listfields()
onlykeylistoffields = self.listfields(onlykey=onlykey)
for f in range(len(listoffields)):
try:
category = listoffields[f][sortingkey]
except KeyError:
category = 'None'
field = onlykeylistoffields[f]
if category in sortedfields.keys():
sortedfields[category].append(field)
else:
sortedfields[category] = [field]
return sortedfields
[docs] def readfield(self, handgrip, getdata=True,
footprints_builder=config.use_footprints_as_builder):
"""
Finds in GRIB the message that corresponds to the *handgrip*,
and returns it as a :class:`epygram.base.Field`.
If several messages meet the requirements, raises error (use
readfields() method instead).
*handgrip* is a dict where you can store all requested GRIB keys...
E.g. {'shortName':'t', 'indicatorOfTypeOfLevel':'pl', 'level':850}
will return the Temperature at 850hPa field.
If *getdata* == **False**, the data is not read, the field consist
in the meta-data only.
If *footprints_builder* == *True*, uses footprints.proxy to build
fields. True decreases performance.
"""
matchingfields = self.readfields(handgrip, getdata=getdata,
footprints_builder=footprints_builder)
if len(matchingfields) != 1:
raise epygramError("several fields found for that *handgrip*;\
please refine.")
return matchingfields[0]
@Resource._openbeforedelayed
[docs] def readfields(self, handgrip, getdata=True,
footprints_builder=config.use_footprints_as_builder):
"""
Finds in GRIB the message(s) that corresponds to the *handgrip*,
and returns it as a :class:`epygram.base.FieldSet` of
:class:`epygram.base.Field`.
*handgrip* is a dict where you can store all requested GRIB keys...
E.g. {'shortName':'t', 'indicatorOfTypeOfLevel':'pl'}
will return all the Temperature fields on Pressure levels.
If *getdata* == **False**, the data is not read, the field(s) consist
in the meta-data only.
If *footprints_builder* == *True*, uses footprints.proxy to build
fields. True decreases performance.
"""
matchingfields = FieldSet()
for msg in self._messages:
ok = True
for key in handgrip.keys():
try:
ok = float(msg[key]) == float(handgrip[key])
except ValueError:
ok = msg[key] == handgrip[key]
except (gribapi.GribInternalError, KeyError):
ok = False
if not ok:
break
if ok:
matchingfields.append(msg.asfield(getdata=getdata,
footprints_builder=footprints_builder))
if len(matchingfields) == 0:
raise epygramError("no field matching the given *handgrip* has been\
found in resource.")
return matchingfields
@Resource._openbeforedelayed
[docs] def writefield(self, field,
force_to_field_grib_edition=False,
centre_id=config.GRIB_default_centre,
process_id=config.GRIB_default_process,
packing=config.GRIB_default_packing):
"""
Writes a Field as a GRIBmessage into the GRIB resource.
Args:\n
- *field*: a :class:`epygram.base.Field` instance
- *centre_id*: identifier of the originating center
- *process_id*: identifier of the originating process
- *packing*: options of packing and compression in GRIB,
given as a list of tuples (key, value),
e.g. [(complexPacking, 1), ...].
"""
if not isinstance(field, H2DField):
raise NotImplementedError("'field' argument other than a H2DField.")
if force_to_field_grib_edition and 'GRIB' in field.fid.keys():
if self.grib_edition != field.fid['GRIB']['editionNumber']:
if self.empty:
self.grib_edition = field.fid['GRIB']['editionNumber']
else:
raise epygramError("cannot force grib edition if the GRIB\
is not empty.")
m = self._GRIBmessageClass(field,
centre_id=centre_id,
process_id=process_id,
packing=packing)
self._messages.append(m)
m.write_to_file(self._file)
if self.empty:
self.empty = False
@Resource._openbeforedelayed
[docs] def what(self, out, mode='one+list', sortby=None):
"""
Writes in file a summary of the contents of the GRIB.
Args: \n
- *out*: the output open file-like object (duck-typing: *out*.write()
only is needed).
- *mode*: among ('one+list', 'fid_list', 'what', 'ls', 'mars'), \n
- 'one+list' = gives the validity/geometry of the first field in
GRIB, plus the list of fid.
- 'fid_list' = gives only the fid of each field in GRIB.
- 'what' = gives the values of the keys from each GRIB message that
are used to generate an **epygram** field from the message (slower).
- 'ls' = gives the values of the 'ls' keys from each GRIB message.
- 'mars' = gives the values of the 'mars' keys from each GRIB message.
- *sortby*: name of the fid key used to sort fields; e.g. 'typeOfLevel';
only for *mode* = 'one+list' or 'fid_list'.
"""
firstcolumn_width = 50
secondcolumn_width = 16
sepline = '{:-^{width}}'.format('', width=firstcolumn_width + \
secondcolumn_width + 1) + '\n'
def write_formatted_dict(dest, fid):
name = fid.pop('name')
dest.write('name: ' + name + '\n')
for k in sorted(fid.keys()):
dest.write(' ' + str(k) + ': ' + str(fid[k]) + '\n')
if mode == 'one+list':
onefield = self._messages[0].asfield(getdata=False)
g0 = onefield.geometry.footprint_as_dict()
g0.pop('vcoordinate')
for m in self._messages:
if m.read_validity().get() != onefield.validity.get():
print m.read_validity()
print onefield.validity
raise epygramError("all fields do not share their validity;\
'one+list' mode (-m) disabled.")
onefield.what(out, vertical_geometry=False,
cumulativeduration=False)
out.write("######################\n")
out.write("### LIST OF FIELDS ###\n")
out.write("######################\n")
listoffields = self.listfields()
out.write("Number: " + str(len(listoffields)) + "\n")
if mode in ('what', 'ls', 'mars'):
out.write(sepline)
for m in self._messages:
if mode == 'what':
_ = m.asfield(getdata=False)
elif mode in ('ls', 'mars'):
m.readmessage(mode)
m._readattribute('name')
m_dict = dict(m)
write_formatted_dict(out, m_dict)
elif sortby != None:
out.write("sorted by: " + sortby + "\n")
out.write(sepline)
sortedfields = self.sortfields(sortby)
for category in sorted(sortedfields.keys()):
out.write(sortby + ": " + str(category) + '\n')
out.write('--------------------' + '\n')
for f in sortedfields[category]:
f.pop(sortby)
write_formatted_dict(out, f)
out.write('--------------------' + '\n')
else:
out.write(sepline)
for f in listoffields:
write_formatted_dict(out, f)
out.write(sepline)