Source code for epygram.resources.MultiValiditiesResource

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Contains the class that handle a MultiValiditiesResource.
"""

from __future__ import print_function, absolute_import, unicode_literals, division

import copy
import numpy

from footprints import FPList, FPDict, proxy as fpx

from epygram import epygramError
from epygram.util import fmtfid
from epygram.base import Resource, FieldSet, FieldValidityList
from . import open_and_close_resource


[docs]class MultiValiditiesResource(Resource): """Class implementing a MultiValiditiesResource.""" _collector = ('resource_modificator', 'epyresource') _footprint = dict( attr=dict( resources=dict( type=FPList, info="List of resources to join."), name=dict( values=set(['MultiValidities'])) ) ) def __init__(self, *args, **kwargs): super(Resource, self).__init__(*args, **kwargs) if len(self.resources) == 0: raise epygramError("There must be at least one resource in *resources*") for r in self.resources: if r.openmode != self.openmode: raise epygramError("All low level resources must be opened using the same mode as this high-level resource.") if len(set([r.format for r in self.resources])) != 1: raise epygramError("All low level resources must have the same format.") self.format = "MultiValidities" self.lowLevelFormat = self.resources[0].format self.isopen = True # resource always appear to be open
[docs] def close(self): """Closes all resources.""" for r in self.resources: try: r.close() except Exception: pass
[docs] def find_fields_in_resource(self, *args, **kwargs): """Call to find_fields_in_resource""" tmp = [] for r in self.resources: with open_and_close_resource(r): tmp.extend(r.find_fields_in_resource(*args, **kwargs)) result = [] for res in tmp: if res not in result: result.append(res) return result
[docs] def listfields(self, *args, **kwargs): """Call to listfields.""" complete = 'complete' in kwargs and kwargs['complete'] tmp = [] for r in self.resources: with open_and_close_resource(r): fidlist = r.listfields(*args, **kwargs) if complete: for fid in fidlist: fid[self.format] = fid[fmtfid(r.format, fid)] tmp.extend(fidlist) result = [] for res in tmp: if res not in result: result.append(res) return result
[docs] def sortfields(self, *args, **kwargs): """Call to sortfields""" tmp = {} for r in self.resources: with open_and_close_resource(r): for k, v in r.sortfields(*args, **kwargs).items(): tmp[k] = tmp.get(k, []) + v result = {} for k, v in tmp.items(): result[k] = list(set(v)) return result
[docs] def readfield(self, *args, **kwargs): """ Reads the field in the different resources and join the validities. """ fieldset = FieldSet() for r in self.resources: with open_and_close_resource(r): fieldset.append(r.readfield(*args, **kwargs)) return self._join_validities(fieldset, **kwargs)
[docs] def writefield(self, *args, **kwargs): """Write fields.""" raise NotImplementedError("writefield is not impelemented")
# To implement writefield we need to check that each validity is affected to only one resource
[docs] def extractprofile(self, *args, **kwargs): """ Extracts the profiles in the different resources and join the validities. """ fieldset = FieldSet() for r in self.resources: with open_and_close_resource(r): fieldset.append(r.extractprofile(*args, **kwargs)) return self._join_validities(fieldset)
[docs] def extractsection(self, *args, **kwargs): """ Extracts the sections in the different resources and join the validities. """ fieldset = FieldSet() for r in self.resources: with open_and_close_resource(r): fieldset.append(r.extractsection(*args, **kwargs)) return self._join_validities(fieldset)
@property def spectral_geometry(self): """ Returns the spectral_geometry """ ref = self.resources[0].spectral_geometry if numpy.all([r.spectral_geometry == ref for r in self.resources]): return ref else: raise epygramError("All spectral_geometry are not identical") def _join_validities(self, fieldset, **kwargs): """ Join the different fields """ # Validities validities = {} for i in range(len(fieldset)): field = fieldset[i] if len(field.validity) != 1: raise NotImplementedError("Join of several multi validities field is not (yet?) implemented") validities[field.validity.get()] = i if len(validities) == 1 and len(fieldset) != 1: validities = {} for i in range(len(fieldset)): field = fieldset[i] if len(field.validity) != 1: raise NotImplementedError("Join of several multi validities field is not (yet?) implemented") validities[field.validity.getbasis()] = i if len(validities) != len(fieldset): raise epygramError("Several resources have returned the same validity") sortedValidities = sorted(validities.keys()) # Geometries kwargs_geom = copy.deepcopy(fieldset[0].geometry.footprint_as_dict()) kwargs_geom['vcoordinate'] = fpx.geometry(structure='V', typeoffirstfixedsurface=255, levels=[255]) for k, v in kwargs_geom.items(): if isinstance(v, FPDict): kwargs_geom[k] = dict(v) geometry = fpx.geometry(**kwargs_geom) kwargs_vcoord = copy.deepcopy(fieldset[0].geometry.vcoordinate.footprint_as_dict()) kwargs_vcoord['grid'] = dict(kwargs_vcoord['grid']) # if kwargs_vcoord['grid'].get('gridlevels'): # kwargs_vcoord['grid']['gridlevels'] = list(kwargs_vcoord['grid']['gridlevels']) kwargs_vcoord['levels'] = [] vcoordinate = fpx.geometry(**kwargs_vcoord) levels = fieldset[0].geometry.vcoordinate.levels joinLevels = False spectral = fieldset[0].spectral for field in fieldset[1:]: if spectral != field.spectral: raise epygramError("All fields must be gridpoint or spectral") kwargs_geom = copy.deepcopy(field.geometry.footprint_as_dict()) kwargs_geom['vcoordinate'] = fpx.geometry(structure='V', typeoffirstfixedsurface=255, levels=[255]) for k, v in kwargs_geom.items(): if isinstance(v, FPDict): kwargs_geom[k] = dict(v) geometry_field = fpx.geometry(**kwargs_geom) kwargs_vcoord = copy.deepcopy(field.geometry.vcoordinate.footprint_as_dict()) kwargs_vcoord['grid'] = dict(kwargs_vcoord['grid']) # if kwargs_vcoord['grid'].get('gridlevels'): # kwargs_vcoord['grid']['gridlevels'] = FPList(kwargs_vcoord['grid']['gridlevels']) kwargs_vcoord['levels'] = [] vcoordinate_field = fpx.geometry(**kwargs_vcoord) levels_field = field.geometry.vcoordinate.levels if geometry != geometry_field: raise epygramError("All resources must return fields with the same geometry.") if vcoordinate != vcoordinate_field: raise epygramError("All resources must return fields with the same vertical geometry.") if numpy.any(numpy.array(levels) != numpy.array(levels_field)): if len(vcoordinate.grid) > 0: raise epygramError("All resources must return fields with the same vertical geometry.") else: if len(levels) != len(levels_field): raise epygramError("All resources must return fields with the same vertical geometry length.") joinLevels = True kwargs_vcoord = copy.deepcopy(fieldset[0].geometry.vcoordinate.footprint_as_dict()) if joinLevels: if spectral: raise epygramError("Not sure how to merge vertical levels when spectral") concat = numpy.concatenate newLevels = [] for i in range(len(fieldset)): mylevel = fieldset[validities[sortedValidities[i]]].geometry.get_levels(d4=True, nb_validities=len(field.validity)) if isinstance(mylevel, numpy.ma.masked_array): concat = numpy.ma.concatenate newLevels.append(mylevel) kwargs_vcoord['levels'] = list(concat(newLevels, axis=0).swapaxes(0, 1).squeeze()) geometry.vcoordinate = fpx.geometry(**kwargs_vcoord) else: geometry.vcoordinate = fpx.geometry(**kwargs_vcoord) # Other metadata kwargs_field = copy.deepcopy(fieldset[0].footprint_as_dict()) kwargs_field['validity'] = FieldValidityList() kwargs_field['geometry'] = geometry for k, v in kwargs_field.items(): if isinstance(v, FPDict): kwargs_field[k] = dict(v) field = fpx.field(**kwargs_field) sameProcesstype = True for f in fieldset[1:]: kwargs_field = copy.deepcopy(f.footprint_as_dict()) kwargs_field['validity'] = FieldValidityList() kwargs_field['geometry'] = geometry sameProcesstype = sameProcesstype and kwargs_field['processtype'] == fieldset[0].processtype kwargs_field['processtype'] = fieldset[0].processtype # we exclude processtype from the comparison for k, v in kwargs_field.items(): if isinstance(v, FPDict): kwargs_field[k] = dict(v) myf = fpx.field(**kwargs_field) if field != myf: raise epygramError("All fields must be of the same kind") # Returned field fieldvaliditylist = FieldValidityList() fieldvaliditylist.pop() getdata = kwargs.get('getdata', True) for i in range(len(fieldset)): field = fieldset[validities[sortedValidities[i]]] fieldvaliditylist.extend(field.validity) if getdata: if i == 0: data = field.getdata(d4=True) else: if isinstance(data, numpy.ma.masked_array): concat = numpy.ma.concatenate else: concat = numpy.concatenate data = concat([data, field.getdata(d4=True)], axis=0) if not sameProcesstype: kwargs_field['processtype'] = None kwargs_field['validity'] = fieldvaliditylist kwargs_field['fid'][self.format] = kwargs_field['fid'][fmtfid(self.lowLevelFormat, kwargs_field['fid'])] field = fpx.field(**kwargs_field) if getdata: field.setdata(data) return field