import numpy as np
import numpy.typing as npt
from numpy import ma
from scipy import ndimage
from cloudnetpy import utils
from cloudnetpy.categorize import (
atmos_utils,
droplet,
falling,
freezing,
insects,
melting,
)
from cloudnetpy.categorize.attenuations import RadarAttenuation
from cloudnetpy.categorize.containers import (
ClassData,
ClassificationResult,
Observations,
)
from cloudnetpy.constants import T0
from cloudnetpy.products.product_tools import CategoryBits, QualityBits
[docs]
def classify_measurements(data: Observations) -> ClassificationResult:
"""Classifies radar/lidar observations.
This function classifies atmospheric scatterers from the input data.
The input data needs to be averaged or interpolated to the common
time / height grid before calling this function.
Args:
data: A :class:`Observations` instance.
Returns:
A :class:`ClassificationResult` instance.
References:
The Cloudnet classification scheme is based on methodology proposed by
Hogan R. and O'Connor E., 2004, https://bit.ly/2Yjz9DZ and its
proprietary Matlab implementation.
Notes:
Some individual classification methods are changed in this Python
implementation compared to the original Cloudnet methodology.
Especially methods classifying insects, melting layer and liquid droplets.
"""
bits = CategoryBits(
droplet=np.array([], dtype=bool),
falling=np.array([], dtype=bool),
freezing=np.array([], dtype=bool),
melting=np.array([], dtype=bool),
aerosol=np.array([], dtype=bool),
insect=np.array([], dtype=bool),
)
obs = ClassData(data)
bits.melting = melting.find_melting_layer(obs)
bits.freezing = freezing.find_freezing_region(obs, bits.melting)
liquid_from_lidar = droplet.find_liquid(obs)
if obs.lv0_files is not None and len(obs.lv0_files) > 0:
if "rpg-fmcw-94" not in obs.radar_type.lower():
msg = "VoodooNet is only implemented for RPG-FMCW-94 radar."
raise NotImplementedError(msg)
import voodoonet # noqa: PLC0415
options = voodoonet.VoodooOptions(progress_bar=False)
dumb_date = obs.date.isoformat().split("-")
target_time = voodoonet.utils.decimal_hour2unix(dumb_date, obs.time)
liquid_prob = voodoonet.infer(
list(obs.lv0_files), target_time=target_time, options=options
)
liquid_from_radar = liquid_prob > 0.55
liquid_from_radar = _remove_false_radar_liquid(
liquid_from_radar,
liquid_from_lidar,
)
liquid_from_radar[~bits.freezing] = 0
is_liquid = liquid_from_radar | liquid_from_lidar
else:
is_liquid = liquid_from_lidar
liquid_prob = None
bits.droplet = droplet.correct_liquid_top(obs, is_liquid, bits.freezing, limit=500)
bits.insect, insect_prob = insects.find_insects(obs, bits.melting, bits.droplet)
bits.falling = falling.find_falling_hydrometeors(obs, bits.droplet, bits.insect)
for _ in range(5):
_fix_undetected_melting_layer(bits, obs.lwp)
_filter_insects(bits)
bits.aerosol = _find_aerosols(obs, bits)
_fix_super_cold_liquid(obs, bits)
_reclassify_ice_with_high_ldr(obs, bits)
_filter_insects_in_falling(bits)
_filter_falling(bits, obs)
is_clutter = _remove_clutter_in_liquid(obs.is_clutter, bits)
return ClassificationResult(
category_bits=bits,
is_rain=obs.is_rain,
is_clutter=is_clutter,
insect_prob=insect_prob,
liquid_prob=liquid_prob,
)
def fetch_quality(
data: Observations,
classification: ClassificationResult,
attenuations: RadarAttenuation,
) -> QualityBits:
return QualityBits(
radar=~data.radar.data["Z"][:].mask,
lidar=~data.lidar.data["beta"][:].mask,
clutter=classification.is_clutter,
molecular=np.zeros(data.radar.data["Z"][:].shape, dtype=bool),
attenuated_liquid=attenuations.liquid.attenuated,
corrected_liquid=attenuations.liquid.attenuated
& ~attenuations.liquid.uncorrected,
attenuated_rain=attenuations.rain.attenuated,
corrected_rain=attenuations.rain.attenuated & ~attenuations.rain.uncorrected,
attenuated_melting=attenuations.melting.attenuated,
corrected_melting=attenuations.melting.attenuated
& ~attenuations.melting.uncorrected,
)
def _reclassify_ice_with_high_ldr(obs: ClassData, bits: CategoryBits) -> None:
"""Reclassifies ice pixels with high LDR as insects.
Ice particles typically have LDR below -13 dB. Higher values in the
freezing region (excluding the melting layer) indicate insects, which
can be present in temperatures down to about -10 C.
"""
if not hasattr(obs, "ldr"):
return
ldr_limit = -13
temp_limit = T0 - 10
is_ice = bits.falling & bits.freezing & ~bits.droplet & ~bits.melting
high_ldr = ~ma.getmaskarray(obs.ldr) & (obs.ldr > ldr_limit)
warm_enough = obs.tw > temp_limit
above_melting = utils.ffill(bits.melting)
reclassify = is_ice & high_ldr & warm_enough & ~above_melting
bits.insect[reclassify] = True
bits.falling[reclassify] = False
has_lidar = ~obs.beta.mask
bits.aerosol[reclassify & has_lidar] = True
def _filter_insects_in_falling(
bits: CategoryBits,
min_falling_size: int = 100,
dilation: int = 2,
) -> None:
"""Reclassifies insect pixels in freezing regions near large falling ice.
Insect pixels in freezing temperatures near sufficiently large falling
hydrometeor regions are almost certainly false positives. Real insects
cannot survive in freezing conditions, so the freezing constraint
protects legitimate warm-region insects from being reclassified.
Args:
bits: A :class:`CategoryBits` instance.
min_falling_size: Minimum size of nearby falling regions required
to trigger reclassification.
dilation: Number of pixels to dilate around falling regions.
"""
structure = ndimage.generate_binary_structure(2, 1)
falling_labels, n_falling = ndimage.label(bits.falling, structure=structure)
if n_falling == 0:
return
falling_ids = np.arange(1, n_falling + 1)
falling_sizes = ndimage.sum(bits.falling, falling_labels, falling_ids)
large_falling = np.isin(
falling_labels, falling_ids[falling_sizes >= min_falling_size]
)
near_large_falling = ndimage.binary_dilation(
large_falling, structure, iterations=dilation
)
reclassify = bits.insect & bits.freezing & near_large_falling
bits.falling[reclassify] = True
bits.insect[reclassify] = False
bits.aerosol[reclassify] = False
def _fix_super_cold_liquid(obs: ClassData, bits: CategoryBits) -> None:
"""Supercooled liquid droplets do not exist in atmosphere below around -38 C."""
t_limit = T0 - 38
super_cold_liquid = np.where((obs.tw < t_limit) & bits.droplet)
bits.droplet[super_cold_liquid] = False
bits.falling[super_cold_liquid] = True
def _remove_false_radar_liquid(
liquid_from_radar: npt.NDArray,
liquid_from_lidar: npt.NDArray,
) -> npt.NDArray[np.bool_]:
"""Removes radar-liquid below lidar-detected liquid bases."""
lidar_liquid_bases = atmos_utils.find_cloud_bases(liquid_from_lidar)
for prof, base in zip(*np.where(lidar_liquid_bases), strict=True):
liquid_from_radar[prof, 0:base] = 0
return liquid_from_radar
def _find_aerosols(
obs: ClassData,
bits: CategoryBits,
) -> npt.NDArray[np.bool_]:
"""Estimates aerosols from lidar backscattering.
Aerosols are lidar signals that are: a) not falling, b) not liquid droplets.
Args:
obs: A :class:`ClassData` instance.
bits: A :class:`CategoryBits instance.
Returns:
2-D boolean array containing aerosols.
"""
is_beta = ~obs.beta.mask
return is_beta & ~bits.falling & ~bits.droplet
def _fix_undetected_melting_layer(
bits: CategoryBits,
lwp: npt.NDArray,
lwp_threshold: float = 0.005,
) -> None:
drizzle_and_falling = _find_drizzle_and_falling(bits)
transition = ma.diff(drizzle_and_falling, axis=1) == -1
masked = ma.getmaskarray(lwp)
allow = masked | (np.asarray(lwp) > lwp_threshold)
bits.melting[:, 1:][transition & allow[:, np.newaxis]] = True
def _find_drizzle_and_falling(bits: CategoryBits) -> npt.NDArray:
"""Classifies pixels as falling, drizzle and others.
Args:
bits: A :class:`CategoryBits instance.
Returns:
2D array where values are 1 (falling, drizzle, supercooled liquids),
2 (drizzle), and masked (all others).
"""
falling_dry = bits.falling & ~bits.droplet
supercooled_liquids = bits.droplet & bits.freezing
drizzle = falling_dry & ~bits.freezing
drizzle_and_falling = falling_dry.astype(int) + drizzle.astype(int)
drizzle_and_falling = ma.copy(drizzle_and_falling)
drizzle_and_falling[supercooled_liquids] = 1
drizzle_and_falling[drizzle_and_falling == 0] = ma.masked
return drizzle_and_falling
def _filter_insects(bits: CategoryBits) -> None:
is_melting_layer = bits.melting
is_insects = bits.insect
is_falling = bits.falling
# Remove above melting layer
above_melting = utils.ffill(is_melting_layer)
ind = np.where(is_insects & above_melting)
is_falling[ind] = True
is_insects[ind] = False
# remove around melting layer:
original_insects = np.copy(is_insects)
n_gates = 5
for x, y in zip(*np.where(is_melting_layer), strict=True):
try:
# change insects to drizzle below melting layer pixel
ind1 = np.arange(y - n_gates, y)
ind11 = np.where(original_insects[x, ind1])[0]
n_drizzle = sum(is_falling[x, :y])
if n_drizzle > 5:
is_falling[x, ind1[ind11]] = True
is_insects[x, ind1[ind11]] = False
else:
continue
# change insects on the right and left of melting layer pixel to drizzle
ind1 = np.arange(x - n_gates, x + n_gates + 1)
ind11 = np.where(original_insects[ind1, y])[0]
is_falling[ind1[ind11], y - 1 : y + 2] = True
is_insects[ind1[ind11], y - 1 : y + 2] = False
except IndexError:
continue
bits.falling = is_falling
bits.insect = is_insects
def _remove_clutter_in_liquid(
is_clutter: npt.NDArray,
bits: CategoryBits,
) -> npt.NDArray:
"""Removes clutter that is inside liquid or falling hydrometeor layers.
Clutter detection based on near-zero velocity can produce false positives
inside cloud and drizzle layers. This function removes clutter flags from
pixels that are adjacent to droplet or falling hydrometeor pixels, and
reclassifies them as falling hydrometeors.
"""
is_clutter = is_clutter.copy()
liquid_or_falling = bits.droplet | bits.falling
structure = ndimage.generate_binary_structure(2, 1)
near_liquid = ndimage.binary_dilation(liquid_or_falling, structure)
false_clutter = is_clutter & near_liquid
is_clutter[false_clutter] = False
bits.falling[false_clutter] = True
return is_clutter
def _filter_falling(bits: CategoryBits, obs: ClassData) -> None:
# filter falling ice speckle noise
is_beta = ~obs.beta.mask
is_freezing = bits.freezing
is_falling = bits.falling
filtered_out = is_falling & ~np.asarray(
utils.remove_small_objects(
is_falling,
max_size=10,
connectivity=1,
),
dtype=bool,
)
is_falling[filtered_out] = False
# In warm conditions, these are likely insects
bits.insect[filtered_out & ~is_freezing] = True
bits.aerosol[filtered_out & ~is_freezing & is_beta] = True
# In cold conditions, classify as aerosol if lidar signal is present
bits.aerosol[filtered_out & is_freezing & is_beta] = True
bits.falling = is_falling