Source code for concat_lib

"""Module for concatenating netCDF files."""

import logging
import shutil
from collections.abc import Iterable
from os import PathLike
from pathlib import Path
from typing import Literal

import netCDF4
import numpy as np
from numpy import ma

from cloudnetpy import utils


[docs] def truncate_netcdf_file( filename: str, output_file: str, n_profiles: int, dim_name: str = "time" ) -> None: """Truncates netcdf file in dim_name dimension taking only n_profiles. Useful for creating small files for tests. """ with ( netCDF4.Dataset(filename, "r") as nc, netCDF4.Dataset(output_file, "w", format=nc.data_model) as nc_new, ): for dim in nc.dimensions: dim_len = None if dim == dim_name else nc.dimensions[dim].size nc_new.createDimension(dim, dim_len) for attr in nc.ncattrs(): value = getattr(nc, attr) setattr(nc_new, attr, value) for key in nc.variables: array = nc.variables[key][:] dimensions = nc.variables[key].dimensions fill_value = getattr(nc.variables[key], "_FillValue", None) var = nc_new.createVariable( key, array.dtype, dimensions, zlib=True, fill_value=fill_value, ) if dimensions and dim_name in dimensions[0]: if array.ndim == 1: var[:] = array[:n_profiles] if array.ndim == 2: var[:] = array[:n_profiles, :] else: var[:] = array for attr in nc.variables[key].ncattrs(): if attr != "_FillValue": value = getattr(nc.variables[key], attr) setattr(var, attr, value)
[docs] def update_nc(old_file: str, new_file: str) -> int: """Appends data to existing netCDF file. Args: old_file: Filename of an existing netCDF file. new_file: Filename of a new file whose data will be appended to the end. Returns: 1 = success, 0 = failed to add new data. Notes: Requires 'time' variable with unlimited dimension. """ try: with ( netCDF4.Dataset(old_file, "a") as nc_old, netCDF4.Dataset(new_file) as nc_new, ): valid_ind = _find_valid_time_indices(nc_old, nc_new) if len(valid_ind) > 0: _update_fields(nc_old, nc_new, valid_ind) return 1 return 0 except OSError: return 0
[docs] def concatenate_files( filenames: Iterable[PathLike | str], output_file: str, concat_dimension: str = "time", variables: list | None = None, new_attributes: dict | None = None, ignore: list | None = None, interp_dimension: str = "range", ) -> list: """Concatenate netCDF files in one dimension. Args: filenames: List of files to be concatenated. output_file: Output file name. concat_dimension: Dimension name for concatenation. Default is 'time'. variables: List of variables with the 'concat_dimension' to be concatenated. Default is None when all variables with 'concat_dimension' will be saved. new_attributes: Optional new global attributes as {'attribute_name': value}. ignore: List of variables to be ignored. interp_dimension: Dimension name for interpolation if the dimensions are not the same. Returns: List of filenames that were successfully concatenated. Notes: Arrays without 'concat_dimension' and scalars are expanded to the concat_dimension. Global attributes are taken from the first file. Groups, possibly present in a NETCDF4 formatted file, are ignored. """ with _Concat(filenames, output_file, concat_dimension, interp_dimension) as concat: concat.create_global_attributes(new_attributes) return concat.concat_data(variables, ignore)
class _Concat: common_variables: set[str] def __init__( self, filenames: Iterable[PathLike | str], output_file: str, concat_dimension: str = "time", interp_dim: str = "range", ): self.filenames = sorted(map(Path, filenames), key=lambda f: f.name) self.concat_dimension = concat_dimension self.interp_dim = interp_dim self.first_filename = self.filenames[0] self.first_file = netCDF4.Dataset(self.first_filename) self.concatenated_file = self._init_output_file(output_file) def create_global_attributes(self, new_attributes: dict | None) -> None: """Copies global attributes from one of the source files.""" _copy_attributes(self.first_file, self.concatenated_file) if new_attributes is not None: for key, value in new_attributes.items(): setattr(self.concatenated_file, key, value) def concat_data( self, keep: list | None = None, ignore: list | None = None, ) -> list: """Concatenates data arrays.""" self._write_initial_data(keep, ignore) output = [self.first_filename] if len(self.filenames) > 1: for filename in self.filenames[1:]: try: self._append_data(filename) except RuntimeError as e: if "NetCDF: HDF error" in str(e): msg = f"Caught a NetCDF HDF error. Skipping file '{filename}'." logging.exception(msg) continue raise output.append(filename) return output def _write_initial_data(self, keep: list | None, ignore: list | None) -> None: len_concat_dim = self.first_file[self.concat_dimension].size auto_scale = False for key, var in self.first_file.variables.items(): if ( # This filtering only affects variables having the concat_dimension keep is not None and key not in keep and key != self.concat_dimension and self.concat_dimension in var.dimensions ): continue if ignore and key in ignore: continue var.set_auto_scale(auto_scale) array, dimensions = self._expand_array(var, len_concat_dim) fill_value = var.get_fill_value() var_new = self.concatenated_file.createVariable( key, array.dtype, dimensions, zlib=True, complevel=3, shuffle=False, fill_value=fill_value, ) var_new.set_auto_scale(auto_scale) var_new[:] = array _copy_attributes(var, var_new) def _expand_array( self, var: netCDF4.Variable, n_data: int ) -> tuple[ma.MaskedArray, tuple[str, ...]]: dimensions = var.dimensions arr = var[:] if self.concat_dimension not in dimensions and var.name != self.interp_dim: dimensions = (self.concat_dimension, *dimensions) arr = np.repeat(arr[np.newaxis, ...], n_data, axis=0) return arr, dimensions def _append_data(self, filename: str | PathLike) -> None: with netCDF4.Dataset(filename) as file: auto_scale = False file.set_auto_scale(auto_scale) ind0 = len(self.concatenated_file.variables[self.concat_dimension]) ind1 = ind0 + len(file.variables[self.concat_dimension]) n_points = ind1 - ind0 for key in self.concatenated_file.variables: if key not in file.variables or key == self.interp_dim: continue array, dimensions = self._expand_array(file[key], n_points) # Nearest neighbour interpolation in the interp_dim dimension # if the dimensions are not the same between the files if self.interp_dim in dimensions and ( self.first_file[self.interp_dim].size != file[self.interp_dim].size ): x = file.variables[self.interp_dim][:] x_target = self.first_file.variables[self.interp_dim][:] idx = np.abs(x[:, None] - x_target[None, :]).argmin(axis=0) array = array[:, idx] out_of_bounds = (x_target < x.min()) | (x_target > x.max()) fill_value = self.first_file.variables[key].get_fill_value() array[:, out_of_bounds] = fill_value self.concatenated_file.variables[key][ind0:ind1, ...] = array def _init_output_file(self, output_file: str) -> netCDF4.Dataset: data_model: Literal["NETCDF4", "NETCDF4_CLASSIC"] = ( "NETCDF4" if self.first_file.data_model == "NETCDF4" else "NETCDF4_CLASSIC" ) nc = netCDF4.Dataset(output_file, "w", format=data_model) for dim in self.first_file.dimensions: dim_len = ( None if dim == self.concat_dimension else self.first_file.dimensions[dim].size ) nc.createDimension(dim, dim_len) return nc def _close(self) -> None: self.first_file.close() self.concatenated_file.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self._close() def _copy_attributes( source: netCDF4.Dataset | netCDF4.Variable, target: netCDF4.Dataset | netCDF4.Variable, ) -> None: for attr in source.ncattrs(): if attr != "_FillValue": value = getattr(source, attr) setattr(target, attr, value) def _find_valid_time_indices( nc_old: netCDF4.Dataset, nc_new: netCDF4.Dataset, ) -> np.ndarray: return np.where(nc_new.variables["time"][:] > nc_old.variables["time"][-1])[0] def _update_fields( nc_old: netCDF4.Dataset, nc_new: netCDF4.Dataset, valid_ind: np.ndarray, ) -> None: ind0 = len(nc_old.variables["time"]) idx = [ind0 + x for x in valid_ind] concat_dimension = nc_old.variables["time"].dimensions[0] for field in nc_new.variables: if field not in nc_old.variables: continue dimensions = nc_new.variables[field].dimensions if concat_dimension in dimensions: concat_ind = dimensions.index(concat_dimension) if len(dimensions) == 1: nc_old.variables[field][idx] = nc_new.variables[field][valid_ind] elif len(dimensions) == 2 and concat_ind == 0: nc_old.variables[field][idx, :] = nc_new.variables[field][valid_ind, :] elif len(dimensions) == 2 and concat_ind == 1: nc_old.variables[field][:, idx] = nc_new.variables[field][:, valid_ind]
[docs] def concatenate_text_files(filenames: list, output_filename: str | PathLike) -> None: """Concatenates text files.""" with open(output_filename, "wb") as target: for filename in filenames: with open(filename, "rb") as source: shutil.copyfileobj(source, target)
[docs] def bundle_netcdf_files( files: list, date: str, output_file: str, concat_dimensions: tuple[str, ...] = ("time", "profile"), variables: list | None = None, ) -> list: """Concatenates several netcdf files into daily file with some extra data manipulation. """ with netCDF4.Dataset(files[0]) as nc: concat_dimension = None for key in concat_dimensions: if key in nc.dimensions: concat_dimension = key break if concat_dimension is None: msg = f"Dimension '{concat_dimensions}' not found in the files." raise KeyError(msg) if len(files) == 1: shutil.copy(files[0], output_file) return files valid_files = [] for file in files: try: with netCDF4.Dataset(file) as nc: time = nc.variables["time"] time_array = time[:] time_units = time.units except OSError: continue epoch = utils.get_epoch(time_units) for timestamp in time_array: if utils.seconds2date(timestamp, epoch)[:3] == date.split("-"): valid_files.append(file) break concatenate_files( valid_files, output_file, concat_dimension=concat_dimension, variables=variables, ignore=[ "minimum", "maximum", "number_integrated_samples", "Min_LWP", "Max_LWP", ], ) return valid_files