Source code for epygram.formats.LFA

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Contains the class to handle LFA format.
"""

from __future__ import print_function, absolute_import, unicode_literals, division

import os
import numpy
import sys
import six

from arpifs4py import wlfa

import epygram
from epygram import epygramError, config, util
from epygram.resources import FileResource
from epygram.fields import MiscField

__all__ = ['LFA']


[docs]class LFA(FileResource): """Class implementing all specificities for LFA resource format.""" _footprint = dict( attr=dict( format=dict( values=set(['LFA']), default='LFA') ) ) def __init__(self, *args, **kwargs): self.isopen = False super(LFA, self).__init__(*args, **kwargs) if not wlfa.wlfatest(self.container.abspath): raise IOError("This resource is not a LFA one.") if not self.fmtdelayedopen: self.open()
[docs] def open(self, openmode=None): """ Opens the LFA in Fortran sense. :param openmode: optional, to open with a specific openmode, eventually different from the one specified at initialization. """ super(LFA, self).open(openmode=openmode) if self.openmode in ('r', 'a'): # open, getting logical unit if self.openmode == 'r': self._unit = wlfa.wlfaouv(self.container.abspath, 'R') elif self.openmode == 'a': self._unit = wlfa.wlfaouv(self.container.abspath, 'A') self.isopen = True self.empty = False elif self.openmode == 'w': # open self._unit = wlfa.wlfaouv(self.container.abspath, 'W') self.isopen = True self.empty = True
[docs] def close(self): """Closes a LFA properly.""" if self.isopen: try: wlfa.wlfafer(self._unit) except Exception: raise IOError("closing " + self.container.abspath) self.isopen = False # Cleanings if self.openmode == 'w' and self.empty: os.remove(self.container.abspath)
################ # ABOUT FIELDS # ################
[docs] def find_fields_in_resource(self, seed=None): """ Returns a list of the fields from resource whose name match the given *seed*. :param seed: might be a regular expression, a list of regular expressions or *None*. If *None* (default), returns the list of all fields in resource. """ if seed is None: fieldslist = self.listfields() elif isinstance(seed, six.string_types): fieldslist = util.find_re_in_list(seed, self.listfields()) elif isinstance(seed, list): fieldslist = [] for s in seed: fieldslist += util.find_re_in_list(s, self.listfields()) if fieldslist == []: raise epygramError("no field matching '" + seed + "' was found in resource " + self.container.abspath) return fieldslist
@FileResource._openbeforedelayed def readfield(self, fieldname, getdata=True): """ Reads a field in resource. :param fieldname: name of the field to be read :param getdata: if False, do not read the field data, only metadata. """ field = MiscField(fid={'LFA':fieldname}) if getdata: (fieldtype, fieldlength) = wlfa.wlfacas(self._unit, fieldname) if fieldtype[0] == 'R': (data, fieldlength) = wlfa.wlfalecr(self._unit, fieldname, fieldlength) elif fieldtype[0] == 'I': (data, fieldlength) = wlfa.wlfaleci(self._unit, fieldname, fieldlength) elif fieldtype[0] == 'C': (data, fieldlength) = wlfa.wlfalecc(self._unit, fieldname, fieldlength, config.LFA_maxstrlen) data = [data[i].strip() for i in range(fieldlength)] field.setdata(numpy.array(data)) return field @FileResource._openbeforedelayed def writefield(self, field): """Writes a Field in resource.""" # DEAD-END: we should not need to write with this comdemned format raise epygramError("writefield routine has not been tested..." + " If you need to, you might face problems...") if not isinstance(field, epygram.base.Field): raise epygramError("'field' argument has to be a" + " epygram.base.Field.") data = numpy.array(field.getdata()) if len(data.shape) != 1: raise epygramError("LFA can only hold 1D arrays.") if data.dtype[0:5] == 'float': wlfa.wlfaecrr(self._unit, field.fid['LFA'], data) elif data.dtype[0:3] == 'int': wlfa.wlfaecri(self._unit, field.fid['LFA'], data) elif data.dtype[0:3] == 'str': wlfa.wlfaecrc(self._unit, field.fid['LFA'], data) else: raise epygramError("LFA can only hold float, int or str arrays.")
[docs] def listfields(self): """ Returns a list containing the LFA identifiers of all the fields of the resource. """ return super(LFA, self).listfields()
@FileResource._openbeforedelayed def _listfields(self): """ Returns a list containing the names of the fields in LFA. """ (list_length, fieldslist) = wlfa.wlfalaft(self._unit, config.LFA_max_num_fields, config.LFA_maxstrlen) fieldslist = [fieldslist[i].strip() for i in range(list_length)] return fieldslist @FileResource._openbeforedelayed def what(self, out=sys.stdout, sortfields=False, **_): """ Writes in file a summary of the contents of the LFA. :param out: the output open file-like object :param sortfields: **True** if the fields have to be sorted by type. """ firstcolumn_width = 50 secondcolumn_width = 16 sepline = '{:-^{width}}'.format('', width=firstcolumn_width + secondcolumn_width + 1) + '\n' listoffields = self.listfields() if sortfields: listoffields.sort() # Write out out.write("### FORMAT: " + self.format + "\n") out.write("\n") out.write("######################\n") out.write("### LIST OF FIELDS ###\n") out.write("######################\n") numfields = len(listoffields) out.write("Number: " + str(numfields) + "\n") out.write(sepline) for f in listoffields: out.write(f + "\n") out.write(sepline)