Source code for epygram.LFA

#!/usr/bin/env python
# -*- coding: utf-8 -*-

__all__ = ['LFA']

import tempfile
import os
import shutil
import numpy

import footprints
from footprints import FPDict 

import LFA4py

import epygram
from epygram import epygramError, config, util, MiscField
from epygram.base import Resource



[docs]class LFA(Resource): """ Class implementing all specificities for LFA resource format. """ _footprint = dict( attr = dict( format = dict( values = set(['LFA']), default = 'LFA') ) ) def __init__(self, *args, **kwargs): """ Constructor. See its footprint for arguments. """ self.isopen = False super(LFA, self).__init__(*args, **kwargs) if not self.fmtdelayedopen: self.open()
[docs] def open(self): """ Opens the LFA in Fortran sense. """ if self.openmode in ('r', 'a'): # open, getting logical unit # use alias (shortened filename), in order to prevent filenames too long for the LFA software self.container.alias = tempfile.mkstemp(prefix='LFA.')[1] os.remove(self.container.alias) os.symlink(self.container.abspath, self.container.alias) if self.openmode == 'r': (self._unit) = LFA4py.wlfaouv(self.container.alias, 'R') elif self.openmode == 'a': (self._unit) = LFA4py.wlfaouv(self.container.alias, 'A') self.isopen = True self.empty = False elif self.openmode == 'w': # open self.container.alias = tempfile.mkstemp(prefix='FA.')[1] os.remove(self.container.alias) (self._unit) = LFA4py.wlfaouv(self.container.alias, 'W') self.isopen = True self.empty = True
[docs] def close(self): """ Closes a LFA properly. """ if self.isopen: try: LFA4py.wlfafer(self._unit) except Exception: raise IOError("closing "+self.container.abspath) self.isopen = False # Cleanings if self.openmode in ('r', 'a'): os.remove(self.container.alias) elif self.openmode == 'w': if self.empty: os.remove(self.container.alias) else: shutil.move(self.container.alias, self.container.abspath) ################ # ABOUT FIELDS # ################
[docs] def find_fields_in_resource(self, seed=None): """ Returns a list of the fields from resource whose name match the given *seed*. - *seed*: might be a regular expression, a list of regular expressions or *None*. If *None* (default), returns the list of all fields in resource. """ if seed == None: fieldslist = self.listfields() elif isinstance(seed, str): fieldslist = util.find_re_in_list(seed, self.listfields()) elif isinstance(seed, list): fieldslist = [] for s in seed: fieldslist += util.find_re_in_list(s, self.listfields()) if fieldslist == []: raise epygramError("no field matching '"+seed+"' was found in resource "+self.container.abspath) return fieldslist
@Resource._openbeforedelayed
[docs] def readfield(self, fieldname, getdata=True): """ Reads a field in resource. """ field = MiscField(fid=FPDict({'LFA':fieldname})) if getdata: (fieldtype, fieldlength) = LFA4py.wlfacas(self._unit, fieldname) if fieldtype[0] == 'R': (data, fieldlength) = LFA4py.wlfalecr(self._unit, fieldname, fieldlength) elif fieldtype[0] == 'I': (data, fieldlength) = LFA4py.wlfaleci(self._unit, fieldname, fieldlength) elif fieldtype[0] == 'C': separator = config.LFA_default_char_separator (datasplit, fieldlength) = LFA4py.wlfalecc(self._unit, fieldname, fieldlength, separator) datastring = '' for i in range(len(datasplit)): datastring += datasplit[i] data = datastring.split(separator)[0:fieldlength] field.setdata(numpy.array(data)) return field
@Resource._openbeforedelayed
[docs] def writefield(self, field): """ Writes a Field in resource. """ #TOBECHECKED: writing not tested if not isinstance(field, epygram.base.Field): raise epygramError("'field' argument has to be a epygram.base.Field.") data = numpy.array(field.data) if len(data.shape) != 1: raise epygramError("LFA can only hold 1D arrays.") if data.dtype[0:5] == 'float': LFA4py.wlfaecrr(self._unit, field.fid['LFA'], data) elif data.dtype[0:3] == 'int': LFA4py.wlfaecri(self._unit, field.fid['LFA'], data) elif data.dtype[0:3] == 'str': LFA4py.wlfaecrc(self._unit, field.fid['LFA'], data) #TOBECHECKED: do the transfer of string arrays works well in this way ? else: raise epygramError("LFA can only hold float, int or str arrays.")
[docs] def listfields(self): """ Returns a list containing the LFA identifiers of all the fields of the resource. """ return super(LFA, self).listfields()
@Resource._openbeforedelayed def _listfields(self): """ Returns a list containing the names of the fields in LFA. """ separator = config.LFA_default_char_separator (list_length, fl) = LFA4py.wlfalaft(self._unit, config.LFA_max_num_fields, separator) fieldstring = '' for i in range(len(fl)): fieldstring += fl[i] fieldslist = fieldstring.split(separator)[0:list_length] return fieldslist