#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Contains classes for netCDF4 resource.
"""
from __future__ import print_function, absolute_import, unicode_literals, division
import copy
import json
import sys
import numpy
from collections import OrderedDict
import datetime
import six
import re
import netCDF4
import footprints
from footprints import proxy as fpx, FPDict, FPList
from bronx.syntax.arrays import stretch_array
from epygram import config, epygramError, util
from epygram.base import FieldValidity, FieldValidityList
from epygram.resources import FileResource
from epygram.util import Angle
__all__ = ['netCDF']
epylog = footprints.loggers.getLogger(__name__)
_typeoffirstfixedsurface_dict = {'altitude':102,
'height':103,
'atmosphere_hybrid_sigma_pressure_coordinate':119,
'atmosphere_hybrid_height_coordinate':118,
'pressure':100}
_typeoffirstfixedsurface_short_dict = _typeoffirstfixedsurface_dict.copy()
_typeoffirstfixedsurface_short_dict.update({'hybrid-pressure':119,
'hybrid-height':118})
_typeoffirstfixedsurface_dict_inv = {v:k for k, v in _typeoffirstfixedsurface_dict.items()}
_typeoffirstfixedsurface_short_dict_inv = {v:k for k, v in _typeoffirstfixedsurface_short_dict.items()}
_proj_dict = {'lambert':'lambert_conformal_conic',
'mercator':'mercator',
'polar_stereographic':'polar_stereographic'}
_proj_dict_inv = {v:k for k, v in _proj_dict.items()}
[docs]class netCDF(FileResource):
"""Class implementing all specificities for netCDF (4) resource format."""
_footprint = dict(
attr=dict(
format=dict(
values=set(['netCDF']),
default='netCDF'),
behaviour=dict(
info="Describes how fields are defined in resource.",
type=FPDict,
optional=True,
default=config.netCDF_default_behaviour)
)
)
def __init__(self, *args, **kwargs):
self.isopen = False
super(netCDF, self).__init__(*args, **kwargs)
if self.openmode in ('r', 'a'):
try:
guess = netCDF4.Dataset(self.container.abspath, self.openmode)
except RuntimeError:
raise IOError("this resource is not a netCDF one.")
else:
guess.close()
behaviour = copy.copy(config.netCDF_default_behaviour)
behaviour.update(self.behaviour)
self._attributes['behaviour'] = behaviour
if not self.fmtdelayedopen:
self.open()
[docs] def open(self, openmode=None):
"""
Opens a netCDF and initializes some attributes.
:param openmode: optional, to open with a specific openmode, eventually
different from the one specified at initialization.
"""
super(netCDF, self).open(openmode=openmode)
self._nc = netCDF4.Dataset(self.container.abspath, self.openmode)
self.isopen = True
if self.openmode == 'w':
self.set_default_global_attributes()
[docs] def close(self):
"""Closes a netCDF."""
if hasattr(self, '_nc') and self._nc._isopen:
self._nc.close()
self.isopen = False
[docs] def variables_number(self):
"""Return the number of variables in resource."""
return len(self._variables)
[docs] def find_fields_in_resource(self, seed=None, generic=False, **_):
"""
Returns a list of the fields from resource whose name match the given
seed.
:param seed: might be a regular expression, a list of regular expressions
or *None*. If *None* (default), returns the list of all fields in
resource.
:param fieldtype: optional, among ('H2D', 'Misc') or a list of these strings.
If provided, filters out the fields not of the given types.
:param generic: if True, returns complete fid's,
union of {'FORMATname':fieldname} and the according generic fid of
the fields.
"""
if seed is None:
fieldslist = self.listfields()
elif isinstance(seed, six.string_types):
fieldslist = util.find_re_in_list(seed, self.listfields())
elif isinstance(seed, list):
fieldslist = []
for s in seed:
fieldslist += util.find_re_in_list(s, self.listfields())
if fieldslist == []:
raise epygramError("no field matching '" + seed +
"' was found in resource " +
self.container.abspath)
if generic:
raise NotImplementedError("not yet.")
return fieldslist
def _listfields(self):
"""Returns the fid list of the fields inside the resource."""
return list(self._variables.keys())
@FileResource._openbeforedelayed
def ncinfo_field(self, fid):
"""
Get info about the field (dimensions and meta-data of the netCDF variable).
:param fid: netCDF field identifier
"""
assert fid in self.listfields(), 'field: ' + fid + ' not in resource.'
dimensions = OrderedDict()
for d in self._variables[fid].dimensions:
dimensions[d] = len(self._dimensions[d])
metadata = {a:getattr(self._variables[fid], a) for a in self._variables[fid].ncattrs()}
return {'dimensions':dimensions,
'metadata':metadata}
@FileResource._openbeforedelayed
def readfield(self, fid,
getdata=True,
only=None,
adhoc_behaviour=None):
"""
Reads one field, given its netCDF name, and returns a Field instance.
:param fid: netCDF field identifier
:param getdata: if *False*, only metadata are read, the field do not
contain data.
:param only: to specify indexes [0 ... n-1] of specific dimensions,
e.g. {'time':5,} to select only the 6th term of time dimension.
:param adhoc_behaviour: to specify "on the fly" a behaviour (usual
dimensions or grids, ...).
"""
# 0. initialization
assert self.openmode != 'w', \
"cannot read fields in resource if with openmode == 'w'."
assert fid in self.listfields(), \
' '.join(["field", fid, "not found in resource."])
only = util.ifNone_emptydict(only)
adhoc_behaviour = util.ifNone_emptydict(adhoc_behaviour)
field_kwargs = {'fid':{'netCDF':fid}}
variable = self._variables[fid]
behaviour = self.behaviour.copy()
behaviour.update(adhoc_behaviour)
# 1.1 identify usual dimensions
all_dimensions_e2n = {}
for d in self._dimensions.keys():
for sd in config.netCDF_standard_dimensions:
if d in config.netCDF_usualnames_for_standard_dimensions[sd]:
all_dimensions_e2n[sd] = d
variable_dimensions = {d:len(self._dimensions[d]) for d in variable.dimensions}
for d in variable_dimensions.keys():
for sd in config.netCDF_standard_dimensions:
# if behaviour is not explicitly given,
# try to find out who is "d" among the standard dimensions
if sd not in behaviour and d in config.netCDF_usualnames_for_standard_dimensions[sd]:
behaviour[sd] = d
dims_dict_n2e = {}
for d in variable_dimensions.keys():
for k in config.netCDF_standard_dimensions:
if d == behaviour.get(k):
dims_dict_n2e[d] = k
dims_dict_e2n = {v:k for k, v in dims_dict_n2e.items()}
# 1.2 try to identify grids
for f in self.listfields():
for sd in config.netCDF_standard_dimensions:
sg = sd.replace('dimension', 'grid')
# if behaviour is not explicitly given,
# try to find out who is "f" among the standard grids
if sg not in behaviour and f in config.netCDF_usualnames_for_standard_dimensions[sd] or \
f == behaviour.get(sd):
behaviour[sg] = f
# 2. time
def get_validity(T_varname):
validity = FieldValidityList()
validity.pop()
if T_varname not in self._listfields():
raise epygramError('unable to find T_grid in variables.')
T = self._variables[T_varname][:]
if len(self._variables[T_varname].dimensions) == 0:
T = [T]
time_unit = getattr(self._variables[T_varname], 'units', '')
if re.match('(hours|seconds|days|minutes)\s+since.+$', time_unit):
T = netCDF4.num2date(T, time_unit).squeeze().reshape((len(T),))
T = [datetime.datetime(*t.timetuple()[:6]) for t in T] # FIXME: not sure of that for dates older than julian/gregorian calendar
basis = netCDF4.num2date(0, time_unit)
if basis.year <= 1582:
epylog.warning('suspicion of inconsistency of julian/gregorian dates')
basis = datetime.datetime(*basis.timetuple()[:6]) # FIXME: not sure of that for dates older than julian/gregorian calendar
for v in T:
validity.append(FieldValidity(date_time=v, basis=basis))
else:
epylog.warning('temporal unit is not CF1.6-compliant, cannot decode.')
for v in T:
validity.append(FieldValidity())
return validity
if 'T_dimension' in dims_dict_e2n:
# field has a time dimension
var_corresponding_to_T_grid = behaviour.get('T_grid', False)
validity = get_validity(var_corresponding_to_T_grid)
if dims_dict_e2n['T_dimension'] in only:
validity = validity[only[dims_dict_e2n['T_dimension']]]
elif any([t in self._listfields()
for t in config.netCDF_usualnames_for_standard_dimensions['T_dimension']]):
# look for a time variable
T_varnames = [t for t in config.netCDF_usualnames_for_standard_dimensions['T_dimension'] if t in self._listfields()]
_v = get_validity(T_varnames[0])
if len(_v) == 1:
validity = _v
else:
validity = FieldValidity()
else:
validity = FieldValidity()
field_kwargs['validity'] = validity
# 3. GEOMETRY
# ===========
kwargs_geom = {}
kwargs_geom['position_on_horizontal_grid'] = 'center'
# 3.1 identify the structure
keys = copy.copy(list(variable_dimensions.keys()))
for k in only.keys():
if k in keys:
keys.remove(k)
else:
raise ValueError("dimension: " + k + " from 'only' not in field variable.")
if 'T_dimension' in dims_dict_e2n and dims_dict_e2n['T_dimension'] not in only:
keys.remove(dims_dict_e2n['T_dimension'])
squeezed_variables = [dims_dict_n2e.get(k)
for k in keys
if variable_dimensions[k] != 1]
H2D = set(squeezed_variables) == set(['X_dimension',
'Y_dimension']) \
or (set(squeezed_variables) == set(['N_dimension']) and
all([d in all_dimensions_e2n for d in ['X_dimension', # flattened grids
'Y_dimension']])) \
or (set(squeezed_variables) == set(['N_dimension']) \
and behaviour.get('H1D_is_H2D_unstructured', False)) # or 2D unstructured grids
D3 = set(squeezed_variables) == set(['X_dimension',
'Y_dimension',
'Z_dimension']) \
or (set(squeezed_variables) == set(['N_dimension',
'Z_dimension']) and
all([d in all_dimensions_e2n for d in ['X_dimension', # flattened grids
'Y_dimension']])) \
or (set(squeezed_variables) == set(['N_dimension',
'Z_dimension']) \
and behaviour.get('H1D_is_H2D_unstructured', False)) # or 2D unstructured grids
V2D = set(squeezed_variables) == set(['N_dimension',
'Z_dimension']) and not D3
H1D = set(squeezed_variables) == set(['N_dimension']) and not H2D
V1D = set(squeezed_variables) == set(['Z_dimension'])
points = set(squeezed_variables) == set([])
if not any([D3, H2D, V2D, H1D, V1D, points]):
for d in set(variable_dimensions.keys()) - set(dims_dict_n2e.keys()):
epylog.error(" ".join(["dimension:",
d,
"has not been identified as a usual",
"(T,Z,Y,X) dimension. Please specify",
"with readfield() argument",
"adhoc_behaviour={'T_dimension':'" + d + "'}",
"for instance or",
"my_resource.behave(T_dimension=" + d + ")",
"or complete",
"config.netCDF_usualnames_for_standard_dimensions",
"in $HOME/.epygram/userconfig.py"]))
raise epygramError(" ".join(["unable to guess structure of field:",
str(list(variable_dimensions.keys())),
"=> refine behaviour dimensions or",
"filter dimensions with 'only'."]))
else:
if D3:
structure = '3D'
elif H2D:
structure = 'H2D'
elif V2D:
structure = 'V2D'
elif H1D:
structure = 'H1D'
raise NotImplementedError('H1D: not yet.') # TODO:
elif V1D:
structure = 'V1D'
elif points:
structure = 'Point'
kwargs_geom['structure'] = structure
# 3.2 vertical geometry (default)
default_kwargs_vcoord = {'structure':'V',
'typeoffirstfixedsurface':255,
'position_on_grid':'mass',
'grid':{'gridlevels': []},
'levels':[0]}
kwargs_vcoord = default_kwargs_vcoord
# 3.3 Specific parts
# 3.3.1 dimensions
dimensions = {}
kwargs_geom['name'] = 'unstructured'
if V2D or H1D:
dimensions['X'] = variable_dimensions[dims_dict_e2n['N_dimension']]
dimensions['Y'] = 1
if V1D or points:
dimensions['X'] = 1
dimensions['Y'] = 1
if D3 or H2D:
if set(['X_dimension', 'Y_dimension']).issubset(set(dims_dict_e2n.keys())):
flattened = False
elif 'N_dimension' in dims_dict_e2n:
flattened = True
else:
raise epygramError('unable to find grid dimensions.')
if not flattened:
dimensions['X'] = variable_dimensions[dims_dict_e2n['X_dimension']]
dimensions['Y'] = variable_dimensions[dims_dict_e2n['Y_dimension']]
else: # flattened
if 'X_dimension' in all_dimensions_e2n \
and 'Y_dimension' in all_dimensions_e2n:
dimensions['X'] = len(self._dimensions[all_dimensions_e2n['X_dimension']])
dimensions['Y'] = len(self._dimensions[all_dimensions_e2n['Y_dimension']])
elif behaviour.get('H1D_is_H2D_unstructured', False):
dimensions['X'] = variable_dimensions[dims_dict_e2n['N_dimension']]
dimensions['Y'] = 1
else:
assert 'X_dimension' in all_dimensions_e2n, \
' '.join(["unable to find X_dimension of field:",
"please specify with readfield() argument",
"adhoc_behaviour={'X_dimension':'" + d + "'}",
"for instance or",
"my_resource.behave(X_dimension=" + d + ")",
"or complete",
"config.netCDF_usualnames_for_standard_dimensions",
"in $HOME/.epygram/userconfig.py"])
assert 'Y_dimension' in all_dimensions_e2n, \
' '.join(["unable to find Y_dimension of field:",
"please specify with readfield() argument",
"adhoc_behaviour={'Y_dimension':'" + d + "'}",
"for instance or",
"my_resource.behave(Y_dimension=" + d + ")",
"or complete",
"config.netCDF_usualnames_for_standard_dimensions",
"in $HOME/.epygram/userconfig.py"])
# 3.3.2 vertical part
if D3 or V1D or V2D:
var_corresponding_to_Z_grid = behaviour.get('Z_grid', dims_dict_e2n['Z_dimension'])
# assert var_corresponding_to_Z_grid in self._variables.keys(), \
# 'unable to find Z_grid in variables.'
levels = None
if var_corresponding_to_Z_grid in self._listfields():
if hasattr(self._variables[var_corresponding_to_Z_grid], 'standard_name') \
and self._variables[var_corresponding_to_Z_grid].standard_name in ('atmosphere_hybrid_sigma_pressure_coordinate',
'atmosphere_hybrid_height_coordinate'):
formula_terms = self._variables[var_corresponding_to_Z_grid].formula_terms.split(' ')
if 'a:' in formula_terms and 'p0:' in formula_terms:
a_name = formula_terms[formula_terms.index('a:') + 1]
p0_name = formula_terms[formula_terms.index('p0:') + 1]
a = self._variables[a_name][:] * self._variables[p0_name][:]
elif 'ap:' in formula_terms:
a_name = formula_terms[formula_terms.index('ap:') + 1]
a = self._variables[a_name][:]
b_name = formula_terms[formula_terms.index('b:') + 1]
b = self._variables[b_name][:]
gridlevels = [(i + 1, {'Ai':a[i], 'Bi':b[i]}) for i in range(len(a))]
levels = [i + 1 for i in range(variable_dimensions[dims_dict_e2n['Z_dimension']])]
if len(gridlevels) == len(levels):
kwargs_vcoord['grid']['ABgrid_position'] = 'mass'
else:
kwargs_vcoord['grid']['ABgrid_position'] = 'flux'
else:
gridlevels = self._variables[var_corresponding_to_Z_grid][:]
if hasattr(self._variables[behaviour['Z_grid']], 'standard_name'):
kwargs_vcoord['typeoffirstfixedsurface'] = _typeoffirstfixedsurface_dict.get(self._variables[behaviour['Z_grid']].standard_name, 255)
elif hasattr(self._variables[behaviour['Z_grid']], 'short_name'):
kwargs_vcoord['typeoffirstfixedsurface'] = _typeoffirstfixedsurface_short_dict.get(self._variables[behaviour['Z_grid']].short_name, 255)
# TODO: complete the reading of variable units to convert
if hasattr(self._variables[behaviour['Z_grid']], 'units'):
if self._variables[behaviour['Z_grid']].units == 'km':
gridlevels = gridlevels * 1000. # get back to metres
else:
gridlevels = list(range(1, variable_dimensions[dims_dict_e2n['Z_dimension']] + 1))
kwargs_vcoord['typeoffirstfixedsurface'] = 255
kwargs_vcoord['grid']['gridlevels'] = [p for p in gridlevels] # footprints purpose
if levels is None:
kwargs_vcoord['levels'] = kwargs_vcoord['grid']['gridlevels'] # could it be else ?
else:
kwargs_vcoord['levels'] = levels
# 3.3.3 horizontal part
# find grid in variables
if H2D or D3:
def find_grid_in_variables():
var_corresponding_to_X_grid = behaviour.get('X_grid', False)
if var_corresponding_to_X_grid not in self._listfields():
epylog.error(" ".join(["unable to find X_grid in variables.",
"Please specify with readfield()",
"argument",
"adhoc_behaviour={'X_grid':'name_of_the_variable'}",
"for instance or",
"my_resource.behave(X_grid='name_of_the_variable')"]))
raise epygramError('unable to find X_grid in variables.')
var_corresponding_to_Y_grid = behaviour.get('Y_grid', False)
if var_corresponding_to_Y_grid not in self._listfields():
epylog.error(" ".join(["unable to find Y_grid in variables.",
"Please specify with readfield()",
"argument",
"adhoc_behaviour={'Y_grid':'name_of_the_variable'}",
"for instance or",
"my_resource.behave(Y_grid='name_of_the_variable')"]))
raise epygramError('unable to find Y_grid in variables.')
else:
if hasattr(self._variables[var_corresponding_to_Y_grid], 'standard_name') and \
self._variables[var_corresponding_to_Y_grid].standard_name == 'projection_y_coordinate' and \
self._variables[var_corresponding_to_X_grid].standard_name == 'projection_x_coordinate':
behaviour['grid_is_lonlat'] = False
elif 'lat' in var_corresponding_to_Y_grid.lower() and \
'lon' in var_corresponding_to_X_grid.lower() and \
'grid_is_lonlat' not in behaviour:
behaviour['grid_is_lonlat'] = True
if len(self._variables[var_corresponding_to_X_grid].dimensions) == 1 and \
len(self._variables[var_corresponding_to_Y_grid].dimensions) == 1:
# case of a flat grid
X = self._variables[var_corresponding_to_X_grid][:]
Y = self._variables[var_corresponding_to_Y_grid][:]
if flattened:
if len(X) == dimensions.get('X') * dimensions.get('Y'):
# case of a H2D field with flattened grid
Xgrid = X.reshape((dimensions['Y'], dimensions['X']))
Ygrid = Y.reshape((dimensions['Y'], dimensions['X']))
elif behaviour.get('H1D_is_H2D_unstructured', False):
# case of a H2D unstructured field
X = self._variables[var_corresponding_to_X_grid][:]
Y = self._variables[var_corresponding_to_Y_grid][:]
Xgrid = X.reshape((1, len(X)))
Ygrid = Y.reshape((1, len(Y)))
else:
raise epygramError('unable to reconstruct 2D grid.')
else:
# case of a regular grid where X is constant on a column
# and Y constant on a row: reconstruct 2D
Xgrid = numpy.ones((Y.size, X.size)) * X
Ygrid = (numpy.ones((Y.size, X.size)).transpose() * Y).transpose()
elif len(self._variables[var_corresponding_to_X_grid].dimensions) == 2 and \
len(self._variables[var_corresponding_to_Y_grid].dimensions) == 2:
Xgrid = self._variables[var_corresponding_to_X_grid][:, :]
Ygrid = self._variables[var_corresponding_to_Y_grid][:, :]
if Ygrid[0, 0] > Ygrid[-1, 0] and not behaviour.get('reverse_Yaxis'):
epylog.warning("Ygrid seems to be reversed; shouldn't behaviour['reverse_Yaxis'] be True ?")
elif behaviour.get('reverse_Yaxis'):
Ygrid = Ygrid[::-1, :]
return Xgrid, Ygrid
# projection or grid
if hasattr(variable, 'grid_mapping') and \
(self._variables[variable.grid_mapping].grid_mapping_name in ('lambert_conformal_conic',
'mercator',
'polar_stereographic',
'latitude_longitude') or
'gauss' in self._variables[variable.grid_mapping].grid_mapping_name.lower()):
# geometry described as "grid_mapping" meta-data
gm = variable.grid_mapping
grid_mapping = self._variables[gm]
if hasattr(grid_mapping, 'ellipsoid'):
kwargs_geom['geoid'] = {'ellps':grid_mapping.ellipsoid}
elif hasattr(grid_mapping, 'earth_radius'):
kwargs_geom['geoid'] = {'a':grid_mapping.earth_radius,
'b':grid_mapping.earth_radius}
elif hasattr(grid_mapping, 'semi_major_axis') and hasattr(grid_mapping, 'semi_minor_axis'):
kwargs_geom['geoid'] = {'a':grid_mapping.semi_major_axis,
'b':grid_mapping.semi_minor_axis}
elif hasattr(grid_mapping, 'semi_major_axis') and hasattr(grid_mapping, 'inverse_flattening'):
kwargs_geom['geoid'] = {'a':grid_mapping.semi_major_axis,
'rf':grid_mapping.inverse_flattening}
else:
kwargs_geom['geoid'] = config.default_geoid
if hasattr(grid_mapping, 'position_on_horizontal_grid'):
kwargs_geom['position_on_horizontal_grid'] = grid_mapping.position_on_horizontal_grid
if grid_mapping.grid_mapping_name in ('lambert_conformal_conic', 'mercator', 'polar_stereographic'):
if (hasattr(self._variables[variable.grid_mapping], 'x_resolution') or
not behaviour.get('grid_is_lonlat', False)):
# if resolution is either in grid_mapping attributes or in the grid itself
kwargs_geom['name'] = _proj_dict_inv[grid_mapping.grid_mapping_name]
if hasattr(grid_mapping, 'x_resolution'):
Xresolution = grid_mapping.x_resolution
Yresolution = grid_mapping.y_resolution
else:
Xgrid, Ygrid = find_grid_in_variables()
if behaviour.get('H1D_is_H2D_unstructured', False):
raise epygramError('unable to retrieve both X_resolution and Y_resolution from a 1D list of points.')
else:
Xresolution = abs(Xgrid[0, 0] - Xgrid[0, 1])
Yresolution = abs(Ygrid[0, 0] - Ygrid[1, 0])
grid = {'X_resolution':Xresolution,
'Y_resolution':Yresolution,
'LAMzone':None}
import pyproj
if kwargs_geom['name'] == 'lambert':
kwargs_geom['projection'] = {'reference_lon':Angle(grid_mapping.longitude_of_central_meridian, 'degrees'),
'rotation':Angle(0., 'degrees')}
if hasattr(grid_mapping, 'rotation'):
kwargs_geom['projection']['rotation'] = Angle(grid_mapping.rotation, 'degrees')
if isinstance(grid_mapping.standard_parallel, numpy.ndarray):
s1, s2 = grid_mapping.standard_parallel
kwargs_geom['projection']['secant_lat1'] = Angle(s1, 'degrees')
kwargs_geom['projection']['secant_lat2'] = Angle(s2, 'degrees')
else:
r = grid_mapping.standard_parallel
kwargs_geom['projection']['reference_lat'] = Angle(r, 'degrees')
s1 = s2 = r
fe = grid_mapping.false_easting
fn = grid_mapping.false_northing
reference_lat = grid_mapping.latitude_of_projection_origin
# compute x_0, y_0...
p = pyproj.Proj(proj='lcc',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
lat_1=s1, lat_2=s2,
**kwargs_geom['geoid'])
dx, dy = p(kwargs_geom['projection']['reference_lon'].get('degrees'),
reference_lat)
# ... for getting center coords from false_easting, false_northing
p = pyproj.Proj(proj='lcc',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
lat_1=s1, lat_2=s2,
x_0=-dx, y_0=-dy,
**kwargs_geom['geoid'])
ll00 = p(-fe, -fn, inverse=True)
del p
grid['input_lon'] = Angle(ll00[0], 'degrees')
grid['input_lat'] = Angle(ll00[1], 'degrees')
grid['input_position'] = (0, 0)
elif kwargs_geom['name'] == 'mercator':
kwargs_geom['projection'] = {'reference_lon':Angle(grid_mapping.longitude_of_central_meridian, 'degrees'),
'rotation':Angle(0., 'degrees')}
if hasattr(grid_mapping, 'rotation'):
kwargs_geom['projection']['rotation'] = Angle(grid_mapping.rotation, 'degrees')
kwargs_geom['projection']['reference_lat'] = Angle(0., 'degrees')
if grid_mapping.standard_parallel != 0.:
lat_ts = grid_mapping.standard_parallel
kwargs_geom['projection']['secant_lat'] = Angle(lat_ts, 'degrees')
else:
lat_ts = 0.
fe = grid_mapping.false_easting
fn = grid_mapping.false_northing
# compute x_0, y_0...
p = pyproj.Proj(proj='merc',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
lat_ts=lat_ts,
**kwargs_geom['geoid'])
dx, dy = p(kwargs_geom['projection']['reference_lon'].get('degrees'),
0.)
# ... for getting center coords from false_easting, false_northing
p = pyproj.Proj(proj='merc',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
lat_ts=lat_ts,
x_0=-dx, y_0=-dy,
**kwargs_geom['geoid'])
ll00 = p(-fe, -fn, inverse=True)
del p
grid['input_lon'] = Angle(ll00[0], 'degrees')
grid['input_lat'] = Angle(ll00[1], 'degrees')
grid['input_position'] = (0, 0)
elif kwargs_geom['name'] == 'polar_stereographic':
kwargs_geom['projection'] = {'reference_lon':Angle(grid_mapping.straight_vertical_longitude_from_pole, 'degrees'),
'rotation':Angle(0., 'degrees')}
if hasattr(grid_mapping, 'rotation'):
kwargs_geom['projection']['rotation'] = Angle(grid_mapping.rotation, 'degrees')
kwargs_geom['projection']['reference_lat'] = Angle(grid_mapping.latitude_of_projection_origin, 'degrees')
lat_ts = grid_mapping.standard_parallel
if grid_mapping.standard_parallel != grid_mapping.latitude_of_projection_origin:
kwargs_geom['projection']['secant_lat'] = Angle(lat_ts, 'degrees')
fe = grid_mapping.false_easting
fn = grid_mapping.false_northing
# compute x_0, y_0...
p = pyproj.Proj(proj='stere',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
lat_0=kwargs_geom['projection']['reference_lat'].get('degrees'),
lat_ts=lat_ts,
**kwargs_geom['geoid'])
dx, dy = p(kwargs_geom['projection']['reference_lon'].get('degrees'),
kwargs_geom['projection']['reference_lat'].get('degrees'),)
# ... for getting center coords from false_easting, false_northing
p = pyproj.Proj(proj='stere',
lon_0=kwargs_geom['projection']['reference_lon'].get('degrees'),
lat_0=kwargs_geom['projection']['reference_lat'].get('degrees'),
lat_ts=lat_ts,
x_0=-dx, y_0=-dy,
**kwargs_geom['geoid'])
ll00 = p(-fe, -fn, inverse=True)
del p
grid['input_lon'] = Angle(ll00[0], 'degrees')
grid['input_lat'] = Angle(ll00[1], 'degrees')
grid['input_position'] = (0, 0)
else:
# no resolution available: grid mapping is useless
gm = None
elif 'gauss' in grid_mapping.grid_mapping_name.lower():
if hasattr(grid_mapping, 'latitudes'):
latitudes = self._variables[grid_mapping.latitudes.split(' ')[1]][:]
else:
# NOTE: this is a (good) approximation actually, the true latitudes are the roots of Legendre polynoms
raise NotImplementedError('(re-)computation of Gauss latitudes (not in file metadata).')
grid = {'latitudes':FPList([Angle(l, 'degrees') for l in latitudes]),
'dilatation_coef':1.}
if hasattr(grid_mapping, 'lon_number_by_lat'):
if isinstance(grid_mapping.lon_number_by_lat, six.string_types):
lon_number_by_lat = self._variables[grid_mapping.lon_number_by_lat.split(' ')[1]][:]
else:
kwargs_geom['name'] = 'regular_gauss'
lon_number_by_lat = [dimensions['X'] for _ in range(dimensions['Y'])]
if hasattr(grid_mapping, 'pole_lon'):
kwargs_geom['name'] = 'rotated_reduced_gauss'
grid['pole_lon'] = Angle(grid_mapping.pole_lon, 'degrees')
grid['pole_lat'] = Angle(grid_mapping.pole_lat, 'degrees')
if hasattr(grid_mapping, 'dilatation_coef'):
grid['dilatation_coef'] = grid_mapping.dilatation_coef
else:
kwargs_geom['name'] = 'reduced_gauss'
dimensions = {'max_lon_number':int(max(lon_number_by_lat)),
'lat_number':len(latitudes),
'lon_number_by_lat':FPList([int(n) for n in
lon_number_by_lat])}
elif grid_mapping.grid_mapping_name == 'latitude_longitude':
# try to find out longitude, latitude arrays
for f in config.netCDF_usualnames_for_lonlat_grids['X']:
if f in self.listfields():
behaviour['X_grid'] = f
break
for f in config.netCDF_usualnames_for_lonlat_grids['Y']:
if f in self.listfields():
behaviour['Y_grid'] = f
break
Xgrid, Ygrid = find_grid_in_variables()
grid = {'longitudes':Xgrid,
'latitudes':Ygrid}
if ((hasattr(self._variables[variable.grid_mapping],
'x_resolution') and
hasattr(self._variables[variable.grid_mapping],
'y_resolution') or
hasattr(self._variables[variable.grid_mapping],
'resolution')) and
len(Xgrid.shape) == 2):
# then this is a regular lon lat
kwargs_geom['name'] = 'regular_lonlat'
if hasattr(self._variables[variable.grid_mapping],
'x_resolution'):
x_res = grid_mapping.x_resolution
else:
x_res = grid_mapping.resolution
if hasattr(self._variables[variable.grid_mapping],
'y_resolution'):
y_res = grid_mapping.y_resolution
else:
y_res = grid_mapping.resolution
grid = {'input_lon':Angle(Xgrid[0, 0], 'degrees'),
'input_lat':Angle(Ygrid[0, 0], 'degrees'),
'input_position':(0, 0),
'X_resolution':Angle(x_res, 'degrees'),
'Y_resolution':Angle(y_res, 'degrees')}
else:
raise NotImplementedError('grid_mapping.grid_mapping_name == ' +
grid_mapping.grid_mapping_name)
else:
if hasattr(variable, 'grid_mapping'):
epylog.info('grid_mapping ignored: unknown case')
# grid only in variables
Xgrid, Ygrid = find_grid_in_variables()
if behaviour.get('grid_is_lonlat', False):
grid = {'longitudes':Xgrid,
'latitudes':Ygrid}
else:
# grid is not lon/lat and no other metadata available : Academic
kwargs_geom['name'] = 'academic'
grid = {'LAMzone':None,
'X_resolution':abs(Xgrid[0, 1] - Xgrid[0, 0]),
'Y_resolution':abs(Ygrid[1, 0] - Ygrid[0, 0]),
'input_lat':1.,
'input_lon':1.,
'input_position':(0, 0)}
kwargs_geom['projection'] = {'reference_dX':grid['X_resolution'],
'reference_dY':grid['Y_resolution'],
'rotation': Angle(0, 'degrees')}
elif V1D or V2D or points:
var_corresponding_to_X_grid = behaviour.get('X_grid', False)
if var_corresponding_to_X_grid not in self._listfields():
if points or V1D:
lon = ['_']
else:
raise epygramError('unable to find X_grid in variables.')
else:
lon = self._variables[var_corresponding_to_X_grid][:]
var_corresponding_to_Y_grid = behaviour.get('Y_grid', False)
if var_corresponding_to_Y_grid not in self._listfields():
if points or V1D:
lat = ['_']
else:
raise epygramError('unable to find Y_grid in variables.')
else:
lat = self._variables[var_corresponding_to_Y_grid][:]
grid = {'longitudes':lon,
'latitudes':lat}
# 3.4 build geometry
vcoordinate = fpx.geometry(**kwargs_vcoord)
kwargs_geom['grid'] = grid
kwargs_geom['dimensions'] = dimensions
kwargs_geom['vcoordinate'] = vcoordinate
geometry = fpx.geometry(**kwargs_geom)
# 4. build field
field_kwargs['geometry'] = geometry
field_kwargs['structure'] = kwargs_geom['structure']
comment = {}
for a in variable.ncattrs():
if a != 'validity':
if isinstance(variable.getncattr(a), numpy.float32): # pb with json and float32
comment.update({a:numpy.float64(variable.getncattr(a))})
elif isinstance(variable.getncattr(a), numpy.ndarray): # pb with json and numpy arrays
comment.update({a:numpy.float64(variable.getncattr(a)).tolist()})
else:
comment.update({a:variable.getncattr(a)})
comment = json.dumps(comment)
if comment != '{}':
field_kwargs['comment'] = comment
field = fpx.field(**field_kwargs)
if getdata:
if only:
n = len(variable.dimensions)
buffdata = variable
for k, i in only.items():
d = variable.dimensions.index(k)
buffdata = util.restrain_to_index_i_of_dim_d(buffdata, i, d, n=n)
else:
buffdata = variable[...]
# check there is no leftover unknown dimension
field_dim_num = 1 if len(field.validity) > 1 else 0
if field.structure != 'Point':
field_dim_num += [int(c) for c in field.structure if c.isdigit()][0]
if (H2D or D3) and flattened:
field_dim_num -= 1
assert field_dim_num == len(buffdata.squeeze().shape), \
' '.join(['shape of field and identified usual dimensions',
'do not match: use *only* to filter or',
'*adhoc_behaviour* to identify dimensions'])
# re-shuffle to have data indexes in order (t,z,y,x)
positions = []
shp4D = [1, 1, 1, 1]
if 'T_dimension' in dims_dict_e2n:
idx = variable.dimensions.index(dims_dict_e2n['T_dimension'])
positions.append(idx)
shp4D[0] = buffdata.shape[idx]
if 'Z_dimension' in dims_dict_e2n:
idx = variable.dimensions.index(dims_dict_e2n['Z_dimension'])
positions.append(idx)
shp4D[1] = buffdata.shape[idx]
if 'Y_dimension' in dims_dict_e2n:
idx = variable.dimensions.index(dims_dict_e2n['Y_dimension'])
positions.append(idx)
shp4D[2] = buffdata.shape[idx]
if 'X_dimension' in dims_dict_e2n:
idx = variable.dimensions.index(dims_dict_e2n['X_dimension'])
positions.append(idx)
shp4D[3] = buffdata.shape[idx]
elif 'N_dimension' in dims_dict_e2n:
idx = variable.dimensions.index(dims_dict_e2n['N_dimension'])
positions.append(idx)
shp4D[3] = buffdata.shape[idx]
for d in variable.dimensions:
# whatever the order of these, they must have been filtered and dimension 1 (only)
if d not in dims_dict_e2n.values():
positions.append(variable.dimensions.index(d))
shp4D = tuple(shp4D)
buffdata = buffdata.transpose(*positions).squeeze()
if isinstance(buffdata, numpy.ma.masked_array):
data = numpy.ma.zeros(shp4D)
else:
data = numpy.empty(shp4D)
if (H2D or D3) and flattened:
if len(buffdata.shape) == 2:
if D3:
first_dimension = 'Z'
else:
first_dimension = 'T'
else:
first_dimension = None
data = geometry.reshape_data(buffdata, first_dimension=first_dimension, d4=True)
else:
data[...] = buffdata.reshape(data.shape)
if behaviour.get('reverse_Yaxis'):
data[...] = data[:, :, ::-1, :]
field.setdata(data)
return field
[docs] def writefield(self, field,
compression=4,
metadata=None,
adhoc_behaviour=None):
"""
Write a field in resource.
:param field: the :class:`~epygram.base.Field` object to write
:param compression ranges from 1 (low compression, fast writing)
to 9 (high compression, slow writing). 0 is no compression.
:param metadata: dict, can be filled by any meta-data, that will be stored
as attribute of the netCDF variable.
:param adhoc_behaviour: to specify "on the fly" a behaviour (usual
dimensions or grids, ...).
"""
metadata = util.ifNone_emptydict(metadata)
vartype = 'f8'
fill_value = -999999.9
adhoc_behaviour = util.ifNone_emptydict(adhoc_behaviour)
behaviour = self.behaviour.copy()
behaviour.update(adhoc_behaviour)
def check_or_add_dim(d, d_in_field=None, size=None):
if size is None:
if d_in_field is None:
d_in_field = d
size = field.geometry.dimensions[d_in_field]
if d not in self._dimensions:
self._nc.createDimension(d, size=size)
else:
assert len(self._dimensions[d]) == size, \
"dimensions mismatch: " + d + ": " + \
str(self._dimensions[d]) + " != " + str(size)
def check_or_add_variable(varname, vartype,
dimensions=(),
**kwargs):
if six.text_type(varname) not in self._listfields():
var = self._nc.createVariable(varname, vartype,
dimensions=dimensions,
**kwargs)
status = 'created'
else:
assert self._variables[varname].dtype == vartype, \
' '.join(['variable', varname,
'already exist with other type:',
self._variables[varname].dtype])
if isinstance(dimensions, six.string_types):
dimensions = (dimensions,)
assert self._variables[varname].dimensions == tuple(dimensions), \
' '.join(['variable', varname,
'already exist with other dimensions:',
str(self._variables[varname].dimensions)])
var = self._variables[varname]
status = 'match'
return var, status
assert 'netCDF' in field.fid
assert not field.spectral
# 1. dimensions
T = Y = X = G = N = None
dims = []
# time
if len(field.validity) > 1 or behaviour.get('force_a_T_dimension', False):
# default
T = config.netCDF_usualnames_for_standard_dimensions['T_dimension'][0]
# or any existing identified time dimension
T = {'found':v for v in self._dimensions
if (v in config.netCDF_usualnames_for_standard_dimensions['T_dimension'] and
len(self._dimensions[v]) == len(field.validity))}.get('found', T)
# or specified behaviour
T = behaviour.get('T_dimension', T)
check_or_add_dim(T, size=len(field.validity))
# vertical part
# default
Z = config.netCDF_usualnames_for_standard_dimensions['Z_dimension'][0]
# or any existing identified time dimension
Z = {'found':v for v in self._dimensions
if (v in config.netCDF_usualnames_for_standard_dimensions['Z_dimension'] and
len(self._dimensions[v]) == len(field.geometry.vcoordinate.levels))}.get('found', Z)
# or specified behaviour
Z = behaviour.get('Z_dimension', Z)
if 'gridlevels' in field.geometry.vcoordinate.grid:
Z_gridsize = max(len(field.geometry.vcoordinate.grid['gridlevels']), 1)
if field.geometry.vcoordinate.typeoffirstfixedsurface in (118, 119):
Z_gridsize -= 1
else:
Z_gridsize = len(field.geometry.vcoordinate.levels) # 1
if Z_gridsize > 1:
check_or_add_dim(Z, size=Z_gridsize)
# horizontal
if behaviour.get('flatten_horizontal_grids', False):
_gpn = field.geometry.gridpoints_number
G = 'gridpoints_number'
check_or_add_dim(G, size=_gpn)
if field.geometry.rectangular_grid:
if field.geometry.dimensions['Y'] > 1 and field.geometry.dimensions['X'] > 1:
Y = behaviour.get('Y_dimension',
config.netCDF_usualnames_for_standard_dimensions['Y_dimension'][0])
check_or_add_dim(Y, d_in_field='Y')
X = behaviour.get('X_dimension',
config.netCDF_usualnames_for_standard_dimensions['X_dimension'][0])
check_or_add_dim(X, d_in_field='X')
elif field.geometry.dimensions['X'] > 1:
N = behaviour.get('N_dimension',
config.netCDF_usualnames_for_standard_dimensions['N_dimension'][0])
check_or_add_dim(N, d_in_field='X')
elif 'gauss' in field.geometry.name:
Y = behaviour.get('Y_dimension', 'latitude')
check_or_add_dim(Y, d_in_field='lat_number')
X = behaviour.get('X_dimension', 'longitude')
check_or_add_dim(X, d_in_field='max_lon_number')
else:
raise NotImplementedError("grid not rectangular nor a gauss one.")
# 2. validity
# TODO: deal with unlimited time dimension ?
if field.validity[0] != FieldValidity():
tgrid = config.netCDF_usualnames_for_standard_dimensions['T_dimension'][0]
tgrid = {'found':v for v in self._variables
if v in config.netCDF_usualnames_for_standard_dimensions['T_dimension']}.get('found', tgrid)
tgrid = behaviour.get('T_grid', tgrid)
if len(field.validity) > 1 or behaviour.get('force_a_T_dimension', False):
_, _status = check_or_add_variable(tgrid, float, T)
dims.append(tgrid)
else:
_, _status = check_or_add_variable(tgrid, float)
datetime0 = field.validity[0].getbasis().isoformat(sep=b' ')
datetimes = [int((dt.get() - field.validity[0].getbasis()).total_seconds()) for dt in field.validity]
if _status == 'created':
self._variables[tgrid][:] = datetimes
self._variables[tgrid].units = ' '.join(['seconds', 'since', datetime0])
else:
assert (self._variables[tgrid][:] == datetimes).all(), \
' '.join(['variable', tgrid, 'mismatch.'])
# 3. geometry
# 3.1 vertical part
if len(field.geometry.vcoordinate.levels) > 1:
dims.append(Z)
if Z_gridsize > 1:
zgridname = config.netCDF_usualnames_for_standard_dimensions['Z_dimension'][0]
zgridname = {'found':v for v in self._variables
if v in config.netCDF_usualnames_for_standard_dimensions['Z_dimension']}.get('found', zgridname)
zgridname = behaviour.get('Z_grid', zgridname)
if field.geometry.vcoordinate.typeoffirstfixedsurface in (118, 119):
ZP1 = Z + '+1'
check_or_add_dim(ZP1, size=Z_gridsize + 1)
zgrid, _status = check_or_add_variable(zgridname, int)
if _status == 'created':
zgrid.standard_name = _typeoffirstfixedsurface_dict_inv[field.geometry.vcoordinate.typeoffirstfixedsurface]
if field.geometry.vcoordinate.typeoffirstfixedsurface == 119:
zgrid.positive = "down"
zgrid.formula_terms = "ap: hybrid_coef_A b: hybrid_coef_B ps: surface_air_pressure"
check_or_add_variable('hybrid_coef_A', vartype, ZP1)
self._variables['hybrid_coef_A'][:] = [iab[1]['Ai'] for iab in field.geometry.vcoordinate.grid['gridlevels']]
check_or_add_variable('hybrid_coef_B', vartype, ZP1)
self._variables['hybrid_coef_B'][:] = [iab[1]['Bi'] for iab in field.geometry.vcoordinate.grid['gridlevels']]
elif field.geometry.vcoordinate.typeoffirstfixedsurface == 118:
# TOBECHECKED:
zgrid.positive = "up"
zgrid.formula_terms = "a: hybrid_coef_A b: hybrid_coef_B orog: orography"
check_or_add_variable('hybrid_coef_A', vartype, ZP1)
self._variables['hybrid_coef_A'][:] = [iab[1]['Ai'] for iab in field.geometry.vcoordinate.grid['gridlevels']]
check_or_add_variable('hybrid_coef_B', vartype, ZP1)
self._variables['hybrid_coef_B'][:] = [iab[1]['Bi'] for iab in field.geometry.vcoordinate.grid['gridlevels']]
else:
epylog.info('assume 118/119 type vertical grid matches.')
else:
if len(numpy.shape(field.geometry.vcoordinate.levels)) > 1:
dims_Z = [d for d in [Z, Y, X, G, N] if d is not None]
else:
dims_Z = Z
zgrid, _status = check_or_add_variable(zgridname, vartype, dims_Z)
u = {102:'m', 103:'m', 100:'hPa'}.get(field.geometry.vcoordinate.typeoffirstfixedsurface, None)
if u is not None:
zgrid.units = u
if _status == 'created':
zgrid[:] = field.geometry.vcoordinate.levels
else:
assert zgrid[:].all() == numpy.array(field.geometry.vcoordinate.levels).all(), \
' '.join(['variable', zgrid, 'mismatch.'])
if _typeoffirstfixedsurface_short_dict_inv.get(field.geometry.vcoordinate.typeoffirstfixedsurface, False):
zgrid.short_name = _typeoffirstfixedsurface_short_dict_inv[field.geometry.vcoordinate.typeoffirstfixedsurface]
# 3.2 grid (lonlat)
dims_lonlat = []
(lons, lats) = field.geometry.get_lonlat_grid()
if behaviour.get('flatten_horizontal_grids'):
dims_lonlat.append(G)
dims.append(G)
lons = stretch_array(lons)
lats = stretch_array(lats)
elif field.geometry.dimensions.get('Y', field.geometry.dimensions.get('lat_number', 0)) > 1: # both Y and X dimensions
dims_lonlat.extend([Y, X])
dims.extend([Y, X])
elif field.geometry.dimensions['X'] > 1: # only X ==> N
dims_lonlat.append(N)
dims.append(N)
# else: pass (single point or profile)
if isinstance(lons, numpy.ma.masked_array):
lons = lons.filled(fill_value)
lats = lats.filled(fill_value)
else:
fill_value = None
try:
_ = float(stretch_array(lons)[0])
except ValueError:
write_lonlat_grid = False
else:
write_lonlat_grid = behaviour.get('write_lonlat_grid', True)
if write_lonlat_grid:
lons_var, _status = check_or_add_variable(behaviour.get('X_grid', 'longitude'),
config.netCDF_default_variables_dtype,
dims_lonlat,
fill_value=fill_value)
lats_var, _status = check_or_add_variable(behaviour.get('Y_grid', 'latitude'),
config.netCDF_default_variables_dtype,
dims_lonlat,
fill_value=fill_value)
if _status == 'match':
epylog.info('assume lons/lats match.')
else:
lons_var[...] = lons[...]
lons_var.units = 'degrees'
lats_var[...] = lats[...]
lats_var.units = 'degrees'
# 3.3 meta-data
def set_ellipsoid(meta):
if 'ellps' in field.geometry.geoid:
self._variables[meta].ellipsoid = field.geometry.geoid['ellps']
elif field.geometry.geoid.get('a', False) == field.geometry.geoid.get('b', True):
self._variables[meta].earth_radius = field.geometry.geoid['a']
elif field.geometry.geoid.get('a', False) and field.geometry.geoid.get('b', False):
self._variables[meta].semi_major_axis = field.geometry.geoid['a']
self._variables[meta].semi_minor_axis = field.geometry.geoid['b']
elif field.geometry.geoid.get('a', False) and field.geometry.geoid.get('rf', False):
self._variables[meta].semi_major_axis = field.geometry.geoid['a']
self._variables[meta].inverse_flattening = field.geometry.geoid['rf']
else:
raise NotImplementedError('this kind of geoid:' + str(field.geometry.geoid))
if field.geometry.dimensions.get('Y', field.geometry.dimensions.get('lat_number', 0)) > 1:
if 'gauss' in field.geometry.name:
# reduced Gauss case
meta = 'Gauss_grid'
_, _status = check_or_add_variable(meta, int)
if _status == 'created':
self._variables[meta].grid_mapping_name = field.geometry.name + "_grid"
set_ellipsoid(meta)
if 'reduced' in field.geometry.name:
self._variables[meta].lon_number_by_lat = 'var: lon_number_by_lat'
check_or_add_variable('lon_number_by_lat', int, Y)
self._variables['lon_number_by_lat'][:] = field.geometry.dimensions['lon_number_by_lat']
self._variables[meta].latitudes = 'var: gauss_latitudes'
check_or_add_variable('gauss_latitudes', float, Y)
self._variables['gauss_latitudes'][:] = [l.get('degrees') for l in field.geometry.grid['latitudes']]
if 'pole_lon' in field.geometry.grid:
self._variables[meta].pole_lon = field.geometry.grid['pole_lon'].get('degrees')
self._variables[meta].pole_lat = field.geometry.grid['pole_lat'].get('degrees')
if 'dilatation_coef' in field.geometry.grid:
self._variables[meta].dilatation_coef = field.geometry.grid['dilatation_coef']
else:
epylog.info('assume Gauss grid parameters match.')
elif field.geometry.projected_geometry:
# projections
if field.geometry.name in ('lambert', 'mercator', 'polar_stereographic'):
meta = 'Projection_parameters'
_, _status = check_or_add_variable(meta, int)
if _status == 'created':
self._variables[meta].grid_mapping_name = _proj_dict[field.geometry.name]
set_ellipsoid(meta)
if field.geometry.position_on_horizontal_grid != 'center':
self._variables[meta].position_on_horizontal_grid = field.geometry.position_on_horizontal_grid
self._variables[meta].x_resolution = field.geometry.grid['X_resolution']
self._variables[meta].y_resolution = field.geometry.grid['Y_resolution']
if field.geometry.grid.get('LAMzone'):
_lon_cen, _lat_cen = field.geometry.ij2ll(float(field.geometry.dimensions['X'] - 1.) / 2.,
float(field.geometry.dimensions['Y'] - 1.) / 2.)
else:
_lon_cen = field.geometry._center_lon.get('degrees')
_lat_cen = field.geometry._center_lat.get('degrees')
if field.geometry.name == 'lambert':
if field.geometry.secant_projection:
std_parallel = [field.geometry.projection['secant_lat1'].get('degrees'),
field.geometry.projection['secant_lat2'].get('degrees')]
latitude_of_projection_origin = (std_parallel[0] + std_parallel[1]) / 2.
else:
std_parallel = field.geometry.projection['reference_lat'].get('degrees')
latitude_of_projection_origin = std_parallel
x00, y00 = field.geometry.ij2xy(0, 0)
x0, y0 = field.geometry.ll2xy(field.geometry.projection['reference_lon'].get('degrees'),
latitude_of_projection_origin)
(dx, dy) = (x00 - x0, y00 - y0)
self._variables[meta].longitude_of_central_meridian = field.geometry.projection['reference_lon'].get('degrees')
self._variables[meta].latitude_of_projection_origin = latitude_of_projection_origin
self._variables[meta].standard_parallel = std_parallel
self._variables[meta].false_easting = -dx
self._variables[meta].false_northing = -dy
elif field.geometry.name == 'mercator':
if field.geometry.secant_projection:
std_parallel = field.geometry.projection['secant_lat'].get('degrees')
else:
std_parallel = field.geometry.projection['reference_lat'].get('degrees')
x00, y00 = field.geometry.ij2xy(0, 0)
x0, y0 = field.geometry.ll2xy(field.geometry.projection['reference_lon'].get('degrees'), 0.)
(dx, dy) = (x00 - x0, y00 - y0)
self._variables[meta].longitude_of_central_meridian = field.geometry.projection['reference_lon'].get('degrees')
self._variables[meta].standard_parallel = std_parallel
self._variables[meta].false_easting = -dx
self._variables[meta].false_northing = -dy
elif field.geometry.name == 'polar_stereographic':
if field.geometry.secant_projection:
std_parallel = field.geometry.projection['secant_lat'].get('degrees')
else:
std_parallel = field.geometry.projection['reference_lat'].get('degrees')
x00, y00 = field.geometry.ij2xy(0, 0)
x0, y0 = field.geometry.ll2xy(field.geometry.projection['reference_lon'].get('degrees'),
field.geometry.projection['reference_lat'].get('degrees'))
(dx, dy) = (x00 - x0, y00 - y0)
self._variables[meta].straight_vertical_longitude_from_pole = field.geometry.projection['reference_lon'].get('degrees')
self._variables[meta].latitude_of_projection_origin = field.geometry.projection['reference_lat'].get('degrees')
self._variables[meta].standard_parallel = std_parallel
self._variables[meta].false_easting = -dx
self._variables[meta].false_northing = -dy
else:
epylog.info('assume projection parameters match.')
else:
raise NotImplementedError('field.geometry.name == ' + field.geometry.name)
else:
meta = False
else:
meta = False
# 4. Variable
varname = field.fid['netCDF'].replace('.', config.netCDF_replace_dot_in_variable_names)
_, _status = check_or_add_variable(varname,
config.netCDF_default_variables_dtype,
dims,
zlib=bool(compression),
complevel=compression,
fill_value=fill_value)
if meta:
self._variables[varname].grid_mapping = meta
if field.geometry.vcoordinate.typeoffirstfixedsurface in (118, 119):
self._variables[varname].vertical_grid = zgridname
data = field.getdata(d4=True)
if isinstance(data, numpy.ma.masked_array):
if 'gauss' in field.geometry.name:
data = field.geometry.fill_maskedvalues(data)
else:
data = data.filled(fill_value)
if behaviour.get('flatten_horizontal_grids'):
data = field.geometry.horizontally_flattened(data)
data = data.squeeze()
if _status == 'match':
epylog.info('overwrite data in variable ' + varname)
self._variables[varname][...] = data
if field.units not in (None, ''):
self._variables[varname].units = field.units
# 5. metadata
for k, v in metadata.items():
self._variables[varname].setncattr(k, v)
[docs] def set_default_global_attributes(self):
"""
Set default global attributes (those from
config.netCDF_default_global_attributes).
"""
self._nc.setncatts(config.netCDF_default_global_attributes)
[docs] def set_global_attributes(self, **attributes):
"""Set the given global attributes."""
self._nc.setncatts(attributes)
[docs] def behave(self, **kwargs):
"""
Set-up the given arguments in self.behaviour, for the purpose of
building fields from netCDF.
"""
self.behaviour.update(kwargs)
[docs] def what(self, out=sys.stdout, **_):
"""Writes in file a summary of the contents of the GRIB."""
# adapted from http://schubert.atmos.colostate.edu/~cslocum/netcdf_example.html
def ncdump(nc, out):
'''
ncdump outputs dimensions, variables and their attribute information.
The information is similar to that of NCAR's ncdump utility.
ncdump requires a valid instance of Dataset.
Parameters
----------
nc : netCDF4.Dataset
A netCDF4 dateset object
verb : Boolean
whether or not nc_attrs, nc_dims, and nc_vars are printed
Returns
-------
nc_attrs : list
A Python list of the NetCDF file global attributes
nc_dims : list
A Python list of the NetCDF file dimensions
nc_vars : list
A Python list of the NetCDF file variables
'''
def outwrite(*items):
items = list(items)
stritem = items.pop(0)
for i in items:
stritem += ' ' + str(i)
out.write(stritem + '\n')
def print_ncattr(key):
"""
Prints the NetCDF file attributes for a given key
Parameters
----------
key : unicode
a valid netCDF4.Dataset.variables key
"""
try:
outwrite("\t\ttype:", repr(nc.variables[key].dtype))
for ncattr in nc.variables[key].ncattrs():
outwrite('\t\t%s:' % ncattr,
repr(nc.variables[key].getncattr(ncattr)))
except KeyError:
outwrite("\t\tWARNING: %s does not contain variable attributes" % key)
# NetCDF global attributes
nc_attrs = nc.ncattrs()
outwrite("NetCDF Global Attributes:")
for nc_attr in nc_attrs:
outwrite('\t%s:' % nc_attr, repr(nc.getncattr(nc_attr)))
nc_dims = [dim for dim in nc.dimensions] # list of nc dimensions
# Dimension shape information.
outwrite("NetCDF dimension information:")
for dim in nc_dims:
outwrite("\tName:", dim)
outwrite("\t\tsize:", len(nc.dimensions[dim]))
# print_ncattr(dim)
# Variable information.
nc_vars = [var for var in nc.variables] # list of nc variables
outwrite("NetCDF variable information:")
for var in nc_vars:
outwrite('\tName:', var)
outwrite("\t\tdimensions:", nc.variables[var].dimensions)
outwrite("\t\tsize:", nc.variables[var].size)
print_ncattr(var)
return nc_attrs, nc_dims, nc_vars
out.write("### FORMAT: " + self.format + "\n")
out.write("\n")
ncdump(self._nc, out)
@property
@FileResource._openbeforedelayed
def _dimensions(self):
return self._nc.dimensions
@property
@FileResource._openbeforedelayed
def _variables(self):
return self._nc.variables