Source code for epygram.formats.netCDF

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Contains classes for netCDF4 resource.
"""

__all__ = ['netCDF']

import datetime
import dateutil
import copy
import json

import footprints
from footprints import proxy as fpx, FPDict

from epygram import config, epygramError, util
from epygram.base import FieldValidity
from epygram.resources import FileResource
from epygram.fields import H2DField

import netCDF4

epylog = footprints.loggers.getLogger(__name__)



[docs]class netCDF(FileResource): """Class implementing all specificities for netCDF (4) resource format.""" _footprint = dict( attr=dict( format=dict( values=set(['netCDF']), default='netCDF'), behaviour=dict( type=FPDict, optional=True, default=config.netCDF_default_behaviour) ) ) def __init__(self, *args, **kwargs): """Constructor. See its footprint for arguments.""" self.isopen = False super(netCDF, self).__init__(*args, **kwargs) if self.openmode in ('r', 'a'): try: guess = netCDF4.Dataset(self.container.abspath, self.openmode) except RuntimeError: raise IOError("this resource is not a netCDF one.") finally: guess.close() behaviour = copy.copy(config.netCDF_default_behaviour) behaviour.update(self.behaviour) self._attributes['behaviour'] = behaviour if not self.fmtdelayedopen: self.open()
[docs] def open(self, openmode=None): """ Opens a netCDF and initializes some attributes. - *openmode*: optional, to open with a specific openmode, eventually different from the one specified at initialization. """ super(netCDF, self).open(openmode=openmode) self._nc = netCDF4.Dataset(self.container.abspath, self.openmode) self.isopen = True
[docs] def close(self): """ Closes a netCDF. """ if hasattr(self, '_nc'): self._nc.close() self.isopen = False
[docs] def variables_number(self): """Return the number of variables in resource.""" return len(self._variables)
def _listfields(self): """Returns the fid list of the fields inside the resource.""" return self._variables.keys() @FileResource._openbeforedelayed
[docs] def readfield(self, fid, getdata=True): """ Reads one field, given its netCDF name, and returns a Field instance. Args: \n - *fid*: netCDF field identifier - *getdata*: if *False*, only metadata are read, the field do not contain data. """ if self.openmode == 'w': raise epygramError("cannot read fields in resource if with" + \ " openmode == 'w'.") assert fid in self.listfields(), ' '.join(["field", fid, "not found in resource."]) field_kwargs = {'fid':{'netCDF':fid}} # geometry dimensions = {d:len(self._dimensions[d]) for d in self._variables[fid].dimensions} geometryname = 'unstructured' if set(self._variables[fid].dimensions) == set(self.behaviour['H2D_dimensions_names']): read_as_miscfield = False # this is a H2D field structure = 'H2D' lons = self._variables[self.behaviour['variable_name_for_longitudes']][:, :] lats = self._variables[self.behaviour['variable_name_for_latitudes']][:, :] lons_dim = self._variables[self.behaviour['variable_name_for_longitudes']].dimensions[:] var_dim = [self._variables[fid].dimensions[i:i + len(lons_dim)] for i in range(0, len(self._variables[fid].dimensions) - len(lons_dim) + 1)] assert lons_dim in var_dim, \ "lons/lats and variable " + fid + " dimensions mismatch" i = var_dim.index(lons_dim) grid = {'longitudes':lons, 'latitudes':lats} else: epylog.warning("unable to assume geometry of field. Read as MiscField.") read_as_miscfield = True # validity if self.behaviour.get('variable_name_for_validity') in self._variables[fid].dimensions: # temporal dimension _validity = self._variables[self.behaviour['variable_name_for_validity']] raise NotImplementedError('temporal dimension of field: not yet !') elif 'validity' in self._variables[fid].ncattrs(): # validity stored as an attribute of variable (as in writefield !) try: _validity = json.loads(self._variables[fid].validity) _validity['basis'] = dateutil.parser.parse(_validity['basis']) _validity['date_time'] = dateutil.parser.parse(_validity['date_time']) if _validity.get('cumulativeduration') is not None: _validity['cumulativeduration'] = datetime.timedelta(seconds=float(_validity['cumulativeduration'])) except (KeyError, ValueError): epylog.warning("unable to decode validity attribute.") validity = FieldValidity() else: validity = FieldValidity(**_validity) # build field if not read_as_miscfield: field_kwargs['validity'] = validity kwargs_geom = {'structure':structure, 'name':geometryname, 'grid':grid, 'dimensions':dimensions, 'vcoordinate':{'structure':'V', 'typeoffirstfixedsurface':255, 'levels':[0]}, 'position_on_horizontal_grid':'center'} geometry = fpx.geometry(**kwargs_geom) field_kwargs['geometry'] = geometry field_kwargs['structure'] = structure comment = {} for a in self._variables[fid].ncattrs(): if read_as_miscfield or (not read_as_miscfield and a != 'validity'): comment.update({a:self._variables[fid].getncattr(a)}) comment = json.dumps(comment) if comment != '{}': field_kwargs['comment'] = comment field = fpx.field(**field_kwargs) if getdata: field.setdata(self._variables[fid][...]) return field
[docs] def writefield(self, field, compression=4, metadata={}): """ Write a field in resource. Args:\n - *compression* ranges from 1 (low compression, fast writing) to 9 (high compression, slow writing). 0 is no compression. - *metadata* can be filled by any meta-data, that will be stored as attribute of the netCDF variable. """ vartype = 'f8' if isinstance(field, H2DField): # dimensions dims = (k for k in field.geometry.dimensions.keys() if len(k) == 1) if self.behaviour['transpose_data_ordering']: dims = sorted(dims, reverse=False) else: dims = sorted(dims, reverse=True) for k in dims: if k not in self._dimensions: self._nc.createDimension(k, size=field.geometry.dimensions[k]) else: assert len(self._dimensions[k]) == field.geometry.dimensions[k], \ "dimensions mismatch: " + k + ": " + \ str(self._dimensions[k]) + " != " + str(field.geometry.dimensions[k]) # geometry (lons/lats) (lons, lats) = field.geometry.get_lonlat_grid() if self.behaviour['transpose_data_ordering']: lons = lons.transpose() lats = lats.transpose() if self.behaviour['variable_name_for_longitudes'] in self._variables: lons_ok = lons.shape == self._variables[self.behaviour['variable_name_for_longitudes']].shape assert lons_ok, "dimensions mismatch: lons grid." else: lons_var = self._nc.createVariable(self.behaviour['variable_name_for_longitudes'], vartype, dims) lons_var[...] = lons if self.behaviour['variable_name_for_latitudes'] in self._variables: lats_ok = lats.shape == self._variables[self.behaviour['variable_name_for_latitudes']].shape assert lats_ok, "dimensions mismatch: lats grid." else: lats_var = self._nc.createVariable(self.behaviour['variable_name_for_latitudes'], vartype, dims) lats_var[...] = lats # validity if len(field.validity) == 1: validity = {'basis':None, 'date_time':None} if field.validity[0].getbasis() is not None: validity['basis'] = field.validity[0].getbasis().isoformat() if field.validity[0].get() is not None: validity['date_time'] = field.validity[0].get().isoformat() if field.validity[0].cumulativeduration() is not None: validity['cumulativeduration'] = str(field.validity[0].cumulativeduration().total_seconds()) elif len(field.validity) > 1: raise NotImplementedError("not yet !") #TODO: create a 'time' dimension #if 'time' not in self._dimensions: # self._nc.createDimension('time', size=None) # for v in field.validity: # self._nc.dimensions['time'].append(v.get('IntStr')) #else: # # check that time dimension is compatible ? # raise NotImplementedError("not yet !") # create variable var = self._nc.createVariable(util.linearize(str(field.fid.get('netCDF', field.fid))), vartype, dims, zlib=bool(compression), complevel=compression) # set metadata if len(field.validity) == 1: var.validity = json.dumps(validity) if field.comment is not None: metadata.update(field.comment) for k, v in metadata.items(): setattr(var, k, v) # set data if self.behaviour['transpose_data_ordering']: data = field.data.transpose() else: data = field.data if 'gauss' in field.geometry.name: fill_value = -999999.9 var.missing_value = fill_value data = data.filled(fill_value) var[...] = data else: raise NotImplementedError("not yet !")
[docs] def behave(self, **kwargs): """ Set-up the given arguments in self.behaviour, for the purpose of building fields from netCDF. """ self.behaviour.update(kwargs)
@property @FileResource._openbeforedelayed def _dimensions(self): return self._nc.dimensions @property @FileResource._openbeforedelayed def _variables(self): return self._nc.variables