"""Module for creating classification file."""
from enum import IntEnum
from os import PathLike
from typing import NamedTuple
from uuid import UUID
import numpy as np
import numpy.typing as npt
from numpy import ma
from cloudnetpy import output, utils
from cloudnetpy.categorize import atmos_utils
from cloudnetpy.constants import M_S_TO_MM_H
from cloudnetpy.datasource import DataSource
from cloudnetpy.metadata import MetaData
from cloudnetpy.products.product_tools import CategorizeBits, QualityBits
[docs]
def generate_classification(
categorize_file: str | PathLike,
output_file: str | PathLike,
uuid: str | UUID | None = None,
) -> UUID:
"""Generates Cloudnet classification product.
This function reads the initial classification masks from a
categorize file and creates a more comprehensive classification
for different atmospheric targets. The results are written in a
netCDF file.
Args:
categorize_file: Categorize file name.
output_file: Output file name.
uuid: Set specific UUID for the file.
Returns:
str: UUID of the generated file.
Examples:
>>> from cloudnetpy.products import generate_classification
>>> generate_classification('categorize.nc', 'classification.nc')
"""
uuid = utils.get_uuid(uuid)
categorize_bits = CategorizeBits(categorize_file)
with DataSource(categorize_file) as source:
classification = _get_target_classification(categorize_bits)
source.append_data(classification, "target_classification")
detection_status = _get_detection_status(categorize_bits)
source.append_data(detection_status, "detection_status")
signal_source_status = _get_signal_source_status(categorize_bits)
source.append_data(signal_source_status, "signal_source_status")
att_status = _get_radar_attenuation_status(source, categorize_bits)
source.append_data(att_status, "radar_attenuation_status")
height = source.getvar("height")
bases, tops = _get_cloud_base_and_top_heights(classification, height)
source.append_data(bases, "cloud_base_height_amsl")
source.append_data(tops, "cloud_top_height_amsl")
source.append_data(
bases - source.altitude,
"cloud_base_height_agl",
)
source.append_data(
tops - source.altitude,
"cloud_top_height_agl",
)
cloud_top_status = _get_cloud_top_height_status(source, tops, att_status)
source.append_data(cloud_top_status, "cloud_top_height_status")
date = source.get_date()
attributes = output.add_time_attribute(CLASSIFICATION_ATTRIBUTES, date)
output.update_attributes(source.data, attributes)
file_type = "classification"
if "liquid_prob" in source.dataset.variables:
file_type += "-voodoo"
output.save_product_file(
file_type, source, output_file, uuid, copy_from_cat=("rain_detected",)
)
return uuid
[docs]
class TopStatus(IntEnum):
RELIABLE = 0
MODERATE_ATT = 1
UNCORR_ATT = 2
SEVERE_ATT = 3
ABOVE_RANGE = 4
[docs]
class AttStatus(IntEnum):
CLEAR = 0
NEGLIGIBLE = 1
SMALL = 2
MODERATE = 3
SEVERE = 4
UNCORRECTED = 5
UNDETECTED = 6
[docs]
class SignalStatus(IntEnum):
CLEAR = 0
BOTH = 1
RADAR_ONLY = 2
LIDAR_ONLY = 3
[docs]
class Target(IntEnum):
CLEAR = 0
DROPLET = 1
DRIZZLE_OR_RAIN = 2
DRIZZLE_OR_RAIN_AND_DROPLET = 3
ICE = 4
ICE_AND_SUPERCOOLED = 5
MELTING = 6
MELTING_AND_DROPLET = 7
AEROSOL = 8
INSECT = 9
INSECT_AND_AEROSOL = 10
[docs]
class DetectionStatus(IntEnum):
CLEAR = 0
LIDAR_ONLY = 1
RADAR_UNCERTAIN_ATT = 2
RADAR_AND_LIDAR = 3
NO_RADAR_UNCERTAIN_ATT = 4
RADAR_ONLY = 5
NO_RADAR_KNOWN_ATT = 6
RADAR_ATT_CORRECTED = 7
CLUTTER = 8
MOLECULAR_SCATT = 9
[docs]
class AttenuationClass(NamedTuple):
small: npt.NDArray
moderate: npt.NDArray
severe: npt.NDArray
def _get_cloud_top_height_status(
product_container: DataSource, tops: npt.NDArray, att_status: npt.NDArray
) -> npt.NDArray:
height = product_container.dataset.variables["height"][:]
dist = np.abs(height[None, :] - tops[:, None])
height_inds = dist.argmin(axis=1)
att_at_top = att_status[np.arange(att_status.shape[0]), height_inds]
status = np.zeros(att_at_top.size, dtype=int)
status[att_at_top == AttStatus.MODERATE] = TopStatus.MODERATE_ATT
status[att_at_top == AttStatus.SEVERE] = TopStatus.SEVERE_ATT
status[att_at_top == AttStatus.UNCORRECTED] = TopStatus.UNCORR_ATT
status[tops >= height[-1]] = TopStatus.ABOVE_RANGE
return status
def _get_radar_attenuation_status(
data_source: DataSource, categorize_bits: CategorizeBits
) -> npt.NDArray:
bits = categorize_bits.quality_bits
is_attenuated = _get_is_attenuated_mask(bits)
is_corrected = _get_is_corrected_mask(bits)
att = _get_attenuation_classes(data_source)
severity = np.zeros_like(att.small, dtype=int)
severity[bits.radar] = AttStatus.NEGLIGIBLE
severity[att.small & bits.radar] = AttStatus.SMALL
severity[att.moderate & bits.radar] = AttStatus.MODERATE
severity[att.severe & bits.radar] = AttStatus.SEVERE
severity[~is_corrected & is_attenuated & bits.radar] = AttStatus.UNCORRECTED
is_severe = severity == AttStatus.SEVERE
above_severe = utils.ffill(is_severe)
severity[above_severe & ~is_severe] = AttStatus.UNDETECTED
return severity
def _get_attenuation_classes(data_source: DataSource) -> AttenuationClass:
def _read_atten(key: str) -> npt.NDArray:
if key not in data_source.dataset.variables:
return np.zeros(data_source.time.shape)
data = data_source.getvar(key)
if isinstance(data, ma.MaskedArray):
return data.filled(0)
return data
liquid_atten = _read_atten("radar_liquid_atten")
rain_atten = _read_atten("radar_rain_atten")
melting_atten = _read_atten("radar_melting_atten")
not_w_band = data_source.getvar("radar_frequency") < 90
if "lwp" not in data_source.dataset.variables or not_w_band:
lwp = np.zeros(data_source.time.shape)
else:
lwp_data = data_source.getvar("lwp")
lwp = lwp_data.filled(0) if isinstance(lwp_data, ma.MaskedArray) else lwp_data
if "rainfall_rate" not in data_source.dataset.variables or not_w_band:
rain_rate = np.zeros(data_source.time.shape)
else:
rain_data = data_source.getvar("rainfall_rate") * M_S_TO_MM_H
rain_rate = (
rain_data.filled(0) if isinstance(rain_data, ma.MaskedArray) else rain_data
)
total_atten = liquid_atten + rain_atten + melting_atten
threshold_moderate = 10 # dB
threshold_severe = 15 # dB
threshold_lwp = 1 # kg/m2
threshold_rain = 3 # mm/h
small = total_atten > 0
moderate = total_atten >= threshold_moderate
severe = (
(total_atten > threshold_severe)
| (lwp[:, np.newaxis] > threshold_lwp)
| (rain_rate[:, np.newaxis] > threshold_rain)
)
return AttenuationClass(small=small, moderate=moderate, severe=severe)
def _get_target_classification(
categorize_bits: CategorizeBits,
) -> ma.MaskedArray:
bits = categorize_bits.category_bits
clutter = categorize_bits.quality_bits.clutter
classification = ma.zeros(bits.freezing.shape, dtype=int)
classification[bits.droplet & ~bits.falling] = Target.DROPLET
classification[~bits.droplet & bits.falling] = Target.DRIZZLE_OR_RAIN
classification[bits.droplet & bits.falling] = Target.DRIZZLE_OR_RAIN_AND_DROPLET
classification[~bits.droplet & bits.falling & bits.freezing] = Target.ICE
classification[bits.droplet & bits.falling & bits.freezing] = (
Target.ICE_AND_SUPERCOOLED
)
classification[bits.melting] = Target.MELTING
classification[bits.melting & bits.droplet] = Target.MELTING_AND_DROPLET
classification[bits.aerosol] = Target.AEROSOL
classification[bits.insect & ~clutter] = Target.INSECT
classification[bits.aerosol & bits.insect & ~clutter] = Target.INSECT_AND_AEROSOL
classification[clutter & ~bits.aerosol] = Target.CLEAR
return classification
def _get_detection_status(categorize_bits: CategorizeBits) -> npt.NDArray:
bits = categorize_bits.quality_bits
is_attenuated = _get_is_attenuated_mask(bits)
is_corrected = _get_is_corrected_mask(bits)
status = np.zeros(bits.radar.shape, dtype=int)
status[bits.lidar & ~bits.radar] = DetectionStatus.LIDAR_ONLY
status[bits.radar & bits.lidar] = DetectionStatus.RADAR_AND_LIDAR
status[~bits.radar & is_attenuated & ~is_corrected] = (
DetectionStatus.NO_RADAR_UNCERTAIN_ATT
)
status[bits.radar & ~bits.lidar & ~is_attenuated] = DetectionStatus.RADAR_ONLY
status[~bits.radar & is_attenuated & is_corrected] = (
DetectionStatus.NO_RADAR_KNOWN_ATT
)
status[bits.radar & is_corrected] = DetectionStatus.RADAR_ATT_CORRECTED
status[bits.radar & is_attenuated & ~is_corrected] = (
DetectionStatus.RADAR_UNCERTAIN_ATT
)
status[bits.clutter] = DetectionStatus.CLUTTER
status[bits.molecular & ~bits.radar] = DetectionStatus.MOLECULAR_SCATT
return status
def _get_is_corrected_mask(bits: QualityBits) -> npt.NDArray:
is_attenuated = _get_is_attenuated_mask(bits)
return (
is_attenuated
& (~bits.attenuated_liquid | bits.corrected_liquid)
& (~bits.attenuated_rain | bits.corrected_rain)
& (~bits.attenuated_melting | bits.corrected_melting)
)
def _get_is_attenuated_mask(bits: QualityBits) -> npt.NDArray:
return bits.attenuated_liquid | bits.attenuated_rain | bits.attenuated_melting
def _get_signal_source_status(categorize_bits: CategorizeBits) -> npt.NDArray:
bits = categorize_bits.quality_bits
status = np.zeros(bits.radar.shape, dtype=int)
status[bits.radar & bits.lidar] = SignalStatus.BOTH
status[bits.radar & ~bits.lidar] = SignalStatus.RADAR_ONLY
status[bits.lidar & ~bits.radar] = SignalStatus.LIDAR_ONLY
return status
def _get_cloud_base_and_top_heights(
classification: npt.NDArray,
height: npt.NDArray,
) -> tuple[npt.NDArray, npt.NDArray]:
cloud_mask = _find_cloud_mask(classification)
if not cloud_mask.any():
return ma.masked_all(cloud_mask.shape[0]), ma.masked_all(cloud_mask.shape[0])
lowest_bases = atmos_utils.find_lowest_cloud_bases(cloud_mask, height)
highest_tops = atmos_utils.find_highest_cloud_tops(cloud_mask, height)
if not (highest_tops - lowest_bases >= 0).all():
msg = "Cloud base higher than cloud top!"
raise ValueError(msg)
return lowest_bases, highest_tops
def _find_cloud_mask(classification: npt.NDArray) -> npt.NDArray:
cloud_mask = np.zeros(classification.shape, dtype=int)
for value in [
Target.DROPLET,
Target.DRIZZLE_OR_RAIN_AND_DROPLET,
Target.ICE,
Target.ICE_AND_SUPERCOOLED,
]:
cloud_mask[classification == value] = 1
return cloud_mask
COMMENTS = {
"target_classification": (
"\n"
"This variable provides the main atmospheric target classifications\n"
"that can be distinguished by radar and lidar."
),
"detection_status": (
"\n"
"This variable reports on the reliability of the radar and lidar data\n"
"used to perform the classification."
),
}
DEFINITIONS = {
"target_classification": utils.status_field_definition(
{
Target.CLEAR: """Clear sky.""",
Target.DROPLET: """Cloud liquid droplets only.""",
Target.DRIZZLE_OR_RAIN: """Drizzle or rain.""",
Target.DRIZZLE_OR_RAIN_AND_DROPLET: """Drizzle or rain
coexisting with cloud liquid droplets.""",
Target.ICE: """Ice particles.""",
Target.ICE_AND_SUPERCOOLED: """Ice coexisting with
supercooled liquid droplets.""",
Target.MELTING: """Melting ice particles.""",
Target.MELTING_AND_DROPLET: """Melting ice particles
coexisting with cloud liquid droplets.""",
Target.AEROSOL: """Aerosol particles, no cloud or precipitation.""",
Target.INSECT: """Insects, no cloud or precipitation.""",
Target.INSECT_AND_AEROSOL: """Aerosol coexisting
with insects, no cloud or precipitation.""",
}
),
"detection_status": utils.status_field_definition(
{
DetectionStatus.CLEAR: """Clear sky.""",
DetectionStatus.LIDAR_ONLY: """Lidar echo only.""",
DetectionStatus.RADAR_UNCERTAIN_ATT: """
Radar echo but reflectivity may be unreliable as attenuation
by rain, melting ice or liquid cloud has not been
corrected.""",
DetectionStatus.RADAR_AND_LIDAR: """Good radar and lidar echos.""",
DetectionStatus.NO_RADAR_UNCERTAIN_ATT: """
No radar echo but rain or liquid cloud beneath mean that
attenuation that would be experienced is unknown.""",
DetectionStatus.RADAR_ONLY: """
Good radar echo only.""",
DetectionStatus.NO_RADAR_KNOWN_ATT: """
No radar echo but known attenuation.""",
DetectionStatus.RADAR_ATT_CORRECTED: """
Radar echo corrected for liquid, rain or melting attenuation.""",
DetectionStatus.CLUTTER: """
Radar ground clutter.""",
DetectionStatus.MOLECULAR_SCATT: """
Lidar clear-air molecular scattering.""",
}
),
"cloud_top_height_status": utils.status_field_definition(
{
TopStatus.RELIABLE: """Reliable.""",
TopStatus.MODERATE_ATT: """Uncertain due to moderate
radar attenuation.""",
TopStatus.UNCORR_ATT: """Uncertain due to incomplete
radar attenuation correction.""",
TopStatus.SEVERE_ATT: """Likely erroneous due to
severe radar attenuation.""",
TopStatus.ABOVE_RANGE: """Cloud top above radar
measurement range.""",
}
),
"signal_source_status": utils.status_field_definition(
{
SignalStatus.CLEAR: """No signal from radar or lidar.""",
SignalStatus.BOTH: """Signal from both radar and lidar.""",
SignalStatus.RADAR_ONLY: """Signal from radar only.""",
SignalStatus.LIDAR_ONLY: """Signal from lidar only.""",
}
),
"radar_attenuation_status": utils.status_field_definition(
{
AttStatus.CLEAR: """No radar signal.""",
AttStatus.NEGLIGIBLE: """Radar signal,
negligible attenuation (corrected).""",
AttStatus.SMALL: """Radar signal,
small attenuation (corrected).""",
AttStatus.MODERATE: """Radar signal,
moderate attenuation (corrected).""",
AttStatus.SEVERE: """Radar signal,
severe attenuation (corrected).""",
AttStatus.UNCORRECTED: """Radar signal,
attenuation present but not corrected.""",
AttStatus.UNDETECTED: """No radar signal, cloud
may be undetected due to severe attenuation beneath.""",
}
),
}
CLASSIFICATION_ATTRIBUTES = {
"target_classification": MetaData(
long_name="Target classification",
comment=COMMENTS["target_classification"],
definition=DEFINITIONS["target_classification"],
units="1",
dimensions=("time", "height"),
),
"detection_status": MetaData(
long_name="Radar and lidar detection status",
comment=COMMENTS["detection_status"],
definition=DEFINITIONS["detection_status"],
units="1",
dimensions=("time", "height"),
),
"signal_source_status": MetaData(
long_name="Signal source status",
units="1",
dimensions=("time", "height"),
definition=DEFINITIONS["signal_source_status"],
),
"radar_attenuation_status": MetaData(
long_name="Radar attenuation status",
units="1",
dimensions=("time", "height"),
definition=DEFINITIONS["radar_attenuation_status"],
),
"cloud_top_height_amsl": MetaData(
long_name="Height of cloud top above mean sea level",
units="m",
dimensions=("time",),
ancillary_variables="cloud_top_height_status",
),
"cloud_base_height_amsl": MetaData(
long_name="Height of cloud base above mean sea level",
units="m",
dimensions=("time",),
),
"cloud_top_height_agl": MetaData(
long_name="Height of cloud top above ground level",
units="m",
dimensions=("time",),
ancillary_variables="cloud_top_height_status",
),
"cloud_base_height_agl": MetaData(
long_name="Height of cloud base above ground level",
units="m",
dimensions=("time",),
),
"cloud_top_height_status": MetaData(
long_name="Cloud top height quality status",
units="1",
dimensions=("time",),
definition=DEFINITIONS["cloud_top_height_status"],
),
}