Source code for categorize.radar

"""Radar module, containing the :class:`Radar` class."""

import logging
import math
from os import PathLike

import numpy as np
import numpy.typing as npt
from numpy import ma
from scipy import constants

from cloudnetpy import utils
from cloudnetpy.categorize.attenuations import RadarAttenuation
from cloudnetpy.constants import GHZ_TO_HZ, SEC_IN_HOUR, SPEED_OF_LIGHT
from cloudnetpy.datasource import DataSource


[docs] class Radar(DataSource): """Radar class, child of DataSource. Args: full_path: Cloudnet Level 1 radar netCDF file. Attributes: radar_frequency (float): Radar frequency (GHz). folding_velocity (float): Radar's folding velocity (m/s). location (str): Location of the radar, copied from the global attribute `location` of the input file. source_type (str): Type of the radar, copied from the global attribute `source` of the *radar_file*. Can be free form string but must include either 'rpg' or 'mira' denoting one of the two supported radars. See Also: :func:`instruments.rpg2nc()`, :func:`instruments.mira2nc()` """ def __init__(self, full_path: str | PathLike) -> None: super().__init__(full_path, radar=True) self.radar_frequency = float(self.getvar("radar_frequency")) self.location = getattr(self.dataset, "location", "") self.source_type = getattr(self.dataset, "source", "") self.height: npt.NDArray self.altitude: float self._init_data()
[docs] def rebin_to_grid(self, time_new: npt.NDArray) -> list: """Rebins radar data in time. Args: time_new: Target time array as fraction hour. """ bad_time_indices = [] for key, array in self.data.items(): match key: case "ldr" | "sldr" | "Z": array.db2lin() bad_time_indices = array.rebin_data(self.time, time_new) array.lin2db() case "v": array.data = self._rebin_velocity( array.data, time_new, ) case "v_sigma": array.data, _ = utils.rebin_2d( self.time, array.data, time_new, "std", mask_zeros=True, ) case "width": array.rebin_data(self.time, time_new) case "rainfall_rate": array.rebin_data(self.time, time_new) case _: continue return list(bad_time_indices)
[docs] def remove_incomplete_pixels(self) -> None: """Mask radar pixels where one or more required quantities are missing. All valid radar pixels **must** contain proper values for `Z`, and `v` and also for `width` if exists. Otherwise there is some kind of problem with the data and the pixel should not be used in any further analysis. """ good_ind = ~ma.getmaskarray(self.data["Z"][:]) & ~ma.getmaskarray( self.data["v"][:], ) if "width" in self.data: good_ind = good_ind & ~ma.getmaskarray(self.data["width"][:]) for array in self.data.values(): if array.data.ndim == 2: array.mask_indices(~good_ind)
[docs] def filter_speckle_noise(self) -> None: """Removes speckle noise from radar data. Any isolated radar pixel, i.e. "hot pixel", is assumed to exist due to speckle noise. This is a crude approach and a more sophisticated method could be implemented here later. """ for key in ("Z", "v", "width", "ldr", "v_sigma"): if key in self.data: self.data[key].filter_vertical_stripes()
[docs] def filter_1st_gate_artifact(self) -> None: """Removes 1st range gate velocity artifact.""" velocity_limit = 4 ind = np.where(self.data["v"][:, 0] > velocity_limit) self.data["v"][:][ind, 0] = ma.masked
[docs] def filter_stripes(self, variable: str) -> None: """Filters vertical and horizontal stripe-shaped artifacts from radar data.""" if variable not in self.data: return data = ma.copy(self.data[variable][:]) n_points_in_profiles = ma.count(data, axis=1) n_profiles_with_data = np.count_nonzero(n_points_in_profiles) if n_profiles_with_data < 300: return n_vertical = self._filter( data, axis=1, min_coverage=0.5, z_limit=10, distance=4, n_blocks=100, ) n_horizontal = self._filter( data, axis=0, min_coverage=0.3, z_limit=-30, distance=3, n_blocks=20, ) if n_vertical > 0 or n_horizontal > 0: logging.debug( "Filtered %s vertical and %s horizontal stripes " "from radar data using %s", n_vertical, n_horizontal, variable, )
def _filter( self, data: npt.NDArray, axis: int, min_coverage: float, z_limit: float, distance: float, n_blocks: int, ) -> int: if axis == 0: data = data.T echo = self.data["Z"][:].T else: echo = self.data["Z"][:] len_block = int(np.floor(data.shape[0] / n_blocks)) block_indices = np.arange(len_block) n_removed_total = 0 for block_number in range(n_blocks): data_block = data[block_indices, :] n_values = ma.count(data_block, axis=1) try: q1 = np.quantile(n_values, 0.25) q3 = np.quantile(n_values, 0.75) except IndexError: continue if q1 == q3: continue threshold = distance * (q3 - q1) + q3 indices = np.where( (n_values > threshold) & (n_values > (min_coverage * data.shape[1])), )[0] true_ind = [int(x) for x in (block_number * len_block + indices)] n_removed = len(indices) if n_removed > 5: continue if n_removed > 0: n_removed_total += n_removed for ind in true_ind: ind2 = np.where(echo[ind, :] < z_limit) bad_indices = (ind, ind2) if axis == 1 else (ind2, ind) self.data["v"][:][bad_indices] = ma.masked block_indices += len_block return n_removed_total
[docs] def correct_atten(self, attenuations: RadarAttenuation) -> None: """Corrects radar echo for liquid and gas attenuation. Args: attenuations: Radar attenuation object. References: The method is based on Hogan R. and O'Connor E., 2004, https://bit.ly/2Yjz9DZ and the original Cloudnet Matlab implementation. """ z_corrected = self.data["Z"][:] + attenuations.gas.amount ind = ma.where(attenuations.liquid.amount) z_corrected[ind] += attenuations.liquid.amount[ind] ind = ma.where(attenuations.rain.amount) z_corrected[ind] += attenuations.rain.amount[ind] ind = ma.where(attenuations.melting.amount) z_corrected[ind] += attenuations.melting.amount[ind] self.append_data(z_corrected, "Z")
[docs] def calc_errors( self, attenuations: RadarAttenuation, is_clutter: npt.NDArray, ) -> None: """Calculates uncertainties of radar echo. Calculates and adds `Z_error`, `Z_sensitivity` and `Z_bias` :class:`CloudnetArray` instances to `data` attribute. Args: attenuations: 2-D attenuations due to atmospheric gases. is_clutter: 2-D boolean array denoting pixels contaminated by clutter. References: The method is based on Hogan R. and O'Connor E., 2004, https://bit.ly/2Yjz9DZ and the original Cloudnet Matlab implementation. """ def _calc_sensitivity() -> npt.NDArray: """Returns sensitivity of radar as function of altitude.""" mean_gas_atten = ma.mean(attenuations.gas.amount, axis=0) z_sensitivity = z_power_min + log_range + mean_gas_atten zc = ma.median(ma.array(z, mask=~is_clutter), axis=0) valid_values = np.logical_not(zc.mask) z_sensitivity[valid_values] = zc[valid_values] return z_sensitivity def _calc_error() -> npt.NDArray | float: """Returns error of radar as function of altitude. References: Hogan, R. J., 1998: Dual-wavelength radar studies of clouds. PhD Thesis, University of Reading, UK. """ noise_threshold = 3 n_pulses = _number_of_independent_pulses() ln_to_log10 = 10 / np.log(10) z_precision = ma.divide(ln_to_log10, np.sqrt(n_pulses)) * ( 1 + (utils.db2lin(z_power_min - z_power) / noise_threshold) ) z_error = utils.l2norm( z_precision, attenuations.liquid.error.filled(0), attenuations.rain.error.filled(0), attenuations.melting.error.filled(0), ) z_error[ attenuations.liquid.uncorrected | attenuations.rain.uncorrected | attenuations.melting.uncorrected ] = ma.masked return z_error def _number_of_independent_pulses() -> float: """Returns number of independent pulses. References: Atlas, D., 1964: Advances in radar meteorology. Advances in Geophys., 10, 318-478. """ if "width" not in self.data: default_width = 0.3 width = np.zeros_like(self.data["Z"][:]) width[~width.mask] = default_width else: width = self.data["width"][:] dwell_time = utils.mdiff(self.time) * SEC_IN_HOUR wl = SPEED_OF_LIGHT / (self.radar_frequency * GHZ_TO_HZ) return 4 * np.sqrt(math.pi) * dwell_time * width / wl def _calc_z_power_min() -> float: if ma.all(z_power.mask): return 0 return np.percentile(z_power.compressed(), 0.1) z = self.data["Z"][:] radar_range = self.to_m(self.dataset.variables["range"]) log_range = utils.lin2db(radar_range, scale=20) z_power = z - log_range z_power_min = _calc_z_power_min() self.append_data(_calc_error(), "Z_error") self.append_data(_calc_sensitivity(), "Z_sensitivity") self.append_data(1.0, "Z_bias")
[docs] def add_meta(self) -> None: """Copies misc. metadata from the input file.""" for key in ("time", "height", "radar_frequency"): self.append_data(np.array(getattr(self, key)), key)
[docs] def add_location(self, time_new: npt.NDArray) -> None: """Add latitude, longitude and altitude from nearest timestamp.""" idx = np.searchsorted(self.time, time_new) idx_left = np.clip(idx - 1, 0, len(self.time) - 1) idx_right = np.clip(idx, 0, len(self.time) - 1) diff_left = np.abs(time_new - self.time[idx_left]) diff_right = np.abs(time_new - self.time[idx_right]) idx_closest = np.where(diff_left < diff_right, idx_left, idx_right) for key in ("latitude", "longitude", "altitude"): data = self.getvar(key) if not utils.isscalar(data): data = data[idx_closest] if not np.any(ma.getmaskarray(data)): data = np.array(data) self.append_data(data, key)
def _init_data(self) -> None: self.append_data(self.getvar("Zh"), "Z", units="dBZ") self.append_data(self.getvar("v"), "v_sigma") for key in ("v", "ldr", "width", "sldr", "rainfall_rate", "nyquist_velocity"): if key in self.dataset.variables: data = self.dataset.variables[key] self.append_data(data, key) def _rebin_velocity( self, data: npt.NDArray, time_new: npt.NDArray, ) -> npt.NDArray: """Rebins Doppler velocity in polar coordinates.""" folding_velocity = self._get_expanded_folding_velocity() # with the new shape (maximum value in every bin) max_folding_binned, _ = utils.rebin_2d( self.time, folding_velocity, time_new, "max", ) # store this in the file self.append_data(max_folding_binned, "nyquist_velocity") if "correction_bits" in self.dataset.variables: bits = self.dataset.variables["correction_bits"][:] dealiasing_bit = utils.isbit(bits, 0) if ma.all(dealiasing_bit): return utils.rebin_2d(self.time, data, time_new, "mean")[0] if ma.any(dealiasing_bit): msg = "Data are only party dealiased. Deal with this later." raise NotImplementedError(msg) # with original shape (repeat maximum value for each point in every bin) max_folding_full, _ = utils.rebin_2d( self.time, folding_velocity, time_new, "max", keepdim=True, ) data_scaled = data * (np.pi / max_folding_full) vel_x = ma.cos(data_scaled) vel_y = ma.sin(data_scaled) vel_x_mean, _ = utils.rebin_2d(self.time, vel_x, time_new) vel_y_mean, _ = utils.rebin_2d(self.time, vel_y, time_new) vel_scaled = ma.arctan2(vel_y_mean, vel_x_mean) return vel_scaled / (np.pi / max_folding_binned) def _get_expanded_folding_velocity(self) -> npt.NDArray: if "nyquist_velocity" in self.dataset.variables: fvel = self.getvar("nyquist_velocity") elif "prf" in self.dataset.variables: prf = self.getvar("prf") fvel = _prf_to_folding_velocity(prf, self.radar_frequency) else: msg = "Unable to determine folding velocity" raise RuntimeError(msg) n_time = self.getvar("time").size n_height = self.height.size if fvel.shape == (n_time, n_height): # Folding velocity is already expanded in radar file # Not yet in current files return fvel if utils.isscalar(fvel): # e.g. MIRA return np.broadcast_to(fvel, (n_time, n_height)) # RPG radars have chirp segments starts = self.getvar("chirp_start_indices") n_seg = starts.size if starts.ndim == 1 else starts.shape[1] starts = np.broadcast_to(starts, (n_time, n_seg)) fvel = np.broadcast_to(fvel, (n_time, n_seg)) # Indices should start from zero (first range gate) # In pre-processed RV Meteor files the first index is 1, so normalize: # Normalize starts so indices begin from zero first_values = starts[:, [0]] if not np.all(np.isin(first_values, [0, 1])): msg = "First value of chirp_start_indices must be 0 or 1" raise ValueError(msg) starts = starts - first_values chirp_size = np.diff(starts, append=n_height) return np.repeat(fvel.ravel(), chirp_size.ravel()).reshape((n_time, n_height))
def _prf_to_folding_velocity(prf: npt.NDArray, radar_frequency: float) -> npt.NDArray: ghz_to_hz = 1e9 if len(prf) != 1: msg = "Unable to determine folding velocity" raise RuntimeError(msg) return prf[0] * constants.c / (4 * radar_frequency * ghz_to_hz)