"""Misc. plotting routines for Cloudnet products."""
import os.path
import re
import textwrap
from dataclasses import dataclass
from datetime import date
from typing import Any
import matplotlib.pyplot as plt
import netCDF4
import numpy as np
from matplotlib import rcParams
from matplotlib.axes import Axes
from matplotlib.colorbar import Colorbar
from matplotlib.colors import ListedColormap
from matplotlib.pyplot import Figure
from matplotlib.ticker import AutoMinorLocator
from matplotlib.transforms import Affine2D, Bbox
from mpl_toolkits.axes_grid1 import make_axes_locatable
from numpy import ma, ndarray
from scipy.ndimage import uniform_filter
from cloudnetpy import constants as con
from cloudnetpy.categorize.atmos_utils import calc_altitude
from cloudnetpy.exceptions import PlottingError
from cloudnetpy.instruments.ceilometer import calc_sigma_units
from cloudnetpy.plotting.plot_meta import ATTRIBUTES, PlotMeta
[docs]
@dataclass
class PlotParameters:
"""Class representing the parameters for plotting.
Attributes:
dpi: The resolution of the plot in dots per inch.
max_y: Maximum y-axis value (km) in 2D time / height plots.
title: Whether to display the title of the plot.
subtitle: Whether to display the subtitle of the plot.
mark_data_gaps: Whether to mark data gaps in the plot.
grid: Whether to display grid lines in the plot.
edge_tick_labels: Whether to display tick labels on the edges of the plot.
show_sources: Whether to display the sources of plotted data (i.e.
instruments and model).
footer_text: The text to display in the footer of the plot.
plot_meta: Additional metadata for the plot.
raise_on_empty: Whether to raise an error if no data is found for a
plotted variable.
minor_ticks: Whether to display minor ticks on the x-axis.
plot_above_ground: Whether to plot above ground instead of above mean sea level.
"""
dpi: float = 120
max_y: int = 12
title: bool = True
subtitle: bool = True
mark_data_gaps: bool = True
grid: bool = False
edge_tick_labels: bool = False
show_sources: bool = False
footer_text: str | None = None
plot_meta: PlotMeta | None = None
raise_on_empty: bool = False
minor_ticks: bool = False
plot_above_ground: bool = False
[docs]
class Dimensions:
"""Dimensions of a generated figure in pixels. Elements such as the figure
title, labels, colorbar and legend are excluded from the margins.
Attributes:
width (int): Figure width in pixels.
height (int): Figure height in pixels.
margin_top (int): Space between top edge of image and plotted data in pixels.
margin_right (int): Space between right edge of image and plotted data
in pixels.
margin_bottom (int): Space between bottom edge of image and plotted
data in pixels.
margin_left (int): Space between left edge of image and plotted data in pixels.
"""
def __init__(self, fig, axes, pad_inches: float | None = None):
if pad_inches is None:
pad_inches = rcParams["savefig.pad_inches"]
tightbbox = (
fig.get_tightbbox(fig.canvas.get_renderer())
.padded(pad_inches)
.transformed(Affine2D().scale(fig.dpi))
)
self.width = int(tightbbox.width)
self.height = int(tightbbox.height)
x0, y0, x1, y1 = (
Bbox.union([ax.get_window_extent() for ax in axes])
.translated(-tightbbox.x0, -tightbbox.y0)
.extents
)
self.margin_top = int(self.height - round(y1))
self.margin_right = int(self.width - round(x1) - 1)
self.margin_bottom = int(round(y0) - 1)
self.margin_left = int(round(x0))
class FigureData:
def __init__(
self,
file: netCDF4.Dataset,
requested_variables: list[str],
options: PlotParameters,
):
self.file = file
self.variables, self.indices = self._get_valid_variables_and_indices(
requested_variables
)
self.options = options
self.height = self._get_height()
self.time = self.file.variables["time"][:]
self.time_including_gaps = np.array([])
def initialize_figure(self) -> tuple[Figure, list[Axes]]:
n_subplots = len(self)
fig, axes = plt.subplots(
n_subplots,
1,
figsize=(16, 4 + (n_subplots - 1) * 4.8),
dpi=self.options.dpi,
sharex=True,
)
fig.subplots_adjust(left=0.06, right=0.73)
axes_list = [axes] if isinstance(axes, Axes) else axes.tolist()
return fig, axes_list
def add_subtitle(self, fig: Figure) -> None:
fig.suptitle(
self._get_subtitle_text(),
fontsize=13,
y=0.885,
x=0.07,
horizontalalignment="left",
verticalalignment="bottom",
)
def _get_subtitle_text(self) -> str:
measurement_date = date(
int(self.file.year), int(self.file.month), int(self.file.day)
)
site_name = self.file.location.replace("-", " ")
return f"{site_name}, {measurement_date.strftime('%d %b %Y').lstrip('0')}"
def _get_valid_variables_and_indices(
self, requested_variables: list[str]
) -> tuple[list[netCDF4.Variable], list[int | None]]:
valid_variables = []
variable_indices = []
for variable_name in requested_variables:
if variable_name.startswith(("tb_", "irt_")):
parts = variable_name.split("_")
extracted_name = parts[0]
extracted_ind = int(parts[1])
else:
extracted_name = variable_name
extracted_ind = None
if extracted_name in self.file.variables:
valid_variables.append(self.file.variables[extracted_name])
variable_indices.append(extracted_ind)
if not valid_variables:
msg = f"None of the variables {requested_variables} found in the file."
raise PlottingError(msg)
return valid_variables, variable_indices
def _get_height(self) -> ndarray | None:
m2km = 1e-3
m2mm = 1e3
file_type = getattr(self.file, "cloudnet_file_type", "")
if file_type == "model":
height = ma.mean(self.file.variables["height"][:], axis=0) # height AGL
if not self.options.plot_above_ground:
site_alt = self._calc_ground_altitude()
height += site_alt
return height * m2km
if "height" in self.file.variables:
height = self.file.variables["height"][:] # height AMSL
if self.options.plot_above_ground:
if "altitude" not in self.file.variables:
msg = "No altitude information in the file."
raise ValueError(msg)
height -= self.file.variables["altitude"][:]
return height * m2km
if "range" in self.file.variables:
return self.file.variables["range"][:] * m2km
if "diameter" in self.file.variables:
return self.file.variables["diameter"][:] * m2mm
return None
def _calc_ground_altitude(self) -> float:
if (
"sfc_geopotential" in self.file.variables
and "gdas1" not in self.file.source.lower() # uncertain unit in gdas1
):
return np.mean(self.file.variables["sfc_geopotential"][:]) / con.G
pressure = ma.mean(self.file.variables["pressure"][:, 0])
temperature = ma.mean(self.file.variables["temperature"][:, 0])
return calc_altitude(temperature, pressure)
def is_mwrpy_product(self) -> bool:
cloudnet_file_type = getattr(self.file, "cloudnet_file_type", "")
return cloudnet_file_type in ("mwr-single", "mwr-multi")
def __len__(self) -> int:
return len(self.variables)
class SubPlot:
def __init__(
self,
ax: Axes,
variable: netCDF4.Variable,
options: PlotParameters,
file_type: str | None,
):
self.ax = ax
self.variable = variable
self.options = options
self.file_type = file_type
self.plot_meta = self._read_plot_meta()
def set_xax(self) -> None:
resolution = 4
x_tick_labels = [
f"{int(i):02d}:00"
if (24 >= i >= 0 if self.options.edge_tick_labels else 24 > i > 0)
else ""
for i in np.arange(0, 24.01, resolution)
]
self.ax.set_xticks(np.arange(0, 25, resolution, dtype=int))
self.ax.set_xticklabels(x_tick_labels, fontsize=12)
if self.options.minor_ticks:
self.ax.xaxis.set_minor_locator(AutoMinorLocator(4))
self.ax.tick_params(which="minor", length=2.5)
self.ax.set_xlim(0, 24)
def set_yax(
self,
ylabel: str | None = None,
y_limits: tuple[float, float] | None = None,
) -> None:
height_str = (
"Height (km AGL)" if self.options.plot_above_ground else "Height (km AMSL)"
)
label = ylabel if ylabel is not None else height_str
self.ax.set_ylabel(label, fontsize=13)
if y_limits is not None:
self.ax.set_ylim(*y_limits)
def add_title(self, ind: int | None) -> None:
title = self.variable.long_name
if self.variable.name == "tb" and ind is not None:
title += f" (channel {ind + 1})"
self.ax.set_title(title, fontsize=14)
def add_grid(self) -> None:
zorder = _get_zorder("grid")
self.ax.xaxis.set_minor_locator(AutoMinorLocator(4))
self.ax.grid(which="major", axis="x", color="k", lw=0.1, zorder=zorder)
self.ax.grid(which="minor", axis="x", lw=0.1, color="k", ls=":", zorder=zorder)
self.ax.grid(which="major", axis="y", lw=0.1, color="k", ls=":", zorder=zorder)
def add_sources(self, figure_data: FigureData) -> None:
source = getattr(self.variable, "source", None) or (
figure_data.file.source if "source" in figure_data.file.ncattrs() else None
)
if source is not None:
source_word = "sources" if "\n" in source else "source"
text = f"Data {source_word}:\n{source}"
self.ax.text(
0.012,
0.96,
text,
ha="left",
va="top",
fontsize=7,
transform=self.ax.transAxes,
bbox={
"facecolor": "white",
"alpha": 0.8,
"edgecolor": "grey",
"boxstyle": "round",
"linewidth": 0.5,
},
)
def set_xlabel(self) -> None:
self.ax.set_xlabel("Time (UTC)", fontsize=13)
def show_footer(self, fig: Figure, ax: Axes) -> None:
if isinstance(self.options.footer_text, str):
n = 50
if len(self.options.footer_text) > n:
wrapped_text = textwrap.fill(self.options.footer_text, n)
self.options.footer_text = "\n".join(wrapped_text.splitlines())
n_lines = self.options.footer_text.count("\n") + 1
y0 = ax.get_position().y0
y1 = ax.get_position().y1
y = (y1 - y0) * (n_lines * 0.06 + 0.1)
fig.text(
0.06,
y0 - y,
self.options.footer_text,
fontsize=11,
ha="left",
va="bottom",
)
def _read_plot_meta(self) -> PlotMeta:
if self.options.plot_meta is not None:
plot_meta = self.options.plot_meta
else:
fallback = ATTRIBUTES["fallback"].get(self.variable.name, PlotMeta())
file_attributes = ATTRIBUTES.get(self.file_type or "", {})
plot_meta = file_attributes.get(self.variable.name, fallback)
if plot_meta.clabel is None:
plot_meta = plot_meta._replace(clabel=_reformat_units(self.variable.units))
return plot_meta
class Plot:
def __init__(self, sub_plot: SubPlot):
self.sub_plot = sub_plot
self._data = sub_plot.variable[:]
self._data_orig = self._data.copy()
self._plot_meta = sub_plot.plot_meta
self._is_log = sub_plot.plot_meta.log_scale
self._ax = sub_plot.ax
def _convert_units(self) -> str:
multiply, add = "multiply", "add"
units_conversion = {
"rainfall_rate": (multiply, 3600000, "mm h$^{-1}$"),
"snowfall_rate": (multiply, 3600000, "mm h$^{-1}$"),
"air_pressure": (multiply, 0.01, "hPa"),
"relative_humidity": (multiply, 100, "%"),
"rainfall_amount": (multiply, 1000, "mm"),
"air_temperature": (add, -273.15, "\u00b0C"),
"r_accum_RT": (multiply, 1000, "mm"),
"r_accum_NRT": (multiply, 1000, "mm"),
}
conversion_method, conversion, units = units_conversion.get(
self.sub_plot.variable.name, (multiply, 1, None)
)
if conversion_method == multiply:
self._data *= conversion
self._data_orig *= conversion
elif conversion_method == add:
self._data += conversion
self._data_orig += conversion
if units is not None:
self._plot_meta = self._plot_meta._replace(clabel=units)
return units
units = getattr(self.sub_plot.variable, "units", "")
return _reformat_units(units)
def _get_y_limits(self) -> tuple[float, float]:
return 0, self.sub_plot.options.max_y
def _init_colorbar(self, plot) -> Colorbar:
divider = make_axes_locatable(self._ax)
cax = divider.append_axes("right", size="1%", pad=0.25)
return plt.colorbar(plot, fraction=1.0, ax=self._ax, cax=cax)
def _fill_between_data_gaps(self, figure_data: FigureData) -> None:
gap_times = list(set(figure_data.time_including_gaps) - set(figure_data.time))
gap_times.sort()
batches = [gap_times[i : i + 2] for i in range(0, len(gap_times), 2)]
for batch in batches:
self._ax.fill_between(
batch,
*self._get_y_limits(),
hatch="//",
facecolor="whitesmoke",
edgecolor="lightgrey",
label="_nolegend_",
zorder=_get_zorder("data_gap"),
)
def _mark_gaps(self, figure_data: FigureData) -> None:
time = figure_data.time
data = self._data
if time[0] < 0 or time[-1] > 24:
msg = "Time values outside the range 0-24."
raise ValueError(msg)
max_gap_fraction_hour = _get_max_gap_in_minutes(figure_data) / 60
if getattr(figure_data.file, "cloudnet_file_type", "") == "model":
time, data = screen_completely_masked_profiles(time, data)
gap_indices = np.where(np.diff(time) > max_gap_fraction_hour)[0]
if not ma.is_masked(data):
mask_new = np.zeros(data.shape)
elif ma.all(data.mask) is ma.masked:
mask_new = np.ones(data.shape)
else:
mask_new = np.copy(data.mask)
data_new = ma.copy(data)
time_new = np.copy(time)
if self._data.ndim == 2:
temp_array = np.zeros((2, data.shape[1]))
temp_mask = np.ones((2, data.shape[1]))
else:
temp_array = np.zeros(2)
temp_mask = np.ones(2)
time_delta = 0.001
for ind in np.sort(gap_indices)[::-1]:
ind_gap = ind + 1
data_new = np.insert(data_new, ind_gap, temp_array, axis=0)
mask_new = np.insert(mask_new, ind_gap, temp_mask, axis=0)
time_new = np.insert(time_new, ind_gap, time[ind_gap] - time_delta)
time_new = np.insert(time_new, ind_gap, time[ind_gap - 1] + time_delta)
if (time[0] - 0) > max_gap_fraction_hour:
data_new = np.insert(data_new, 0, temp_array, axis=0)
mask_new = np.insert(mask_new, 0, temp_mask, axis=0)
time_new = np.insert(time_new, 0, time[0] - time_delta)
time_new = np.insert(time_new, 0, time_delta)
if (24 - time[-1]) > max_gap_fraction_hour:
ind_gap = mask_new.shape[0]
data_new = np.insert(data_new, ind_gap, temp_array, axis=0)
mask_new = np.insert(mask_new, ind_gap, temp_mask, axis=0)
time_new = np.insert(time_new, ind_gap, 24 - time_delta)
time_new = np.insert(time_new, ind_gap, time[-1] + time_delta)
data_new.mask = mask_new
self._data = data_new
figure_data.time_including_gaps = time_new
def _read_flagged_data(self, figure_data: FigureData) -> ndarray:
flag_names = [
f"{self.sub_plot.variable.name}_quality_flag",
"temperature_quality_flag",
]
if self.sub_plot.variable.name != "irt":
flag_names.append("quality_flag")
for flag_name in flag_names:
if flag_name in figure_data.file.variables:
return figure_data.file.variables[flag_name][:] > 0
return np.array([])
class Plot2D(Plot):
def plot(self, figure_data: FigureData):
self._convert_units()
self._mark_gaps(figure_data)
if self.sub_plot.variable.name == "cloud_fraction":
self._data[self._data == 0] = ma.masked
if any(
key in self.sub_plot.variable.name for key in ("status", "classification")
):
self._plot_segment_data(figure_data)
else:
self._plot_mesh_data(figure_data)
if figure_data.options.mark_data_gaps:
self._fill_between_data_gaps(figure_data)
if figure_data.is_mwrpy_product():
self._fill_flagged_data(figure_data)
def _fill_flagged_data(self, figure_data: FigureData) -> None:
flags = self._read_flagged_data(figure_data)
batches = find_batches_of_ones(flags)
for batch in batches:
if batch[0] == batch[1]:
continue
time_batch = figure_data.time[batch[0]], figure_data.time[batch[1]]
self._ax.fill_between(
time_batch,
*self._get_y_limits(),
facecolor="white",
alpha=0.7,
label="_nolegend_",
zorder=_get_zorder("flags"),
)
def _plot_segment_data(self, figure_data: FigureData) -> None:
def _hide_segments() -> tuple[list, list]:
if self._plot_meta.clabel is None:
msg = "Missing clabel"
raise ValueError(msg)
labels = [x[0] for x in self._plot_meta.clabel]
colors = [x[1] for x in self._plot_meta.clabel]
segments_to_hide = np.char.startswith(labels, "_")
indices = np.where(segments_to_hide)[0]
for ind in np.flip(indices):
del labels[ind], colors[ind]
self._data[self._data == ind] = ma.masked
self._data[self._data > ind] -= 1
return colors, labels
cbar, clabel = _hide_segments()
alt = self._screen_data_by_max_y(figure_data)
image = self._ax.pcolorfast(
figure_data.time_including_gaps,
alt,
self._data.T[:-1, :-1],
cmap=ListedColormap(cbar),
vmin=-0.5,
vmax=len(cbar) - 0.5,
zorder=_get_zorder("data"),
)
colorbar = self._init_colorbar(image)
colorbar.set_ticks(np.arange(len(clabel)).tolist())
colorbar.ax.set_yticklabels(clabel, fontsize=13)
def _plot_mesh_data(self, figure_data: FigureData) -> None:
if self._plot_meta.plot_range is None:
vmin, vmax = self._data.min(), self._data.max()
else:
vmin, vmax = self._plot_meta.plot_range
if self._is_log:
self._data, vmin, vmax = lin2log(self._data, vmin, vmax)
alt = self._screen_data_by_max_y(figure_data)
if (duration := self._plot_meta.time_smoothing_duration) > 0:
sigma_units = calc_sigma_units(
figure_data.time, alt * 1e3, sigma_minutes=duration, sigma_metres=0
)
valid_time_ind = ~np.all(self._data.mask, axis=1)
smoothed_data = uniform_filter(self._data[valid_time_ind, :], sigma_units)
self._data[valid_time_ind, :] = smoothed_data
if self._data.mask.all() and figure_data.options.raise_on_empty:
msg = "All data is masked"
raise PlottingError(msg)
pcolor_kwargs = {
"cmap": plt.get_cmap(str(self._plot_meta.cmap)),
"vmin": vmin,
"vmax": vmax,
"zorder": _get_zorder("data"),
}
image: Any
if getattr(figure_data.file, "cloudnet_file_type", "") == "model":
image = self._ax.pcolor(
figure_data.time_including_gaps,
alt,
self._data.T,
**pcolor_kwargs,
shading="nearest",
)
else:
image = self._ax.pcolorfast(
figure_data.time_including_gaps,
alt,
self._data.T[:-1, :-1],
**pcolor_kwargs,
)
cbar = self._init_colorbar(image)
cbar.set_label(str(self._plot_meta.clabel), fontsize=13)
if self._is_log:
cbar.set_ticks(np.arange(vmin, vmax + 1).tolist())
tick_labels = get_log_cbar_tick_labels(vmin, vmax)
cbar.ax.set_yticklabels(tick_labels)
if self._plot_meta.contour:
self._plot_contour(
figure_data,
alt,
levels=np.linspace(vmin, vmax, num=10),
colors="black",
linewidths=0.5,
)
if self.sub_plot.variable.name == "Tw":
self._plot_contour(
figure_data,
alt,
levels=np.array([con.T0]),
colors="gray",
linewidths=1.25,
linestyles="dashed",
)
def _plot_contour(self, figure_data: FigureData, alt: np.ndarray, **options):
time_length = len(figure_data.time_including_gaps)
step = max(1, time_length // 200)
ind_time = np.arange(0, time_length, step)
self._ax.contour(
figure_data.time_including_gaps[ind_time],
alt,
self._data[ind_time, :].T,
**options,
zorder=_get_zorder("contour"),
)
def _screen_data_by_max_y(self, figure_data: FigureData) -> ndarray:
if figure_data.height is None:
msg = "No height information in the file."
raise ValueError(msg)
if self._data.ndim < 2:
msg = "Data has to be 2D."
raise PlottingError(msg)
alt = figure_data.height
if figure_data.options.max_y is None:
return alt
ind = int((np.argmax(alt > figure_data.options.max_y) or len(alt)) + 1)
self._data = self._data[:, :ind]
return alt[:ind]
class Plot1D(Plot):
def plot(self, figure_data: FigureData, hacky_freq_ind: int | None = None) -> None:
units = self._convert_units()
self._mark_gaps(figure_data)
self._ax.plot(
figure_data.time_including_gaps,
self._data,
label="_nolegend_",
**self._get_plot_options(),
zorder=_get_zorder("data"),
)
if self._plot_meta.moving_average:
self._plot_moving_average(figure_data, hacky_freq_ind)
if self._plot_meta.zero_line:
self._ax.axhline(0, color="black", alpha=0.5, label="_nolegend_")
self._fill_between_data_gaps(figure_data)
self.sub_plot.set_yax(ylabel=units, y_limits=self._get_y_limits())
pos = self._ax.get_position()
self._ax.set_position((pos.x0, pos.y0, pos.width * 0.965, pos.height))
if figure_data.is_mwrpy_product():
flags = self._read_flagged_data(figure_data)
if np.any(flags):
self._plot_flag_data(figure_data.time[flags], self._data_orig[flags])
self._add_legend()
def plot_tb(self, figure_data: FigureData, freq_ind: int) -> None:
if len(self._data.shape) != 2 or freq_ind >= self._data.shape[1]:
msg = "Frequency index not found in data"
raise PlottingError(msg)
self._data = self._data[:, freq_ind]
self._data[np.isnan(self._data)] = ma.masked
if self._data.mask.all() and figure_data.options.raise_on_empty:
msg = "All data is masked"
raise PlottingError(msg)
self._data_orig = self._data_orig[:, freq_ind]
if self.sub_plot.variable.name == "tb":
is_bad_zenith = self._get_bad_zenith_profiles(figure_data)
self._data[is_bad_zenith] = ma.masked
self._data_orig[is_bad_zenith] = ma.masked
flags = self._read_flagged_data(figure_data)[:, freq_ind]
flags[is_bad_zenith] = False
if np.any(flags):
self._plot_flag_data(figure_data.time[flags], self._data_orig[flags])
self._add_legend()
self._show_frequency(figure_data, freq_ind)
self.plot(figure_data, freq_ind)
def _show_frequency(self, figure_data: FigureData, freq_ind: int) -> None:
if self.sub_plot.variable.name == "tb":
label = "Freq"
value = figure_data.file.variables["frequency"][freq_ind]
unit = "GHz"
elif "ir_wavelength" in figure_data.file.variables:
label = "WL"
variable = figure_data.file.variables["ir_wavelength"]
# `ir_wavelength` is scalar in old files
value = variable[:] if len(variable.shape) == 0 else variable[freq_ind]
value /= 1e-6 # m to μm
unit = "μm"
else:
return
self._ax.text(
0.0,
-0.13,
f"{label}: {value:.2f} {unit}",
transform=self._ax.transAxes,
fontsize=12,
color="dimgrey",
bbox={
"facecolor": "white",
"linewidth": 0,
"boxstyle": "round",
},
)
def _plot_flag_data(self, time: ndarray, values: ndarray) -> None:
self._ax.plot(
time,
values,
color="salmon",
marker=".",
lw=0,
markersize=3,
zorder=_get_zorder("flags"),
)
def _add_legend(self) -> None:
self._ax.legend(
["Flagged data"],
markerscale=3,
numpoints=1,
frameon=False,
)
def _get_y_limits(self) -> tuple[float, float]:
percent_gap = 0.05
fallback = (-percent_gap, percent_gap)
if ma.all(self._data.mask):
return fallback
min_data = self._data.min()
max_data = self._data.max()
range_val = max_data - min_data
gap = percent_gap * range_val
min_y = min_data - gap
max_y = max_data + gap
if min_y == 0 and max_y == 0:
return fallback
return min_y, max_y
def _get_plot_options(self) -> dict:
default_options = {
"color": "lightblue",
"lw": 0,
"marker": ".",
"markersize": 3,
}
custom_options = {
"tb": {
"color": "lightblue",
}
}
variable_name = self.sub_plot.variable.name
if variable_name in custom_options:
default_options.update(custom_options[variable_name])
return default_options
def _plot_moving_average(
self, figure_data: FigureData, hacky_freq_ind: int | None = None
) -> None:
time = figure_data.time.copy()
data = self._data_orig.copy()
if figure_data.is_mwrpy_product() or self.sub_plot.variable.name in (
"tb",
"irt",
):
flags = self._read_flagged_data(figure_data)
else:
flags = np.array([])
if hacky_freq_ind is not None and np.any(flags):
flags = flags[:, hacky_freq_ind]
is_invalid = ma.getmaskarray(data)
if np.any(flags):
is_invalid |= flags
is_wind_direction = self.sub_plot.variable.name == "wind_direction"
if is_wind_direction:
wind_speed = figure_data.file["wind_speed"]
data = np.stack([wind_speed, data], axis=1)
block_ind = np.where(np.diff(is_invalid))[0] + 1
valid_time_blocks = np.split(time, block_ind)[is_invalid[0] :: 2]
valid_data_blocks = np.split(data, block_ind)[is_invalid[0] :: 2]
for time1, data1 in zip(valid_time_blocks, valid_data_blocks, strict=False):
if is_wind_direction:
sma = self._calculate_average_wind_direction(
data1[:, 0], data1[:, 1], time1, window=15
)
else:
sma = self._calculate_moving_average(data1, time1, window=5)
gap_time = _get_max_gap_in_minutes(figure_data)
gaps = self._find_time_gap_indices(time1, max_gap_min=gap_time) + 1
for time2, data2 in zip(
np.split(time1, gaps), np.split(sma, gaps), strict=False
):
self._ax.plot(
time2,
data2,
color="slateblue",
lw=2,
label="_nolegend_",
zorder=_get_zorder("mean_curve"),
)
@staticmethod
def _get_line_width(time: ndarray) -> float:
line_width = np.median(np.diff(time)) * 1000
return min(max(line_width, 0.25), 0.9)
@staticmethod
def _get_bad_zenith_profiles(figure_data: FigureData) -> ndarray:
zenith_limit = 5
valid_pointing_status = 0
if "pointing_flag" in figure_data.file.variables:
pointing_flag = figure_data.file.variables["pointing_flag"][:]
zenith_angle = figure_data.file.variables["zenith_angle"][:]
is_bad_zenith = np.abs(zenith_angle) > zenith_limit
is_bad_pointing = pointing_flag != valid_pointing_status
return is_bad_zenith | is_bad_pointing
return np.zeros_like(figure_data.time, dtype=bool)
@staticmethod
def _find_time_gap_indices(time: ndarray, max_gap_min: float) -> ndarray:
gap_decimal_hour = max_gap_min / 60
return np.where(np.diff(time) > gap_decimal_hour)[0]
@staticmethod
def _calculate_moving_average(
data: ndarray, time: ndarray, window: float = 5
) -> ndarray:
if len(data) == 0:
return np.array([])
if len(data) == 1:
return data
time_delta_hours = np.median(np.diff(time))
window_size = int(window / 60 / time_delta_hours)
if window_size < 1:
window_size = 1
if window_size % 2 == 0:
window_size += 1
weights = np.repeat(1 / window_size, window_size)
padded_data = np.pad(data, window_size // 2, mode="edge")
return np.convolve(padded_data, weights, "valid")
@classmethod
def _calculate_average_wind_direction(
cls,
wind_speed: ndarray,
wind_direction: ndarray,
time: ndarray,
window: float = 5,
) -> ndarray:
angle = np.deg2rad(wind_direction)
u = wind_speed * np.cos(angle)
v = wind_speed * np.sin(angle)
avg_u = cls._calculate_moving_average(u, time, window)
avg_v = cls._calculate_moving_average(v, time, window)
data = np.rad2deg(np.arctan2(avg_v, avg_u)) % 360
wrap = np.where(np.abs(np.diff(data)) > 300)[0]
data[wrap] = np.nan
return data
def lin2log(*args) -> list:
return [ma.log10(x) for x in args]
def get_log_cbar_tick_labels(value_min: float, value_max: float) -> list[str]:
return [f"10$^{{{int(i)}}}$" for i in np.arange(value_min, value_max + 1)]
def _reformat_units(unit: str) -> str:
if unit == "1":
return ""
return re.sub(r"(-\d+)", r"$^{\1}$", unit).replace("mu ", "$\\mu$")
def _get_max_gap_in_minutes(figure_data: FigureData) -> float:
source = getattr(figure_data.file, "source", "").lower()
file_type = getattr(figure_data.file, "cloudnet_file_type", "")
max_allowed_gap = {
"model": 181 if "gdas1" in source or "ecmwf open" in source else 61,
"mwr-multi": 35,
"weather-station": 12,
"doppler-lidar-wind": 75,
"doppler-lidar": 75,
"radar": 5,
}
return max_allowed_gap.get(file_type, 10)
def _get_zorder(name: str) -> int:
zorder = {
"contour": 2,
"data_gap": 2,
"flags": 2,
}
return zorder.get(name, -1)
def find_batches_of_ones(array: ndarray) -> list[tuple[int, int]]:
"""Find batches of ones in a binary array."""
starts = np.where(np.diff(np.hstack(([0], array))) == 1)[0]
stops = np.where(np.diff(np.hstack((array, [0]))) == -1)[0]
return list(zip(starts, stops, strict=True))
def screen_completely_masked_profiles(time: ndarray, data: ma.MaskedArray) -> tuple:
if not ma.is_masked(data):
return time, data
good_ind = np.where(np.any(~data.mask, axis=1))[0]
if len(good_ind) == 0:
msg = "All values masked in the file."
raise PlottingError(msg)
good_ind = np.append(good_ind, good_ind[-1] + 1)
good_ind = np.clip(good_ind, 0, len(time) - 1)
return time[good_ind], data[good_ind, :]
def plot_2d(
data: ma.MaskedArray,
cmap: str = "viridis",
ncolors: int = 50,
clim: tuple | None = None,
ylim: tuple | None = None,
xlim: tuple | None = None,
*,
cbar: bool = True,
) -> None:
"""Simple plot of 2d variable."""
plt.close()
if cbar:
color_map = plt.get_cmap(cmap, ncolors)
plt.imshow(
ma.masked_equal(data, 0).T,
aspect="auto",
origin="lower",
cmap=color_map,
)
plt.colorbar()
else:
plt.imshow(ma.masked_equal(data, 0).T, aspect="auto", origin="lower")
if clim:
plt.clim(clim[0], clim[1])
if ylim is not None:
plt.ylim(ylim)
if xlim is not None:
plt.xlim(xlim)
plt.show()