Source code for categorize.droplet

"""This module has functions for liquid layer detection."""

import numpy as np
import scipy.signal
from numpy import ma

from cloudnetpy import utils
from cloudnetpy.categorize import atmos_utils
from cloudnetpy.categorize.containers import ClassData


[docs] def correct_liquid_top( obs: ClassData, is_liquid: np.ndarray, is_freezing: np.ndarray, limit: float = 200, ) -> np.ndarray: """Corrects lidar detected liquid cloud top using radar data. Args: obs: The :class:`ClassData` instance. is_liquid: 2-D boolean array denoting liquid clouds from lidar data. is_freezing: 2-D boolean array of sub-zero temperature, derived from the model temperature and melting layer based on radar data. limit: The maximum correction distance (m) above liquid cloud top. Returns: Corrected liquid cloud array. References: Hogan R. and O'Connor E., 2004, https://bit.ly/2Yjz9DZ. """ is_liquid_corrected = np.copy(is_liquid) liquid_tops = atmos_utils.find_cloud_tops(is_liquid) top_above = utils.n_elements(obs.height, limit) for prof, top in zip(*np.where(liquid_tops), strict=True): ind = _find_ind_above_top(is_freezing[prof, top:], top_above) rad = obs.z[prof, top : top + ind + 1] if not (rad.mask.all() or ~rad.mask.any()): first_masked = ma.where(rad.mask)[0][0] is_liquid_corrected[prof, top : top + first_masked] = True return is_liquid_corrected
def _find_ind_above_top(is_freezing_from_peak: np.ndarray, top_above: int) -> int: first_point_below_zero = np.where(is_freezing_from_peak)[0][0] ind = first_point_below_zero + top_above return min(len(is_freezing_from_peak) - 1, ind)
[docs] def find_liquid( obs: ClassData, peak_amp: float = 1e-6, max_width: float = 300, min_points: int = 3, min_top_der: float = 1e-7, min_lwp: float = 0, min_alt: float = 100, ) -> np.ndarray: """Estimate liquid layers from SNR-screened attenuated backscatter. Args: obs: The :class:`ClassData` instance. peak_amp: Minimum value of peak. Default is 1e-6. max_width: Maximum width of peak. Default is 300 (m). min_points: Minimum number of valid points in peak. Default is 3. min_top_der: Minimum derivative above peak, defined as (beta_peak-beta_top) / (alt_top-alt_peak). Default is 1e-7. min_lwp: Minimum value from linearly interpolated lwp (kg m-2) measured by the mwr. Default is 0. min_alt: Minimum altitude of the peak from the ground. Default is 100 (m). Returns: 2-D boolean array denoting liquid layers. References: The method is based on Tuononen, M. et.al, 2019, https://acp.copernicus.org/articles/19/1985/2019/. """ def _is_proper_peak() -> bool: conditions = ( npoints >= min_points, peak_width < max_width, top_der > min_top_der, is_positive_lwp, peak_alt > min_alt, ) return all(conditions) lwp_int = interpolate_lwp(obs) beta = ma.copy(obs.beta) height = obs.height is_liquid = np.zeros(beta.shape, dtype=bool) base_below_peak = utils.n_elements(height, 200) top_above_peak = utils.n_elements(height, 150) difference = ma.array(np.diff(beta, axis=1)) beta_diff = difference.filled(0) beta = beta.filled(0) peak_indices = _find_strong_peaks(beta, peak_amp) for n, peak in zip(*peak_indices, strict=True): lprof = beta[n, :] dprof = beta_diff[n, :] try: base = ind_base(dprof, peak, base_below_peak, 4) top = ind_top(dprof, peak, height.shape[0], top_above_peak, 4) except IndexError: continue npoints = np.count_nonzero(lprof[base : top + 1]) peak_width = height[top] - height[base] peak_alt = height[peak] - height[0] top_der = (lprof[peak] - lprof[top]) / (height[top] - height[peak]) is_positive_lwp = lwp_int[n] >= min_lwp if _is_proper_peak(): is_liquid[n, base : top + 1] = True return is_liquid
[docs] def ind_base(dprof: np.ndarray, ind_peak: int, dist: int, lim: float) -> int: """Finds base index of a peak in profile. Return the lowermost index of profile where 1st order differences below the peak exceed a threshold value. Args: dprof: 1-D array of 1st discrete difference. Masked values should be 0, e.g. dprof = np.diff(masked_prof).filled(0) ind_peak: Index of (possibly local) peak in the original profile. Note that the peak must be found with some other method before calling this function. dist: Number of elements investigated below *p*. If ( *p* - *dist*)<0, search starts from index 0. lim: Parameter for base index. Values greater than 1.0 are valid. Values close to 1 most likely return the point right below the maximum 1st order difference (within *dist* points below *p*). Values larger than 1 more likely accept some other point, lower in the profile. Returns: Base index of the peak. Raises: IndexError: Can't find proper base index (probably too many masked values in the profile). Examples: Consider a profile >>> x = np.array([0, 0.5, 1, -99, 4, 8, 5]) that contains one bad, masked value >>> mx = ma.masked_array(x, mask=[0, 0, 0, 1, 0, 0, 0]) [0, 0.5, 1.0, --, 4.0, 8.0, 5.0] The 1st order difference is now >>> dx = np.diff(mx).filled(0) [0.5, 0.5, 0, 0, 4, -3] From the original profile we see that the peak index is 5. Let's assume our base can't be more than 4 elements below peak and the threshold value is 2. Thus we call >>> ind_base(dx, 5, 4, 2) 4 When x[4] is the lowermost point that satisfies the condition. Changing the threshold value would alter the result >>> ind_base(dx, 5, 4, 10) 1 See Also: droplet.ind_top() """ start = max(ind_peak - dist, 0) # should not be negative diffs = dprof[start:ind_peak] mind = np.argmax(diffs) return start + np.where(diffs > diffs[mind] / lim)[0][0]
[docs] def ind_top(dprof: np.ndarray, ind_peak: int, nprof: int, dist: int, lim: float) -> int: """Finds top index of a peak in profile. Return the uppermost index of profile where 1st order differences above the peak exceed a threshold value. Args: dprof: 1-D array of 1st discrete difference. Masked values should be 0, e.g. dprof = np.diff(masked_prof).filled(0) nprof: Length of the profile. Top index can't be higher than this. ind_peak: Index of (possibly local) peak in the profile. Note that the peak must be found with some other method before calling this function. dist: Number of elements investigated above *p*. If (*p* + *dist*) > *nprof*, search ends to *nprof*. lim: Parameter for top index. Values greater than 1.0 are valid. Values close to 1 most likely return the point right above the maximum 1st order difference (within *dist* points above *p*). Values larger than 1 more likely accept some other point, higher in the profile. Returns: Top index of the peak. Raises: IndexError: Can not find proper top index (probably too many masked values in the profile). See Also: droplet.ind_base() """ end = min(ind_peak + dist, nprof) # should not be greater than len(profile) diffs = dprof[ind_peak:end] mind = np.argmin(diffs) return ind_peak + np.where(diffs < diffs[mind] / lim)[0][-1] + 1
[docs] def interpolate_lwp(obs: ClassData) -> np.ndarray: """Linear interpolation of liquid water path to fill masked values. Args: obs: The :class:`ClassData` instance. Returns: Liquid water path where the masked values are filled by interpolation. """ if obs.lwp.all() is ma.masked: return np.zeros(obs.time.shape) ind = ma.where(obs.lwp) return np.interp(obs.time, obs.time[ind], obs.lwp[ind])
def _find_strong_peaks(data: np.ndarray, threshold: float) -> tuple: """Finds local maximums from data (greater than *threshold*).""" peaks = scipy.signal.argrelextrema(data, np.greater, order=4, axis=1) strong_peaks = np.where(data[peaks] > threshold) return peaks[0][strong_peaks], peaks[1][strong_peaks]