Source code for epygram.formats.TIFFMF

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Contains the class to handle LFI format.
"""

__all__ = ['TIFFMF']

import datetime
import numpy

from footprints import FPDict, proxy as fpx

from epygram import config, epygramError, util, epylog
from epygram.util import Angle
from epygram.base import Resource, FieldSet, FieldValidity, Field
from epygram.fields import H2DField

from epygram.untied import pyexttiff

[docs]class TIFFMF(Resource): """ Class implementing all specificities for TIFFMF resource format. """ _footprint = dict( attr=dict( format=dict( values=set(['TIFFMF']), default='TIFFMF'), ) ) def __init__(self, *args, **kwargs): """ Constructor. See its footprint for arguments. """ self.isopen = False self.geometry = None self.validity = None self.scan = None super(TIFFMF, self).__init__(*args, **kwargs) if not self.fmtdelayedopen: self.open()
[docs] def open(self): """ Opens a TIFFMF, and initializes some attributes. """ # Opening if self.openmode in ('r'): self.tiff = pyexttiff.TiffFile(self.container.abspath, [(34974,)], method=2) try: self.tiff.IFDs[0].get_value(34974) except: raise epygramError('Tiff file is not a tiffmf file') self.isopen = True else: raise NotImplementedError("TIFFMF is only implmented for reading") # Reading of metadata if self.geometry == None : self._read_sections()
[docs] def close(self): """ Closes a TIFFMF file. """ if self.isopen: self.isopen = False # Cleanings self.tiff.close() del self.tiff ################ # ABOUT FIELDS # ################
[docs] def find_fields_in_resource(self, seed=None): """ Returns a list of the fields from resource whose identifier match the given seed. Args: \n - *seed*: might be a tuple of regular expressions, a list of regular expressions tuples or *None*. If *None* (default), returns the list of all fields in resource. """ fieldslist = [] if seed == None: fieldslist = self.listfields() elif isinstance(seed, str): fieldslist = util.find_re_in_list(seed, self.listfields()) elif isinstance(seed, list): fieldslist = [] for s in seed: fieldslist += util.find_re_in_list(s, self.listfields()) if fieldslist == []: raise epygramError("no field matching '" + str(seed) + "' was found in resource " + self.container.abspath) return fieldslist
[docs] def listfields(self): """ Returns a list containing the TIFFMF identifiers of all the fields of the resource. """ return super(TIFFMF, self).listfields()
@Resource._openbeforedelayed def _listfields(self): """ Actual listfields() method for TIFFMF. """ return [IFD.get_value(270) for IFD in self.tiff.IFDs]
[docs] def sortfields(self): """ Returns a sorted list of fields with regards to their name, as a dict of lists. """ mylist = self.listfields() # sort mylist.sort() outlists = {'Fields':mylist} return outlists
@Resource._openbeforedelayed
[docs] def readfield(self, fieldidentifier, getdata=True): """ Reads one field, given its identifier (tuple (LFI name, level)), and returns a Field instance. Interface to Fortran routines from 'ifsaux'. Args: \n - *fieldidentifier*: (LFI fieldname, level). - *getdata*: optional, if *False*, only metadata are read, the field do not contain data. Default is *True*. """ if self.openmode == 'w': raise epygramError("cannot read fields in resource with openmode == 'w'.") if type(fieldidentifier) != type(""): raise epygramError("fieldidentifier of a TIFFMF field is a string.") if fieldidentifier not in self.listfields(): raise epygramError("fieldidentifier is not found in th TIFFMF file.") geometry = fpx.geometry(**dict(structure='H2D', name=self.geometry.name, grid=FPDict(self.geometry.grid), projection=FPDict(self.geometry.projection), dimensions=FPDict(self.geometry.dimensions), position_on_grid=FPDict(self.geometry.position_on_grid) ) ) field = H2DField(fid=FPDict({self.format:fieldidentifier}), geometry=self.geometry, validity=self.validity.deepcopy(), processtype='observation', comment="") if getdata: if self.scan != 0: raise NotImplementedError("This scan mode is not implemented") for IFD in self.tiff.IFDs: if IFD.get_value(270) == fieldidentifier: data = IFD.get_image()[::-1, :] data = numpy.ma.masked_values(data, 0) field.setdata(data) return field
[docs] def readfields(self, requestedfields=None, getdata=True): """ Returns a :class:`epygram.base.FieldSet` containing requested fields read in the resource. Args: \n - *requestedfields*: might be \n - an expressions with regular expressions (e.g. '*CMS*') - a list of TIFFMF fields identifiers with regular expressions (e.g. ['*TIME*', '*CMS*']) - if not specified, interpretated as all fields that will be found in resource - *getdata*: optional, if *False*, only metadata are read, the fields do not contain data. Default is *True*. """ requestedfields = self.find_fields_in_resource(requestedfields) if requestedfields == []: raise epygramError("unable to find requested fields in resource.") return super(TIFFMF, self).readfields(requestedfields, getdata)
[docs] def writefield(self, field): """ Write a field in the resource. Args: \n - *field*: a :class:`epygram.base.Field` instance or :class:`epygram.H2DField`. """ if not isinstance(field, Field): raise epygramError("*field* must be a Field instance.") raise NotImplementedError("Writing of TIFFMF file is not implemented.")
[docs] def writefields(self, fieldset): """ Write the fields of the *fieldset* in the resource. Args: \n - *fieldset*: must be a :class:`epygram.base.FieldSet` instance. """ if not isinstance(fieldset, FieldSet): raise epygramError("'fieldset' argument must be of kind FieldSet.") if self.openmode == 'r': raise IOError("cannot write field in a LFI with openmode 'r'.") raise NotImplementedError("Writing of TIFFMF file is not implemented.") ########### # pre-app # ###########
@Resource._openbeforedelayed
[docs] def what(self, out, fieldsextrainfo=False, sortfields=False): """ Writes in file a summary of the contents of the TIFFMF. Args: \n - *out*: the output open file-like object (duck-typing: *out*.write() only is needed). - *sortfields*: **True** if the fields have to be sorted. """ firstcolumn_width = 50 secondcolumn_width = 16 sepline = '{:-^{width}}'.format('', width=firstcolumn_width + secondcolumn_width + 1) + '\n' listfields = self.listfields() first_H2DField = listfields[0] firstfield = self.readfield(first_H2DField, getdata=False) def write_formatted(dest, label, value): dest.write('{:<{width}}'.format(label, width=firstcolumn_width) \ + ':' \ + '{:>{width}}'.format(str(value), width=secondcolumn_width) \ + '\n') def write_formatted_col(dest, label, value): dest.write('{:>{width}}'.format(label, width=firstcolumn_width) \ + ':' \ + '{:>{width}}'.format(str(value), width=secondcolumn_width) \ + '\n') def write_formatted_fields(dest, label): dest.write('{:<{width}}'.format(label, width=20) \ + '\n') firstfield.what(out, vertical_geometry=False) out.write("######################\n") out.write("### LIST OF FIELDS ###\n") out.write("######################\n") if sortfields: sortedfields = self.sortfields() listoffields = [] for k in sorted(sortedfields.keys()): listoffields.append(k) listoffields.append('--------------------') listoffields.extend(sortedfields[k]) listoffields.append('--------------------') numfields = sum([len(v) for v in sortedfields.values()]) else: listoffields = listfields numfields = len(listfields) out.write("Number: " + str(numfields) + "\n") if fieldsextrainfo: write_formatted_fields(out, "Field name") else: write_formatted_fields(out, "Field name") out.write(sepline) done = [] for f in listoffields: if f not in done: if fieldsextrainfo: write_formatted_fields(out, f) else: write_formatted_fields(out, f) done.append(f) out.write(sepline) ############## # the LFI WAY # ##############
@Resource._openbeforedelayed def _read_sections(self): """ Reads sectrion 1 and section 2 in the TIIF tags. """ # MF tags reading IFD = self.tiff.IFDs[0].get_value(34974) typeImage = IFD.get_value(50002) subTypeImage = IFD.get_value(50003) date = IFD.get_value(50006) projection = IFD.get_value(50066) section1 = IFD.get_value(60000).view(dtype=self.tiff.dtypes.int32) headerSection2 = IFD.get_value(60001).view(dtype=self.tiff.dtypes.int32) bodySection2 = IFD.get_value(60002).view(dtype=self.tiff.dtypes.int32) # section decoding (lengthS1, version, centre, process, grid, flag, param, satellite, band, year, month, day, hour, minute, unit, P1, P2, timerange, included, missing, century, D) = section1 (lengthS2, NV, PV_PL, dataRepresenationType) = headerSection2 if projection == 1: #Polar streographic if dataRepresenationType != 5: raise epygramError("Projection does not match.") (Nx, Ny, La1, Lo1, res, LoV, Dx, Dy, pole, scan) = bodySection2 if typeImage == 7 and \ subTypeImage == 13 and \ (version, centre, process) == (1, 211, 220) and \ (param, satellite, band) == (127, 200, 33792) and \ (Nx, Ny) == (8192, 6144) and \ (La1, Lo1, Dx, Dy) == (43759, -76033, 1093, 1093) and \ LoV == 180000: #If tiffmf file is modified to put LoV=0. in the section 2, #R2 is no more able to put the image at the good location. # #So, it seems there is an error somewhere in this module, so we need to #reset LoV to 0. to have the good coordinates. #This must be corrected epylog.warning("LoV reset to 0. for this file.") LoV = 0 else: raise epygramError("Do we need to reset LoV to 0.? If you encounter this " + "error, please check if we need or not to reset " + "this parameter and modify the test above.") if res != 64: raise NotImplementedError('Only res=64 is implemented') Nx = int(Nx) Ny = int(Ny) lat_0 = 90 if pole == 0 else -90 m = (1. + numpy.copysign(1., lat_0) * numpy.sin(numpy.radians(lat_0))) / \ (1. + numpy.copysign(1., lat_0) * numpy.sin(numpy.radians(60))) grid = {'LAMzone':None, 'X_resolution':float(Dx) * m, 'Y_resolution':float(Dy) * m} projection = {'reference_lon':Angle(float(LoV) / 1000., 'degrees'), 'input_lon':Angle(float(Lo1) / 1000., 'degrees'), 'input_lat':Angle(float(La1) / 1000., 'degrees'), 'input_position':'ul', 'rotation': 0., 'reference_lat':Angle(lat_0, 'degrees')} dimensions = {'X':Nx, 'Y':Ny} geometryname = 'polar_stereographic' kwargs_geom = dict(structure='H2D', name=geometryname, grid=FPDict(grid), dimensions=FPDict(dimensions), projection=FPDict(projection), geoid=FPDict(config.default_geoid), position_on_grid=FPDict({'vertical':'mass', 'horizontal':'center'}) ) self.geometry = fpx.geometry(**kwargs_geom) elif projection == 3: #Mercator if dataRepresenationType != 1: raise epygramError("Projection does not match.") (Ni, Nj, La1, Lo1, res, La2, Lo2, Latin, scan, Di, Dj) = bodySection2 raise NotImplementedError("This projection is not implemented (mercator) but could easily be done.") elif projection == 11: #Space view if dataRepresenationType != 90: raise epygramError("Projection does not match.") (Nx, Ny, Lap, Lop, res, dx, dy, Xp, Yp, scan, ort, nr, Xo, Yo) = bodySection2 Lap = Angle(int(Lap) / 1000., 'degrees') Lop = Angle(int(Lop) / 1000., 'degrees') Nx = int(Nx) Ny = int(Ny) rmajor = 6378169.0 rminor = 6356583.8 h = int(nr) * rmajor / 1E6 - rmajor resolx = h * 2 * numpy.arcsin(1E6 / int(nr)) / int(dx) resoly = h * 2 * numpy.arcsin(1E6 / int(nr)) / int(dy) grid = {'LAMzone':None, 'X_resolution':resolx, 'Y_resolution':resoly} projection = {'satellite_lat':Lap, 'satellite_lon':Lop, 'satellite_height':h, 'input_lon':Lop, 'input_lat':Lap, #CAUTION: not center but sub-satellite point! 'input_position':'c', 'reference_lon':Lop, 'rotation': int(ort) / 1000.} dimensions = {'X':Nx, 'Y':Ny} geometryname = 'space_view' kwargs_geom = dict(structure='H2D', name=geometryname, grid=FPDict(grid), dimensions=FPDict(dimensions), projection=FPDict(projection), position_on_grid=FPDict({'vertical':'mass', 'horizontal':'center'}) ) geometry = fpx.geometry(**kwargs_geom) if scan == 0: centerPoint = geometry.ij2ll(int(Xo) - int(Xp) + Nx, int(Yp) - int(Yo)) else: raise NotImplementedError("Polar stereographic projection with scan != 0 is not implemented.") projection['input_lon'] = Angle(centerPoint[0], 'degrees') projection['input_lat'] = Angle(centerPoint[1], 'degrees') kwargs_geom['projection'] = FPDict(projection) self.geometry = fpx.geometry(**kwargs_geom) elif projection == 15: #Cylindric ? raise NotImplementedError("Cylindric projection is not implemented") (Ni, Nj, La1, Lo1, res, La2, Lo2, Di, Dj, scan) = bodySection2 else: # In case we need it: lambert would be (Nx, Ny, La1, Lo1, res, Lon0, Lat0, K0, # X0, Y0, Dx, Dy, pole, scan) = bodySection2 raise NotImplementedError("This projection is not implemented") self.scan = scan # Validity date = datetime.datetime(date[:2].view(dtype=self.tiff.dtypes.uint16), date[2], date[3], date[4], date[5]) date2 = datetime.datetime(year + 100 * (century - 1), month, day, hour, minute) if date != date2: raise epygramError("The two dates encoded in the tiff file must be the same.") self.validity = FieldValidity(basis=date, term=datetime.timedelta(hours=0))