Source code for instruments.fd12p

import datetime
import logging
import math
import re
from os import PathLike
from uuid import UUID

import numpy as np
from numpy import ma

from cloudnetpy import output
from cloudnetpy.exceptions import ValidTimeStampError
from cloudnetpy.instruments import instruments
from cloudnetpy.instruments.cloudnet_instrument import CSVFile
from cloudnetpy.metadata import MetaData
from cloudnetpy.utils import get_uuid


[docs] def fd12p2nc( input_file: str | PathLike, output_file: str | PathLike, site_meta: dict, uuid: str | UUID | None = None, date: str | datetime.date | None = None, ) -> UUID: """Converts Vaisala FD12P into Cloudnet Level 1b netCDF file. Args: input_file: Filename of input file. output_file: Output filename. site_meta: Dictionary containing information about the site. Required key is `name`. uuid: Set specific UUID for the file. date: Expected date of the measurements as YYYY-MM-DD or datetime.date object. Returns: UUID of the generated file. Raises: ValidTimeStampError: No valid timestamps found. """ if isinstance(date, str): date = datetime.date.fromisoformat(date) uuid = get_uuid(uuid) fd12p = FD12P(site_meta) fd12p.parse_input_file(input_file, date) fd12p.add_data() fd12p.add_date() fd12p.screen_all_masked() fd12p.sort_timestamps() fd12p.remove_duplicate_timestamps() fd12p.convert_units() fd12p.normalize_cumulative_amount("precipitation_amount") fd12p.normalize_cumulative_amount("snowfall_amount") fd12p.add_site_geolocation() attributes = output.add_time_attribute(ATTRIBUTES, fd12p.date) output.update_attributes(fd12p.data, attributes) output.save_level1b(fd12p, output_file, uuid) return uuid
class FD12P(CSVFile): def __init__(self, site_meta: dict) -> None: super().__init__(site_meta) self.instrument = instruments.FD12P self._data = { key: [] for key in ( "time", "visibility", "synop_WaWa", "precipitation_rate", "precipitation_amount", "snowfall_amount", ) } def parse_input_file( self, filename: str | PathLike, expected_date: datetime.date | None = None ) -> None: # In Lindenberg, format is date and time followed by Message 2 without # non-printable characters. with open(filename) as file: invalid_lines = 0 for line in file: try: columns = line.split() if len(columns) != 13: msg = "Invalid column count" raise ValueError(msg) # noqa: TRY301 date = _parse_date(columns[0]) time = _parse_time(columns[1]) visibility = _parse_int(columns[4]) synop = _parse_int(columns[7]) p_rate = _parse_float(columns[10]) # mm/h p_amount = _parse_float(columns[11]) # mm s_amount = _parse_int(columns[12]) # mm self._data["time"].append(datetime.datetime.combine(date, time)) self._data["visibility"].append(visibility) self._data["synop_WaWa"].append(synop) self._data["precipitation_rate"].append(p_rate) self._data["precipitation_amount"].append(p_amount) self._data["snowfall_amount"].append(s_amount) except ValueError: invalid_lines += 1 continue if invalid_lines: logging.info("Skipped %d lines", invalid_lines) for key in ("visibility", "synop_WaWa", "snowfall_amount"): values = np.array( [0 if math.isnan(x) else x for x in self._data[key]], dtype=np.int32 ) mask = np.array([math.isnan(x) for x in self._data[key]]) self._data[key] = ma.array(values, mask=mask) self._data["snowfall_amount"] = self._data["snowfall_amount"].astype(np.float32) if expected_date: self._data["time"] = [ d for d in self._data["time"] if d.date() == expected_date ] if not self._data["time"]: raise ValidTimeStampError def convert_units(self) -> None: precipitation_rate = self.data["precipitation_rate"][:] self.data["precipitation_rate"].data = ( precipitation_rate / 3600 / 1000 ) # mm/h -> m/s for key in ("precipitation_amount", "snowfall_amount"): self.data[key].data = self.data[key][:] / 1000 # mm -> m def screen_all_masked(self) -> None: is_valid = np.ones_like(self.data["time"][:], dtype=np.bool_) for key in self.data: if key == "time": continue is_valid &= ma.getmaskarray(self.data[key][:]) self.screen_time_indices(~is_valid) def _parse_date(date: str) -> datetime.date: match = re.fullmatch(r"(?P<day>\d{2})\.(?P<month>\d{2})\.(?P<year>\d{4})", date) if match is None: msg = f"Invalid date: {date}" raise ValueError(msg) return datetime.date(int(match["year"]), int(match["month"]), int(match["day"])) def _parse_time(time: str) -> datetime.time: match = re.fullmatch( r"(?P<hour>\d{2}):(?P<minute>\d{2})(:(?P<second>\d{2}))?", time ) if match is None: msg = f"Invalid time: {time}" raise ValueError(msg) return datetime.time( int(match["hour"]), int(match["minute"]), int(match["second"]) if match["second"] is not None else 0, ) def _parse_int(value: str) -> float: if "/" in value: return math.nan return int(value) def _parse_float(value: str) -> float: if "/" in value: return math.nan return float(value) ATTRIBUTES = { "visibility": MetaData( long_name="Meteorological optical range (MOR) visibility", units="m", standard_name="visibility_in_air", dimensions=("time",), ), "precipitation_rate": MetaData( long_name="Precipitation rate", standard_name="lwe_precipitation_rate", units="m s-1", dimensions=("time",), ), "precipitation_amount": MetaData( long_name="Precipitation amount", standard_name="lwe_thickness_of_precipitation_amount", units="m", comment="Cumulated precipitation since 00:00 UTC", dimensions=("time",), ), "snowfall_amount": MetaData( long_name="Snowfall amount", units="m", standard_name="thickness_of_snowfall_amount", comment="Cumulated snow since 00:00 UTC", dimensions=("time",), ), }