Source code for instruments.weather_radar

import datetime
from collections.abc import Iterable, Sequence
from os import PathLike
from uuid import UUID

import netCDF4
import numpy as np
import numpy.typing as npt
from numpy import ma

from cloudnetpy import output, utils
from cloudnetpy.cloudnetarray import CloudnetArray
from cloudnetpy.constants import CM_TO_M, HZ_TO_GHZ, KM_TO_M, M_TO_KM, SPEED_OF_LIGHT
from cloudnetpy.exceptions import CloudnetException, ValidTimeStampError
from cloudnetpy.instruments import instruments
from cloudnetpy.instruments.cloudnet_instrument import CloudnetInstrument
from cloudnetpy.metadata import MetaData


[docs] def wr2nc( input_files: str | PathLike | Sequence[str | PathLike], output_file: str | PathLike, site_meta: dict, uuid: str | UUID | None = None, date: str | datetime.date | None = None, ) -> UUID: """Converts OPERA HDF5 weather radar data into Cloudnet Level 1b netCDF file. Args: input_files: List of OPERA HDF5 files. output_file: Output filename. site_meta: Dictionary containing information about the site. Required keys are `name`, `latitude`, `longitude` and `altitude`. uuid: Set specific UUID for the file. date: Expected date as YYYY-MM-DD. Returns: UUID of the generated file. """ if isinstance(date, str): date = datetime.date.fromisoformat(date) uuid = utils.get_uuid(uuid) if isinstance(input_files, str | PathLike): input_files = [input_files] wr = WeatherRadar(input_files, site_meta, date) wr.sort_and_dedup_timestamps() wr.convert_to_cloudnet_arrays() wr.screen_noise() wr.add_meta() attributes = output.add_time_attribute(ATTRIBUTES, wr.date) output.update_attributes(wr.data, attributes) output.save_level1b(wr, output_file, uuid) return uuid
class WeatherRadar(CloudnetInstrument): def __init__( self, filenames: Iterable[str | PathLike], site_meta: dict, expected_date: datetime.date | None = None, ) -> None: super().__init__() self.site_meta = site_meta self._read_data(filenames) self._screen_time(expected_date) self.instrument = instruments.WRM200 def _read_data(self, filenames: Iterable[str | PathLike]) -> None: times = [] ranges = [] data = [] for filename in filenames: try: file_time, file_range, file_data, file_scalars = _read_opera_h5( filename ) except InvalidRangeError: continue times.append(file_time) ranges.append(file_range) data.append(file_data) if not times: raise ValidTimeStampError target_range = max(ranges, key=lambda rng: rng[-1]) all_data: dict = {key: [] for d in data for key in d} for src_range, values in zip(ranges, data, strict=True): for key in all_data: if key in values: block = ma.array([values[key]]) if not np.array_equal(src_range, target_range): block = utils.interpolate_2D_along_y( src_range, block, target_range ) else: block = ma.masked_all((1, len(target_range))) all_data[key].append(block) self.raw_time = np.array(times) self.raw_range = target_range self.raw_data = {key: ma.concatenate(value) for key, value in all_data.items()} self.scalars = file_scalars def _screen_time(self, expected_date: datetime.date | None = None) -> None: if expected_date is None: self.date = self.raw_time[0].date() else: is_valid = [dt.date() == expected_date for dt in self.raw_time] self.raw_time = self.raw_time[is_valid] for key in self.raw_data: self.raw_data[key] = self.raw_data[key][is_valid] self.date = expected_date def sort_and_dedup_timestamps(self) -> None: self.raw_time, time_ind = np.unique(self.raw_time, return_index=True) for key in self.raw_data: self.raw_data[key] = self.raw_data[key][time_ind] def add_meta(self) -> None: valid_keys = ("latitude", "longitude", "altitude") for key, value in self.site_meta.items(): name = key.lower() if name in valid_keys: self.data[name] = CloudnetArray(float(value), name) def convert_to_cloudnet_arrays(self) -> None: epoch = datetime.datetime.combine(self.date, datetime.time()) hour = (self.raw_time - epoch) / datetime.timedelta(hours=1) height = self.site_meta["altitude"] + self.raw_range self.data["time"] = CloudnetArray(hour.astype(np.float32), "time") self.data["range"] = CloudnetArray(self.raw_range, "range") self.data["height"] = CloudnetArray(height, "height") self.data["SNR"] = CloudnetArray(self.raw_data["SNR"], "SNR") self.data["Zh"] = CloudnetArray(self.raw_data["DBZH"], "Zh") self.data["v"] = CloudnetArray(self.raw_data["VRADH"], "v") self.data["width"] = CloudnetArray(self.raw_data["WRADH"], "width") self.data["zdr"] = CloudnetArray(self.raw_data["ZDR"], "zdr") self.data["rho_hv"] = CloudnetArray(self.raw_data["RHOHV"], "rho_hv") self.data["radar_frequency"] = CloudnetArray( self.scalars["FREQ"], "radar_frequency" ) self.data["nyquist_velocity"] = CloudnetArray( self.scalars["NI"], "nyquist_velocity" ) self.data["calibration_reflectivity_factor"] = CloudnetArray( self.scalars["NEZ"], "calibration_reflectivity_factor" ) def screen_noise(self) -> None: is_noise = self.data["SNR"].data < 0 for cloudnet_array in self.data.values(): if cloudnet_array.data.ndim == 2: cloudnet_array.mask_indices(is_noise) class InvalidRangeError(CloudnetException): pass def _read_opera_h5( file: str | PathLike, ) -> tuple[datetime.datetime, npt.NDArray, dict[str, npt.NDArray], dict[str, float]]: all_data = {} with netCDF4.Dataset(file) as rootgrp: date = rootgrp["what"].date time = rootgrp["what"].time dt = datetime.datetime.strptime(date + time, "%Y%m%d%H%M%S") dataset = rootgrp["dataset1"] nbins = dataset["where"].nbins rstart = dataset["where"].rstart * KM_TO_M rscale = dataset["where"].rscale # m halfbin = rscale / 2 rng = rstart + halfbin + rscale * np.arange(nbins) # Sometimes nbins is larger than normal (e.g. 119 vs 8274), leading to # very long range and data that looks like garbage. if rng[-1] > 100_000: raise InvalidRangeError nez = dataset["how"].NEZH ni = dataset["how"].NI wavelength = rootgrp["how"].wavelength * CM_TO_M frequency = HZ_TO_GHZ * SPEED_OF_LIGHT / wavelength scalars = {"NEZ": nez, "NI": ni, "FREQ": frequency} grpnames = [group for group in dataset.groups if group.startswith("data")] for grpname in grpnames: grp = dataset[grpname] quantity = grp["what"].quantity is_db = quantity in ("ZDR", "SNR") offset = grp["what"].offset gain = grp["what"].gain nodata = grp["what"].nodata undetect = grp["what"].undetect grpdata = grp["data"][:] is_masked = (grpdata == nodata) | (grpdata == undetect) grpdata = offset + gain * ma.masked_where(is_masked, grpdata) if is_db: grpdata = utils.db2lin(grpdata) grpdata = ma.mean(grpdata, axis=0) if is_db: grpdata = utils.lin2db(grpdata) all_data[quantity] = grpdata # DBZH is available in some files for some days, but for consistency, # always recalculate it from SNR. The value calculated from SNR is close # to DBZH but not exactly for some unknown reason. all_data["DBZH"] = all_data["SNR"] + nez + 20 * np.log10(rng * M_TO_KM) return dt, rng, all_data, scalars ATTRIBUTES = { "calibration_reflectivity_factor": MetaData( long_name="Calibration reflectivity factor", comment="This parameter is the equivalent radar reflectivity factor at 1 km\n" "when the return signal power is equal to the noise power (SNR=0 dB).", units="dBZ", dimensions=(), ) }