#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Contains the class to handle LFA format.
"""
from __future__ import print_function, absolute_import, unicode_literals, division
import os
import numpy
import sys
import six
from arpifs4py import wlfa
import epygram
from epygram import epygramError, config, util
from epygram.resources import FileResource
from epygram.fields import MiscField
__all__ = ['LFA']
[docs]class LFA(FileResource):
"""Class implementing all specificities for LFA resource format."""
_footprint = dict(
attr=dict(
format=dict(
values=set(['LFA']),
default='LFA')
)
)
def __init__(self, *args, **kwargs):
self.isopen = False
super(LFA, self).__init__(*args, **kwargs)
if not wlfa.wlfatest(self.container.abspath):
raise IOError("This resource is not a LFA one.")
if not self.fmtdelayedopen:
self.open()
[docs] def open(self, openmode=None):
"""
Opens the LFA in Fortran sense.
:param openmode: optional, to open with a specific openmode, eventually
different from the one specified at initialization.
"""
super(LFA, self).open(openmode=openmode)
if self.openmode in ('r', 'a'):
# open, getting logical unit
if self.openmode == 'r':
self._unit = wlfa.wlfaouv(self.container.abspath, 'R')
elif self.openmode == 'a':
self._unit = wlfa.wlfaouv(self.container.abspath, 'A')
self.isopen = True
self.empty = False
elif self.openmode == 'w':
# open
self._unit = wlfa.wlfaouv(self.container.abspath, 'W')
self.isopen = True
self.empty = True
[docs] def close(self):
"""Closes a LFA properly."""
if self.isopen:
try:
wlfa.wlfafer(self._unit)
except Exception:
raise IOError("closing " + self.container.abspath)
self.isopen = False
# Cleanings
if self.openmode == 'w' and self.empty:
os.remove(self.container.abspath)
################
# ABOUT FIELDS #
################
[docs] def find_fields_in_resource(self, seed=None):
"""
Returns a list of the fields from resource whose name match the given
*seed*.
:param seed: might be a regular expression, a list of regular expressions
or *None*. If *None* (default), returns the list of all fields in
resource.
"""
if seed is None:
fieldslist = self.listfields()
elif isinstance(seed, six.string_types):
fieldslist = util.find_re_in_list(seed, self.listfields())
elif isinstance(seed, list):
fieldslist = []
for s in seed:
fieldslist += util.find_re_in_list(s, self.listfields())
if fieldslist == []:
raise epygramError("no field matching '" + seed +
"' was found in resource " +
self.container.abspath)
return fieldslist
@FileResource._openbeforedelayed
def readfield(self, fieldname, getdata=True):
"""
Reads a field in resource.
:param fieldname: name of the field to be read
:param getdata: if False, do not read the field data, only metadata.
"""
field = MiscField(fid={'LFA':fieldname})
if getdata:
(fieldtype, fieldlength) = wlfa.wlfacas(self._unit, fieldname)
if fieldtype[0] == 'R':
(data, fieldlength) = wlfa.wlfalecr(self._unit, fieldname,
fieldlength)
elif fieldtype[0] == 'I':
(data, fieldlength) = wlfa.wlfaleci(self._unit, fieldname,
fieldlength)
elif fieldtype[0] == 'C':
(data, fieldlength) = wlfa.wlfalecc(self._unit, fieldname,
fieldlength,
config.LFA_maxstrlen)
data = [data[i].strip() for i in range(fieldlength)]
field.setdata(numpy.array(data))
return field
@FileResource._openbeforedelayed
def writefield(self, field):
"""Writes a Field in resource."""
# DEAD-END: we should not need to write with this comdemned format
raise epygramError("writefield routine has not been tested..." +
" If you need to, you might face problems...")
if not isinstance(field, epygram.base.Field):
raise epygramError("'field' argument has to be a" +
" epygram.base.Field.")
data = numpy.array(field.getdata())
if len(data.shape) != 1:
raise epygramError("LFA can only hold 1D arrays.")
if data.dtype[0:5] == 'float':
wlfa.wlfaecrr(self._unit, field.fid['LFA'], data)
elif data.dtype[0:3] == 'int':
wlfa.wlfaecri(self._unit, field.fid['LFA'], data)
elif data.dtype[0:3] == 'str':
wlfa.wlfaecrc(self._unit, field.fid['LFA'], data)
else:
raise epygramError("LFA can only hold float, int or str arrays.")
[docs] def listfields(self):
"""
Returns a list containing the LFA identifiers of all the fields of
the resource.
"""
return super(LFA, self).listfields()
@FileResource._openbeforedelayed
def _listfields(self):
"""
Returns a list containing the names of the fields in LFA.
"""
(list_length, fieldslist) = wlfa.wlfalaft(self._unit,
config.LFA_max_num_fields,
config.LFA_maxstrlen)
fieldslist = [fieldslist[i].strip() for i in range(list_length)]
return fieldslist
@FileResource._openbeforedelayed
def what(self, out=sys.stdout,
sortfields=False,
**_):
"""
Writes in file a summary of the contents of the LFA.
:param out: the output open file-like object
:param sortfields: **True** if the fields have to be sorted by type.
"""
firstcolumn_width = 50
secondcolumn_width = 16
sepline = '{:-^{width}}'.format('', width=firstcolumn_width +
secondcolumn_width + 1) + '\n'
listoffields = self.listfields()
if sortfields:
listoffields.sort()
# Write out
out.write("### FORMAT: " + self.format + "\n")
out.write("\n")
out.write("######################\n")
out.write("### LIST OF FIELDS ###\n")
out.write("######################\n")
numfields = len(listoffields)
out.write("Number: " + str(numfields) + "\n")
out.write(sepline)
for f in listoffields:
out.write(f + "\n")
out.write(sepline)