#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
spectra:
This module contains:
- a class to handle variance spectrum;
- a function to compute DCT spectrum from a 2D field;
- a function to sort spectra with regards to their name;
- a function to plot a series of spectra.
"""
import numpy
import copy
from epygram.util import RecursiveObject
[docs]class Spectrum(RecursiveObject):
"""
A spectrum can be seen as a quantification of a signal's variance with
regards to scale.
If the signal is defined in physical space on N points, its spectral
representation will be a squared mean value (wavenumber 0) and variances for
N-1 wavenumbers.
For details and documentation, see
Denis et al. (2002) : 'Spectral Decomposition of Two-Dimensional
Atmospheric Fields on Limited-Area Domains
Using the Discrete Cosine Transform (DCT)'
"""
def __init__(self, variances, name=None, resolution=None, mean2=None):
"""
Args:
- *variances* being the variances of the spectrum, from wavenumber 1
to N-1.
- *name* is an optional name for the spectrum.
- *resolution* is an optional resolution for the field represented by
the spectrum. It is used to compute the according wavelengths.
- *mean2* is the optional mean^2 of the field, i.e. variance of
wavenumber 0 of the spectrum.
"""
self.variances = numpy.array(variances)
self.name = name
self.resolution = resolution
self.mean2 = mean2
@property
[docs] def wavenumbers(self):
"""Gets the wavenumbers of the spectrum."""
return numpy.arange(1, len(self.variances) + 1)
@property
[docs] def wavelengths(self):
"""Gets the wavelengths of the spectrum."""
K = len(self.variances) + 1
return numpy.array([2. * self.resolution * K / k
for k in self.wavenumbers])
[docs] def write(self, out):
"""
Writes the spectrum with formatted output in *out*.
*out* must be an output open file-like object
(*out*.write() only is needed).
"""
fieldnamelength = 16
length_k = 8
precision = 4
header = '{:<{width}}'.format("# k", width=length_k)
header += '{:^{width}}'.format("lambda", width=fieldnamelength + 2)
header += '{:^{width}}'.format("variance", width=fieldnamelength + 2)
out.write(header + '\n')
wn = [0, ]
wn.extend(self.wavenumbers)
wl = ['-']
wl.extend(self.wavelengths)
var = [self.mean2, ]
var.extend(self.variances)
for k in range(len(var)):
line = '{:>{width}}'.format(str(wn[k]), width=length_k)
if k == 0:
lambdastr = str(wl[k])
else:
lambdastr = '{:.{precision}{type}}'.format(wl[k], type='E',
precision=precision)
if var[k] == None:
valstr = '?'
else:
valstr = '{:.{precision}{type}}'.format(var[k], type='E',
precision=precision)
line += '{:^{width}}'.format(lambdastr, width=fieldnamelength + 2)
line += '{:^{width}}'.format(valstr, width=fieldnamelength + 2)
out.write(line + '\n')
[docs]def sort(spectra):
""" Sort a list of spectra with regards to their name. """
untied_spectra = copy.copy(spectra)
sortedspectra = []
for f in sorted([s.name for s in untied_spectra], reverse=True):
for s in range(len(untied_spectra)):
if untied_spectra[s].name == f:
sortedspectra.append(untied_spectra.pop(s))
break
return sortedspectra
[docs]def dctspectrum(x, log=None, verbose=False):
"""
Function *dctspectrum* takes a 2D-array as argument and returns its 1D
DCT ellipse spectrum.
For details and documentation, see
Denis et al. (2002) : 'Spectral Decomposition of Two-Dimensional
Atmospheric Fields on Limited-Area Domains Using
the Discrete Cosine Transform (DCT).'
*log* is an optional logging.Logger instance to which print info
in *verbose* case.
"""
import scipy.fftpack as tfm
# compute transform
if log != None and verbose:
log.info("dctspectrum: compute DCT transform...")
norm = 'ortho' # None
y = tfm.dct(tfm.dct(x, norm=norm, axis=0), norm=norm, axis=1)
# compute spectrum
if log != None and verbose:
log.info("dctspectrum: compute variance spectrum...")
N, M = y.shape
N2 = N ** 2
M2 = M ** 2
MN = M * N
K = min(M, N)
variance = numpy.zeros(K)
variance[0] = y[0, 0] ** 2 / MN
for j in range(0, N):
j2 = float(j) ** 2
for i in range(0, M):
var = y[j, i] ** 2 / MN
k = numpy.sqrt(float(i) ** 2 / M2 + j2 / N2) * K
k_inf = int(numpy.floor(k))
k_sup = k_inf + 1
weightsup = k - k_inf
weightinf = 1.0 - weightsup
if 0 <= k < 1:
variance[1] += weightsup * var
if 1 <= k < K - 1:
variance[k_inf] += weightinf * var
variance[k_sup] += weightsup * var
if K - 1 <= k < K:
variance[k_inf] += weightinf * var
return variance
[docs]def plotspectra(spectra,
slopes=[{'exp':-3, 'offset':1, 'label':'-3'},
{'exp':-5. / 3., 'offset':1, 'label':'-5/3'}],
zoom=None,
unit='SI',
title=None):
"""
To plot a series of spectra.
Args:\n
- spectra = a Spectrum instance or a list of.
- unit: string accepting LaTeX-mathematical syntaxes
- slopes = list of dict(
- exp=x where x is exposant of a A*k**-x slope
- offset=A where A is logscale offset in a A*k**-x slope;
a offset=1 is fitted to intercept the first spectra at wavenumber = 2
- label=(optional label) appearing 'k = label' in legend)
- zoom = dict(xmin=,xmax=,ymin=,ymax=)
- title = string for title
"""
import matplotlib.pyplot as plt
plt.rc('font', family='serif')
if isinstance(spectra, Spectrum):
spectra = [spectra]
# prepare dimensions
window = dict()
window['ymin'] = min([min(s.variances) for s in spectra]) / 10
window['ymax'] = max([max(s.variances) for s in spectra]) * 10
window['xmax'] = max([max(s.wavelengths) for s in spectra]) * 1.5
window['xmin'] = min([min(s.wavelengths) for s in spectra]) * 0.8
if zoom != None:
for k, v in zoom.items():
window[k] = v
x1 = window['xmax']
x2 = window['xmin']
# colors and linestyles
colors = ['red', 'blue', 'green', 'orange', 'magenta', 'darkolivegreen',
'yellow', 'salmon', 'black']
linestyles = ['-', '--', '-.', ':']
# axes
fig, ax = plt.subplots()
if title != None : ax.set_title(title)
ax.set_yscale('log')
ax.set_ylim(window['ymin'], window['ymax'])
ax.set_xscale('log')
ax.set_xlim(window['xmax'], window['xmin'])
ax.grid()
ax.set_xlabel('wavelength ($km$)')
ax.set_ylabel(r'variance spectrum ($' + unit + '$)')
# plot slopes
# we take the second wavenumber (of first spectrum) as intercept, because
# it is often better fitted with spectrum than the first one
x_intercept = spectra[0].wavelengths[1]
y_intercept = spectra[0].variances[1]
i = 0
for slope in slopes:
# a slope is defined by y = A * k**-s and we plot it with
# two points y1, y2
try:
label = slope['label']
except KeyError:
# label = str(Fraction(slope['exp']).limit_denominator(10))
label = str(slope['exp'])
# because we plot w/r to wavelength, as opposed to wavenumber
s = -slope['exp']
A = y_intercept * x_intercept ** (-s) * slope['offset']
y1 = A * x1 ** s
y2 = A * x2 ** s
ax.plot([x1, x2], [y1, y2], color='0.7',
linestyle=linestyles[i % len(linestyles)],
label=r'$k^{' + label + '}$')
i += 1
# plot spectra
i = 0
for s in spectra:
ax.plot(s.wavelengths, s.variances, color=colors[i % len(colors)],
linestyle=linestyles[i // len(colors)], label=s.name)
i += 1
# legend
legend = ax.legend(loc='lower left', shadow=True)
for label in legend.get_texts():
label.set_fontsize('medium')
return fig