Source code for epygram.formats.GeoPoints

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Contains the class for GeoPoints format.
"""

from __future__ import print_function, absolute_import, unicode_literals, division

import datetime
import numpy
import sys

import footprints
from footprints import FPList, FPDict, proxy as fpx

from epygram import config, epygramError
from epygram.base import FieldSet, FieldValidity
from epygram.resources import FileResource
from epygram.geometries import PointGeometry, VGeometry
from epygram.geometries.H2DGeometry import H2DUnstructuredGeometry
from epygram.fields import H2DField, PointField, D3Field

__all__ = ['GeoPoints']

epylog = footprints.loggers.getLogger(__name__)


[docs]class GeoPoints(FileResource): """ Class implementing all specificities for GeoPoints resource format. Format: <beginning of file> #GEO #PARAMETER = any name or character string #LAT LON DATE TIME VALUE #DATA 32.2671 -12.5501 20140417 21 2.269112E+02 ... <till end of file> The set of keys describing each point is defined in its 'columns' attribute. """ _footprint = dict( attr=dict( format=dict( values=set(['GeoPoints']), default='GeoPoints'), parameter=dict( optional=True, info="The name of the parameter whose data is in the" + " *value* field."), columns=dict( type=FPList, optional=True, info="The columns of the geopoints, i.e. the set of" + " keys describing each point."), other_attributes=dict( type=FPDict, optional=True, info="other key:value pairs in header.", default=FPDict({})), ) ) def __init__(self, *args, **kwargs): self.isopen = False super(GeoPoints, self).__init__(*args, **kwargs) if not self.fmtdelayedopen: self.open()
[docs] def open(self, parameter=None, columns=None, openmode=None, other_attributes=None): """ Opens a GeoPoint. :param parameter: name of the parameter in header (openmode='w'). :param columns: list of all the items to be written for each point (openmode='w'). :param openmode: optional, to open with a specific openmode, eventually different from the one specified at initialization. :param other_attributes: other key:value pairs to be written in header (openmode='w'). """ super(GeoPoints, self).open(openmode=openmode) if self.openmode in ('r', 'a'): self._file = open(self.container.abspath, 'r') # check format is GeoPoints isgeo = self._file.readline() if isgeo[0:4] != '#GEO': raise IOError("this resource is not a GeoPoints one.") # read header header = {} self.headerlength = 1 headerline = self._file.readline() while headerline[0:5] != '#DATA': self.headerlength += 1 if '=' in headerline: metadata = headerline[:-1].replace('#', '').split('=') header[metadata[0].strip()] = metadata[1].strip() elif 'FORMAT' in headerline: metadata = headerline[:-1].replace('#', '').split() header[metadata[0].strip()] = metadata[1].strip() else: # description of columns self._attributes['columns'] = headerline[1:-1].split() headerline = self._file.readline() self.headerlength += 1 for meta in header.keys(): if meta in ['PARAMETER', ]: self._attributes[meta.lower()] = header.pop(meta) else: self.other_attributes[meta] = header.pop(meta) # initializations self.isopen = True self.empty = False elif self.openmode == 'w': # object initializations self.empty = True if parameter is not None and self.parameter is None: self._attributes['parameter'] = parameter if columns is not None and self.columns is None: self._attributes['columns'] = columns if other_attributes is not None and self.other_attributes is None: self._attributes['other_attributes'] = other_attributes if len(self.other_attributes) > 0: if 'FORMAT' in self.other_attributes.keys(): if columns is not None: epylog.warning("Double specification: columns overwritten by FORMAT.") if self.other_attributes['FORMAT'] == 'XYV': self._attributes['columns'] = ['LON', 'LAT', 'VALUE'] else: raise NotImplementedError("Not yet. Cf. https://software.ecmwf.int/wiki/display/METV/Geopoints for implementing.") # header initializations if self.parameter is not None and self.columns is not None: self._file = open(self.container.abspath, self.openmode) self._file.write('#GEO\n') for k, v in self.other_attributes.items(): self._file.write('#' + k + ' = ' + str(v) + '\n') self._file.write('#PARAMETER = ' + self.parameter + '\n') self._file.write('#') for i in self.columns: self._file.write(i + ' ') self._file.write('\n') self._file.write('#DATA\n') self.isopen = True else: epylog.debug("cannot open GeoPoints in 'w' mode without" + " attributes *parameter* and *columns* set." + " Not open yet.")
[docs] def close(self): """ Closes a GeoPoints. """ if hasattr(self, '_file'): self._file.close() self.isopen = False
[docs] def listfields(self, **kwargs): """ Returns a list containing the LFI identifiers of all the fields of the resource. """ return super(GeoPoints, self).listfields(**kwargs)
@FileResource._openbeforedelayed def _listfields(self, complete=False): """ Actual listfields() method for GeoPoints. :param complete: - if True method returns a list of {'GeoPoints':GeoPoints_fid, 'generic':generic_fid} - if False method return a list of GeoPoints_fid """ if complete: return [{'GeoPoints':self.parameter, 'generic':{}}] else: return [self.parameter] ################ # ABOUT POINTS # ################
[docs] def countpoints(self): """ Counts and returns the number of points in the resource. """ if self.isopen: self._file.close() self.isopen = False wasopen = True n = sum(1 for _ in open(self.container.abspath)) - self.headerlength if wasopen: if self.openmode == 'w' and not self.empty: self._file = open(self.container.abspath, 'a') self.isopen = True else: self._file = open(self.container.abspath, self.openmode) self.isopen = True if self.openmode == 'r': for _ in range(self.headerlength): self._file.readline() return n
@FileResource._openbeforedelayed def readfield(self, parameter='*', footprints_builder=False, as_points=False): """ Reads the GeoPoints. :param parameter: is the name of the parameter to read (it exists only for compatibility with other formats) :param footprints_builder: if *True*, uses footprints.proxy to build fields. Defaults to False for performance reasons. :param *as_points*: if *True*, returns a collection of points instead of a single field. """ if self.openmode in ('w', 'a'): raise epygramError("cannot read fields in resource if with" + " openmode == 'w' or 'a'.") elif not self.isopen: self.open() if parameter not in ['*', self.parameter]: raise epygramError("This file does not contain the requested parameter.") # Rewind self._file.close() self._file = open(self.container.abspath, self.openmode) for _ in range(self.headerlength): self._file.readline() specialValues_list = {'DATE':[], 'TIME':[], 'LEVEL':[]} # To contain the date, time and date values specialValues_cst = {'DATE':True, 'TIME':True, 'LEVEL':True} # To know if date, time and level are constant or not specialValues_prev = {} # Previous values of date, time and leve to detect changes in values values = [] latitudes = [] longitudes = [] nb = 0 for line in iter(self._file): pointvals = line.split() point = dict(zip(self.columns, pointvals)) # We store date, time and levels as a list of one value if parameter is constant # or as a multi-values list if the parameter varies for item in ['DATE', 'TIME', 'LEVEL']: if item in self.columns: if nb == 0: specialValues_prev[item] = point[item] if specialValues_cst[item] and specialValues_prev[item] != point[item]: specialValues_cst[item] = False specialValues_list[item] = specialValues_list[item] * nb if nb == 0 or not specialValues_cst[item]: specialValues_list[item].append(point[item]) values.append(float(point['VALUE'])) latitudes.append(float(point['LAT'])) longitudes.append(float(point['LON'])) nb += 1 field_structure = 'Point' if nb == 1 else 'H2D' if footprints_builder: field_builder = fpx.field geom_builder = fpx.geometry vcoord_builder = fpx.geometry else: if field_structure == 'Point': field_builder = PointField geom_builder = PointGeometry else: field_builder = H2DField geom_builder = H2DUnstructuredGeometry vcoord_builder = VGeometry if 'LEVEL' in self.columns: specialValues_list['LEVEL'] = [float(level) for level in specialValues_list['LEVEL']] vcoordinate = vcoord_builder(structure='V', typeoffirstfixedsurface=255, levels=[255] if 'LEVEL' not in self.columns else specialValues_list['LEVEL']) geometry = geom_builder(structure=field_structure, name='unstructured', dimensions={'X':len(values), 'Y':1}, vcoordinate=vcoordinate, grid={'longitudes':longitudes, 'latitudes':latitudes}, position_on_horizontal_grid='center' ) if 'DATE' in self.columns and 'TIME' in self.columns: if specialValues_cst['DATE'] and not specialValues_cst['TIME']: specialValues_cst['DATE'] = False specialValues_list['DATE'] = specialValues_list['DATE'] * nb if specialValues_cst['TIME'] and not specialValues_cst['DATE']: specialValues_cst['TIME'] = False specialValues_list['TIME'] = specialValues_list['TIME'] * nb if 'DATE' in self.columns: for i in range(len(specialValues_list['DATE'])): date = specialValues_list['DATE'][i] specialValues_list['DATE'][i] = datetime.datetime(int(date[0:4]), int(date[4:6]), int(date[6:8])) if 'TIME' in self.columns: for i in range(len(specialValues_list['TIME'])): time = specialValues_list['TIME'][i] if len(time) in (1, 2): specialValues_list['TIME'][i] = datetime.time(int(time)) else: specialValues_list['TIME'][i] = datetime.time(int(time[0:2]), int(time[2:4])) if 'DATE' in self.columns or 'TIME' in self.columns: dates = [] if 'DATE' in self.columns and 'TIME' in self.columns: for i in range(len(specialValues_list['DATE'])): delta = datetime.timedelta(seconds=specialValues_list['TIME'][i].hour * 3600 + specialValues_list['TIME'][i].minute * 60 + specialValues_list['TIME'][i].second) dates.append(specialValues_list['DATE'][i] + delta) elif 'DATE' in self.columns: dates = specialValues_list['DATE'] else: dates = specialValues_list['TIME'] if specialValues_cst['DATE']: validity = FieldValidity(date_time=dates[0]) else: validity = FieldValidity(date_time=dates) else: validity = FieldValidity() field = field_builder(structure=field_structure, fid={self.format:self.parameter}, geometry=geometry, validity=validity) field.setdata(numpy.array(values).reshape(1, len(values))) if as_points: field = field.as_points() return field
[docs] def writefield(self, field, parameter='UNKNOWN', fidkey_for_parameter=None, order='C', llprecision=config.GeoPoints_lonlat_precision, precision=config.GeoPoints_precision, col_width=config.GeoPoints_col_width, subzone=None): """ Write a field in the resource. :param field: a :class:`epygram.fields.D3Field` or a :class:`epygram.base.FieldSet` of :class:`epygram.fields.PointField`, or a dict {'lon':[...], 'lat':[...], 'value':[...], ...}. The keys of field that are not in the *columns* are ignored. The keys of the columns that are not in the field are filled by zeros. At least 'lon' and 'lat' are necessary. :param parameter: the name of the parameter, to be given if *field* is a dict and if it has not been given at opening. :param order: optional, for a rectangular D3Field, whether to flatten 2D arrays in 'C' (row-major) or 'F' (Fortran, column-major) order. :param llprecision: precision (number of digits) in write for lon/lat. :param precision: precision (number of digits) in write for other floats. :param subzone: optional, among ('C', 'CI'). Only if *field* is a LAM :class:`epygram.fields.H2DField`, extracts the points of the resp. C or C+I zone. Default is no subzone, i.e. the whole field. """ if self.openmode == 'r': raise IOError("cannot write field in a GeoPoints with openmode 'r'.") if not self.isopen: open_kwargs = {} columns = ['LAT', 'LON', 'LEVEL'] if isinstance(field, D3Field): if parameter == 'UNKNOWN': if fidkey_for_parameter is None: open_kwargs['parameter'] = str(field.fid.get(self.format, field.fid)) else: open_kwargs['parameter'] = str(field.fid[fidkey_for_parameter]) if field.validity.get() is not None: columns.extend(['DATE', 'TIME']) elif isinstance(field, FieldSet) or isinstance(field, PointField): if isinstance(field, PointField): field = FieldSet([field]) if parameter == 'UNKNOWN': if fidkey_for_parameter is None: open_kwargs['parameter'] = str(field[0].fid.get(self.format, field[0].fid)) else: open_kwargs['parameter'] = str(field[0].fid[fidkey_for_parameter]) if field[0].validity.get() is not None: columns.extend(['DATE', 'TIME']) elif isinstance(field, dict): if 'lat' not in field: raise epygramError("'lat' must be in field.keys().") if 'lon' not in field: raise epygramError("'lon' must be in field.keys().") if 'value' not in field: raise epygramError("'value' must be in field.keys().") for i in field.keys(): if i.upper() not in ('LAT', 'LON', 'VALUE'): columns.append(i.upper()) columns.append('VALUE') if self.columns is None: open_kwargs['columns'] = columns self.open(**open_kwargs) if isinstance(field, D3Field): (lons, lats) = field.geometry.get_lonlat_grid(nb_validities=len(field.validity), subzone=subzone, d4=True) if field.geometry.rectangular_grid: lons = lons.flatten(order=order) lats = lats.flatten(order=order) values = field.getdata(subzone=subzone, d4=True).flatten(order=order) if 'LEVEL' in self.columns: levels = field.geometry.get_levels(nb_validities=len(field.validity), subzone=subzone, d4=True).flatten(order=order) else: lons = lons.compressed() lats = lats.compressed() values = field.getdata(subzone=subzone, d4=True).compressed() if 'LEVEL' in self.columns: levels = field.geometry.get_levels(nb_validities=len(field.validity), subzone=subzone, d4=True).flatten(order=order) if 'DATE' in self.columns or 'TIME' in self.columns: date = field.validity.get(fmt='IntStr') if 'TIME' in self.columns: hour = date[8:10] if 'DATE' in self.columns: date = date[0:8] writebuffer = {'LON':lons, 'LAT':lats, 'VALUE':values} if 'LEVEL' in self.columns: # level = field.geometry.vcoordinate.get('level', 0) writebuffer['LEVEL'] = levels for i in self.columns: if i == 'DATE': writebuffer[i] = [date] * len(values) elif i == 'TIME': writebuffer[i] = [hour] * len(values) # elif i == 'LEVEL': # writebuffer[i] = [level] * len(values) # others to be implemented here elif isinstance(field, FieldSet) or isinstance(field, PointField): if isinstance(field, PointField): field = FieldSet([field]) writebuffer = {'LON':[], 'LAT':[], 'VALUE':[], 'DATE':[], 'TIME':[]} for pt in field: if not isinstance(pt, PointField): raise NotImplementedError("write a FieldSet of fields" + " that are not PointField.") writebuffer['LON'].append(pt.geometry.hlocation['lon'].get('degrees')) writebuffer['LAT'].append(pt.geometry.hlocation['lat'].get('degrees')) writebuffer['VALUE'].append(float(pt.getdata())) if 'DATE' in self.columns or 'TIME' in self.columns: date = pt.validity.get(fmt='IntStr') if 'TIME' in self.columns: writebuffer['TIME'].append(date[8:10]) if 'DATE' in self.columns: writebuffer['DATE'].append(date[0:8]) # others to be implemented here elif isinstance(field, dict): writebuffer = {k.upper():v for k, v in field.items()} else: raise NotImplementedError("*field* of that type.") for i in self.columns: if i not in writebuffer: writebuffer[i] = [0] * len(writebuffer['VALUE']) substruct = self.columns[2:] # WRITE for pt in range(len(writebuffer['VALUE'])): pointstr = ' {: .{precision}{type}} '.format(writebuffer['LAT'][pt], type='F', precision=llprecision) + \ ' {: .{precision}{type}} '.format(writebuffer['LON'][pt], type='F', precision=llprecision) for i in substruct: x = writebuffer[i][pt] if type(x) in (float, numpy.float64): x = ' {: .{precision}{type}} '.format(x, type='E', precision=precision) else: x = '{:^{width}}'.format(x, width=col_width) pointstr += x self._file.write(pointstr + "\n") if self.empty: self.empty = False
########### # pre-app # ########### @FileResource._openbeforedelayed def what(self, out=sys.stdout, **_): """ Writes in file-like a summary of the contents of the GeoPoints. :param out: the output open file-like object. """ points_number = self.countpoints() out.write("### FORMAT: " + self.format + "\n") out.write("\n") out.write("#####################" + "\n") out.write("# GeoPoints Summary #" + "\n") out.write("#####################" + "\n") out.write("Parameter (described by VALUE): " + self.parameter + "\n") out.write("Number of points : " + str(points_number) + "\n") out.write("Data/meta for each point : " + str(self.columns) + "\n") if 'structure' in self.footprint_attributes: out.write("Structure of Points as a Field: " + str(self.structure) + "\n")