Source code for concat_lib

"""Module for concatenating netCDF files."""

import logging
import shutil
from collections.abc import Iterable
from os import PathLike
from pathlib import Path
from typing import Literal

import netCDF4
import numpy as np

from cloudnetpy import utils
from cloudnetpy.exceptions import InconsistentDataError


[docs] def truncate_netcdf_file( filename: str, output_file: str, n_profiles: int, dim_name: str = "time" ) -> None: """Truncates netcdf file in dim_name dimension taking only n_profiles. Useful for creating small files for tests. """ with ( netCDF4.Dataset(filename, "r") as nc, netCDF4.Dataset(output_file, "w", format=nc.data_model) as nc_new, ): for dim in nc.dimensions: dim_len = None if dim == dim_name else nc.dimensions[dim].size nc_new.createDimension(dim, dim_len) for attr in nc.ncattrs(): value = getattr(nc, attr) setattr(nc_new, attr, value) for key in nc.variables: array = nc.variables[key][:] dimensions = nc.variables[key].dimensions fill_value = getattr(nc.variables[key], "_FillValue", None) var = nc_new.createVariable( key, array.dtype, dimensions, zlib=True, fill_value=fill_value, ) if dimensions and dim_name in dimensions[0]: if array.ndim == 1: var[:] = array[:n_profiles] if array.ndim == 2: var[:] = array[:n_profiles, :] else: var[:] = array for attr in nc.variables[key].ncattrs(): if attr != "_FillValue": value = getattr(nc.variables[key], attr) setattr(var, attr, value)
[docs] def update_nc(old_file: str, new_file: str) -> int: """Appends data to existing netCDF file. Args: old_file: Filename of an existing netCDF file. new_file: Filename of a new file whose data will be appended to the end. Returns: 1 = success, 0 = failed to add new data. Notes: Requires 'time' variable with unlimited dimension. """ try: with ( netCDF4.Dataset(old_file, "a") as nc_old, netCDF4.Dataset(new_file) as nc_new, ): valid_ind = _find_valid_time_indices(nc_old, nc_new) if len(valid_ind) > 0: _update_fields(nc_old, nc_new, valid_ind) return 1 return 0 except OSError: return 0
[docs] def concatenate_files( filenames: Iterable[PathLike | str], output_file: str, concat_dimension: str = "time", variables: list | None = None, new_attributes: dict | None = None, ignore: list | None = None, allow_difference: list | None = None, ) -> list: """Concatenate netCDF files in one dimension. Args: filenames: List of files to be concatenated. output_file: Output file name. concat_dimension: Dimension name for concatenation. Default is 'time'. variables: List of variables with the 'concat_dimension' to be concatenated. Default is None when all variables with 'concat_dimension' will be saved. new_attributes: Optional new global attributes as {'attribute_name': value}. ignore: List of variables to be ignored. allow_difference: Names of scalar variables that can differ from one file to another (value from the first file is saved). Returns: List of filenames that were successfully concatenated. Notes: Arrays without 'concat_dimension', scalars, and global attributes will be taken from the first file. Groups, possibly present in a NETCDF4 formatted file, are ignored. """ with _Concat(filenames, output_file, concat_dimension) as concat: concat.get_common_variables() concat.create_global_attributes(new_attributes) return concat.concat_data(variables, ignore, allow_difference)
class _Concat: common_variables: set[str] def __init__( self, filenames: Iterable[PathLike | str], output_file: str, concat_dimension: str = "time", ): self.filenames = sorted(map(Path, filenames), key=lambda f: f.name) self.concat_dimension = concat_dimension self.first_filename = self.filenames[0] self.first_file = netCDF4.Dataset(self.first_filename) self.concatenated_file = self._init_output_file(output_file) self.common_variables = set() def get_common_variables(self) -> None: """Finds variables which should have the same values in all files.""" for key, value in self.first_file.variables.items(): if self.concat_dimension not in value.dimensions: self.common_variables.add(key) def create_global_attributes(self, new_attributes: dict | None) -> None: """Copies global attributes from one of the source files.""" _copy_attributes(self.first_file, self.concatenated_file) if new_attributes is not None: for key, value in new_attributes.items(): setattr(self.concatenated_file, key, value) def concat_data( self, variables: list | None, ignore: list | None, allow_vary: list | None, ) -> list: """Concatenates data arrays.""" self._write_initial_data(variables, ignore) output = [self.first_filename] if len(self.filenames) > 1: for filename in self.filenames[1:]: try: self._append_data(filename, allow_vary) except RuntimeError as e: if "NetCDF: HDF error" in str(e): msg = f"Caught a NetCDF HDF error. Skipping file '{filename}'." logging.exception(msg) continue raise output.append(filename) return output def _write_initial_data(self, variables: list | None, ignore: list | None) -> None: for key in self.first_file.variables: if ( variables is not None and key not in variables and key not in self.common_variables and key != self.concat_dimension ): continue if ignore and key in ignore: continue auto_scale = False self.first_file[key].set_auto_scale(auto_scale) array = self.first_file[key][:] dimensions = self.first_file[key].dimensions fill_value = getattr(self.first_file[key], "_FillValue", None) var = self.concatenated_file.createVariable( key, array.dtype, dimensions, zlib=True, complevel=3, shuffle=False, fill_value=fill_value, ) auto_scale = False var.set_auto_scale(auto_scale) var[:] = array _copy_attributes(self.first_file[key], var) def _append_data(self, filename: str | PathLike, allow_vary: list | None) -> None: with netCDF4.Dataset(filename) as file: auto_scale = False file.set_auto_scale(auto_scale) ind0 = len(self.concatenated_file.variables[self.concat_dimension]) ind1 = ind0 + len(file.variables[self.concat_dimension]) for key in self.concatenated_file.variables: if key not in file.variables: continue array = file[key][:] if key in self.common_variables: if allow_vary is not None and key in allow_vary: continue if not np.array_equal(self.first_file[key][:], array): msg = ( f"Inconsistent values in variable '{key}' between " f"files '{self.first_filename}' and '{filename}'" ) raise InconsistentDataError(msg) continue if array.ndim == 0: continue if array.ndim == 1: self.concatenated_file.variables[key][ind0:ind1] = array else: self.concatenated_file.variables[key][ind0:ind1, :] = array def _init_output_file(self, output_file: str) -> netCDF4.Dataset: data_model: Literal["NETCDF4", "NETCDF4_CLASSIC"] = ( "NETCDF4" if self.first_file.data_model == "NETCDF4" else "NETCDF4_CLASSIC" ) nc = netCDF4.Dataset(output_file, "w", format=data_model) for dim in self.first_file.dimensions: dim_len = ( None if dim == self.concat_dimension else self.first_file.dimensions[dim].size ) nc.createDimension(dim, dim_len) return nc def _close(self) -> None: self.first_file.close() self.concatenated_file.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self._close() def _copy_attributes( source: netCDF4.Dataset | netCDF4.Variable, target: netCDF4.Dataset | netCDF4.Variable, ) -> None: for attr in source.ncattrs(): if attr != "_FillValue": value = getattr(source, attr) setattr(target, attr, value) def _find_valid_time_indices( nc_old: netCDF4.Dataset, nc_new: netCDF4.Dataset, ) -> np.ndarray: return np.where(nc_new.variables["time"][:] > nc_old.variables["time"][-1])[0] def _update_fields( nc_old: netCDF4.Dataset, nc_new: netCDF4.Dataset, valid_ind: np.ndarray, ) -> None: ind0 = len(nc_old.variables["time"]) idx = [ind0 + x for x in valid_ind] concat_dimension = nc_old.variables["time"].dimensions[0] for field in nc_new.variables: if field not in nc_old.variables: continue dimensions = nc_new.variables[field].dimensions if concat_dimension in dimensions: concat_ind = dimensions.index(concat_dimension) if len(dimensions) == 1: nc_old.variables[field][idx] = nc_new.variables[field][valid_ind] elif len(dimensions) == 2 and concat_ind == 0: nc_old.variables[field][idx, :] = nc_new.variables[field][valid_ind, :] elif len(dimensions) == 2 and concat_ind == 1: nc_old.variables[field][:, idx] = nc_new.variables[field][:, valid_ind]
[docs] def concatenate_text_files(filenames: list, output_filename: str | PathLike) -> None: """Concatenates text files.""" with open(output_filename, "wb") as target: for filename in filenames: with open(filename, "rb") as source: shutil.copyfileobj(source, target)
[docs] def bundle_netcdf_files( files: list, date: str, output_file: str, concat_dimensions: tuple[str, ...] = ("time", "profile"), variables: list | None = None, ) -> list: """Concatenates several netcdf files into daily file with some extra data manipulation. """ with netCDF4.Dataset(files[0]) as nc: concat_dimension = None for key in concat_dimensions: if key in nc.dimensions: concat_dimension = key break if concat_dimension is None: msg = f"Dimension '{concat_dimensions}' not found in the files." raise KeyError(msg) if len(files) == 1: shutil.copy(files[0], output_file) return files valid_files = [] for file in files: try: with netCDF4.Dataset(file) as nc: time = nc.variables["time"] time_array = time[:] time_units = time.units except OSError: continue epoch = utils.get_epoch(time_units) for timestamp in time_array: if utils.seconds2date(timestamp, epoch)[:3] == date.split("-"): valid_files.append(file) break concatenate_files( valid_files, output_file, concat_dimension=concat_dimension, variables=variables, ignore=[ "minimum", "maximum", "number_integrated_samples", "Min_LWP", "Max_LWP", ], ) return valid_files