Source code for instruments.weather_station

import csv
import datetime
import math
import re
from collections import defaultdict
from collections.abc import Iterable
from os import PathLike

import numpy as np
from numpy import ma

from cloudnetpy import output
from cloudnetpy.categorize import atmos_utils
from cloudnetpy.cloudnetarray import CloudnetArray
from cloudnetpy.constants import HPA_TO_PA, MM_H_TO_M_S, SEC_IN_HOUR
from cloudnetpy.exceptions import ValidTimeStampError
from cloudnetpy.instruments import instruments
from cloudnetpy.instruments.cloudnet_instrument import CSVFile
from cloudnetpy.instruments.toa5 import read_toa5
from cloudnetpy.utils import datetime2decimal_hours


[docs] def ws2nc( weather_station_file: str | list[str], output_file: str, site_meta: dict, uuid: str | None = None, date: str | None = None, ) -> str: """Converts weather station data into Cloudnet Level 1b netCDF file. Args: weather_station_file: Filename of weather-station ASCII file. output_file: Output filename. site_meta: Dictionary containing information about the site. Required key is `name`. uuid: Set specific UUID for the file. date: Expected date of the measurements as YYYY-MM-DD. Returns: UUID of the generated file. Raises: ValidTimeStampError: No valid timestamps found. """ if not isinstance(weather_station_file, list): weather_station_file = [weather_station_file] ws: WS if site_meta["name"] == "Palaiseau": ws = PalaiseauWS(weather_station_file, site_meta) elif site_meta["name"] == "Bucharest": ws = BucharestWS(weather_station_file, site_meta) elif site_meta["name"] == "Granada": ws = GranadaWS(weather_station_file, site_meta) elif site_meta["name"] == "Kenttärova": ws = KenttarovaWS(weather_station_file, site_meta) elif site_meta["name"] == "Hyytiälä": ws = HyytialaWS(weather_station_file, site_meta) elif site_meta["name"] == "Galați": ws = GalatiWS(weather_station_file, site_meta) elif site_meta["name"] == "Jülich": ws = JuelichWS(weather_station_file, site_meta) elif site_meta["name"] == "Lampedusa": ws = LampedusaWS(weather_station_file, site_meta) elif site_meta["name"] == "Limassol": ws = LimassolWS(weather_station_file, site_meta) else: msg = "Unsupported site" raise ValueError(msg) if date is not None: ws.screen_timestamps(date) ws.convert_time() ws.add_date() ws.add_site_geolocation() ws.add_data() ws.convert_temperature_and_humidity() ws.convert_pressure() ws.convert_rainfall_rate() ws.convert_rainfall_amount() ws.normalize_rainfall_amount() ws.calculate_rainfall_amount() attributes = output.add_time_attribute({}, ws.date) output.update_attributes(ws.data, attributes) return output.save_level1b(ws, output_file, uuid)
class WS(CSVFile): def __init__(self, site_meta: dict): super().__init__(site_meta) self.instrument = instruments.GENERIC_WEATHER_STATION date: list[str] def calculate_rainfall_amount(self) -> None: if "rainfall_amount" in self.data or "rainfall_rate" not in self.data: return resolution = np.median(np.diff(self.data["time"].data)) * SEC_IN_HOUR rainfall_amount = ma.cumsum(self.data["rainfall_rate"].data * resolution) self.data["rainfall_amount"] = CloudnetArray(rainfall_amount, "rainfall_amount") def screen_timestamps(self, date: str) -> None: dates = np.array([str(d.date()) for d in self._data["time"]]) valid_mask = dates == date if not valid_mask.any(): raise ValidTimeStampError for key in self._data: self._data[key] = self._data[key][valid_mask] @staticmethod def format_data(data: dict) -> dict: for key, value in data.items(): new_value = np.array(value) if key != "time": new_value = ma.masked_where(np.isnan(new_value), new_value) data[key] = new_value return data def convert_temperature_and_humidity(self) -> None: temperature_kelvins = atmos_utils.c2k(self.data["air_temperature"][:]) self.data["air_temperature"].data = temperature_kelvins self.data["relative_humidity"].data = self.data["relative_humidity"][:] / 100 def convert_rainfall_rate(self) -> None: if "rainfall_rate" not in self.data: return rainfall_rate = self.data["rainfall_rate"][:] self.data["rainfall_rate"].data = rainfall_rate / 60 / 1000 # mm/min -> m/s def convert_pressure(self) -> None: if "air_pressure" not in self.data: return self.data["air_pressure"].data = self.data["air_pressure"][:] * HPA_TO_PA def convert_time(self) -> None: pass def convert_rainfall_amount(self) -> None: pass class PalaiseauWS(WS): def __init__(self, filenames: list[str], site_meta: dict): super().__init__(site_meta) self.filenames = filenames self._data = self._read_data() def _read_data(self) -> dict: timestamps, values, header = [], [], [] for filename in self.filenames: with open(filename, encoding="latin-1") as f: data = f.readlines() for row in data: if not (columns := row.split()): continue if row.startswith("#"): header_row = "".join(columns) if header_row not in header: header.append(header_row) else: timestamp = datetime.datetime.strptime( columns[0], "%Y-%m-%dT%H:%M:%SZ" ).replace(tzinfo=datetime.timezone.utc) values.append([timestamp] + [float(x) for x in columns[1:]]) timestamps.append(timestamp) self._validate_header(header) return {"time": timestamps, "values": values} def convert_time(self) -> None: decimal_hours = datetime2decimal_hours(self._data["time"]) self.data["time"] = CloudnetArray(decimal_hours, "time") def screen_timestamps(self, date: str) -> None: dates = [str(d.date()) for d in self._data["time"]] valid_ind = [ind for ind, d in enumerate(dates) if d == date] if not valid_ind: raise ValidTimeStampError for key in self._data: self._data[key] = [ x for ind, x in enumerate(self._data[key]) if ind in valid_ind ] def add_data(self) -> None: keys = ( "wind_speed", "wind_direction", "air_temperature", "relative_humidity", "air_pressure", "rainfall_rate", "rainfall_amount", ) for ind, key in enumerate(keys): array = [row[ind + 1] for row in self._data["values"]] array_masked = ma.masked_invalid(array) self.data[key] = CloudnetArray(array_masked, key) def convert_rainfall_amount(self) -> None: self.data["rainfall_amount"].data = ( self.data["rainfall_amount"][:] / 1000 ) # mm -> m @staticmethod def _validate_header(header: list[str]) -> None: expected_identifiers = [ "DateTime(yyyy-mm-ddThh:mm:ssZ)", "Windspeed(m/s)", "Winddirection(deg", "Airtemperature", "Relativehumidity(%)", "Pressure(hPa)", "Precipitationrate(mm/min)", "precipitation", ] column_titles = [row for row in header if "Col." in row] error_msg = "Unexpected weather station file format" if len(column_titles) != len(expected_identifiers): raise ValueError(error_msg) for title, identifier in zip(column_titles, expected_identifiers, strict=True): if identifier not in title: raise ValueError(error_msg) class BucharestWS(PalaiseauWS): def convert_rainfall_rate(self) -> None: rainfall_rate = self.data["rainfall_rate"][:] self.data["rainfall_rate"].data = rainfall_rate * MM_H_TO_M_S class GranadaWS(WS): def __init__(self, filenames: list[str], site_meta: dict): if len(filenames) != 1: raise ValueError super().__init__(site_meta) self.filename = filenames[0] self._data = self._read_data() def _read_data(self) -> dict: keymap = { "TIMESTAMP": "time", "air_t_Avg": "air_temperature", "rh_Avg": "relative_humidity", "pressure_Avg": "air_pressure", "wind_speed_avg": "wind_speed", "wind_dir_avg": "wind_direction", "rain_Tot": "rainfall_rate", } expected_units = { "air_t_Avg": "degC", "rh_Avg": "%", "pressure_Avg": "hPa", "wind_speed_avg": "m/s", "wind_dir_avg": "Deg", "rain_Tot": "mm", } units, process, rows = read_toa5(self.filename) for key in units: if key in expected_units and expected_units[key] != units[key]: msg = ( f"Expected {key} to have units {expected_units[key]}," f" got {units[key]} instead" ) raise ValueError(msg) data: dict[str, list] = {keymap[key]: [] for key in units if key in keymap} for row in rows: for key, value in row.items(): if key not in keymap: continue parsed = value if keymap[key] != "time": try: parsed = float(value) except ValueError: parsed = math.nan data[keymap[key]].append(parsed) return self.format_data(data) class KenttarovaWS(WS): def __init__(self, filenames: list[str], site_meta: dict): super().__init__(site_meta) self.filenames = filenames self._data = self._read_data() def _read_data(self) -> dict: merged: dict = {} for filename in self.filenames: with open(filename, newline="") as f: reader = csv.DictReader(f) raw_data: dict = {key: [] for key in reader.fieldnames} # type: ignore[union-attr] for row in reader: for key, value in row.items(): parsed_value: float | datetime.datetime if key == "Read time (UTC+2)": parsed_value = datetime.datetime.strptime( value, "%Y-%m-%d %H:%M:%S" ) - datetime.timedelta(hours=2) else: try: parsed_value = float(value) except ValueError: parsed_value = math.nan raw_data[key].append(parsed_value) data = { "time": raw_data["Read time (UTC+2)"], "air_temperature": raw_data["Temp 2m (C)"], "relative_humidity": raw_data["Humidity 2m (%)"], "air_pressure": raw_data["Pressure (hPa)"], "wind_speed": raw_data["Wind speed (m/s)"], "wind_direction": raw_data["Wind dir (deg)"], "rainfall_rate": raw_data["Precipitation (?)"], } if merged: merged = {key: [*merged[key], *data[key]] for key in merged} else: merged = data return self.format_data(merged) def convert_rainfall_rate(self) -> None: # Rainfall rate is 10-minute averaged in mm h-1 rainfall_rate = self.data["rainfall_rate"][:] self.data["rainfall_rate"].data = rainfall_rate * MM_H_TO_M_S / 10 def convert_pressure(self) -> None: # Magic number 10 to convert to realistic Pa self.data["air_pressure"].data = self.data["air_pressure"][:] * 10 class HyytialaWS(WS): """Hyytiälä rain-gauge variables: a = Pluvio400 and b = Pluvio200. E.g. - AaRNRT/mm = amount of non-real-time rain total (Pluvio400) [mm] - BbRT/mm = Bucket content in real-time (Pluvio200) [mm]. """ def __init__(self, filenames: list[str], site_meta: dict): super().__init__(site_meta) self.filename = filenames[0] self._data = self._read_data() def _read_data(self) -> dict: with open(self.filename, newline="") as f: # Skip first two lines for _ in range(2): next(f) # Read header header_line = f.readline().strip() fields = header_line[1:].strip().split() reader = csv.DictReader( f, delimiter=" ", skipinitialspace=True, fieldnames=fields ) if reader.fieldnames is None: raise ValueError raw_data: dict = {key: [] for key in reader.fieldnames} raw_data["time"] = [] # Read data for row in reader: for key, value in row.items(): if key: parsed_value: float | datetime.datetime if key == "y": current_time = datetime.datetime( int(value), int(row["m"]), int(row["d"]), int(row["minute"]) // 60, int(row["minute"]) % 60, ) raw_data["time"].append(current_time) else: try: parsed_value = float(value) except (TypeError, ValueError): parsed_value = math.nan if parsed_value in (-99.99, -99.9): parsed_value = math.nan raw_data[key].append(parsed_value) data = { "time": raw_data["time"], "air_temperature": raw_data["Ta/dsC"], "relative_humidity": raw_data["RH/pcnt"], "air_pressure": raw_data["Pa/kPa"], "wind_speed": raw_data["WS/(m/s)"], "wind_direction": raw_data["WD/ds"], "rainfall_rate": raw_data["AaNRT/mm"], } return self.format_data(data) def convert_pressure(self) -> None: self.data["air_pressure"].data = ( self.data["air_pressure"][:] * 1000 ) # kPa to Pa class GalatiWS(WS): def __init__(self, filenames: list[str], site_meta: dict): super().__init__(site_meta) self.filename = filenames[0] self._data = self._read_data() def _read_data(self) -> dict: with open(self.filename, newline="") as f: reader = csv.DictReader(f) raw_data: dict = {key: [] for key in reader.fieldnames} # type: ignore[union-attr] for row in reader: for key, value in row.items(): parsed_value: float | datetime.datetime if key == "TimeStamp": parsed_value = datetime.datetime.strptime( value, "%Y-%m-%d %H:%M:%S.%f" ) else: try: parsed_value = float(value) except ValueError: parsed_value = math.nan raw_data[key].append(parsed_value) def read_value(keys: Iterable[str]): for key in keys: if key in raw_data: return raw_data[key] raise KeyError("Didn't find any keys: " + ", ".join(keys)) data = { "time": read_value(["TimeStamp"]), "air_temperature": read_value(["Temperature", "Temperatura"]), "relative_humidity": read_value(["RH", "Umiditate_relativa"]), "air_pressure": read_value( ["Atmospheric_pressure", "Presiune_atmosferica"] ), "rainfall_rate": read_value(["Precipitations", "Precipitatii"]), "wind_speed": read_value(["Wind_speed", "Viteza_vant"]), "wind_direction": read_value(["Wind_direction", "Directie_vant"]), } return self.format_data(data) def add_data(self) -> None: # Skip wind measurements where range was limited to 0-180 degrees if datetime.date(*map(int, self.date)) < datetime.date(2024, 10, 29): del self._data["wind_speed"] del self._data["wind_direction"] return super().add_data() def convert_pressure(self) -> None: mmHg2Pa = 133.322 self.data["air_pressure"].data = self.data["air_pressure"][:] * mmHg2Pa class JuelichWS(WS): def __init__(self, filenames: list[str], site_meta: dict): super().__init__(site_meta) self.filename = filenames[0] self._data = self._read_data() def _read_data(self) -> dict: keymap = { "TIMESTAMP": "time", "AirTC_Avg": "air_temperature", "RH": "relative_humidity", "BV_BP_Avg": "air_pressure", "WS_ms_S_WVT": "wind_speed", "WindDir_D1_WVT": "wind_direction", } expected_units = { "AirTC_Avg": "Deg C", "RH": "%", "BV_BP_Avg": "hPa", "WS_ms_S_WVT": "meters/Second", "WindDir_D1_WVT": "Deg", } units, process, rows = read_toa5(self.filename) for key in units: if key in expected_units and expected_units[key] != units[key]: msg = ( f"Expected {key} to have units {expected_units[key]}," f" got {units[key]} instead" ) raise ValueError(msg) data: dict[str, list] = {keymap[key]: [] for key in units if key in keymap} for row in rows: for key, value in row.items(): if key not in keymap: continue parsed = value if keymap[key] != "time": parsed = float(value) data[keymap[key]].append(parsed) return self.format_data(data) class LampedusaWS(WS): """Read Lampedusa weather station data in ICOS format.""" def __init__(self, filenames: list[str], site_meta: dict): super().__init__(site_meta) self.filename = filenames[0] self._data = self._read_data() def _read_data(self) -> dict: with open(self.filename, newline="") as f: fields = [ "time", "str1", "str2", "T", "RH", "Td", "P", "WSi", "WDi", "WS10m", "WD10m", "rain1m", "rain2h", "empty", ] reader = csv.DictReader(f, fieldnames=fields) raw_data: dict = {key: [] for key in fields} for row in reader: for key, value in row.items(): fixed_value = value.strip("\0") parsed_value: float | datetime.datetime if key == "time": parsed_value = datetime.datetime.strptime( fixed_value, "%y%m%d %H%M%S" ) else: try: parsed_value = float(fixed_value) except ValueError: parsed_value = math.nan raw_data[key].append(parsed_value) data = { "time": raw_data["time"], "air_temperature": raw_data["T"], "relative_humidity": raw_data["RH"], "air_pressure": raw_data["P"], "wind_speed": raw_data["WSi"], "wind_direction": raw_data["WDi"], "rainfall_rate": raw_data["rain1m"], } return self.format_data(data) class LimassolWS(WS): def __init__(self, filenames: list[str], site_meta: dict): super().__init__(site_meta) self.filenames = filenames self._data = defaultdict(list) for filename in filenames: for key, values in _parse_sirta(filename).items(): self._data[key].extend(values) self._data["time"] = self._data.pop("Date Time (yyyy-mm-ddThh:mm:ss)") def convert_time(self) -> None: decimal_hours = datetime2decimal_hours(self._data["time"]) self.data["time"] = CloudnetArray(decimal_hours, "time") def screen_timestamps(self, date: str) -> None: dates = [str(d.date()) for d in self._data["time"]] valid_ind = [ind for ind, d in enumerate(dates) if d == date] if not valid_ind: raise ValidTimeStampError for key in self._data: self._data[key] = [ x for ind, x in enumerate(self._data[key]) if ind in valid_ind ] def add_data(self) -> None: self.data["air_temperature"] = CloudnetArray( np.array(self._data["Air temperature (°C)"]), "air_temperature" ) self.data["relative_humidity"] = CloudnetArray( np.array(self._data["Relative humidity (%)"]), "relative_humidity" ) self.data["rainfall_rate"] = CloudnetArray( np.array(self._data["Total precipitation (mm)"]), "rainfall_rate" ) # Wind speed and direction are available since 2025-02-13: if ( "Wind speed at 10m (m/s)" in self._data and "Wind direction at 10m (degrees)" in self._data ): self.data["wind_speed"] = CloudnetArray( np.array(self._data["Wind speed at 10m (m/s)"]), "wind_speed" ) self.data["wind_direction"] = CloudnetArray( np.array(self._data["Wind direction at 10m (degrees)"]), "wind_direction", ) else: self.data["wind_speed"] = CloudnetArray( np.array(self._data["Wind speed (m/s)"]), "wind_speed" ) def convert_rainfall_rate(self) -> None: rainfall_rate = self.data["rainfall_rate"][:] self.data["rainfall_rate"].data = ( rainfall_rate / (10 * 60) / 1000 ) # mm/(10 min) -> m/s def _parse_sirta(filename: str | PathLike): """Parse SIRTA-style weather station file.""" with open(filename, "rb") as f: raw_content = f.read() try: content = raw_content.decode("utf-8") except UnicodeDecodeError: content = raw_content.decode("latin-1") lines = [line.strip() for line in content.splitlines()] columns: list[str] = [] output: dict = {} for line in lines: m = re.fullmatch(r"#\s*Col.\s*(\d+)\s*:\s*(.*)", line) if m is None: continue if m[1] != str(len(columns) + 1): msg = f"Expected column {m[1]}, found {len(columns)+1}" raise ValueError(msg) columns.append(m[2]) output[m[2]] = [] for line in lines: if not line or line.startswith("#"): continue values = line.split() if len(columns) != len(values): continue for column, value in zip(columns, values, strict=False): parsed: float | datetime.datetime if column == "Date Time (yyyy-mm-ddThh:mm:ss)": parsed = datetime.datetime.strptime(value, "%Y-%m-%dT%H:%M:%S").replace( tzinfo=datetime.timezone.utc ) elif column == "Date Time (yyyy-mm-ddThh:mm:ssZ)": parsed = datetime.datetime.strptime( value, "%Y-%m-%dT%H:%M:%SZ" ).replace(tzinfo=datetime.timezone.utc) else: parsed = float(value) output[column].append(parsed) return output