"""Module for reading Radiometrics MP3014 microwave radiometer data."""
import csv
import datetime
import logging
import os
import re
from operator import attrgetter
from typing import Any, NamedTuple
import numpy as np
from cloudnetpy import output, utils
from cloudnetpy.cloudnetarray import CloudnetArray
from cloudnetpy.exceptions import InconsistentDataError, ValidTimeStampError
from cloudnetpy.instruments import instruments
from cloudnetpy.metadata import MetaData
[docs]
def radiometrics2nc(
full_path: str,
output_file: str,
site_meta: dict,
uuid: str | None = None,
date: str | datetime.date | None = None,
) -> str:
"""Converts Radiometrics .csv file into Cloudnet Level 1b netCDF file.
Args:
full_path: Input file name or folder containing multiple input files.
output_file: Output file name, e.g. 'radiometrics.nc'.
site_meta: Dictionary containing information about the site and instrument.
Required key value pairs are `name` and `altitude` (metres above mean
sea level).
uuid: Set specific UUID for the file.
date: Expected date as YYYY-MM-DD of all profiles in the file.
Returns:
UUID of the generated file.
Examples:
>>> from cloudnetpy.instruments import radiometrics2nc
>>> site_meta = {'name': 'Soverato', 'altitude': 21}
>>> radiometrics2nc('radiometrics.csv', 'radiometrics.nc', site_meta)
"""
if isinstance(date, str):
date = datetime.date.fromisoformat(date)
if os.path.isdir(full_path):
valid_filenames = utils.get_sorted_filenames(full_path, ".csv")
else:
valid_filenames = [full_path]
objs = []
for filename in valid_filenames:
obj = Radiometrics(filename)
obj.read_raw_data()
obj.read_data()
objs.append(obj)
radiometrics = RadiometricsCombined(objs, site_meta)
radiometrics.screen_time(date)
radiometrics.sort_timestamps()
radiometrics.time_to_fractional_hours()
radiometrics.data_to_cloudnet_arrays()
radiometrics.add_meta()
if radiometrics.date is None:
msg = "Failed to find valid timestamps from Radiometrics file(s)."
raise ValidTimeStampError(msg)
attributes = output.add_time_attribute(ATTRIBUTES, radiometrics.date)
output.update_attributes(radiometrics.data, attributes)
return output.save_level1b(radiometrics, output_file, uuid)
class Record(NamedTuple):
row_number: int
block_type: int
block_index: int
timestamp: datetime.datetime
values: dict[str, Any]
class Radiometrics:
"""Reader for level 2 files of Radiometrics microwave radiometers.
References:
Radiometrics (2008). Profiler Operator's Manual: MP-3000A, MP-2500A,
MP-1500A, MP-183A.
"""
def __init__(self, filename: str):
self.filename = filename
self.raw_data: list[Record] = []
self.data: dict = {}
self.instrument = instruments.RADIOMETRICS
self.ranges: list[str] = []
def read_raw_data(self) -> None:
"""Reads Radiometrics raw data."""
record_columns = {}
unknown_record_types = set()
rows = []
with open(self.filename, encoding="utf8") as infile:
reader = csv.reader(infile, skipinitialspace=True)
for row in reader:
if row[0] == "Record":
if row[1] != "Date/Time":
msg = "Unexpected header in Radiometrics file"
raise RuntimeError(msg)
record_type = int(row[2])
columns = row[3:]
record_columns[record_type] = columns
if record_type in (10, 400):
self.ranges = [
column
for column in columns
if re.fullmatch(r"\d+\.\d+", column)
]
else:
record_type = int(row[2])
block_type = record_type // 10 * 10
block_index = record_type - block_type - 1
column_names = record_columns.get(block_type)
if column_names is None:
if record_type not in unknown_record_types:
logging.info("Skipping unknown record type %d", record_type)
unknown_record_types.add(record_type)
continue
record = Record(
row_number=int(row[0]),
timestamp=_parse_datetime(row[1]),
block_type=block_type,
block_index=block_index,
values=dict(zip(column_names, row[3:], strict=True)),
)
rows.append(record)
self.raw_data = sorted(rows, key=attrgetter("row_number"))
def read_data(self) -> None:
"""Reads values."""
times = []
lwps = []
iwvs = []
irts = []
irt_times = []
temps = []
temp_times = []
rhs = []
rh_times = []
ahs = []
ah_times = []
block_titles = {}
for record in self.raw_data:
if record.block_type == 100:
block_type = int(record.values["Record Type"]) - 1
title = record.values["Title"]
block_titles[block_type] = title
if title := block_titles.get(record.block_type + record.block_index):
if title == "Temperature (K)":
temp_times.append(record.timestamp)
temps.append(
[float(record.values[column]) for column in self.ranges]
)
elif title == "Relative Humidity (%)":
rh_times.append(record.timestamp)
rhs.append([float(record.values[column]) for column in self.ranges])
elif title == "Vapor Density (g/m^3)":
ah_times.append(record.timestamp)
ahs.append([float(record.values[column]) for column in self.ranges])
elif record.block_type == 10:
if record.block_index == 0:
lwp = record.values["Lqint(mm)"]
iwv = record.values["Vint(cm)"]
irt = record.values["Tir(K)"]
times.append(record.timestamp)
lwps.append(float(lwp))
iwvs.append(float(iwv))
irt_times.append(record.timestamp)
irts.append([float(irt)])
temp_times.append(record.timestamp)
temps.append(
[float(record.values[column]) for column in self.ranges]
)
elif record.block_index == 1:
ah_times.append(record.timestamp)
ahs.append([float(record.values[column]) for column in self.ranges])
elif record.block_index == 2:
rh_times.append(record.timestamp)
rhs.append([float(record.values[column]) for column in self.ranges])
elif record.block_type == 200:
irt_times.append(record.timestamp)
irt = record.values["Tir(K)"]
irts.append([float(irt)])
elif record.block_type == 300:
lwp = record.values["Int. Liquid(mm)"]
iwv = record.values["Int. Vapor(cm)"]
times.append(record.timestamp)
lwps.append(float(lwp))
iwvs.append(float(iwv))
self.data["time"] = np.array(times, dtype="datetime64[s]")
self.data["lwp"] = np.array(lwps) # mm => kg m-2
self.data["iwv"] = np.array(iwvs) * 10 # cm => kg m-2
self.data["irt"] = _find_closest(
np.array(irt_times, dtype="datetime64[s]"),
np.array(irts),
self.data["time"],
)
self.data["temperature"] = _find_closest(
np.array(temp_times, dtype="datetime64[s]"),
np.array(temps),
self.data["time"],
)
self.data["relative_humidity"] = _find_closest(
np.array(rh_times, dtype="datetime64[s]"),
np.array(rhs) / 100, # % => 1
self.data["time"],
)
self.data["absolute_humidity"] = _find_closest(
np.array(ah_times, dtype="datetime64[s]"),
np.array(ahs) / 1000, # g m-3 => kg m-3
self.data["time"],
)
class RadiometricsCombined:
site_meta: dict
data: dict
date: datetime.date | None
instrument: instruments.Instrument
def __init__(self, objs: list[Radiometrics], site_meta: dict):
self.site_meta = site_meta
self.data = {}
self.date = None
for obj in objs:
if obj.ranges != objs[0].ranges:
msg = "Inconsistent range between files"
raise InconsistentDataError(msg)
for key in obj.data:
self.data = utils.append_data(self.data, key, obj.data[key])
ranges = [float(x) for x in objs[0].ranges]
self.data["range"] = np.array(ranges) * 1000 # m => km
self.data["height"] = self.data["range"] + self.site_meta["altitude"]
self.instrument = instruments.RADIOMETRICS
def screen_time(self, expected_date: datetime.date | None) -> None:
"""Screens timestamps."""
if expected_date is None:
self.date = self.data["time"][0].astype(object).date()
return
self.date = expected_date
valid_mask = self.data["time"].astype("datetime64[D]") == self.date
if np.count_nonzero(valid_mask) == 0:
raise ValidTimeStampError
for key in self.data:
if key in ("range", "height"):
continue
self.data[key] = self.data[key][valid_mask]
def sort_timestamps(self):
ind = np.argsort(self.data["time"])
for key in self.data:
if key in ("range", "height"):
continue
self.data[key] = self.data[key][ind]
def time_to_fractional_hours(self) -> None:
base = self.data["time"][0].astype("datetime64[D]")
self.data["time"] = (self.data["time"] - base) / np.timedelta64(1, "h")
def data_to_cloudnet_arrays(self) -> None:
"""Converts arrays to CloudnetArrays."""
for key, array in self.data.items():
dimensions = (
("time", "range")
if key in ("temperature", "relative_humidity", "absolute_humidity")
else None
)
self.data[key] = CloudnetArray(array, key, dimensions=dimensions)
def add_meta(self) -> None:
"""Adds some metadata."""
valid_keys = ("latitude", "longitude", "altitude")
for key, value in self.site_meta.items():
name = key.lower()
if name in valid_keys:
self.data[name] = CloudnetArray(float(value), key)
def _parse_datetime(text: str) -> datetime.datetime:
date, time = text.split()
month, day, year = map(int, date.split("/"))
hour, minute, second = map(int, time.split(":"))
if year < 100:
year += 2000
return datetime.datetime(
year,
month,
day,
hour,
minute,
second,
)
def _find_closest(x: np.ndarray, y: np.ndarray, x_new: np.ndarray) -> np.ndarray:
return y[np.argmin(np.abs(x_new - x[:, np.newaxis]), axis=0)]
ATTRIBUTES = {
"irt": MetaData(
long_name="Infrared brightness temperatures",
units="K",
),
}