"""Lidar module, containing the :class:`Lidar` class."""
import logging
from typing import Literal
import numpy as np
from numpy import ma
from cloudnetpy.datasource import DataSource
from cloudnetpy.utils import interpolate_2d_nearest
[docs]
class Lidar(DataSource):
"""Lidar class, child of DataSource.
Args:
full_path: Cloudnet Level 1 lidar netCDF file.
"""
def __init__(self, full_path: str):
super().__init__(full_path)
self.append_data(self.getvar("beta"), "beta")
self._add_meta()
[docs]
def interpolate_to_grid(
self, time_new: np.ndarray, height_new: np.ndarray
) -> list[int]:
"""Interpolate beta using nearest neighbor."""
max_height = 100 # m
max_time = 1 / 60 # min -> fraction hour
if self.height is None:
msg = "Unable to interpolate lidar: no height information"
raise RuntimeError(msg)
# Interpolate beta to new grid but ignore profiles that are completely masked
beta = self.data["beta"][:]
indices = [ind for ind, b in enumerate(beta) if ma.all(b) is not ma.masked]
beta_interp = interpolate_2d_nearest(
self.time[indices],
self.height,
beta[indices, :],
time_new,
height_new,
)
# Mask data points that are too far from the original grid
time_gap_ind = get_gap_ind(self.time[indices], time_new, max_time)
height_gap_ind = get_gap_ind(self.height, height_new, max_height)
self._mask_profiles(beta_interp, time_gap_ind, "time")
self._mask_profiles(beta_interp, height_gap_ind, "height")
self.data["beta"].data = beta_interp
return time_gap_ind
@staticmethod
def _mask_profiles(
data: ma.MaskedArray, ind: list[int], dim: Literal["time", "height"]
) -> None:
prefix = f"Unable to interpolate lidar for {len(ind)}"
if dim == "time" and ind:
logging.warning("%s time steps", prefix)
data[ind, :] = ma.masked
elif dim == "height" and ind:
logging.warning("%s altitudes", prefix)
data[:, ind] = ma.masked
def _add_meta(self) -> None:
self.append_data(float(self.getvar("wavelength")), "lidar_wavelength")
self.append_data(0.5, "beta_error")
self.append_data(3.0, "beta_bias")
def get_gap_ind(grid: np.ndarray, new_grid: np.ndarray, threshold: float) -> list[int]:
return [
ind
for ind, value in enumerate(new_grid)
if np.min(np.abs(grid - value)) > threshold
]