"""Model module, containing the :class:`Model` class."""
import os.path
from os import PathLike
import numpy as np
import numpy.typing as npt
from numpy import ma
from scipy.interpolate import interp1d
from cloudnetpy import utils
from cloudnetpy.categorize import atmos_utils
from cloudnetpy.categorize.itu import (
calc_gas_specific_attenuation,
calc_liquid_specific_attenuation,
calc_saturation_vapor_pressure,
)
from cloudnetpy.cloudnetarray import CloudnetArray
from cloudnetpy.constants import G
from cloudnetpy.datasource import DataSource
from cloudnetpy.exceptions import ModelDataError
[docs]
class Model(DataSource):
"""Model class, child of DataSource.
Args:
model_file: File name of the NWP model file.
alt_site: Altitude of the site above mean sea level (m).
options: Dictionary containing optional parameters.
Attributes:
source_type (str): Model type, e.g. 'gdas1' or 'ecwmf'.
model_heights (ndarray): 2-D array of model heights (one for each time
step).
mean_height (ndarray): Mean of *model_heights*.
data_sparse (dict): Model variables in common height grid but without
interpolation in time.
data_dense (dict): Model variables interpolated to Cloudnet's dense
time / height grid.
"""
fields_dense = (
"temperature",
"pressure",
"rh",
"q",
)
fields_sparse = (*fields_dense, "uwind", "vwind")
fields_atten = (
"specific_gas_atten",
"specific_saturated_gas_atten",
"specific_liquid_atten",
)
def __init__(
self, model_file: str | PathLike, alt_site: float, options: dict | None = None
) -> None:
super().__init__(model_file)
self.options = options
self.source_type = _find_model_type(model_file)
self.model_heights = self._get_model_heights(alt_site)
self.mean_height = _calc_mean_height(self.model_heights)
self.height: npt.NDArray
self.data_sparse: dict = {}
self.data_dense: dict = {}
self.alt_site = alt_site
self._append_grid()
[docs]
def interpolate_to_common_height(self) -> None:
"""Interpolates model variables to common height grid."""
def _interpolate_variable(data_in: ma.MaskedArray) -> CloudnetArray:
datai = ma.zeros((len(self.time), len(self.mean_height)))
for ind, (alt, prof) in enumerate(
zip(self.model_heights, data_in, strict=True),
):
if prof.mask.all():
datai[ind, :] = ma.masked
else:
fun = interp1d(alt, prof, fill_value="extrapolate")
datai[ind, :] = fun(self.mean_height)
return CloudnetArray(datai, key, units)
for key in self.fields_sparse:
variable = self.dataset.variables[key]
data = variable[:]
units = variable.units
self.data_sparse[key] = _interpolate_variable(data)
[docs]
def interpolate_to_grid(
self,
time_grid: npt.NDArray,
height_grid: npt.NDArray,
) -> list:
"""Interpolates model variables to Cloudnet's dense time / height grid.
Args:
time_grid: The target time array (fraction hour).
height_grid: The target height array (m).
Returns:
Indices fully masked profiles.
"""
half_height = height_grid - np.diff(height_grid, prepend=self.alt_site) / 2
for key in self.fields_dense + self.fields_atten:
array = self.data_sparse[key][:]
valid_profiles = _find_number_of_valid_profiles(array)
if valid_profiles < 2:
raise ModelDataError
self.data_dense[key] = utils.interpolate_2d_mask(
self.time,
self.mean_height,
array,
time_grid,
half_height if "atten" in key else height_grid,
extrapolate=True,
)
self.height = height_grid
return utils.find_masked_profiles_indices(self.data_dense["temperature"])
[docs]
def calc_wet_bulb(self) -> None:
"""Calculates wet-bulb temperature in dense grid."""
wet_bulb_temp = atmos_utils.calc_wet_bulb_temperature(self.data_dense)
offset = (self.options or {}).get("temperature_offset", 0)
wet_bulb_temp += offset
self.append_data(wet_bulb_temp, "Tw", units="K")
if offset:
self.data["Tw"].temperature_correction_applied = offset
[docs]
def screen_sparse_fields(self) -> None:
"""Removes model fields that we don't want to write in the output."""
fields_to_keep = ("temperature", "pressure", "q", "uwind", "vwind")
self.data_sparse = {key: self.data_sparse[key] for key in fields_to_keep}
def _append_grid(self) -> None:
self.append_data(np.array(self.time), "model_time")
self.append_data(self.mean_height, "model_height")
def _get_model_heights(self, alt_site: float) -> npt.NDArray:
"""Returns model heights for each time step."""
try:
model_heights = self.dataset.variables["height"]
except KeyError as err:
msg = "No 'height' variable in the model file."
raise ModelDataError(msg) from err
surface_altitude = self._get_model_surface_altitude(alt_site)
return self.to_m(model_heights) + surface_altitude
def _get_model_surface_altitude(self, alt_site: float) -> float | npt.NDArray:
"""Returns model surface altitude from geopotential if available.
For sites in complex terrain (e.g. mountains), the model grid cell
surface height can differ significantly from the actual site altitude.
Using the model's own surface height ensures that thermodynamic fields
are placed at their physically correct absolute heights.
Note: Model surface altitude might be higher than the site altitude.
"""
if "sfc_height" in self.dataset.variables:
sfc_height = self.dataset.variables["sfc_height"][:]
return sfc_height[:, np.newaxis]
if "sfc_geopotential" in self.dataset.variables:
geopotential = self.dataset.variables["sfc_geopotential"][:]
return geopotential[:, np.newaxis] / G
return alt_site
def calc_attenuations(self, frequency: float) -> None:
temperature = self.getvar("temperature")
pressure = self.getvar("pressure")
specific_humidity = self.getvar("q")
self.data_sparse["specific_liquid_atten"] = calc_liquid_specific_attenuation(
temperature, frequency
)
vp = atmos_utils.calc_vapor_pressure(pressure, specific_humidity)
svp = calc_saturation_vapor_pressure(temperature)
self.data_sparse["specific_gas_atten"] = calc_gas_specific_attenuation(
pressure, vp, temperature, frequency
)
self.data_sparse["specific_saturated_gas_atten"] = (
calc_gas_specific_attenuation(pressure, svp, temperature, frequency)
)
def _calc_mean_height(model_heights: npt.NDArray) -> npt.NDArray:
mean_height = ma.mean(model_heights, axis=0)
return np.array(mean_height)
def _find_model_type(file_name: str | PathLike) -> str:
"""Finds model type from the model filename."""
possible_keys = ("gdas1", "icon", "ecmwf", "harmonie", "era5", "arpege")
basename = os.path.basename(file_name)
for key in possible_keys:
if key in basename:
return key
msg = f"Unknown model type: {file_name}"
raise ValueError(msg)
def _find_number_of_valid_profiles(array: npt.NDArray) -> int:
mask = ma.getmaskarray(array)
all_masked_profiles = np.all(mask, axis=1)
return int(np.count_nonzero(~all_masked_profiles))