"""General helper classes and functions for all products."""
from dataclasses import dataclass
from typing import NamedTuple
import netCDF4
import numpy as np
from numpy import ma
from numpy.typing import NDArray
from cloudnetpy import constants, utils
from cloudnetpy.categorize import atmos_utils
from cloudnetpy.datasource import DataSource
[docs]
class IceCoefficients(NamedTuple):
"""Coefficients for ice effective radius retrieval."""
K2liquid0: float
ZT: float
T: float
Z: float
c: float
[docs]
@dataclass
class CategoryBits:
droplet: NDArray[np.bool_]
falling: NDArray[np.bool_]
freezing: NDArray[np.bool_]
melting: NDArray[np.bool_]
aerosol: NDArray[np.bool_]
insect: NDArray[np.bool_]
[docs]
@dataclass
class QualityBits:
radar: NDArray[np.bool_]
lidar: NDArray[np.bool_]
clutter: NDArray[np.bool_]
molecular: NDArray[np.bool_]
attenuated_liquid: NDArray[np.bool_]
corrected_liquid: NDArray[np.bool_]
attenuated_rain: NDArray[np.bool_]
corrected_rain: NDArray[np.bool_]
attenuated_melting: NDArray[np.bool_]
corrected_melting: NDArray[np.bool_]
class CategorizeBits:
def __init__(self, categorize_file: str):
self._categorize_file = categorize_file
self.category_bits = self._read_category_bits()
self.quality_bits = self._read_quality_bits()
def _read_category_bits(self) -> CategoryBits:
with netCDF4.Dataset(self._categorize_file) as nc:
bits = nc.variables["category_bits"][:]
return CategoryBits(
droplet=utils.isbit(bits, 0),
falling=utils.isbit(bits, 1),
freezing=utils.isbit(bits, 2),
melting=utils.isbit(bits, 3),
aerosol=utils.isbit(bits, 4),
insect=utils.isbit(bits, 5),
)
def _read_quality_bits(self) -> QualityBits:
with netCDF4.Dataset(self._categorize_file) as nc:
bits = nc.variables["quality_bits"][:]
return QualityBits(
radar=utils.isbit(bits, 0),
lidar=utils.isbit(bits, 1),
clutter=utils.isbit(bits, 2),
molecular=utils.isbit(bits, 3),
attenuated_liquid=utils.isbit(bits, 4),
corrected_liquid=utils.isbit(bits, 5),
attenuated_rain=utils.isbit(bits, 6),
corrected_rain=utils.isbit(bits, 7),
attenuated_melting=utils.isbit(bits, 8),
corrected_melting=utils.isbit(bits, 9),
)
[docs]
class ProductClassification(CategorizeBits):
"""Base class for creating different classifications in the child classes
of various Cloudnet products. Child of CategorizeBits class.
Args:
categorize_file (str): Categorize file name.
Attributes:
is_rain (ndarray): 1D array denoting rainy profiles.
"""
def __init__(self, categorize_file: str):
super().__init__(categorize_file)
self.is_rain = get_is_rain(categorize_file)
[docs]
class IceClassification(ProductClassification):
"""Class storing the information about different ice types.
Child of ProductClassification().
"""
def __init__(self, categorize_file: str):
super().__init__(categorize_file)
self._is_attenuated = self._find_attenuated()
self._is_corrected = self._find_corrected()
self.is_ice = self._find_ice()
self.would_be_ice = self._find_would_be_ice()
self.corrected_ice = self._find_corrected_ice()
self.uncorrected_ice = self._find_uncorrected_ice()
self.ice_above_rain = self._find_ice_above_rain()
self.clear_above_rain = self._find_clear_above_rain()
def _find_clear_above_rain(self) -> np.ndarray:
return (
utils.transpose(self.is_rain) * ~self.is_ice
& self.category_bits.freezing
& ~self.category_bits.melting
)
def _find_attenuated(self) -> np.ndarray:
return (
self.quality_bits.attenuated_liquid
| self.quality_bits.attenuated_rain
| self.quality_bits.attenuated_melting
)
def _find_corrected(self) -> np.ndarray:
return (
self.quality_bits.corrected_liquid
| self.quality_bits.corrected_rain
| self.quality_bits.corrected_melting
)
def _find_ice(self) -> np.ndarray:
return (
self.category_bits.falling
& self.category_bits.freezing
& ~self.category_bits.melting
& ~self.category_bits.insect
)
def _find_would_be_ice(self) -> np.ndarray:
warm_falling = (
self.category_bits.falling
& ~self.category_bits.freezing
& ~self.category_bits.insect
)
return warm_falling | self.category_bits.melting
def _find_corrected_ice(self) -> np.ndarray:
return self.is_ice & self._is_attenuated & self._is_corrected
def _find_uncorrected_ice(self) -> np.ndarray:
uncorrected_melting = (
self.quality_bits.attenuated_melting & ~self.quality_bits.corrected_melting
)
uncorrected_rain = (
self.quality_bits.attenuated_rain & ~self.quality_bits.corrected_rain
)
uncorrected_liquid = (
self.quality_bits.attenuated_liquid & ~self.quality_bits.corrected_liquid
)
return (
self.is_ice
& self._is_attenuated
& (uncorrected_melting | uncorrected_rain | uncorrected_liquid)
)
def _find_ice_above_rain(self) -> np.ndarray:
is_rain = utils.transpose(self.is_rain)
return (self.is_ice * is_rain) == 1
[docs]
class IceSource(DataSource):
"""Base class for different ice products."""
def __init__(self, categorize_file: str, product: str):
super().__init__(categorize_file)
self.radar_frequency = float(self.getvar("radar_frequency"))
self.wl_band = utils.get_wl_band(self.radar_frequency)
self.temperature = _get_temperature(categorize_file)
self.product = product
self.coefficients = self._get_coefficients()
[docs]
def append_icy_data(
self,
ice_classification: IceClassification,
) -> None:
"""Adds the main variable (including ice above rain)."""
data = self._convert_z()
data[~ice_classification.is_ice | ice_classification.uncorrected_ice] = (
ma.masked
)
self.append_data(data, f"{self.product}")
[docs]
def append_status(self, ice_classification: IceClassification) -> None:
"""Adds the status of retrieval."""
data = self.data[self.product][:]
retrieval_status = np.zeros(data.shape, dtype=int)
is_data = ~data.mask
retrieval_status[is_data] = 1
retrieval_status[is_data & ice_classification.corrected_ice] = 3
retrieval_status[~is_data & ice_classification.is_ice] = 4
retrieval_status[ice_classification.uncorrected_ice] = 2
retrieval_status[ice_classification.clear_above_rain] = 6
retrieval_status[ice_classification.would_be_ice & (retrieval_status == 0)] = 7
self.append_data(retrieval_status, f"{self.product}_retrieval_status")
def _get_coefficients(self) -> IceCoefficients:
"""Returns coefficients for ice effective radius retrieval.
References:
Hogan et.al. 2006, https://doi.org/10.1175/JAM2340.1
"""
msg = f"Unsupported band: {self.wl_band}"
if self.product == "ier":
if self.wl_band == "Ka":
return IceCoefficients(0.878, -0.000205, -0.0015, 0.0016, -1.52)
if self.wl_band == "W":
return IceCoefficients(0.669, -0.000296, -0.00193, -0.000, -1.502)
raise ValueError(msg)
if self.wl_band == "Ka":
return IceCoefficients(0.878, 0.000242, -0.0186, 0.0699, -1.63)
if self.wl_band == "W":
return IceCoefficients(0.669, 0.000580, -0.00706, 0.0923, -0.992)
raise ValueError(msg)
def _convert_z(self, z_variable: str = "Z") -> np.ndarray:
"""Calculates temperature weighted z, i.e. ice effective radius [m]."""
if self.product not in ("iwc", "ier"):
msg = f"Invalid product: {self.product}"
raise ValueError(msg)
if z_variable not in ("Z", "Z_sensitivity"):
msg = f"Invalid z_variable: {z_variable}"
raise ValueError(msg)
temperature = (
self.temperature if z_variable == "Z" else ma.mean(self.temperature, axis=0)
)
z_scaled = self.getvar(z_variable) + self._get_z_factor()
g_to_kg = 0.001
m_to_mu = 1e6
scale = (
g_to_kg if self.product == "iwc" else 3 / (2 * constants.RHO_ICE) * m_to_mu
)
return (
10
** (
self.coefficients.ZT * z_scaled * temperature
+ self.coefficients.T * temperature
+ self.coefficients.Z * z_scaled
+ self.coefficients.c
)
* scale
)
def _get_z_factor(self) -> float:
"""Returns empirical scaling factor for radar echo."""
k2 = np.array(self.coefficients.K2liquid0) / 0.93
return float(utils.lin2db(k2))
def get_is_rain(filename: str) -> np.ndarray:
# TODO: Check that this is correct
with netCDF4.Dataset(filename) as nc:
for name in ["rain_detected", "rainfall_rate", "rain_rate"]:
if name in nc.variables:
data = nc.variables[name][:]
data = data != 0
data[data.mask] = True
return np.array(data)
msg = "No rain data found."
raise ValueError(msg)
def read_nc_field(nc_file: str, name: str) -> ma.MaskedArray:
with netCDF4.Dataset(nc_file) as nc:
return nc.variables[name][:]
[docs]
def interpolate_model(cat_file: str, names: str | list) -> dict[str, np.ndarray]:
"""Interpolates 2D model field into dense Cloudnet grid.
Args:
cat_file: Categorize file name.
names: Model variable to be interpolated, e.g. 'temperature' or ['temperature',
'pressure'].
Returns:
dict: Interpolated variables.
"""
def _interp_field(var_name: str) -> np.ndarray:
values = _read_nc_fields(
cat_file,
["model_time", "model_height", var_name, "time", "height"],
)
return utils.interpolate_2d(*values)
names = [names] if isinstance(names, str) else names
return {name: _interp_field(name) for name in names}
def _read_nc_fields(nc_file: str, names: list[str]) -> list[ma.MaskedArray]:
with netCDF4.Dataset(nc_file) as nc:
return [nc.variables[name][:] for name in names]
def _get_temperature(categorize_file: str) -> np.ndarray:
"""Returns interpolated temperatures in Celsius."""
atmosphere = interpolate_model(categorize_file, "temperature")
return atmos_utils.k2c(atmosphere["temperature"])