Source code for instruments.pollyxt

"""Module for reading / converting pollyxt data."""

import glob
import logging

import netCDF4
import numpy as np
from numpy import ma
from numpy.testing import assert_almost_equal

from cloudnetpy import output, utils
from cloudnetpy.exceptions import InconsistentDataError, ValidTimeStampError
from cloudnetpy.instruments import instruments
from cloudnetpy.instruments.ceilometer import Ceilometer
from cloudnetpy.metadata import MetaData
from cloudnetpy.utils import Epoch


[docs] def pollyxt2nc( input_folder: str, output_file: str, site_meta: dict, uuid: str | None = None, date: str | None = None, ) -> str: """Converts PollyXT Raman lidar data into Cloudnet Level 1b netCDF file. Args: input_folder: Path to pollyxt netCDF files. output_file: Output filename. site_meta: Dictionary containing information about the site with keys: - `name`: Name of the site (mandatory) - `altitude`: Site altitude in [m] (mandatory). - `latitude` (optional). - `longitude` (optional). - `zenith_angle`: If not the default 5 degrees (optional). - `snr_limit`: If not the default 2 (optional). uuid: Set specific UUID for the file. date: Expected date of the measurements as YYYY-MM-DD. Returns: UUID of the generated file. Examples: >>> from cloudnetpy.instruments import pollyxt2nc >>> site_meta = {'name': 'Mindelo', 'altitude': 13, 'zenith_angle': 6, 'snr_limit': 3} >>> pollyxt2nc('/path/to/files/', 'pollyxt.nc', site_meta) """ snr_limit = site_meta.get("snr_limit", 2) polly = PollyXt(site_meta, date) epoch = polly.fetch_data(input_folder) polly.get_date_and_time(epoch) polly.fetch_zenith_angle() polly.calc_screened_products(snr_limit) polly.mask_nan_values() polly.prepare_data() polly.screen_completely_masked_profiles() polly.data_to_cloudnet_arrays(time_dtype="f8") attributes = output.add_time_attribute(ATTRIBUTES, polly.date) output.update_attributes(polly.data, attributes) polly.add_snr_info("beta", snr_limit) return output.save_level1b(polly, output_file, uuid)
class PollyXt(Ceilometer): def __init__(self, site_meta: dict, expected_date: str | None): super().__init__() self.site_meta = site_meta self.expected_date = expected_date self.instrument = instruments.POLLYXT def mask_nan_values(self) -> None: for array in self.data.values(): if getattr(array, "ndim", 0) > 0: array[np.isnan(array)] = ma.masked def calc_screened_products(self, snr_limit: float = 5.0) -> None: keys = ("beta", "depolarisation") for key in keys: self.data[key] = ma.masked_where( self.data["snr"] < snr_limit, self.data[f"{key}_raw"], ) self.data["depolarisation"][self.data["depolarisation"] > 1] = ma.masked self.data["depolarisation"][self.data["depolarisation"] < 0] = ma.masked self.data["beta"][self.data["beta"] < 0] = ma.masked del self.data["snr"] def fetch_zenith_angle(self) -> None: default = 5 self.data["zenith_angle"] = float(self.metadata.get("zenith_angle", default)) def fetch_data(self, input_folder: str) -> Epoch: """Read input data.""" bsc_files = glob.glob(f"{input_folder}/*[0-9]_att*.nc") depol_files = glob.glob(f"{input_folder}/*[0-9]_vol*.nc") bsc_files.sort() depol_files.sort() if not bsc_files: msg = "No pollyxt bsc files found" raise RuntimeError(msg) if len(bsc_files) != len(depol_files): msg = "Inconsistent number of pollyxt bsc / depol files" raise InconsistentDataError(msg) self._fetch_attributes(bsc_files[0]) self.data["range"] = _read_array_from_multiple_files( bsc_files, depol_files, "height", ) calibration_factors: np.ndarray = np.array([]) beta_channel = self._get_valid_beta_channel(bsc_files) bsc_key = f"attenuated_backscatter_{beta_channel}nm" for bsc_file, depol_file in zip(bsc_files, depol_files, strict=True): with ( netCDF4.Dataset(bsc_file, "r") as nc_bsc, netCDF4.Dataset(depol_file, "r") as nc_depol, ): epoch = utils.get_epoch(nc_bsc["time"].unit) try: time = np.array( _read_array_from_file_pair(nc_bsc, nc_depol, "time"), ) except AssertionError as err: logging.warning( "Ignoring files '%s' and '%s': %s", nc_bsc, nc_depol, err, ) continue beta_raw = nc_bsc.variables[bsc_key][:] depol_raw = nc_depol.variables["volume_depolarization_ratio_532nm"][:] snr = nc_bsc.variables[f"SNR_{beta_channel}nm"][:] for array, key in zip( [beta_raw, depol_raw, time, snr], ["beta_raw", "depolarisation_raw", "time", "snr"], strict=True, ): self.data = utils.append_data(self.data, key, array) calibration_factor = nc_bsc.variables[ bsc_key ].Lidar_calibration_constant_used calibration_factor = np.repeat(calibration_factor, len(time)) calibration_factors = np.concatenate( [calibration_factors, calibration_factor], ) self.data["calibration_factor"] = calibration_factors return epoch def screen_completely_masked_profiles(self) -> None: valid_ind = ~np.all(np.ma.getmaskarray(self.data["beta_raw"]), axis=1) for key, item in self.data.items(): if isinstance(item, np.ndarray) and item.shape[0] == len(valid_ind): self.data[key] = item[valid_ind] def _get_valid_beta_channel(self, files: list) -> str: polly_channels = ("1064", "532", "355") for channel in polly_channels: for file in files: with netCDF4.Dataset(file, "r") as nc: beta = nc.variables[f"attenuated_backscatter_{channel}nm"][:] if not _only_zeros_or_masked(beta): if channel != polly_channels[0]: logging.warning( "Using %s nm pollyXT channel for backscatter", channel, ) if self.instrument is None: msg = "No instrument defined" raise RuntimeError(msg) self.instrument.wavelength = float(channel) return channel msg = "No functional pollyXT backscatter channels found" raise ValidTimeStampError(msg) def _fetch_attributes(self, file: str) -> None: with netCDF4.Dataset(file, "r") as nc: if hasattr(nc, "source"): self.serial_number = nc.source.lower() def _read_array_from_multiple_files(files1: list, files2: list, key) -> np.ndarray: array: np.ndarray = np.array([]) for ind, (file1, file2) in enumerate(zip(files1, files2, strict=True)): with netCDF4.Dataset(file1, "r") as nc1, netCDF4.Dataset(file2, "r") as nc2: array1 = _read_array_from_file_pair(nc1, nc2, key) if ind == 0: array = array1 assert_almost_equal( array, array1, err_msg=f"Inconsistent variable '{key}'", decimal=2 ) return np.array(array) def _read_array_from_file_pair( nc_file1: netCDF4.Dataset, nc_file2: netCDF4.Dataset, key: str, ) -> np.ndarray: array1 = nc_file1.variables[key][:] array2 = nc_file2.variables[key][:] assert_almost_equal( array1, array2, err_msg=f"Inconsistent variable '{key}'", decimal=2 ) return array1 def _only_zeros_or_masked(data: ma.MaskedArray) -> bool: return ma.sum(data) == 0 or data.mask.all() ATTRIBUTES = { "depolarisation": MetaData( long_name="Lidar volume linear depolarisation ratio", units="1", comment="SNR-screened lidar volume linear depolarisation ratio at 532 nm.", ), "depolarisation_raw": MetaData( long_name="Lidar volume linear depolarisation ratio", units="1", comment="Non-screened lidar volume linear depolarisation ratio at 532 nm.", ), "calibration_factor": MetaData( long_name="Attenuated backscatter calibration factor", units="1", comment="Calibration factor applied.", ), }