Source code for output

"""Functions for file writing."""

import datetime
import logging
from dataclasses import fields
from os import PathLike
from uuid import UUID

import netCDF4
import numpy as np
from numpy import ma

from cloudnetpy import utils, version
from cloudnetpy.categorize.containers import Observations
from cloudnetpy.categorize.model import Model
from cloudnetpy.datasource import DataSource
from cloudnetpy.instruments.instruments import Instrument
from cloudnetpy.metadata import COMMON_ATTRIBUTES, MetaData


[docs] def save_level1b( obj, output_file: PathLike | str, uuid: UUID | str | None = None, ) -> str: """Saves Cloudnet Level 1b file.""" dimensions = _get_netcdf_dimensions(obj) with init_file(output_file, dimensions, obj.data, uuid) as nc: file_uuid = nc.file_uuid fix_attribute_name(nc) location = obj.site_meta["name"] nc.cloudnet_file_type = obj.instrument.domain nc.title = get_l1b_title(obj.instrument, location) if isinstance(obj.date, list): nc.year, nc.month, nc.day = obj.date elif isinstance(obj.date, datetime.date): nc.year = str(obj.date.year) nc.month = str(obj.date.month).zfill(2) nc.day = str(obj.date.day).zfill(2) else: raise TypeError nc.location = location nc.history = get_l1b_history(obj.instrument) nc.source = get_l1b_source(obj.instrument) if hasattr(obj, "serial_number") and obj.serial_number is not None: nc.serial_number = obj.serial_number nc.references = get_references() return file_uuid
def _get_netcdf_dimensions(obj) -> dict: dimensions = { key: len(obj.data[key][:]) for key in ("time", "range") if key in obj.data } # RPG cloud radar if "chirp_start_indices" in obj.data: dimensions["chirp_sequence"] = len(obj.data["chirp_start_indices"][:]) # disdrometer if hasattr(obj, "n_diameter") and hasattr(obj, "n_velocity"): dimensions["diameter"] = obj.n_diameter dimensions["velocity"] = obj.n_velocity dimensions["nv"] = 2 # HATPRO l1c if "tb" in obj.data: dimensions["frequency"] = obj.data["tb"][:].shape[1] dimensions["receiver_nb"] = len(obj.data["receiver_nb"][:]) dimensions["band"] = 2 dimensions["t_amb_nb"] = 2 if "irt" in obj.data: dimensions["ir_channel"] = obj.data["irt"][:].shape[1] return dimensions
[docs] def save_product_file( short_id: str, obj: DataSource, file_name: str, uuid: str | None = None, copy_from_cat: tuple = (), ) -> str: """Saves a standard Cloudnet product file. Args: short_id: Short file identifier, e.g. 'lwc', 'iwc', 'drizzle', 'classification'. obj: Instance containing product specific attributes: `time`, `dataset`, `data`. file_name: Name of the output file to be generated. uuid: Set specific UUID for the file. copy_from_cat: Variables to be copied from the categorize file. """ human_readable_file_type = _get_identifier(short_id) dimensions = { "time": len(obj.time), "height": len(obj.dataset.variables["height"]), } with init_file(file_name, dimensions, obj.data, uuid) as nc: file_uuid = nc.file_uuid nc.cloudnet_file_type = short_id vars_from_source = ( "altitude", "latitude", "longitude", "time", "height", *copy_from_cat, ) copy_variables(obj.dataset, nc, vars_from_source) nc.title = ( f"{human_readable_file_type.capitalize()} products from" f" {obj.dataset.location}" ) nc.source_file_uuids = get_source_uuids([nc, obj]) copy_global( obj.dataset, nc, ("location", "day", "month", "year", "source", "voodoonet_version"), ) merge_history(nc, human_readable_file_type, obj) nc.references = get_references(short_id) return file_uuid
[docs] def get_l1b_source(instrument: Instrument) -> str: """Returns level 1b file source.""" parts = [ item for item in [instrument.manufacturer, instrument.model] if item is not None ] return " ".join(parts) if parts else instrument.category.capitalize()
[docs] def get_l1b_history(instrument: Instrument) -> str: """Returns level 1b file history.""" return f"{utils.get_time()} - {instrument.domain} file created"
[docs] def get_l1b_title(instrument: Instrument, location: str) -> str: """Returns level 1b file title.""" if instrument.model: prefix = " ".join( item for item in [instrument.model, instrument.category] if item is not None ) else: prefix = instrument.category.capitalize() return f"{prefix} from {location}"
[docs] def get_references(identifier: str | None = None, extra: list | None = None) -> str: """Returns references. Args: identifier: Cloudnet file type, e.g., 'iwc'. extra: List of additional references to include """ references = "https://doi.org/10.21105/joss.02123" match identifier: case "der": references += ( ", https://doi.org/10.1175/1520-0426(2002)019<0835:TROSCD>2.0.CO;2" ) case "ier": references += ( ", https://doi.org/10.1175/JAM2340.1" ", https://doi.org/10.1175/JAM2543.1" ", https://doi.org/10.5194/amt-13-5335-2020" ) case "lwc" | "categorize": references += ", https://doi.org/10.1175/BAMS-88-6-883" case "iwc": references += ", https://doi.org/10.1175/JAM2340.1" case "drizzle": references += ", https://doi.org/10.1175/JAM-2181.1" if extra is not None: for reference in extra: references += f", {reference}" return references
[docs] def get_source_uuids(data: Observations | list[netCDF4.Dataset | DataSource]) -> str: """Returns file_uuid attributes of objects. Args: data: Observations instance. Returns: str: UUIDs separated by comma. """ if isinstance(data, Observations): obs = [getattr(data, field.name) for field in fields(data)] elif isinstance(data, list): obs = data uuids = [ obj.dataset.file_uuid for obj in obs if hasattr(obj, "dataset") and hasattr(obj.dataset, "file_uuid") ] unique_uuids = sorted(set(uuids)) return ", ".join(unique_uuids)
[docs] def merge_history( nc: netCDF4.Dataset, file_type: str, data: Observations | DataSource ) -> None: """Merges history fields from one or several files and creates a new record. Args: nc: The netCDF Dataset instance. file_type: Long description of the file. data: Dictionary of objects with history attribute. """ new_record = f"{utils.get_time()} - {file_type} file created" histories = [] if ( isinstance(data, DataSource) and hasattr(data, "dataset") and hasattr(data.dataset, "history") ): history = data.dataset.history histories.append(history) if isinstance(data, Observations): for field in fields(data): obj = getattr(data, field.name) if hasattr(obj, "dataset") and hasattr(obj.dataset, "history"): history = obj.dataset.history history = history.split("\n")[-1] if isinstance(obj, Model) else history histories.append(history) histories.sort(reverse=True) old_history = [f"\n{history}" for history in histories] old_history_str = "".join(old_history) nc.history = f"{new_record}{old_history_str}"
[docs] def add_source_instruments(nc: netCDF4.Dataset, data: Observations) -> None: """Adds source attribute to categorize file.""" sources = [] for field in fields(data): obj = getattr(data, field.name) if hasattr(obj, "source"): sources.append(obj.source) if sources: formatted_sources = [sources[0]] + [f"\n{source}" for source in sources[1:]] nc.source = "".join(formatted_sources)
[docs] def init_file( file_name: PathLike | str, dimensions: dict, cloudnet_arrays: dict, uuid: UUID | str | None = None, ) -> netCDF4.Dataset: """Initializes a Cloudnet file for writing. Args: file_name: File name to be generated. dimensions: Dictionary containing dimension for this file. cloudnet_arrays: Dictionary containing :class:`CloudnetArray` instances. uuid: Set specific UUID for the file. """ nc = netCDF4.Dataset(file_name, "w", format="NETCDF4_CLASSIC") for key, dimension in dimensions.items(): nc.createDimension(key, dimension) _write_vars2nc(nc, cloudnet_arrays) add_standard_global_attributes(nc, uuid) return nc
[docs] def copy_variables( source: netCDF4.Dataset, target: netCDF4.Dataset, keys: tuple, ) -> None: """Copies variables (and their attributes) from one file to another. Args: source: Source object. target: Target object. keys: Variable names to be copied. """ for key in keys: if key in source.variables: fill_value = getattr(source.variables[key], "_FillValue", False) variable = source.variables[key] var_out = target.createVariable( key, variable.datatype, variable.dimensions, fill_value=fill_value, ) var_out.setncatts( { k: variable.getncattr(k) for k in variable.ncattrs() if k != "_FillValue" }, ) var_out[:] = variable[:]
[docs] def copy_global( source: netCDF4.Dataset, target: netCDF4.Dataset, attributes: tuple, ) -> None: """Copies global attributes from one file to another. Args: source: Source object. target: Target object. attributes: List of attributes to be copied. """ source_attributes = source.ncattrs() for attr in attributes: if attr in source_attributes: setattr(target, attr, source.getncattr(attr))
[docs] def add_time_attribute( attributes: dict, date: list[str] | datetime.date, key: str = "time", ) -> dict: """Adds time attribute with correct units.""" if isinstance(date, list): date_str = "-".join(date) elif isinstance(date, datetime.date): date_str = date.isoformat() else: raise TypeError units = f"hours since {date_str} 00:00:00 +00:00" if key not in attributes: attributes[key] = MetaData(units=units) else: attributes[key] = attributes[key]._replace(units=units) return attributes
[docs] def add_source_attribute(attributes: dict, data: Observations) -> dict: """Adds source attribute to variables.""" variables = { "radar": ( "v", "width", "v_sigma", "ldr", "Z", "zdr", "sldr", "radar_frequency", "nyquist_velocity", "rainfall_rate", ), "lidar": ("beta", "lidar_wavelength"), "mwr": ("lwp",), "model": ("uwind", "vwind", "Tw", "q", "pressure", "temperature"), "disdrometer": ("rainfall_rate",), } for instrument, keys in variables.items(): if getattr(data, instrument) is None: continue source = getattr(data, instrument).dataset.source for key in keys: if key in attributes: attributes[key] = attributes[key]._replace(source=source) else: attributes[key] = MetaData(source=source) return attributes
[docs] def update_attributes(cloudnet_variables: dict, attributes: dict) -> None: """Overrides existing CloudnetArray-attributes. Overrides existing attributes using hard-coded values. New attributes are added. Args: cloudnet_variables: CloudnetArray instances. attributes: Product-specific attributes. """ for key in cloudnet_variables: if key in COMMON_ATTRIBUTES: cloudnet_variables[key].set_attributes(COMMON_ATTRIBUTES[key]) if key in attributes: cloudnet_variables[key].set_attributes(attributes[key])
def _write_vars2nc(nc: netCDF4.Dataset, cloudnet_variables: dict) -> None: """Iterates over Cloudnet instances and write to netCDF file.""" for obj in cloudnet_variables.values(): if ma.isMaskedArray(obj.data): fill_value = netCDF4.default_fillvals[obj.data_type] else: fill_value = False size = obj.dimensions or _get_dimensions(nc, obj.data) nc_variable = nc.createVariable( obj.name, obj.data_type, size, zlib=True, fill_value=fill_value, ) nc_variable[:] = obj.data for attr in obj.fetch_attributes(): setattr(nc_variable, attr, getattr(obj, attr)) def _get_dimensions(nc: netCDF4.Dataset, data: np.ndarray) -> tuple: """Finds correct dimensions for a variable.""" if utils.isscalar(data): return () variable_size: list = [] file_dims = nc.dimensions array_dims = data.shape for length in array_dims: dim = [key for key in file_dims if file_dims[key].size == length][0] # noqa: RUF015 variable_size = [*variable_size, dim] return tuple(variable_size) def _get_identifier(short_id: str) -> str: valid_ids = ( "lwc", "iwc", "drizzle", "classification", "der", "ier", "classification-voodoo", ) if short_id not in valid_ids: msg = f"Invalid file identifier: {short_id}" raise ValueError(msg) if short_id == "iwc": return "ice water content" if short_id == "lwc": return "liquid water content" if short_id == "ier": return "ice effective radius" if short_id == "der": return "droplet effective radius" return short_id def add_standard_global_attributes( nc: netCDF4.Dataset, uuid: UUID | str | None = None, ) -> None: nc.Conventions = "CF-1.8" nc.cloudnetpy_version = version.__version__ nc.file_uuid = str(uuid) if uuid is not None else utils.get_uuid()
[docs] def fix_attribute_name(nc: netCDF4.Dataset) -> None: """Changes incorrect 'unit' variable attribute to correct 'units'. This is true at least for 'drg' variable in raw MIRA files. """ for var in nc.variables: if "unit" in nc[var].ncattrs(): logging.info('Renaming "unit" attribute into "units"') nc[var].setncattr("units", nc[var].unit) nc[var].delncattr("unit")
def fix_time_attributes(nc: netCDF4.Dataset) -> None: nc.variables["time"].standard_name = "time" nc.variables["time"].long_name = "Time UTC" nc.variables["time"].calendar = "standard" nc.variables[ "time" ].units = f"hours since {nc.year}-{nc.month}-{nc.day} 00:00:00 +00:00" def replace_attribute_with_standard_value( nc: netCDF4.Dataset, variables: tuple, attributes: tuple, ) -> None: for key in variables: if key in COMMON_ATTRIBUTES and key in nc.variables: for attr in attributes: if (value := getattr(COMMON_ATTRIBUTES[key], attr)) is not None: setattr(nc.variables[key], attr, value)