Source code for epygram.base

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy
import datetime

import footprints
from footprints import FootprintBase, FPDict

from epygram import epygramError, config, epylog
from epygram.util import RecursiveObject, nicedeco



[docs]class Field(RecursiveObject, FootprintBase): """ Generic abstract class implementing a Field, composed of an identifier and a data. The field identifier *fid* identifies a field, in several format denominations. E.g. *{FA='SURFTEMPERATURE', GRIB2={'4.2-0-0':0, '4.5':1}}*. """ _collector = ('field',) _abstract = True _footprint = dict( attr = dict( fid = dict( type = FPDict), data = dict( optional = True, default=None), comment = dict( type = str, optional = True, access = 'rwx') ) )
[docs] def setdata(self, data): """ Sets or overwrites the field data as a numpy array. Mainly useful because a footprints attribute cannot be a numpy array... """ if not isinstance(data, numpy.ndarray): data = numpy.array(data, dtype=numpy.float64) self._attributes['data'] = data ############# # OPERATORS # #############
def _check_operands(self, other): """ Internal method to check compatibility of terms in operations on fields. """ if not isinstance(other, self.__class__): try: other = float(other) except Exception: raise ValueError("operations on "+self.__class__.__name__ + " must involve either scalars (integer/float) or " + self.__class__.__name__+".") else: if numpy.shape(self.data) != numpy.shape(other.data): raise epygramError("dimensions mismatch.") def _add(self, other, **kwargs): """ Definition of addition, 'other' being: - a scalar (integer/float) - another Field of the same subclass. Returns a new Field whose data is the resulting operation, with 'fid' = {'op':'+'} and null validity. """ self._check_operands(other) if isinstance(other, self.__class__): rhs = other.data else: rhs = other result = self.data + rhs newid = {'op':'+'} newfield = footprints.proxy.field(fid=footprints.FPDict(newid), **kwargs) newfield.setdata(result) return newfield def _mul(self, other, **kwargs): """ Definition of multiplication, 'other' being: - a scalar (integer/float) - another Field of the same subclass. Returns a new Field whose data is the resulting operation, with 'fid' = {'op':'*'} and null validity. """ self._check_operands(other) if isinstance(other, self.__class__): rhs = other.data else: rhs = other result = self.data * rhs newid = {'op':'*'} newfield = footprints.proxy.field(fid=footprints.FPDict(newid), **kwargs) newfield.setdata(result) return newfield def _sub(self, other, **kwargs): """ Definition of substraction, 'other' being: - a scalar (integer/float) - another Field of the same subclass. Returns a new Field whose data is the resulting operation, with 'fid' = {'op':'-'} and null validity. """ self._check_operands(other) if isinstance(other, self.__class__): rhs = other.data else: rhs = other result = self.data - rhs newid = {'op':'-'} newfield = footprints.proxy.field(fid=footprints.FPDict(newid), **kwargs) newfield.setdata(result) return newfield def _div(self, other, **kwargs): """ Definition of division, 'other' being: - a scalar (integer/float) - another Field of the same subclass. Returns a new Field whose data is the resulting operation, with 'fid' = {'op':'/'} and null validity. """ self._check_operands(other) if isinstance(other, self.__class__): rhs = other.data else: rhs = other result = self.data / rhs newid = {'op':'/'} newfield = footprints.proxy.field(fid=footprints.FPDict(newid), **kwargs) newfield.setdata(result) return newfield # default behaviors def __add__(self, other): return self._add(other) def __mul__(self, other): return self._mul(other) def __sub__(self, other): return self._sub(other) def __div__(self, other): return self._div(other)
[docs]class FieldSet(RecursiveObject, list): """ Handles a set of Fields, in the manner of Python's builtin list, with some extra features, especially ensuring its components all are Fields. Constructor optional argument *fields* has to be either a :class:`Field` or an iterable of. """ def __init__(self, fields=()): """ Constructor. Checks that optional 'fields' argument is actually iterable and contains Field instances, or is a single Field. """ if fields == (): pass elif isinstance(fields, Field): fields = (fields,) else: try: for item in fields: if not isinstance(item, Field): raise epygramError("A FieldSet can only be made out of Field instances.") except TypeError: raise epygramError("'fields' argument must be either a Field instance or an iterable of fields.") except Exception as e: raise e super(FieldSet, self).__init__(fields) def __setitem__(self, position, field): if not isinstance(field, Field): raise epygramError("A FieldSet can contain only Field instances.") super(FieldSet, self).__setitem__(position, field) def __setslice__(self, pos1, pos2, fieldset): if not isinstance(fieldset, FieldSet): raise epygramError("'fieldset' argument must be of kind FieldSet.") super(FieldSet, self).__setslice__(pos1, pos2, fieldset)
[docs] def append(self, field): """ Checks that *field* is a :class:`Field` instance before appending it. """ if not isinstance(field, Field): raise epygramError("A FieldSet can contain only Field instances.") super(FieldSet, self).append(field)
[docs] def index(self, fid): """ Returns the index of the first field of the FieldSet matching *fid*, 'fid' being a simple dict: *{typefmt:identifier}*. """ if not isinstance(fid, dict) or len(fid) > 1: raise ValueError("'fid' must be a simple {typefmt:identifier}") typefmt, identifier = fid.items()[0] idx = None for f in range(0, len(self)): if self[f].fid[typefmt] == identifier: idx = f break return idx
[docs] def extend(self, fieldset): """ Checks that *fieldset* is a :class:`FieldSet` instance before extending with it. """ if not isinstance(fieldset, FieldSet): raise epygramError("'fieldset' argument must be of kind FieldSet.") super(FieldSet, self).extend(fieldset)
[docs] def insert(self, position, field): """ Checks that *field* is a :class:`Field` instance before inserting it. """ if not isinstance(field, Field): raise epygramError("A FieldSet can contain only Field instances.") super(FieldSet, self).insert(position, field)
[docs] def remove(self, fid): """ Removes from the FieldSet the first field matching *fid*, 'fid' being a simple dict: {typefmt:identifier} """ try: idx = self.index(fid) del self[idx] except Exception: pass
[docs] def sort(self, attribute, key=None, reverse=False): """ Sorts the fields of the FieldSet by the increasing criterion. If attribute is a string, sorting will be done according to *field.attribute[key]* or *field.attribute* (if *key==None*). If attribute is a list *[a1, a2...]*, sorting will be done according to *field.a1.a2[key]* or *field.a1.a2* (if *key==None*). If *reverse* is *True*, sorts by decreasing order. """ if isinstance(attribute, str): if key == None: cmpfct = lambda x,y: cmp(x._attributes[attribute], y._attributes[attribute]) else: cmpfct = lambda x,y: cmp(x._attributes[attribute][key], y._attributes[attribute][key]) elif isinstance(attribute, list): a = attribute if isinstance(self[0]._attributes[a[0]], FootprintBase): if len(attribute) == 2: if key == None: cmpfct = lambda x,y: cmp(x._attributes[a[0]]._attributes[a[1]], y._attributes[a[0]]._attributes[a[1]]) else: cmpfct = lambda x,y: cmp(x._attributes[a[0]]._attributes[a[1]][key], y._attributes[a[0]]._attributes[a[1]][key]) elif len(attribute) == 3: if key == None: cmpfct = lambda x,y: cmp(x._attributes[a[0]]._attributes[a[1]].__dict__[a[2]], y._attributes[a[0]]._attributes[a[1]].__dict__[a[2]]) else: cmpfct = lambda x,y: cmp(x._attributes[a[0]]._attributes[a[1]].__dict__[a[2]][key], y._attributes[a[0]]._attributes[a[1]].__dict__[a[2]][key]) else: raise NotImplementedError("len(attribute) > 3.") else: if len(attribute) == 2: if key == None: cmpfct = lambda x,y: cmp(x._attributes[a[0]].__dict__[a[1]], y._attributes[a[0]].__dict__[a[1]]) else: cmpfct = lambda x,y: cmp(x._attributes[a[0]].__dict__[a[1]][key], y._attributes[a[0]].__dict__[a[1]][key]) elif len(attribute) == 3: if key == None: cmpfct = lambda x,y: cmp(x._attributes[a[0]].__dict__[a[1]].__dict__[a[2]], y._attributes[a[0]].__dict__[a[1]].__dict__[a[2]]) else: cmpfct = lambda x,y: cmp(x._attributes[a[0]].__dict__[a[1]].__dict__[a[2]][key], y._attributes[a[0]].__dict__[a[1]].__dict__[a[2]][key]) else: raise NotImplementedError("len(attribute) > 3.") else: raise TypeError("attribute must be a string or list of string.") super(FieldSet, self).sort(cmp=cmpfct, reverse=reverse)
[docs] def listfields(self, typefmt=None): """ Returns a list of the identifiers of the FieldSet. If *typefmt* is supplied, the list contains only *fid[typefmt]*, and not whole fid. """ fieldslist = [] for field in self: if typefmt != None: fieldslist.append(field.fid[typefmt]) else: fieldslist.append(field.fid) return fieldslist
[docs] def filter(self, by, criteria): """ Not Implemented Yet. Returns a new FieldSet filtered according to criteria specified in argument. Args: - by: the kind of filter; on what to filter ? - criteria: how to filter on that ? Available filters, examples: - by='id', criteria={typefmt:identifier} will return only fields whose id[typefmt] match value... """ #TODO: id=, fieldtype=, spectral, (...) raise NotImplementedError("not yet...")
[docs]class Resource(RecursiveObject, FootprintBase): """ Generic abstract class implementing a Resource. """ _abstract = True _collector = ('dataformat',) _footprint = dict( attr = dict( format = dict( optional = True, info = "Format of the resource."), filename = dict( info = "File name (absolute or relative) of the resource."), openmode = dict( values = set(['r', 'read', 'w', 'write', 'a', 'append']), remap = dict( read = 'r', write = 'w', append = 'a'), info = "Opening mode."), fmtdelayedopen = dict( optional = True, default = False, type = bool, info = "Opening of the resource delayed (not at time of construction).") ) ) @nicedeco def _openbeforedelayed(mtd): def nowopen(self, *args, **kwargs): if self.fmtdelayedopen and not self.isopen: self.open() return mtd(self, *args, **kwargs) return nowopen _openbeforedelayed = staticmethod(_openbeforedelayed) def __init__(self, *args, **kwargs): """ Constructor. See its footprint for arguments. """ import os super(Resource, self).__init__(*args, **kwargs) self.container = footprints.proxy.container(filename=self.filename) if self.openmode in ('r', 'a') and not os.path.exists(self.container.abspath): raise IOError(self.container.abspath+" does not exist.") # protection against unhappy overwrites... if config.protect_unhappy_writes and \ os.path.exists(self.container.abspath) and self.openmode == 'w': overwrite = raw_input(self.container.abspath+" will be overwritten: do you want to continue (y/n) ? ") == 'y' if not overwrite: raise epygramError(self.container.abspath+" already exists.") def __del__(self): """ Destructor. Closes the resource properly. """ try: self.close() except Exception as e: epylog.warning("Exception catched in epygram.base.Resource.__del__(): " + str(e))
[docs] def open(self): """ Opens the resource properly. """ pass
[docs] def close(self): """ Closes the file properly. """ pass
[docs] def readfields(self, requestedfields, getdata=True): """ Returns a :class:`FieldSet` containing requested fields read in the resource. Args: - *requestedfields*: a field identifier of the resource format, or a list of. - *getdata*: optional, if *False*, only metadata are read, the fields do not contain data. Default is *True*. """ fieldset = FieldSet() if isinstance(requestedfields, list): for f in requestedfields: fieldset.append(self.readfield(f, getdata=getdata)) else: fieldset.append(self.readfield(requestedfields, getdata=getdata)) return fieldset
[docs] def writefields(self, fieldset): """ Write the fields of the 'fieldset' in the resource; *fieldset* must be a :class:`FieldSet` instance. """ if not isinstance(fieldset, FieldSet): raise epygramError("'fieldset' argument must be a FieldSet instance.") for field in fieldset: self.writefield(field)
[docs] def listfields(self): """ Returns a list containing the identifiers (in the resource format) of all the fields of the resource. (Generic wrapper with buffering if openmode == 'r'.) """ if self.openmode == 'r' and not hasattr(self, '_bufferedlistfields'): self._bufferedlistfields = [] elif self.openmode in ('w', 'a'): # list of fields subject to evolution; no buffering self._bufferedlistfields = None if not self._bufferedlistfields: fieldslist = self._listfields() if self._bufferedlistfields != None: self._bufferedlistfields = fieldslist # save else: # immutable and already read fieldslist = self._bufferedlistfields return fieldslist
def _listfields(self): """ Actual listfields() method (virtual). """ pass
[docs]class FieldValidity(RecursiveObject): """ This class handles the temporal validity of a meteorological field: its date and time of validity (*date_time*), as well as the validity of its origin (*basis*, i.e. for a forecast field for instance, the beginning of the forecast) and its *term*. An additional optional *cumulativeduration* parameter can define the duration for which cumulative fields (e.g. precipitation) are valid. Constructor arguments: cf. *set()* method. """ def __init__(self, date_time=None, basis=None, term=None, cumulativeduration=None): """ Constructor. Args: - date_time: has to be of type datetime.datetime; - basis: has to be of type datetime.datetime; - term: has to be of type datetime.timedelta. - cumulativeduration: has to be of type datetime.timedelta. """ self._basis = None self._date_time = None self._cumulativeduration = None kwargs = dict(date_time=date_time, basis=basis, term=term, cumulativeduration=cumulativeduration) if not (date_time==None and basis==None and term==None): self.set(**kwargs)
[docs] def term(self, fmt=None): """ This method returns the term as the difference between date and time of validity and basis. By default, it is returned as a :class:`datetime.timedelta`; otherwise, *fmt* argument can specify the desired return format. Coded versions of *fmt*: 'IntHours', 'IntSeconds', and that's all for now... """ if fmt == None: out = self._date_time - self._basis elif fmt == 'IntHours': term = self._date_time - self._basis out = int(term.total_seconds()/3600) elif fmt == 'IntSeconds': term = self._date_time - self._basis out = int(term.total_seconds()) else: raise NotImplementedError("fmt="+fmt+" option for "+self.__class__.__name__+".term().") return out
[docs] def cumulativeduration(self, fmt=None): """ This method returns the cumulative duration, i.e. the duration for which cumulative fields (e.g. precipitation) are valid. By default, it is returned as a :class:`datetime.timedelta`; otherwise, *fmt* argument can specify the desired return format. Coded versions of *fmt*: 'IntHours', 'IntSeconds', and that's all for now... """ if fmt == None: out = self._cumulativeduration elif fmt == 'IntHours': out = int(self._cumulativeduration.total_seconds()/3600) elif fmt == 'IntSeconds': out = int(self._cumulativeduration.total_seconds()) else: raise NotImplementedError("fmt="+fmt+" option for "+self.__class__.__name__+".cumulativeduration().") return out
[docs] def get(self, fmt=None): """ Returns the date and time of validity. By default, as a :class:`datetime.datetime`; otherwise, *fmt* argument can specify the desired return format. Coded versions of *fmt*: 'IntStr' (e.g. '20140731104812' = 2014 july 31th at 10h, 48m, 12s). And that's all for now... """ if fmt == None: out = self._date_time elif fmt == 'IntStr': out = '{:0>{width}}'.format(str(self._date_time.year), width=4) \ + '{:0>{width}}'.format(str(self._date_time.month), width=2) \ + '{:0>{width}}'.format(str(self._date_time.day), width=2) \ + '{:0>{width}}'.format(str(self._date_time.hour), width=2) \ + '{:0>{width}}'.format(str(self._date_time.minute), width=2) \ + '{:0>{width}}'.format(str(self._date_time.second), width=2) else: raise NotImplementedError("fmt="+fmt+" option for "+self.__class__.__name__+".get().") return out
[docs] def getbasis(self, fmt=None): """ Returns the date and time of origin (basis). By default, as a :class:`datetime.datetime`; otherwise, *fmt* argument can specify the desired return format. Coded versions of *fmt*: 'IntStr' (e.g. '20140731104812' = 2014 july 31th at 10h, 48m, 12s). And that's all for now... """ if fmt == None: out = self._basis elif fmt == 'IntStr': out = '{:^{width}}'.format(str(self._date_time.year), width=4) \ + '{:0>{width}}'.format(str(self._date_time.month), width=2) \ + '{:0>{width}}'.format(str(self._date_time.day), width=2) \ + '{:0>{width}}'.format(str(self._date_time.hour), width=2) \ + '{:0>{width}}'.format(str(self._date_time.minute), width=2) \ + '{:0>{width}}'.format(str(self._date_time.second), width=2) else: raise NotImplementedError("fmt="+fmt+" option for "+self.__class__.__name__+".getbasis().") return out
[docs] def set(self, date_time=None, basis=None, term=None, cumulativeduration=None): """ Sets validity and basis according to arguments. A consistency check is done if the three arguments are provided (which is useless anyway). Args: \n - *date_time*: has to be of type :class:`datetime.datetime`; - *basis*: has to be of type :class:`datetime.datetime`; - *term*: has to be of type :class:`datetime.timedelta`; - *cumulativeduration*: has to be of type :class:`datetime.timedelta`. """ if isinstance(date_time, datetime.datetime): self._date_time = date_time elif date_time != None: raise epygramError("argument 'date_time' must be of type datetime.datime") if isinstance(basis, datetime.datetime): self._basis = basis elif basis != None: raise epygramError("argument 'basis' must be of type datetime.datime") if term != None and not isinstance(term, datetime.timedelta): raise epygramError("argument 'term' must be of type datetime.timedelta") if cumulativeduration != None and not isinstance(cumulativeduration, datetime.timedelta): raise epygramError("argument 'cumulativeduration' must be of type datetime.timedelta") if isinstance(term, datetime.timedelta): if date_time != None and basis != None and term != None \ and date_time - basis != term: raise epygramError("inconsistency between 'term', 'basis' and 'date_time' arguments.") if self._date_time == None: if self._basis == None: raise epygramError("cannot set 'term' without 'basis' nor 'date_time'.") else: self._date_time = self._basis + term else: if self._basis == None: self._basis = self._date_time - term else: self._date_time = self._basis + term if cumulativeduration != None: self._cumulativeduration = cumulativeduration