#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Package **vgrid**:
Contains handling of Hybrid-Pressure vertical grid generation and plotting.
"""
from __future__ import print_function, absolute_import, unicode_literals, division
import io
import os
import re
import six
from collections import OrderedDict
import itertools
try:
import bokeh
from bokeh import models as bkm
from bokeh import io as bkio
from bokeh import plotting as bkp
from bokeh import core as bkc
from bokeh.palettes import Category10
except ImportError:
raise ImportError('Seems like bokeh is not installed: install locally with "pip install --user bokeh" !')
from bronx.datagrip import namelist
from epygram.geometries.VGeometry import VGeometry
from epygram.profiles import hybridP2fluxpressure, flux2masspressures
MKVGRID_BINARY = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'vertical_discretization',
'mkvgrid.x')
[docs]class HybridPressureVGrid(object):
"""Handle and represent information about a Hybrid-Pressure Vertical Grid."""
_re_grid_labels = '\s+'.join(
['(ILa)',
'(A)', '(B)',
'(Sigma)', '(1 - Sigma)', '(Rap Si-Hy)',
'(Prehyd\(lbar\))', '(Prehyd\(l\))', '(\[D Prehyd\]\(l\))',
'(Alti\(l\))', '(Alti\(lbar\))', '(\[D Alti\]\(l\))'])
colors = itertools.cycle(Category10[10])
complete_tooltips = [('level', "@level"),
('p', "@p{0.2f} hPa"),
('z', "@z{0.2f} m"),
('top', "@psup{0.2f} hPa | @zsup{0.2f} m"),
('bottom', "@pinf{0.2f} hPa | @zinf{0.2f} m"),
('thickness', "@pthickness{0.2f} hPa | @zthickness{0.2f} m")]
@classmethod
def next_color(cls):
return bkc.properties.value(next(cls.colors))
def __init__(self, source,
vertical_mean=None,
reference_pressure=101325.,
ptop=0.,
vgrid_name=None):
"""
Build the grid object from a **source**, which may be
- an infosup file from ``mkvgrid.x``
- an epygram Hybrid-Pressure VGeometry; in which case the below
arguments must/may be provided:
Case of an epygram Hybrid-Pressure VGeometry **source**:
:param vertical_mean: mandatory, among ('geometric', 'arithmetic', 'LAPRXPK=False')
:param reference_pressure: at the surface, for building a standard atmosphere
:param ptop: pressure at the top of the model (upper boundary of the upper layer)
:param vgrid_name: name of the grid, for plot/saving purpose
"""
if isinstance(source, six.string_types):
self.name = os.path.basename(source).replace('.infosup', '')
self._parse_infosup(source)
elif isinstance(source, VGeometry):
assert vertical_mean is not None, \
"must provide a **vertical_mean** among ('geometric', 'arithmetic', 'LAPRXPK=False')"
self._from_epygram(source,
vertical_mean=vertical_mean,
reference_pressure=reference_pressure,
ptop=ptop,
vgrid_name=vgrid_name)
else:
raise NotImplementedError('construction from else that source:' + str(type(source)))
def _parse_infosup(self, infosup_file):
"""Parse an infosup file from ``mkvgrid.x`` program."""
# read
with io.open(infosup_file, 'r') as f:
lines = [line.strip('\n').strip() for line in f.readlines()]
lines = [line for line in lines if line != '']
# dimensions
self.dimensions = dict()
for line in lines[0:5]:
k,v = line.split('=')
k = k.strip('*').strip()
self.dimensions[k] = int(v)
# domains
self.domains = list()
cursor = 5
for line in lines[cursor:cursor + self.ndoms]:
k,v = line.split('=')
k = k.strip('*').strip()
k = k.split()[-1]
self.domains.append((k, int(v)))
self.domains = self.domains[::-1]
cursor += self.ndoms
# domains altitudes and heights
self.domains_altitudes = dict()
self.domains_widths = dict()
for line in lines[cursor:cursor + self.ndoms]:
alt, height = line.split(';')
k,v = alt.split('=')
k = k.strip('*').strip()
self.domains_altitudes[k] = float(v)
k,v = height.split('=')
k = k.strip()
self.domains_widths[k] = float(v)
cursor += self.ndoms
alt, height = lines[cursor].split(';')
k,v = alt.split(':')
k = k.strip('*').strip()
self.domains_altitudes[k] = float(v)
k,v = height.split('=')
k = k.strip()
self.domains_widths[k] = float(v)
cursor += 1
# parameters
self.params = dict()
for line in lines[cursor:cursor + 5]:
k,v = line.split('=')
k = k.strip('*').strip()
try:
self.params[k] = float(v)
except ValueError:
self.params[k] = True if v.strip() == 'T' else False
cursor += 5
# levels
self.levels = dict()
match = re.match(self._re_grid_labels, lines[cursor])
if match:
labels = match.groups()
else:
raise SyntaxError('Labels of levels table !')
cursor += 2
for line in lines[cursor:cursor + self.nlevels]:
line = line.split()
i, vals = int(line[0]), [float(v) for v in line[1:]]
self.levels[i] = dict(zip(labels[1:], vals))
def _from_epygram(self, vgeometry,
vertical_mean,
reference_pressure=101325.,
ptop=0.,
vgrid_name=None):
"""
Convert an epygram Hybrid-Pressure VGeometry object.
:param vertical_mean: among ('geometric', 'arithmetic', 'LAPRXPK=False')
:param reference_pressure: at the surface, for building a standard atmosphere
:param ptop: pressure at the top of the model (upper boundary of the upper layer)
:param vgrid_name: name of the grid, for plot/saving purpose
"""
from . import standard_atmosphere
assert vgeometry.typeoffirstfixedsurface == 119
A = [level[1]['Ai'] for level in vgeometry.grid['gridlevels'][1:]]
B = [level[1]['Bi'] for level in vgeometry.grid['gridlevels'][1:]]
# compute standard atmosphere at levels
p_lbar = hybridP2fluxpressure(A, B, reference_pressure).squeeze()
p_l = flux2masspressures(p_lbar, vertical_mean, Ptop=ptop)
a_l = standard_atmosphere.altitude_at(p_l)
a_lbar = standard_atmosphere.altitude_at(p_lbar)
# dimensions
self.dimensions = {'Number of levels':len(p_l)}
# parameters
self.params = {
'LLAPRXPK':vertical_mean != 'LAPRXPK=False',
'Reference pressure at 0m: ZVP00PR':reference_pressure}
if vgrid_name is not None:
self.name = vgrid_name
else:
self.name = 'from_field_L' + str(self.nlevels)
# levels
self.levels = dict()
for i in range(1, self.nlevels + 1):
if i == 1:
D_a = 2 * (a_lbar[0] - a_lbar[1])
else:
D_a = (a_l[i - 2] - a_lbar[i - 2]) + (a_l[i - 1] - a_lbar[i - 1])
self.levels[i] = {
'A':A[i - 1],
'B':B[i - 1],
'Alti(l)':a_l[i - 1],
'Alti(lbar)':a_lbar[i - 1],
'[D Alti](l)':D_a,
'Prehyd(l)':p_l[i - 1],
'Prehyd(lbar)':p_lbar[i - 1]}
def _as_bokeh_ColumnDataSource(self):
"""Convert to a bokeh ColumnDataSource object."""
return bkm.ColumnDataSource(self.grid)
@property
def nlevels(self):
return self.dimensions['Number of levels']
@property
def ndoms(self):
return self.dimensions['Number of vertical domains']
def unit(self, yaxis):
return {'p':'hPa', 'z':'m', 'l':'#'}[yaxis]
@property
def grid(self):
"""
Get grid height and pressure of full(mass) levels and frontiers.
Pressures are given in hPa.
"""
grid = {'z':[self.levels[i]['Alti(l)'] for i in range(1, self.nlevels + 1)],
'zinf':[self.levels[i]['Alti(lbar)'] for i in range(1, self.nlevels + 1)],
'zsup':([self.levels[1]['Alti(l)'] + self.levels[1]['[D Alti](l)'] / 2.] + # TOBECHECKED:
[self.levels[i - 1]['Alti(lbar)'] for i in range(2, self.nlevels + 1)]),
'p':[self.levels[i]['Prehyd(l)'] / 100 for i in range(1, self.nlevels + 1)],
'pinf':[self.levels[i]['Prehyd(lbar)'] / 100 for i in range(1, self.nlevels + 1)],
'psup':[0.] + [self.levels[i - 1]['Prehyd(lbar)'] / 100 for i in range(2, self.nlevels + 1)],
'level':range(1, self.nlevels + 1),
}
for y in ('p', 'z'):
grid[y + 'thickness'] = [abs(grid[y + 'sup'][i] - grid[y + 'inf'][i])
for i in range(self.nlevels)]
return grid
[docs] def domains_lower_limit(self):
"""
Get the lower limit of each domain in pressure, height and level number.
Pressures are given in hPa.
Lower level is "last included level + 0.5".
"""
domains_lower_limit = OrderedDict()
i = 0
for d, d_nlev in self.domains: # these are ordered from top to bottom
i += d_nlev
d_lower_pressure = self.levels[i]['Prehyd(lbar)'] / 100
d_lower_height = self.levels[i]['Alti(lbar)']
domains_lower_limit[d] = {'z':d_lower_height,
'p':d_lower_pressure,
'level':float(i) + .5}
return domains_lower_limit
[docs] def domains_upper_limit(self):
"""
Get the upper limit of each domain in pressure, height and level number.
Pressures are given in hPa.
Lower level is "last included level - 0.5".
"""
domains_upper_limit = OrderedDict()
domains_upper_limit[self.domains[0][0]] = {'z':self.grid['z'][0] + self.grid['zthickness'][0] / 2.,
'p':self.grid['p'][0] + self.grid['pthickness'][0] / 2.,
'level':0.5}
domains = list(self.domains_lower_limit().keys())
limits = list(self.domains_lower_limit().values())
for i in range(1, len(domains)):
# the upper limit of the below domain is the lower limit of the above domain
domains_upper_limit[domains[i]] = limits[i - 1]
return domains_upper_limit
[docs] def get_AB(self):
"""Get the lists of A & B coefficients of the N+1 flux levels."""
return {'A':[0.] + [self.levels[i]['A'] for i in range(1, self.nlevels + 1)],
'B':[0.] + [self.levels[i]['B'] for i in range(1, self.nlevels + 1)]}
[docs] def set_domains(self, domain_names, domain_levels_numbers):
"""
Set a series of domains.
:param domain_names: list of the name of each domain,
ordered from top to bottom
:param domain_levels_numbers: list of the number of levels in each domain,
ordered from top to bottom
"""
assert len(domain_names) == len(domain_levels_numbers)
assert sum(domain_levels_numbers) == self.nlevels
self.dimensions['Number of vertical domains'] = len(domain_names)
self.domains = list()
for i in range(len(domain_names)):
self.domains.append((domain_names[i], domain_levels_numbers[i]))
# applicative methods
[docs] def write_AB_to_namelist(self):
"""Write A and B coefficients to namelist(s)."""
AB = self.get_AB()
A = AB['A']
B = AB['B']
NAM = namelist.NamelistBlock('NAMFPG')
NAM['FPVALH(0)'] = A
NAM['FPVBH(0)'] = B
with io.open('.'.join([self.name, NAM.name]), 'w') as o:
o.write(NAM.dumps(sorting=namelist.FIRST_ORDER_SORTING))
NAM = namelist.NamelistBlock('NAMVV1')
NAM['DVALH(0)'] = A
NAM['DVBH(0)'] = B
with io.open('.'.join([self.name, NAM.name]), 'w') as o:
o.write(NAM.dumps(sorting=namelist.FIRST_ORDER_SORTING))
[docs] def back_to_mkvgrid_namelist(self, namel_filename=None):
"""Generate back a mkvgrid.x namelist."""
if namel_filename is None:
namel_filename = '.'.join([self.name, 'mkvgrid', 'nam'])
# NAM_DIM
assert hasattr(self, 'domains')
NAM = namelist.NamelistBlock('NAM_DIM')
NAM['JPN'] = self.nlevels
NAM['JPNPRES'] = len([b for b in self.get_AB()['B'][1:] if b == 0.])
NAM['JPNSIGM'] = len([b for b in self.get_AB()['B'][1:] if b == 1.])
NAM['JPDOM'] = self.ndoms
with io.open(namel_filename, 'w') as o:
o.write(NAM.dumps())
# NAM_REF
NAM = namelist.NamelistBlock('NAM_REF')
NAM['ZP1'] = self.grid['p'][0] * 100
NAM['ZVP00'] = self.grid['pinf'][-1] * 100
NAM['ZVP00PR'] = self.params['Reference pressure at 0m: ZVP00PR']
NAM['ZALT_BOT'] = self.grid['z'][-1]
with io.open(namel_filename, 'a') as o:
o.write(NAM.dumps())
# NAM_DOM
NAM = namelist.NamelistBlock('NAM_DOM')
for i in range(1, self.ndoms):
dom, nlev = self.domains[self.ndoms - i]
NAM['ZALT_DOM({})'.format(i)] = self.domains_upper_limit()[dom]['z']
upper_level = int(self.domains_upper_limit()[dom]['level'] + 0.5)
NAM['ZDALT_DOM({})'.format(i)] = self.grid['zthickness'][upper_level - 1]
NAM['IT_DOM({})'.format(i)] = nlev
NAM['CLNAM_DOM({})'.format(i)] = dom
NAM['ZDALT_DOM({})'.format(self.ndoms)] = self.grid['zinf'][0] - self.grid['zinf'][1]
dom, nlev = self.domains[0]
NAM['IT_DOM({})'.format(self.ndoms)] = nlev
NAM['CLNAM_DOM({})'.format(self.ndoms)] = dom
with io.open(namel_filename, 'a') as o:
o.write(NAM.dumps(sorting=namelist.FIRST_ORDER_SORTING))
# NAM_PARAM
NAM = namelist.NamelistBlock('NAM_PARAM')
if 'LLAPRXPK' in self.params:
NAM['LLAPRXPK'] = self.params['LLAPRXPK']
with io.open(namel_filename, 'a') as o:
o.write(NAM.dumps())
return namel_filename
[docs] def bokeh_plot_y_vs_x(self, yaxis, xaxis,
over=None,
hover_attachment='right',
bokeh_kwargs={},
fig_kwargs={},
legend_location='top_right',
plot_domains=None,
add_domains_options={}):
"""
Plot grid and return the figure.
:param yaxis: 'level', 'p' (pressure), 'z' (height) or 'pthickness'//'zthickness'
:param xaxis: 'level', 'p' (pressure), 'z' (height) or 'pthickness'//'zthickness'
:param over: an eventual existing bokeh figure on which to plot
:param hover_attachment: position of the hover
:param bokeh_kwargs: passed to bokeh's inner plot functions
:param fig_kwargs: to be passed to bokeh's figure()
:param plot_domains: plot a horizontal bar separating each vertical domain (True/False/None)
:param add_domains_options: options to be passed to bokeh_add_domains() method
"""
import uuid
thisid = str(uuid.uuid4())
cds = self._as_bokeh_ColumnDataSource()
# options
axes_domain = dict(xaxis=xaxis, yaxis=yaxis)
kwargs = dict(source=cds,
name=thisid)
if yaxis in ('z', 'p') and xaxis in ('z', 'p'):
# p/z or z/p
method = 'vbar'
kwargs.update(x=xaxis,
top=yaxis + 'sup',
bottom=yaxis + 'inf',
width=xaxis + 'thickness')
elif yaxis in ('z', 'p'):
# z or p as y: thickness as bar
kwargs.update(x=xaxis)
if xaxis == 'level':
method = 'vbar'
kwargs.update(top=yaxis + 'sup',
bottom=yaxis + 'inf',
width=1.)
else:
method = 'circle'
kwargs.update(y=yaxis,
line_color='black',
size=9)
elif xaxis in ('z', 'p'):
# z or p as x: thickness as bar
kwargs.update(y=yaxis)
if yaxis == 'level':
method = 'hbar'
kwargs.update(left=xaxis + ('inf' if xaxis == 'z' else 'sup'),
right=xaxis + ('sup' if xaxis == 'z' else 'inf'),
height=1.)
else:
method = 'circle'
kwargs.update(x=xaxis,
line_color='black',
size=9)
else:
method = 'circle'
kwargs.update(x=xaxis,
y=yaxis,
line_color='black',
size=9)
color = bokeh_kwargs.get('color', self.next_color())
legend = bokeh_kwargs.get('legend_label', self.name)
kwargs.update(color=color,
legend_label=legend)
kwargs.update(bokeh_kwargs)
# figure
if over is None:
def axes(axis):
return ({'z':'Altitude',
'p':'Pressure',
'level':'Level',
'zthickness':'Thickness',
'pthickness':'Thickness'}[axis] +
' ({})'
).format(self.unit(axis[0]))
fig = bkp.figure(sizing_mode='stretch_both',
title="Vertical Coordinate Grid",
**fig_kwargs)
fig.xaxis.axis_label = axes(xaxis)
fig.yaxis.axis_label = axes(yaxis)
# reverse pressure axis
if yaxis == 'p':
extra = max(cds.data[yaxis + 'inf']) * 0.05
fig.y_range = bkm.Range1d(max(cds.data[yaxis + 'inf']) + extra,
0. - extra)
elif yaxis == 'level':
extra = float(self.nlevels) * 0.05
fig.y_range = bkm.Range1d(self.nlevels + extra,
1. - extra)
else:
fig = over
if not any([isinstance(t, bkm.tools.CrosshairTool) for t in fig.tools]):
fig.add_tools(bkm.tools.CrosshairTool())
# actual plot command
getattr(fig, method)(**kwargs)
# hover
tooltips = self.complete_tooltips
if 'legend_label' in kwargs:
tooltips = [('GRID', kwargs['legend_label'])] + tooltips
fig.add_tools(bkm.tools.HoverTool(tooltips=tooltips,
mode={'circle':'mouse',
'vbar':'hline',
'hbar':'vline'}[method],
attachment=hover_attachment,
names=[thisid]))
if plot_domains is None:
if hasattr(self, 'domains'):
plot_domains = True
else:
plot_domains = False
if plot_domains:
if 'thickness' in yaxis:
axes_domain['yaxis'] = None
if 'thickness' in xaxis:
axes_domain['xaxis'] = None
if hasattr(self, 'domains'):
kwargs = dict(line_color=color, line_width=2)
kwargs.update(axes_domain)
kwargs.update(add_domains_options)
self.bokeh_add_domains(fig, **kwargs)
else:
print("WARNING: **plot_domains** is True while grid has no domains information; **plot_domains** ignored.")
fig.legend.location = legend_location
return fig
[docs] def bokeh_add_domains(self, fig, yaxis=None, xaxis=None,
**span_options):
"""
Add domains limits to figure.
:param fig: the figure on which to plot.
:param yaxis: 'level', 'p' (pressure), 'z' (height) or 'pthickness'//'zthickness'
:param xaxis: 'level', 'p' (pressure), 'z' (height) or 'pthickness'//'zthickness'
Other options are passed to bokeh's Span to plot domains limits.
"""
dll = self.domains_lower_limit()
doms = []
border_line_color = span_options.get('line_color', 'black')
border_line_width = span_options.get('line_width', 2)
for d, lim in dll.items():
if yaxis is not None:
# horizontal lines
if yaxis in lim:
doms.append(bkm.Span(location=lim[yaxis], dimension='width',
**span_options))
fig.add_layout(bkm.Label(x=1, y=lim[yaxis], text=d,
text_align='left',x_units='screen',
border_line_color=border_line_color,
border_line_width=border_line_width))
if xaxis is not None:
# vertical lines
if xaxis in lim:
text_align = {'level':'right', 'z':'left', 'p':'right'}[xaxis]
doms.append(bkm.Span(location=lim[xaxis], dimension='height',
**span_options))
fig.add_layout(bkm.Label(y=0, x=lim[xaxis], text=d,
text_align=text_align, y_units='screen',
border_line_color=border_line_color,
border_line_width=border_line_width))
fig.renderers.extend(doms)
return fig
[docs] def bokeh_fig_to_html(self, fig, htmlname=None, show_figure=True):
"""
Outputs figure to html file, and optionally display it.
:param htmlname: name of html file; if None, name of file
is auto-generated from self.name
:param show_figure: to show the plot in a WebBrowser tab
"""
if htmlname is None:
htmlname = self.name + '.html'
bkio.output_file(htmlname, title='VGrid')
if show_figure:
bkio.show(fig)
else:
bkio.save(fig)