"""Module for creating classification file."""
import numpy as np
from numpy import ma
from cloudnetpy import output, utils
from cloudnetpy.categorize import atmos_utils
from cloudnetpy.datasource import DataSource
from cloudnetpy.metadata import MetaData
from cloudnetpy.products.product_tools import CategorizeBits
[docs]
def generate_classification(
categorize_file: str,
output_file: str,
uuid: str | None = None,
) -> str:
"""Generates Cloudnet classification product.
This function reads the initial classification masks from a
categorize file and creates a more comprehensive classification
for different atmospheric targets. The results are written in a
netCDF file.
Args:
categorize_file: Categorize file name.
output_file: Output file name.
uuid: Set specific UUID for the file.
Returns:
str: UUID of the generated file.
Examples:
>>> from cloudnetpy.products import generate_classification
>>> generate_classification('categorize.nc', 'classification.nc')
"""
with DataSource(categorize_file) as product_container:
categorize_bits = CategorizeBits(categorize_file)
classification = _get_target_classification(categorize_bits)
product_container.append_data(classification, "target_classification")
status = _get_detection_status(categorize_bits)
product_container.append_data(status, "detection_status")
bases, tops = _get_cloud_base_and_top_heights(classification, product_container)
product_container.append_data(bases, "cloud_base_height_amsl")
product_container.append_data(tops, "cloud_top_height_amsl")
product_container.append_data(
bases - product_container.altitude,
"cloud_base_height_agl",
)
product_container.append_data(
tops - product_container.altitude,
"cloud_top_height_agl",
)
date = product_container.get_date()
attributes = output.add_time_attribute(CLASSIFICATION_ATTRIBUTES, date)
output.update_attributes(product_container.data, attributes)
file_type = "classification"
if "liquid_prob" in product_container.dataset.variables:
file_type += "-voodoo"
return output.save_product_file(
file_type,
product_container,
output_file,
uuid,
)
def _get_target_classification(
categorize_bits: CategorizeBits,
) -> ma.MaskedArray:
bits = categorize_bits.category_bits
clutter = categorize_bits.quality_bits.clutter
classification = ma.zeros(bits.freezing.shape, dtype=int)
classification[bits.droplet & ~bits.falling] = 1 # Cloud droplets
classification[~bits.droplet & bits.falling] = 2 # Drizzle or rain
classification[bits.droplet & bits.falling] = 3 # Drizzle or rain and droplets
classification[~bits.droplet & bits.falling & bits.freezing] = 4 # ice
classification[bits.droplet & bits.falling & bits.freezing] = 5 # ice + supercooled
classification[bits.melting] = 6 # melting layer
classification[bits.melting & bits.droplet] = 7 # melting + droplets
classification[bits.aerosol] = 8 # aerosols
classification[bits.insect & ~clutter] = 9 # insects
classification[bits.aerosol & bits.insect & ~clutter] = 10 # insects + aerosols
classification[clutter & ~bits.aerosol] = 0
return classification
def _get_detection_status(categorize_bits: CategorizeBits) -> np.ndarray:
bits = categorize_bits.quality_bits
is_attenuated = (
bits.attenuated_liquid | bits.attenuated_rain | bits.attenuated_melting
)
is_corrected = (
is_attenuated
& (~bits.attenuated_liquid | bits.corrected_liquid)
& (~bits.attenuated_rain | bits.corrected_rain)
& (~bits.attenuated_melting | bits.corrected_melting)
)
status = np.zeros(bits.radar.shape, dtype=int)
status[bits.lidar & ~bits.radar] = 1
status[bits.radar & bits.lidar] = 3
status[~bits.radar & is_attenuated & ~is_corrected] = 4
status[bits.radar & ~bits.lidar & ~is_attenuated] = 5
status[~bits.radar & is_attenuated & is_corrected] = 6
status[bits.radar & is_corrected] = 7
status[bits.radar & is_attenuated & ~is_corrected] = 2
status[bits.clutter] = 8
status[bits.molecular & ~bits.radar] = 9
return status
def _get_cloud_base_and_top_heights(
classification: np.ndarray,
product_container: DataSource,
) -> tuple[np.ndarray, np.ndarray]:
height = product_container.getvar("height")
cloud_mask = _find_cloud_mask(classification)
if not cloud_mask.any():
return ma.masked_all(cloud_mask.shape[0]), ma.masked_all(cloud_mask.shape[0])
lowest_bases = atmos_utils.find_lowest_cloud_bases(cloud_mask, height)
highest_tops = atmos_utils.find_highest_cloud_tops(cloud_mask, height)
if not (highest_tops - lowest_bases >= 0).all():
msg = "Cloud base higher than cloud top!"
raise ValueError(msg)
return lowest_bases, highest_tops
def _find_cloud_mask(classification: np.ndarray) -> np.ndarray:
cloud_mask = np.zeros(classification.shape, dtype=int)
for value in [1, 3, 4, 5]:
cloud_mask[classification == value] = 1
return cloud_mask
COMMENTS = {
"target_classification": (
"\n"
"This variable provides the main atmospheric target classifications\n"
"that can be distinguished by radar and lidar."
),
"detection_status": (
"\n"
"This variable reports on the reliability of the radar and lidar data\n"
"used to perform the classification."
),
}
DEFINITIONS = {
"target_classification": utils.status_field_definition(
{
0: "Clear sky.",
1: "Cloud liquid droplets only.",
2: "Drizzle or rain.",
3: "Drizzle or rain coexisting with cloud liquid droplets.",
4: "Ice particles.",
5: "Ice coexisting with supercooled liquid droplets.",
6: "Melting ice particles.",
7: "Melting ice particles coexisting with cloud liquid droplets.",
8: "Aerosol particles, no cloud or precipitation.",
9: "Insects, no cloud or precipitation.",
10: "Aerosol coexisting with insects, no cloud or precipitation.",
}
),
"detection_status": utils.status_field_definition(
{
0: """Clear sky.""",
1: """Lidar echo only.""",
2: """Radar echo but reflectivity may be unreliable as attenuation
by rain, melting ice or liquid cloud has not been
corrected.""",
3: """Good radar and lidar echos.""",
4: """No radar echo but rain or liquid cloud beneath mean that
attenuation that would be experienced is unknown.""",
5: """Good radar echo only.""",
6: """No radar echo but known attenuation.""",
7: """Radar echo corrected for liquid, rain or melting
attenuation.""",
8: """Radar ground clutter.""",
9: """Lidar clear-air molecular scattering.""",
}
),
}
CLASSIFICATION_ATTRIBUTES = {
"target_classification": MetaData(
long_name="Target classification",
comment=COMMENTS["target_classification"],
definition=DEFINITIONS["target_classification"],
units="1",
),
"detection_status": MetaData(
long_name="Radar and lidar detection status",
comment=COMMENTS["detection_status"],
definition=DEFINITIONS["detection_status"],
units="1",
),
"cloud_top_height_amsl": MetaData(
long_name="Height of cloud top above mean sea level",
units="m",
),
"cloud_base_height_amsl": MetaData(
long_name="Height of cloud base above mean sea level",
units="m",
),
"cloud_top_height_agl": MetaData(
long_name="Height of cloud top above ground level",
units="m",
),
"cloud_base_height_agl": MetaData(
long_name="Height of cloud base above ground level",
units="m",
),
}