#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy
import datetime
import footprints
from footprints import FootprintBase, FPDict
from epygram import epygramError, config, epylog
from epygram.util import RecursiveObject, nicedeco
[docs]class Field(RecursiveObject, FootprintBase):
"""
Generic abstract class implementing a Field, composed of an identifier and a data.
The field identifier *fid* identifies a field, in several format denominations.
E.g. *{FA='SURFTEMPERATURE', GRIB2={'4.2-0-0':0, '4.5':1}}*.
"""
_collector = ('field',)
_abstract = True
_footprint = dict(
attr = dict(
fid = dict(
type = FPDict),
data = dict(
optional = True,
default=None),
comment = dict(
type = str,
optional = True,
access = 'rwx')
)
)
[docs] def setdata(self, data):
"""
Sets or overwrites the field data as a numpy array.
Mainly useful because a footprints attribute cannot be a numpy array...
"""
if not isinstance(data, numpy.ndarray):
data = numpy.array(data, dtype=numpy.float64)
self._attributes['data'] = data
#############
# OPERATORS #
#############
def _check_operands(self, other):
"""
Internal method to check compatibility of terms in operations on fields.
"""
if not isinstance(other, self.__class__):
try:
other = float(other)
except Exception:
raise ValueError("operations on "+self.__class__.__name__
+ " must involve either scalars (integer/float) or "
+ self.__class__.__name__+".")
else:
if numpy.shape(self.data) != numpy.shape(other.data):
raise epygramError("dimensions mismatch.")
def _add(self, other, **kwargs):
"""
Definition of addition, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'+'} and null validity.
"""
self._check_operands(other)
if isinstance(other, self.__class__): rhs = other.data
else: rhs = other
result = self.data + rhs
newid = {'op':'+'}
newfield = footprints.proxy.field(fid=footprints.FPDict(newid),
**kwargs)
newfield.setdata(result)
return newfield
def _mul(self, other, **kwargs):
"""
Definition of multiplication, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'*'} and null validity.
"""
self._check_operands(other)
if isinstance(other, self.__class__): rhs = other.data
else: rhs = other
result = self.data * rhs
newid = {'op':'*'}
newfield = footprints.proxy.field(fid=footprints.FPDict(newid),
**kwargs)
newfield.setdata(result)
return newfield
def _sub(self, other, **kwargs):
"""
Definition of substraction, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'-'} and null validity.
"""
self._check_operands(other)
if isinstance(other, self.__class__): rhs = other.data
else: rhs = other
result = self.data - rhs
newid = {'op':'-'}
newfield = footprints.proxy.field(fid=footprints.FPDict(newid),
**kwargs)
newfield.setdata(result)
return newfield
def _div(self, other, **kwargs):
"""
Definition of division, 'other' being:
- a scalar (integer/float)
- another Field of the same subclass.
Returns a new Field whose data is the resulting operation,
with 'fid' = {'op':'/'} and null validity.
"""
self._check_operands(other)
if isinstance(other, self.__class__): rhs = other.data
else: rhs = other
result = self.data / rhs
newid = {'op':'/'}
newfield = footprints.proxy.field(fid=footprints.FPDict(newid),
**kwargs)
newfield.setdata(result)
return newfield
# default behaviors
def __add__(self, other):
return self._add(other)
def __mul__(self, other):
return self._mul(other)
def __sub__(self, other):
return self._sub(other)
def __div__(self, other):
return self._div(other)
[docs]class FieldSet(RecursiveObject, list):
"""
Handles a set of Fields, in the manner of Python's builtin list,
with some extra features, especially ensuring its components all are Fields.
Constructor optional argument *fields* has to be either a :class:`Field`
or an iterable of.
"""
def __init__(self, fields=()):
"""
Constructor.
Checks that optional 'fields' argument is actually iterable and
contains Field instances, or is a single Field.
"""
if fields == ():
pass
elif isinstance(fields, Field):
fields = (fields,)
else:
try:
for item in fields:
if not isinstance(item, Field):
raise epygramError("A FieldSet can only be made out of Field instances.")
except TypeError:
raise epygramError("'fields' argument must be either a Field instance or an iterable of fields.")
except Exception as e:
raise e
super(FieldSet, self).__init__(fields)
def __setitem__(self, position, field):
if not isinstance(field, Field):
raise epygramError("A FieldSet can contain only Field instances.")
super(FieldSet, self).__setitem__(position, field)
def __setslice__(self, pos1, pos2, fieldset):
if not isinstance(fieldset, FieldSet):
raise epygramError("'fieldset' argument must be of kind FieldSet.")
super(FieldSet, self).__setslice__(pos1, pos2, fieldset)
[docs] def append(self, field):
"""
Checks that *field* is a :class:`Field` instance before appending it.
"""
if not isinstance(field, Field):
raise epygramError("A FieldSet can contain only Field instances.")
super(FieldSet, self).append(field)
[docs] def index(self, fid):
"""
Returns the index of the first field of the FieldSet matching *fid*,
'fid' being a simple dict: *{typefmt:identifier}*.
"""
if not isinstance(fid, dict) or len(fid) > 1:
raise ValueError("'fid' must be a simple {typefmt:identifier}")
typefmt, identifier = fid.items()[0]
idx = None
for f in range(0, len(self)):
if self[f].fid[typefmt] == identifier:
idx = f
break
return idx
[docs] def extend(self, fieldset):
"""
Checks that *fieldset* is a :class:`FieldSet` instance before extending with it.
"""
if not isinstance(fieldset, FieldSet):
raise epygramError("'fieldset' argument must be of kind FieldSet.")
super(FieldSet, self).extend(fieldset)
[docs] def insert(self, position, field):
"""
Checks that *field* is a :class:`Field` instance before inserting it.
"""
if not isinstance(field, Field):
raise epygramError("A FieldSet can contain only Field instances.")
super(FieldSet, self).insert(position, field)
[docs] def remove(self, fid):
"""
Removes from the FieldSet the first field matching *fid*,
'fid' being a simple dict: {typefmt:identifier}
"""
try:
idx = self.index(fid)
del self[idx]
except Exception:
pass
[docs] def sort(self, attribute, key=None, reverse=False):
"""
Sorts the fields of the FieldSet by the increasing criterion.
If attribute is a string, sorting will be done according to *field.attribute[key]*
or *field.attribute* (if *key==None*).
If attribute is a list *[a1, a2...]*, sorting will be done according to *field.a1.a2[key]*
or *field.a1.a2* (if *key==None*).
If *reverse* is *True*, sorts by decreasing order.
"""
if isinstance(attribute, str):
if key == None:
cmpfct = lambda x,y: cmp(x._attributes[attribute],
y._attributes[attribute])
else:
cmpfct = lambda x,y: cmp(x._attributes[attribute][key],
y._attributes[attribute][key])
elif isinstance(attribute, list):
a = attribute
if isinstance(self[0]._attributes[a[0]], FootprintBase):
if len(attribute) == 2:
if key == None:
cmpfct = lambda x,y: cmp(x._attributes[a[0]]._attributes[a[1]],
y._attributes[a[0]]._attributes[a[1]])
else:
cmpfct = lambda x,y: cmp(x._attributes[a[0]]._attributes[a[1]][key],
y._attributes[a[0]]._attributes[a[1]][key])
elif len(attribute) == 3:
if key == None:
cmpfct = lambda x,y: cmp(x._attributes[a[0]]._attributes[a[1]].__dict__[a[2]],
y._attributes[a[0]]._attributes[a[1]].__dict__[a[2]])
else:
cmpfct = lambda x,y: cmp(x._attributes[a[0]]._attributes[a[1]].__dict__[a[2]][key],
y._attributes[a[0]]._attributes[a[1]].__dict__[a[2]][key])
else:
raise NotImplementedError("len(attribute) > 3.")
else:
if len(attribute) == 2:
if key == None:
cmpfct = lambda x,y: cmp(x._attributes[a[0]].__dict__[a[1]],
y._attributes[a[0]].__dict__[a[1]])
else:
cmpfct = lambda x,y: cmp(x._attributes[a[0]].__dict__[a[1]][key],
y._attributes[a[0]].__dict__[a[1]][key])
elif len(attribute) == 3:
if key == None:
cmpfct = lambda x,y: cmp(x._attributes[a[0]].__dict__[a[1]].__dict__[a[2]],
y._attributes[a[0]].__dict__[a[1]].__dict__[a[2]])
else:
cmpfct = lambda x,y: cmp(x._attributes[a[0]].__dict__[a[1]].__dict__[a[2]][key],
y._attributes[a[0]].__dict__[a[1]].__dict__[a[2]][key])
else:
raise NotImplementedError("len(attribute) > 3.")
else:
raise TypeError("attribute must be a string or list of string.")
super(FieldSet, self).sort(cmp=cmpfct, reverse=reverse)
[docs] def listfields(self, typefmt=None):
"""
Returns a list of the identifiers of the FieldSet.
If *typefmt* is supplied, the list contains only *fid[typefmt]*, and not whole fid.
"""
fieldslist = []
for field in self:
if typefmt != None:
fieldslist.append(field.fid[typefmt])
else:
fieldslist.append(field.fid)
return fieldslist
[docs] def filter(self, by, criteria):
"""
Not Implemented Yet.
Returns a new FieldSet filtered according to criteria specified in argument.
Args:
- by: the kind of filter; on what to filter ?
- criteria: how to filter on that ?
Available filters, examples:
- by='id', criteria={typefmt:identifier} will return only fields whose id[typefmt] match value...
"""
#TODO: id=, fieldtype=, spectral, (...)
raise NotImplementedError("not yet...")
[docs]class Resource(RecursiveObject, FootprintBase):
"""
Generic abstract class implementing a Resource.
"""
_abstract = True
_collector = ('dataformat',)
_footprint = dict(
attr = dict(
format = dict(
optional = True,
info = "Format of the resource."),
filename = dict(
info = "File name (absolute or relative) of the resource."),
openmode = dict(
values = set(['r', 'read', 'w', 'write', 'a', 'append']),
remap = dict(
read = 'r',
write = 'w',
append = 'a'),
info = "Opening mode."),
fmtdelayedopen = dict(
optional = True,
default = False,
type = bool,
info = "Opening of the resource delayed (not at time of construction).")
)
)
@nicedeco
def _openbeforedelayed(mtd):
def nowopen(self, *args, **kwargs):
if self.fmtdelayedopen and not self.isopen:
self.open()
return mtd(self, *args, **kwargs)
return nowopen
_openbeforedelayed = staticmethod(_openbeforedelayed)
def __init__(self, *args, **kwargs):
"""
Constructor. See its footprint for arguments.
"""
import os
super(Resource, self).__init__(*args, **kwargs)
self.container = footprints.proxy.container(filename=self.filename)
if self.openmode in ('r', 'a') and not os.path.exists(self.container.abspath):
raise IOError(self.container.abspath+" does not exist.")
# protection against unhappy overwrites...
if config.protect_unhappy_writes and \
os.path.exists(self.container.abspath) and self.openmode == 'w':
overwrite = raw_input(self.container.abspath+" will be overwritten: do you want to continue (y/n) ? ") == 'y'
if not overwrite:
raise epygramError(self.container.abspath+" already exists.")
def __del__(self):
"""
Destructor.
Closes the resource properly.
"""
try: self.close()
except Exception as e:
epylog.warning("Exception catched in epygram.base.Resource.__del__(): " + str(e))
[docs] def open(self):
"""
Opens the resource properly.
"""
pass
[docs] def close(self):
"""
Closes the file properly.
"""
pass
[docs] def readfields(self, requestedfields, getdata=True):
"""
Returns a :class:`FieldSet` containing requested fields read in the resource.
Args:
- *requestedfields*: a field identifier of the resource format, or a list of.
- *getdata*: optional, if *False*, only metadata are read, the fields do not contain data.
Default is *True*.
"""
fieldset = FieldSet()
if isinstance(requestedfields, list):
for f in requestedfields:
fieldset.append(self.readfield(f, getdata=getdata))
else:
fieldset.append(self.readfield(requestedfields, getdata=getdata))
return fieldset
[docs] def writefields(self, fieldset):
"""
Write the fields of the 'fieldset' in the resource;
*fieldset* must be a :class:`FieldSet` instance.
"""
if not isinstance(fieldset, FieldSet):
raise epygramError("'fieldset' argument must be a FieldSet instance.")
for field in fieldset:
self.writefield(field)
[docs] def listfields(self):
"""
Returns a list containing the identifiers (in the resource format)
of all the fields of the resource.
(Generic wrapper with buffering if openmode == 'r'.)
"""
if self.openmode == 'r' and not hasattr(self, '_bufferedlistfields'):
self._bufferedlistfields = []
elif self.openmode in ('w', 'a'):
# list of fields subject to evolution; no buffering
self._bufferedlistfields = None
if not self._bufferedlistfields:
fieldslist = self._listfields()
if self._bufferedlistfields != None:
self._bufferedlistfields = fieldslist # save
else:
# immutable and already read
fieldslist = self._bufferedlistfields
return fieldslist
def _listfields(self):
"""
Actual listfields() method (virtual).
"""
pass
[docs]class FieldValidity(RecursiveObject):
"""
This class handles the temporal validity of a meteorological field:
its date and time of validity (*date_time*), as well as the validity of its origin
(*basis*, i.e. for a forecast field for instance, the beginning of the forecast)
and its *term*.
An additional optional *cumulativeduration* parameter can define
the duration for which cumulative fields (e.g. precipitation) are valid.
Constructor arguments: cf. *set()* method.
"""
def __init__(self, date_time=None, basis=None, term=None, cumulativeduration=None):
"""
Constructor.
Args:
- date_time: has to be of type datetime.datetime;
- basis: has to be of type datetime.datetime;
- term: has to be of type datetime.timedelta.
- cumulativeduration: has to be of type datetime.timedelta.
"""
self._basis = None
self._date_time = None
self._cumulativeduration = None
kwargs = dict(date_time=date_time,
basis=basis,
term=term,
cumulativeduration=cumulativeduration)
if not (date_time==None and basis==None and term==None):
self.set(**kwargs)
[docs] def term(self, fmt=None):
"""
This method returns the term as the difference between date and time
of validity and basis.
By default, it is returned as a :class:`datetime.timedelta`;
otherwise, *fmt* argument can specify the desired return format.
Coded versions of *fmt*: 'IntHours', 'IntSeconds', and that's all for now...
"""
if fmt == None:
out = self._date_time - self._basis
elif fmt == 'IntHours':
term = self._date_time - self._basis
out = int(term.total_seconds()/3600)
elif fmt == 'IntSeconds':
term = self._date_time - self._basis
out = int(term.total_seconds())
else:
raise NotImplementedError("fmt="+fmt+" option for "+self.__class__.__name__+".term().")
return out
[docs] def cumulativeduration(self, fmt=None):
"""
This method returns the cumulative duration,
i.e. the duration for which cumulative fields (e.g. precipitation) are valid.
By default, it is returned as a :class:`datetime.timedelta`;
otherwise, *fmt* argument can specify the desired return format.
Coded versions of *fmt*: 'IntHours', 'IntSeconds', and that's all for now...
"""
if fmt == None:
out = self._cumulativeduration
elif fmt == 'IntHours':
out = int(self._cumulativeduration.total_seconds()/3600)
elif fmt == 'IntSeconds':
out = int(self._cumulativeduration.total_seconds())
else:
raise NotImplementedError("fmt="+fmt+" option for "+self.__class__.__name__+".cumulativeduration().")
return out
[docs] def get(self, fmt=None):
"""
Returns the date and time of validity.
By default, as a :class:`datetime.datetime`;
otherwise, *fmt* argument can specify the desired return format.
Coded versions of *fmt*: 'IntStr' (e.g. '20140731104812' = 2014 july 31th at 10h, 48m, 12s).
And that's all for now...
"""
if fmt == None:
out = self._date_time
elif fmt == 'IntStr':
out = '{:0>{width}}'.format(str(self._date_time.year), width=4) \
+ '{:0>{width}}'.format(str(self._date_time.month), width=2) \
+ '{:0>{width}}'.format(str(self._date_time.day), width=2) \
+ '{:0>{width}}'.format(str(self._date_time.hour), width=2) \
+ '{:0>{width}}'.format(str(self._date_time.minute), width=2) \
+ '{:0>{width}}'.format(str(self._date_time.second), width=2)
else:
raise NotImplementedError("fmt="+fmt+" option for "+self.__class__.__name__+".get().")
return out
[docs] def getbasis(self, fmt=None):
"""
Returns the date and time of origin (basis).
By default, as a :class:`datetime.datetime`;
otherwise, *fmt* argument can specify the desired return format.
Coded versions of *fmt*: 'IntStr' (e.g. '20140731104812' = 2014 july 31th at 10h, 48m, 12s).
And that's all for now...
"""
if fmt == None:
out = self._basis
elif fmt == 'IntStr':
out = '{:^{width}}'.format(str(self._date_time.year), width=4) \
+ '{:0>{width}}'.format(str(self._date_time.month), width=2) \
+ '{:0>{width}}'.format(str(self._date_time.day), width=2) \
+ '{:0>{width}}'.format(str(self._date_time.hour), width=2) \
+ '{:0>{width}}'.format(str(self._date_time.minute), width=2) \
+ '{:0>{width}}'.format(str(self._date_time.second), width=2)
else:
raise NotImplementedError("fmt="+fmt+" option for "+self.__class__.__name__+".getbasis().")
return out
[docs] def set(self, date_time=None, basis=None, term=None, cumulativeduration=None):
"""
Sets validity and basis according to arguments.
A consistency check is done if the three arguments are provided (which is useless anyway).
Args: \n
- *date_time*: has to be of type :class:`datetime.datetime`;
- *basis*: has to be of type :class:`datetime.datetime`;
- *term*: has to be of type :class:`datetime.timedelta`;
- *cumulativeduration*: has to be of type :class:`datetime.timedelta`.
"""
if isinstance(date_time, datetime.datetime):
self._date_time = date_time
elif date_time != None:
raise epygramError("argument 'date_time' must be of type datetime.datime")
if isinstance(basis, datetime.datetime):
self._basis = basis
elif basis != None:
raise epygramError("argument 'basis' must be of type datetime.datime")
if term != None and not isinstance(term, datetime.timedelta):
raise epygramError("argument 'term' must be of type datetime.timedelta")
if cumulativeduration != None and not isinstance(cumulativeduration, datetime.timedelta):
raise epygramError("argument 'cumulativeduration' must be of type datetime.timedelta")
if isinstance(term, datetime.timedelta):
if date_time != None and basis != None and term != None \
and date_time - basis != term:
raise epygramError("inconsistency between 'term', 'basis' and 'date_time' arguments.")
if self._date_time == None:
if self._basis == None:
raise epygramError("cannot set 'term' without 'basis' nor 'date_time'.")
else:
self._date_time = self._basis + term
else:
if self._basis == None:
self._basis = self._date_time - term
else:
self._date_time = self._basis + term
if cumulativeduration != None:
self._cumulativeduration = cumulativeduration