Source code for concat_lib

"""Module for concatenating netCDF files."""

import shutil
from os import PathLike
from typing import Literal

import netCDF4
import numpy as np

from cloudnetpy import utils
from cloudnetpy.exceptions import InconsistentDataError


[docs] def truncate_netcdf_file( filename: str, output_file: str, n_profiles: int, dim_name: str = "time" ) -> None: """Truncates netcdf file in dim_name dimension taking only n_profiles. Useful for creating small files for tests. """ with ( netCDF4.Dataset(filename, "r") as nc, netCDF4.Dataset(output_file, "w", format=nc.data_model) as nc_new, ): for dim in nc.dimensions: dim_len = None if dim == dim_name else nc.dimensions[dim].size nc_new.createDimension(dim, dim_len) for attr in nc.ncattrs(): value = getattr(nc, attr) setattr(nc_new, attr, value) for key in nc.variables: array = nc.variables[key][:] dimensions = nc.variables[key].dimensions fill_value = getattr(nc.variables[key], "_FillValue", None) var = nc_new.createVariable( key, array.dtype, dimensions, zlib=True, fill_value=fill_value, ) if dimensions and dim_name in dimensions[0]: if array.ndim == 1: var[:] = array[:n_profiles] if array.ndim == 2: var[:] = array[:n_profiles, :] else: var[:] = array for attr in nc.variables[key].ncattrs(): if attr != "_FillValue": value = getattr(nc.variables[key], attr) setattr(var, attr, value)
[docs] def update_nc(old_file: str, new_file: str) -> int: """Appends data to existing netCDF file. Args: old_file: Filename of an existing netCDF file. new_file: Filename of a new file whose data will be appended to the end. Returns: 1 = success, 0 = failed to add new data. Notes: Requires 'time' variable with unlimited dimension. """ try: with ( netCDF4.Dataset(old_file, "a") as nc_old, netCDF4.Dataset(new_file) as nc_new, ): valid_ind = _find_valid_time_indices(nc_old, nc_new) if len(valid_ind) > 0: _update_fields(nc_old, nc_new, valid_ind) return 1 return 0 except OSError: return 0
[docs] def concatenate_files( filenames: list, output_file: str, concat_dimension: str = "time", variables: list | None = None, new_attributes: dict | None = None, ignore: list | None = None, allow_difference: list | None = None, ) -> None: """Concatenate netCDF files in one dimension. Args: filenames: List of files to be concatenated. output_file: Output file name. concat_dimension: Dimension name for concatenation. Default is 'time'. variables: List of variables with the 'concat_dimension' to be concatenated. Default is None when all variables with 'concat_dimension' will be saved. new_attributes: Optional new global attributes as {'attribute_name': value}. ignore: List of variables to be ignored. allow_difference: Names of scalar variables that can differ from one file to another (value from the first file is saved). Notes: Arrays without 'concat_dimension', scalars, and global attributes will be taken from the first file. Groups, possibly present in a NETCDF4 formatted file, are ignored. """ with _Concat(filenames, output_file, concat_dimension) as concat: concat.get_common_variables() concat.create_global_attributes(new_attributes) concat.concat_data(variables, ignore, allow_difference)
class _Concat: common_variables: set[str] def __init__( self, filenames: list, output_file: str, concat_dimension: str = "time", ): self.filenames = sorted(filenames) self.concat_dimension = concat_dimension self.first_filename = self.filenames[0] self.first_file = netCDF4.Dataset(self.first_filename) self.concatenated_file = self._init_output_file(output_file) self.common_variables = set() def get_common_variables(self) -> None: """Finds variables which should have the same values in all files.""" for key, value in self.first_file.variables.items(): if self.concat_dimension not in value.dimensions: self.common_variables.add(key) def create_global_attributes(self, new_attributes: dict | None) -> None: """Copies global attributes from one of the source files.""" _copy_attributes(self.first_file, self.concatenated_file) if new_attributes is not None: for key, value in new_attributes.items(): setattr(self.concatenated_file, key, value) def concat_data( self, variables: list | None, ignore: list | None, allow_vary: list | None, ) -> None: """Concatenates data arrays.""" self._write_initial_data(variables, ignore) if len(self.filenames) > 1: for filename in self.filenames[1:]: self._append_data(filename, allow_vary) def _write_initial_data(self, variables: list | None, ignore: list | None) -> None: for key in self.first_file.variables: if ( variables is not None and key not in variables and key not in self.common_variables and key != self.concat_dimension ): continue if ignore and key in ignore: continue auto_scale = False self.first_file[key].set_auto_scale(auto_scale) array = self.first_file[key][:] dimensions = self.first_file[key].dimensions fill_value = getattr(self.first_file[key], "_FillValue", None) var = self.concatenated_file.createVariable( key, array.dtype, dimensions, zlib=True, complevel=3, shuffle=False, fill_value=fill_value, ) auto_scale = False var.set_auto_scale(auto_scale) var[:] = array _copy_attributes(self.first_file[key], var) def _append_data(self, filename: str, allow_vary: list | None) -> None: with netCDF4.Dataset(filename) as file: auto_scale = False file.set_auto_scale(auto_scale) ind0 = len(self.concatenated_file.variables[self.concat_dimension]) ind1 = ind0 + len(file.variables[self.concat_dimension]) for key in self.concatenated_file.variables: if key not in file.variables: continue array = file[key][:] if key in self.common_variables: if allow_vary is not None and key in allow_vary: continue if not np.array_equal(self.first_file[key][:], array): msg = ( f"Inconsistent values in variable '{key}' between " f"files '{self.first_filename}' and '{filename}'" ) raise InconsistentDataError(msg) continue if array.ndim == 0: continue if array.ndim == 1: self.concatenated_file.variables[key][ind0:ind1] = array else: self.concatenated_file.variables[key][ind0:ind1, :] = array def _init_output_file(self, output_file: str) -> netCDF4.Dataset: data_model: Literal["NETCDF4", "NETCDF4_CLASSIC"] = ( "NETCDF4" if self.first_file.data_model == "NETCDF4" else "NETCDF4_CLASSIC" ) nc = netCDF4.Dataset(output_file, "w", format=data_model) for dim in self.first_file.dimensions: dim_len = ( None if dim == self.concat_dimension else self.first_file.dimensions[dim].size ) nc.createDimension(dim, dim_len) return nc def _close(self) -> None: self.first_file.close() self.concatenated_file.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self._close() def _copy_attributes( source: netCDF4.Dataset | netCDF4.Variable, target: netCDF4.Dataset | netCDF4.Variable, ) -> None: for attr in source.ncattrs(): if attr != "_FillValue": value = getattr(source, attr) setattr(target, attr, value) def _find_valid_time_indices( nc_old: netCDF4.Dataset, nc_new: netCDF4.Dataset, ) -> np.ndarray: return np.where(nc_new.variables["time"][:] > nc_old.variables["time"][-1])[0] def _update_fields( nc_old: netCDF4.Dataset, nc_new: netCDF4.Dataset, valid_ind: np.ndarray, ) -> None: ind0 = len(nc_old.variables["time"]) idx = [ind0 + x for x in valid_ind] concat_dimension = nc_old.variables["time"].dimensions[0] for field in nc_new.variables: if field not in nc_old.variables: continue dimensions = nc_new.variables[field].dimensions if concat_dimension in dimensions: concat_ind = dimensions.index(concat_dimension) if len(dimensions) == 1: nc_old.variables[field][idx] = nc_new.variables[field][valid_ind] elif len(dimensions) == 2 and concat_ind == 0: nc_old.variables[field][idx, :] = nc_new.variables[field][valid_ind, :] elif len(dimensions) == 2 and concat_ind == 1: nc_old.variables[field][:, idx] = nc_new.variables[field][:, valid_ind]
[docs] def concatenate_text_files(filenames: list, output_filename: str | PathLike) -> None: """Concatenates text files.""" with open(output_filename, "wb") as target: for filename in filenames: with open(filename, "rb") as source: shutil.copyfileobj(source, target)
[docs] def bundle_netcdf_files( files: list, date: str, output_file: str, concat_dimensions: tuple[str, ...] = ("time", "profile"), variables: list | None = None, ) -> list: """Concatenates several netcdf files into daily file with some extra data manipulation. """ with netCDF4.Dataset(files[0]) as nc: concat_dimension = None for key in concat_dimensions: if key in nc.dimensions: concat_dimension = key break if concat_dimension is None: msg = f"Dimension '{concat_dimensions}' not found in the files." raise KeyError(msg) if len(files) == 1: shutil.copy(files[0], output_file) return files valid_files = [] for file in files: try: with netCDF4.Dataset(file) as nc: time = nc.variables["time"] time_array = time[:] time_units = time.units except OSError: continue epoch = utils.get_epoch(time_units) for timestamp in time_array: if utils.seconds2date(timestamp, epoch)[:3] == date.split("-"): valid_files.append(file) break concatenate_files( valid_files, output_file, concat_dimension=concat_dimension, variables=variables, ignore=[ "minimum", "maximum", "number_integrated_samples", "Min_LWP", "Max_LWP", ], ) return valid_files