Source code for instruments.rain_e_h3

import csv
import datetime
from os import PathLike
from uuid import UUID

import numpy as np

from cloudnetpy import output
from cloudnetpy.exceptions import ValidTimeStampError
from cloudnetpy.instruments import instruments
from cloudnetpy.instruments.cloudnet_instrument import CSVFile
from cloudnetpy.utils import get_uuid


[docs] def rain_e_h32nc( input_file: str | PathLike, output_file: str | PathLike, site_meta: dict, uuid: str | UUID | None = None, date: str | datetime.date | None = None, ) -> UUID: """Converts rain_e_h3 rain gauge into Cloudnet Level 1b netCDF file. Args: input_file: Filename of rain_e_h3 CSV file. output_file: Output filename. site_meta: Dictionary containing information about the site. Required key is `name`. uuid: Set specific UUID for the file. date: Expected date of the measurements as YYYY-MM-DD or datetime.date object. Returns: UUID of the generated file. Raises: ValidTimeStampError: No valid timestamps found. """ rain = RainEH3(site_meta) if isinstance(date, str): date = datetime.date.fromisoformat(date) uuid = get_uuid(uuid) rain.parse_input_file(input_file, date) rain.add_data() rain.add_date() rain.convert_units() rain.normalize_cumulative_amount("rainfall_amount") rain.add_site_geolocation() rain.sort_timestamps() rain.remove_duplicate_timestamps() attributes = output.add_time_attribute({}, rain.date) output.update_attributes(rain.data, attributes) output.save_level1b(rain, output_file, uuid) return uuid
class RainEH3(CSVFile): def __init__(self, site_meta: dict) -> None: super().__init__(site_meta) self.instrument = instruments.RAIN_E_H3 self._data = { "time": [], "rainfall_rate": [], "rainfall_amount": [], } def parse_input_file( self, filepath: str | PathLike, date: datetime.date | None = None ) -> None: with open(filepath, encoding="latin1") as f: data = list(csv.reader(f, delimiter=";")) n_values = np.median([len(row) for row in data]).astype(int) if n_values == 22: self._read_talker_protocol_22_columns(data, date) elif n_values == 16: self._read_talker_protocol_16_columns(data, date) else: msg = "Only talker protocol with 16 or 22 columns is supported." raise NotImplementedError(msg) def _read_talker_protocol_16_columns( self, data: list, date: datetime.date | None = None ) -> None: """Old Lindenberg data format. 0 date DD.MM.YYYY 1 time 2 precipitation intensity in mm/h 3 precipitation accumulation in mm 4 housing contact 5 top temperature 6 bottom temperature 7 heater status 8 error code 9 system status 10 talker interval in seconds 11 operating hours 12 device type 13 user data storage 1 14 user data storage 2 15 user data storage 3 """ for row in data: if len(row) != 16: continue try: dt = datetime.datetime.strptime( f"{row[0]} {row[1]}", "%d.%m.%Y %H:%M:%S" ) except ValueError: continue if date and date != dt.date(): continue self._data["time"].append(dt) self._data["rainfall_rate"].append(float(row[2])) self._data["rainfall_amount"].append(float(row[3])) if not self._data["time"]: raise ValidTimeStampError def _read_talker_protocol_22_columns( self, data: list, date: datetime.date | None = None ) -> None: """Columns according to header in Lindenberg data. 0 datetime utc 1 date 2 time 3 precipitation intensity in mm/h 4 precipitation accumulation in mm 5 housing contact 6 top temperature 7 bottom temperature 8 heater status 9 error code 10 system status 11 talker interval in seconds 12 operating hours 13 device type 14 user data storage 1 15 user data storage 2 16 user data storage 3 17 user data storage 4 18 serial number 19 hardware version 20 firmware version 21 external temperature * checksum """ for row in data: if len(row) != 22: continue try: dt = datetime.datetime.strptime(f"{row[0]}", "%Y-%m-%d %H:%M:%S") except ValueError: continue if date and date != dt.date(): continue self._data["time"].append(dt) self._data["rainfall_rate"].append(float(row[3])) self._data["rainfall_amount"].append(float(row[4])) self.serial_number = row[18] if not self._data["time"]: raise ValidTimeStampError def convert_units(self) -> None: rainfall_rate = self.data["rainfall_rate"][:] self.data["rainfall_rate"].data = rainfall_rate / 3600 / 1000 # mm/h -> m/s self.data["rainfall_amount"].data = ( self.data["rainfall_amount"][:] / 1000 ) # mm -> m