Source code for epygram.formats.GeoPoints

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
GeoPoints:

Contains the class for GeoPoints format.
"""

__all__ = ['GeoPoints']

import datetime
import numpy

from footprints import FPDict, FPList, proxy as fpx

from epygram import config, epygramError, epylog
from epygram.base import Resource, FieldSet, FieldValidity
from epygram.util import Angle
from epygram.geometries import PointGeometry
from epygram.fields import H2DField, PointField



[docs]class GeoPoints(Resource): """ Class implementing all specificities for GeoPoints resource format. Format: <beginning of file> #GEO #PARAMETER = any name or character string #LAT LON DATE TIME VALUE #DATA 32.2671 -12.5501 20140417 21 2.269112E+02 ... <till end of file> The set of keys describing each point is defined in its 'structure' attribute. """ _footprint = dict( attr=dict( format=dict( values=set(['GeoPoints']), default='GeoPoints'), parameter=dict( optional=True, info="The name of the parameter whose data is in the *value*\ field."), structure=dict( type=FPList, optional=True, info="The structure of the geopoints, i.e. the set of keys\ describing each point.") ) ) def __init__(self, *args, **kwargs): """ Constructor. See its footprint for arguments. """ self.isopen = False super(GeoPoints, self).__init__(*args, **kwargs) if not self.fmtdelayedopen: self.open()
[docs] def open(self, parameter=None, structure=None, openmode=None): """ Opens a GeoPoint. Args:\n - *parameter*: name of the parameter in header (openmode='w'). - *structure*: list of all the items to be written for each point\ (openmode='w'). - *openmode*: optional, to open with a specific openmode, eventually\ different from the one specified at initialization. """ super(GeoPoints, self).open(openmode=openmode) if self.openmode in ('r', 'a'): self._file = open(self.container.abspath, 'r') isgeo = self._file.readline() if isgeo[0:4] != '#GEO': raise IOError("this resource is not a GeoPoints one.") parameter = self._file.readline() self._attributes['parameter'] = parameter.split('=')[1][:-1].strip() structure = self._file.readline() self._attributes['structure'] = structure[1:].split() self._file.readline() if self.openmode == 'a': self._file.close() self._file = open(self.container.abspath, self.openmode) self.isopen = True self.empty = False elif self.openmode == 'w': self.empty = True if parameter != None and self.parameter == None: self._attributes['parameter'] = parameter if structure != None and self.structure == None: self._attributes['structure'] = structure if self.parameter != None and self.structure != None: self._file = open(self.container.abspath, self.openmode) self._file.write('#GEO\n') self._file.write('#PARAMETER = ' + self.parameter + '\n') self._file.write('#') for i in self.structure: self._file.write(i + ' ') self._file.write('\n') self._file.write('#DATA\n') self.isopen = True else: epylog.debug("cannot open GeoPoints in 'w' mode without\ attributes *parameter* and *structure* set.\ Not open yet.")
[docs] def close(self): """ Closes a GeoPoints. """ if hasattr(self, '_file'): self._file.close() self.isopen = False ################ # ABOUT POINTS # ################
[docs] def countpoints(self): """ Counts and returns the number of points in the resource. """ if self.isopen: self._file.close() self.isopen = False wasopen = True n = sum(1 for _ in open(self.container.abspath)) - 4 if wasopen: if self.openmode == 'w' and not self.empty: self._file = open(self.container.abspath, 'a') self.isopen = True else: self._file = open(self.container.abspath, self.openmode) self.isopen = True if self.openmode == 'r': for _ in range(4): self._file.readline() return n
@Resource._openbeforedelayed
[docs] def readfield(self, as_dict=False, footprints_builder=False): """ Reads the GeoPoints, and returns:\n - a FieldSet of PointField if not *as_dict*, - a dict of the GeoPoints structure (self.structure) {'lon':[...], 'lat':[...], 'value':[...], ...} if *as_dict*. - *footprints_builder*: if *True*, uses footprints.proxy to build fields. Defaults to False for performance reasons. """ if self.openmode in ('w', 'a'): raise epygramError("cannot read fields in resource if with\ openmode == 'w' or 'a'.") elif not self.isopen: self.open() if footprints_builder: field_builder = fpx.field geom_builder = fpx.geometry else: field_builder = PointField geom_builder = PointGeometry if not as_dict: toreturn = FieldSet() else: toreturn = {} for i in self.structure: toreturn[i.lower()] = [] while True: pointvals = self._file.readline().split() if pointvals == []: break point = dict(zip(self.structure, pointvals)) if not as_dict: hlocation = {'lon':Angle(float(point['LON']), unit='degrees'), 'lat':Angle(float(point['LAT']), unit='degrees')} geometry = geom_builder(structure='Point', hlocation=FPDict(hlocation)) if 'DATE' in self.structure and 'TIME' in self.structure: dt = datetime.datetime(int(point['DATE'][0:4]), int(point['DATE'][4:6]), int(point['DATE'][6:8]), int(point['TIME'])) validity = FieldValidity(date_time=dt) pointfield = field_builder(fid=FPDict({'GeoPoints':self.parameter}), geometry=geometry, validity=validity) pointfield.setdata(float(point['VALUE'])) toreturn.append(pointfield) else: for i in self.structure: if i in ('LEVEL', 'DATE', 'TIME'): toreturn[i.lower()].append(int(point[i])) else: toreturn[i.lower()].append(float(point[i])) self.close() return toreturn
[docs] def writefield(self, field, parameter='UNKNOWN', order='C', llprecision=config.GeoPoints_lonlat_precision, precision=config.GeoPoints_precision, col_width=config.GeoPoints_col_width, subzone=None): """ Write a field in the resource. Args: \n - *field*: a :class:`epygram.fields.H2DField` or a :class:`epygram.base.FieldSet` of :class:`epygram.fields.PointField`, or a dict {'lon':[...], 'lat':[...], 'value':[...], ...}. The keys of field that are not in the *structure* are ignored. The keys of the structure that are not in the field are filled by zeros. At least 'lon' and 'lat' are necessary. - *parameter*: the name of the parameter, to be given if *field* is a dict and if it has not been given at opening. - *order*: optional, for a rectangular H2DField, whether to flatten 2D arrays in 'C' (row-major) or 'F' (Fortran, column-major) order. - *llprecision*: precision (number of digits) in write for lon/lat. - *precision*: precision (number of digits) in write for other floats. - *subzone*: optional, among ('C', 'CI'). Only if *field* is a LAM :class:`epygram.fields.H2DField`, extracts the points of the resp. C or C+I zone. Default is no subzone, i.e. the whole field. """ if self.openmode == 'r': raise IOError("cannot write field in a GeoPoints with openmode 'r'.") if not self.isopen: structure = ['LAT', 'LON'] if isinstance(field, H2DField): parameter = field.fid[field.fid.keys()[0]] if field.validity.get() != None: structure.extend(['DATE', 'TIME']) elif isinstance(field, FieldSet) or isinstance(field, PointField): if isinstance(field, PointField): field = FieldSet([field]) parameter = field[0].fid[field[0].fid.keys()[0]] if field[0].validity.get() != None: structure.extend(['DATE', 'TIME']) elif isinstance(field, dict): if not 'lat' in field.keys(): raise epygramError("'lat' must be in field.keys().") if not 'lon' in field.keys(): raise epygramError("'lon' must be in field.keys().") if not 'value' in field.keys(): raise epygramError("'value' must be in field.keys().") for i in field.keys(): if i.upper() not in ('LAT', 'LON', 'VALUE'): structure.append(i.upper()) structure.append('VALUE') self.open(parameter=str(parameter), structure=structure) if isinstance(field, H2DField): (lons, lats) = field.geometry.get_lonlat_grid(subzone=subzone) if field.geometry.rectangular_grid: lons = lons.flatten(order=order) lats = lats.flatten(order=order) values = field.getdata(subzone=subzone).flatten(order=order) else: lons = lons.compressed() lats = lats.compressed() values = field.getdata(subzone=subzone).compressed() if 'DATE' in self.structure or 'TIME' in self.structure: date = field.validity.get('IntStr') if 'TIME' in self.structure: hour = int(date[8:10]) if 'DATE' in self.structure: date = int(date[0:8]) if 'LEVEL' in self.structure: try: level = field.geometry.vcoordinate['Z'] except KeyError: level = 0 writebuffer = {'LON':lons, 'LAT':lats, 'VALUE':values} for i in self.structure: if i == 'DATE': writebuffer[i] = [date] * len(values) elif i == 'TIME': writebuffer[i] = [hour] * len(values) elif i == 'LEVEL': writebuffer[i] = [level] * len(values) # others to be implemented here elif isinstance(field, FieldSet) or isinstance(field, PointField): if isinstance(field, PointField): field = FieldSet([field]) writebuffer = {'LON':[], 'LAT':[], 'VALUE':[], 'DATE':[], 'TIME':[]} for pt in field: if not isinstance(pt, PointField): raise NotImplementedError("write a FieldSet of fields that\ are not PointField.") writebuffer['LON'].append(pt.geometry.hlocation['lon'].get('degrees')) writebuffer['LAT'].append(pt.geometry.hlocation['lat'].get('degrees')) writebuffer['VALUE'].append(float(pt.data)) if 'DATE' in self.structure or 'TIME' in self.structure: date = pt.validity.get('IntStr') if 'TIME' in self.structure: writebuffer['TIME'].append(int(date[8:10])) if 'DATE' in self.structure: writebuffer['DATE'].append(int(date[0:8])) # others to be implemented here elif isinstance(field, dict): writebuffer = {k.upper():v for k, v in field.items()} else: raise NotImplementedError("*field* of that type.") for i in self.structure: if i not in writebuffer.keys(): writebuffer[i] = [0] * len(writebuffer['VALUE']) substruct = self.structure[2:] for pt in range(len(writebuffer['VALUE'])): pointstr = ' {: .{precision}{type}} '.format(writebuffer['LAT'][pt], type='F', precision=llprecision)\ + ' {: .{precision}{type}} '.format(writebuffer['LON'][pt], type='F', precision=llprecision) for i in substruct: x = writebuffer[i][pt] if type(x) in (float, numpy.float64): x = ' {: .{precision}{type}} '.format(x, type='E', precision=precision) else: x = '{:^{width}}'.format(x, width=col_width) pointstr += x self._file.write(pointstr + "\n") if self.empty: self.empty = False ########### # pre-app # ###########
@Resource._openbeforedelayed
[docs] def what(self, out): """ Writes in file-like a summary of the contents of the GeoPoints. Args: \n - *out*: the output open file-like object (duck-typing: *out*.write() only is needed). """ points_number = self.countpoints() out.write("#####################" + "\n") out.write("# GeoPoints Summary #" + "\n") out.write("#####################" + "\n") out.write("Parameter (described by VALUE): " + self.parameter + "\n") out.write("Number of points : " + str(points_number) + "\n") out.write("Data/meta for each point : " + str(self.structure) + "\n")