Source code for epygram.spectra

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
spectra:

This module contains:
- a class to handle variance spectrum;
- a function to compute DCT spectrum from a 2D field;
- a function to sort spectra with regards to their name;
- a function to plot a series of spectra.
"""

import numpy
import copy

from epygram.util import RecursiveObject



[docs]class Spectrum(RecursiveObject): """ A spectrum can be seen as a quantification of a signal's variance with regards to scale. If the signal is defined in physical space on N points, its spectral representation will be a squared mean value (wavenumber 0) and variances for N-1 wavenumbers. For details and documentation, see Denis et al. (2002) : 'Spectral Decomposition of Two-Dimensional Atmospheric Fields on Limited-Area Domains Using the Discrete Cosine Transform (DCT)' """ def __init__(self, variances, name=None, resolution=None, mean2=None): """ Args: - *variances* being the variances of the spectrum, from wavenumber 1 to N-1. - *name* is an optional name for the spectrum. - *resolution* is an optional resolution for the field represented by the spectrum. It is used to compute the according wavelengths. - *mean2* is the optional mean^2 of the field, i.e. variance of wavenumber 0 of the spectrum. """ self.variances = numpy.array(variances) self.name = name self.resolution = resolution self.mean2 = mean2 @property
[docs] def wavenumbers(self): """Gets the wavenumbers of the spectrum.""" return numpy.arange(1, len(self.variances) + 1)
@property
[docs] def wavelengths(self): """Gets the wavelengths of the spectrum.""" K = len(self.variances) + 1 return numpy.array([2. * self.resolution * K / k for k in self.wavenumbers])
[docs] def write(self, out): """ Writes the spectrum with formatted output in *out*. *out* must be an output open file-like object (*out*.write() only is needed). """ fieldnamelength = 16 length_k = 8 precision = 4 header = '{:<{width}}'.format("# k", width=length_k) header += '{:^{width}}'.format("lambda", width=fieldnamelength + 2) header += '{:^{width}}'.format("variance", width=fieldnamelength + 2) out.write(header + '\n') wn = [0, ] wn.extend(self.wavenumbers) wl = ['-'] wl.extend(self.wavelengths) var = [self.mean2, ] var.extend(self.variances) for k in range(len(var)): line = '{:>{width}}'.format(str(wn[k]), width=length_k) if k == 0: lambdastr = str(wl[k]) else: lambdastr = '{:.{precision}{type}}'.format(wl[k], type='E', precision=precision) if var[k] == None: valstr = '?' else: valstr = '{:.{precision}{type}}'.format(var[k], type='E', precision=precision) line += '{:^{width}}'.format(lambdastr, width=fieldnamelength + 2) line += '{:^{width}}'.format(valstr, width=fieldnamelength + 2) out.write(line + '\n')
[docs]def sort(spectra): """ Sort a list of spectra with regards to their name. """ untied_spectra = copy.copy(spectra) sortedspectra = [] for f in sorted([s.name for s in untied_spectra], reverse=True): for s in range(len(untied_spectra)): if untied_spectra[s].name == f: sortedspectra.append(untied_spectra.pop(s)) break return sortedspectra
[docs]def dctspectrum(x, log=None, verbose=False): """ Function *dctspectrum* takes a 2D-array as argument and returns its 1D DCT ellipse spectrum. For details and documentation, see Denis et al. (2002) : 'Spectral Decomposition of Two-Dimensional Atmospheric Fields on Limited-Area Domains Using the Discrete Cosine Transform (DCT).' *log* is an optional logging.Logger instance to which print info in *verbose* case. """ import scipy.fftpack as tfm # compute transform if log != None and verbose: log.info("dctspectrum: compute DCT transform...") norm = 'ortho' # None y = tfm.dct(tfm.dct(x, norm=norm, axis=0), norm=norm, axis=1) # compute spectrum if log != None and verbose: log.info("dctspectrum: compute variance spectrum...") N, M = y.shape N2 = N ** 2 M2 = M ** 2 MN = M * N K = min(M, N) variance = numpy.zeros(K) variance[0] = y[0, 0] ** 2 / MN for j in range(0, N): j2 = float(j) ** 2 for i in range(0, M): var = y[j, i] ** 2 / MN k = numpy.sqrt(float(i) ** 2 / M2 + j2 / N2) * K k_inf = int(numpy.floor(k)) k_sup = k_inf + 1 weightsup = k - k_inf weightinf = 1.0 - weightsup if 0 <= k < 1: variance[1] += weightsup * var if 1 <= k < K - 1: variance[k_inf] += weightinf * var variance[k_sup] += weightsup * var if K - 1 <= k < K: variance[k_inf] += weightinf * var return variance
[docs]def plotspectra(spectra, slopes=[{'exp':-3, 'offset':1, 'label':'-3'}, {'exp':-5. / 3., 'offset':1, 'label':'-5/3'}], zoom=None, unit='SI', title=None): """ To plot a series of spectra. Args:\n - spectra = a Spectrum instance or a list of. - unit: string accepting LaTeX-mathematical syntaxes - slopes = list of dict( - exp=x where x is exposant of a A*k**-x slope - offset=A where A is logscale offset in a A*k**-x slope; a offset=1 is fitted to intercept the first spectra at wavenumber = 2 - label=(optional label) appearing 'k = label' in legend) - zoom = dict(xmin=,xmax=,ymin=,ymax=) - title = string for title """ import matplotlib.pyplot as plt plt.rc('font', family='serif') if isinstance(spectra, Spectrum): spectra = [spectra] # prepare dimensions window = dict() window['ymin'] = min([min(s.variances) for s in spectra]) / 10 window['ymax'] = max([max(s.variances) for s in spectra]) * 10 window['xmax'] = max([max(s.wavelengths) for s in spectra]) * 1.5 window['xmin'] = min([min(s.wavelengths) for s in spectra]) * 0.8 if zoom != None: for k, v in zoom.items(): window[k] = v x1 = window['xmax'] x2 = window['xmin'] # colors and linestyles colors = ['red', 'blue', 'green', 'orange', 'magenta', 'darkolivegreen', 'yellow', 'salmon', 'black'] linestyles = ['-', '--', '-.', ':'] # axes fig, ax = plt.subplots() if title != None : ax.set_title(title) ax.set_yscale('log') ax.set_ylim(window['ymin'], window['ymax']) ax.set_xscale('log') ax.set_xlim(window['xmax'], window['xmin']) ax.grid() ax.set_xlabel('wavelength ($km$)') ax.set_ylabel(r'variance spectrum ($' + unit + '$)') # plot slopes # we take the second wavenumber (of first spectrum) as intercept, because # it is often better fitted with spectrum than the first one x_intercept = spectra[0].wavelengths[1] y_intercept = spectra[0].variances[1] i = 0 for slope in slopes: # a slope is defined by y = A * k**-s and we plot it with # two points y1, y2 try: label = slope['label'] except KeyError: # label = str(Fraction(slope['exp']).limit_denominator(10)) label = str(slope['exp']) # because we plot w/r to wavelength, as opposed to wavenumber s = -slope['exp'] A = y_intercept * x_intercept ** (-s) * slope['offset'] y1 = A * x1 ** s y2 = A * x2 ** s ax.plot([x1, x2], [y1, y2], color='0.7', linestyle=linestyles[i % len(linestyles)], label=r'$k^{' + label + '}$') i += 1 # plot spectra i = 0 for s in spectra: ax.plot(s.wavelengths, s.variances, color=colors[i % len(colors)], linestyle=linestyles[i // len(colors)], label=s.name) i += 1 # legend legend = ax.legend(loc='lower left', shadow=True) for label in legend.get_texts(): label.set_fontsize('medium') return fig