Source code for categorize.classify

import numpy as np
import skimage
from numpy import ma
from numpy.typing import NDArray

from cloudnetpy import utils
from cloudnetpy.categorize import (
    atmos_utils,
    droplet,
    falling,
    freezing,
    insects,
    melting,
)
from cloudnetpy.categorize.attenuations import RadarAttenuation
from cloudnetpy.categorize.containers import (
    ClassData,
    ClassificationResult,
    Observations,
)
from cloudnetpy.constants import T0
from cloudnetpy.products.product_tools import CategoryBits, QualityBits


[docs] def classify_measurements(data: Observations) -> ClassificationResult: """Classifies radar/lidar observations. This function classifies atmospheric scatterers from the input data. The input data needs to be averaged or interpolated to the common time / height grid before calling this function. Args: data: A :class:`Observations` instance. Returns: A :class:`ClassificationResult` instance. References: The Cloudnet classification scheme is based on methodology proposed by Hogan R. and O'Connor E., 2004, https://bit.ly/2Yjz9DZ and its proprietary Matlab implementation. Notes: Some individual classification methods are changed in this Python implementation compared to the original Cloudnet methodology. Especially methods classifying insects, melting layer and liquid droplets. """ bits = CategoryBits( droplet=np.array([], dtype=bool), falling=np.array([], dtype=bool), freezing=np.array([], dtype=bool), melting=np.array([], dtype=bool), aerosol=np.array([], dtype=bool), insect=np.array([], dtype=bool), ) obs = ClassData(data) bits.melting = melting.find_melting_layer(obs) bits.freezing = freezing.find_freezing_region(obs, bits.melting) liquid_from_lidar = droplet.find_liquid(obs) if obs.lv0_files is not None and len(obs.lv0_files) > 0: if "rpg-fmcw-94" not in obs.radar_type.lower(): msg = "VoodooNet is only implemented for RPG-FMCW-94 radar." raise NotImplementedError(msg) import voodoonet from voodoonet.utils import VoodooOptions options = VoodooOptions(progress_bar=False) target_time = voodoonet.utils.decimal_hour2unix(obs.date, obs.time) liquid_prob = voodoonet.infer( obs.lv0_files, target_time=target_time, options=options ) liquid_from_radar = liquid_prob > 0.55 liquid_from_radar = _remove_false_radar_liquid( liquid_from_radar, liquid_from_lidar, ) liquid_from_radar[~bits.freezing] = 0 is_liquid = liquid_from_radar | liquid_from_lidar else: is_liquid = liquid_from_lidar liquid_prob = None bits.droplet = droplet.correct_liquid_top(obs, is_liquid, bits.freezing, limit=500) bits.insect, insect_prob = insects.find_insects(obs, bits.melting, bits.droplet) bits.falling = falling.find_falling_hydrometeors(obs, bits.droplet, bits.insect) filtered_ice = _filter_falling(bits) for _ in range(5): _fix_undetected_melting_layer(bits) _filter_insects(bits) bits.aerosol = _find_aerosols(obs, bits) bits.aerosol[filtered_ice] = False _fix_super_cold_liquid(obs, bits) return ClassificationResult( category_bits=bits, is_rain=obs.is_rain, is_clutter=obs.is_clutter, insect_prob=insect_prob, liquid_prob=liquid_prob, )
def fetch_quality( data: Observations, classification: ClassificationResult, attenuations: RadarAttenuation, ) -> QualityBits: return QualityBits( radar=~data.radar.data["Z"][:].mask, lidar=~data.lidar.data["beta"][:].mask, clutter=classification.is_clutter, molecular=np.zeros(data.radar.data["Z"][:].shape, dtype=bool), attenuated_liquid=attenuations.liquid.attenuated, corrected_liquid=attenuations.liquid.attenuated & ~attenuations.liquid.uncorrected, attenuated_rain=attenuations.rain.attenuated, corrected_rain=attenuations.rain.attenuated & ~attenuations.rain.uncorrected, attenuated_melting=attenuations.melting.attenuated, corrected_melting=attenuations.melting.attenuated & ~attenuations.melting.uncorrected, ) def _fix_super_cold_liquid(obs: ClassData, bits: CategoryBits) -> None: """Supercooled liquid droplets do not exist in atmosphere below around -38 C.""" t_limit = T0 - 38 super_cold_liquid = np.where((obs.tw < t_limit) & bits.droplet) bits.droplet[super_cold_liquid] = False bits.falling[super_cold_liquid] = True def _remove_false_radar_liquid( liquid_from_radar: np.ndarray, liquid_from_lidar: np.ndarray, ) -> NDArray[np.bool_]: """Removes radar-liquid below lidar-detected liquid bases.""" lidar_liquid_bases = atmos_utils.find_cloud_bases(liquid_from_lidar) for prof, base in zip(*np.where(lidar_liquid_bases), strict=True): liquid_from_radar[prof, 0:base] = 0 return liquid_from_radar def _find_aerosols( obs: ClassData, bits: CategoryBits, ) -> NDArray[np.bool_]: """Estimates aerosols from lidar backscattering. Aerosols are lidar signals that are: a) not falling, b) not liquid droplets. Args: obs: A :class:`ClassData` instance. bits: A :class:`CategoryBits instance. Returns: 2-D boolean array containing aerosols. """ is_beta = ~obs.beta.mask return is_beta & ~bits.falling & ~bits.droplet def _fix_undetected_melting_layer(bits: CategoryBits) -> None: drizzle_and_falling = _find_drizzle_and_falling(bits) transition = ma.diff(drizzle_and_falling, axis=1) == -1 bits.melting[:, 1:][transition] = True def _find_drizzle_and_falling(bits: CategoryBits) -> np.ndarray: """Classifies pixels as falling, drizzle and others. Args: bits: A :class:`CategoryBits instance. Returns: 2D array where values are 1 (falling, drizzle, supercooled liquids), 2 (drizzle), and masked (all others). """ falling_dry = bits.falling & ~bits.droplet supercooled_liquids = bits.droplet & bits.freezing drizzle = falling_dry & ~bits.freezing drizzle_and_falling = falling_dry.astype(int) + drizzle.astype(int) drizzle_and_falling = ma.copy(drizzle_and_falling) drizzle_and_falling[supercooled_liquids] = 1 drizzle_and_falling[drizzle_and_falling == 0] = ma.masked return drizzle_and_falling def _filter_insects(bits: CategoryBits) -> None: is_melting_layer = bits.melting is_insects = bits.insect is_falling = bits.falling # Remove above melting layer above_melting = utils.ffill(is_melting_layer) ind = np.where(is_insects & above_melting) is_falling[ind] = True is_insects[ind] = False # remove around melting layer: original_insects = np.copy(is_insects) n_gates = 5 for x, y in zip(*np.where(is_melting_layer), strict=True): try: # change insects to drizzle below melting layer pixel ind1 = np.arange(y - n_gates, y) ind11 = np.where(original_insects[x, ind1])[0] n_drizzle = sum(is_falling[x, :y]) if n_drizzle > 5: is_falling[x, ind1[ind11]] = True is_insects[x, ind1[ind11]] = False else: continue # change insects on the right and left of melting layer pixel to drizzle ind1 = np.arange(x - n_gates, x + n_gates + 1) ind11 = np.where(original_insects[ind1, y])[0] is_falling[ind1[ind11], y - 1 : y + 2] = True is_insects[ind1[ind11], y - 1 : y + 2] = False except IndexError: continue bits.falling = is_falling bits.insect = is_insects def _filter_falling(bits: CategoryBits) -> tuple: # filter falling ice speckle noise is_freezing = bits.freezing is_falling = bits.falling is_falling_filtered = skimage.morphology.remove_small_objects( is_falling, 10, connectivity=1, ) is_filtered = is_falling & ~np.array(is_falling_filtered) ice_ind = np.where(is_freezing & is_filtered) is_falling[ice_ind] = False # in warm these are (probably) insects insect_ind = np.where(~is_freezing & is_filtered) is_falling[insect_ind] = False bits.falling = is_falling bits.insect[insect_ind] = True return ice_ind