"""
r2c_utils.py
Utility functions to read, process, and write EnSim‐format .r2c ASCII files,
reorder MATLAB GRU data arrays, convert to 2D grids, and clean attribute names.
Utility functions to read, process, and write EnSim-format .r2c ASCII files,
reorder MATLAB GRU data arrays, convert to 2D grids, clean attribute names,
and produce small-multiples georeferenced plots (absolute or difference).
"""
import numpy as np
import re
import numpy as np
import geopandas as gpd
import matplotlib.pyplot as plt
from math import ceil
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
[docs]
def read_r2c_file(file_path):
"""
Read an EnSim‐format .r2c ASCII file and return header info, attribute metadata,
and the 3D data array.
Parameters
----------
file_path : str
Path to the .r2c file.
Returns
-------
header_info : dict[str, str]
Mapping from header keys (e.g. ':xCount', ':yOrigin') to their values.
attributes : dict[int, dict]
Mapping from attribute ID → {'name': str, 'type': str|None, 'units': str|None}.
data_matrix : np.ndarray, shape (n_attributes, n_rows, n_cols)
The numeric grid values, reshaped according to header counts.
"""
header_info = {}
attributes = {}
data_values = []
# ─── 1) Parse header ─────────────────────────────────────────────────────────
with open(file_path, 'r') as f:
line = f.readline().strip()
while ':EndHeader' not in line:
if line.startswith(':'):
parts = line.split()
key = parts[0]
if 'Attribute' not in key:
header_info[key] = ' '.join(parts[1:])
else:
attr_id = int(parts[1])
meta_dict = attributes.setdefault(
attr_id, {'name': None, 'type': None, 'units': None}
)
if 'AttributeName' in key:
meta_dict['name'] = ' '.join(parts[2:])
elif 'AttributeType' in key:
meta_dict['type'] = ' '.join(parts[2:])
elif 'AttributeUnits' in key:
meta_dict['units'] = ' '.join(parts[2:])
line = f.readline().strip()
# ─── 2) Read numeric grid values ───────────────────────────────────────────────
with open(file_path, 'r') as f:
# skip header
for line in f:
if line.strip() == ':EndHeader':
break
# read values
for line in f:
data_values.extend([float(tok) for tok in line.split()])
# ─── 3) Reshape into (attrs, rows, cols) ──────────────────────────────────────
n_cols = int(header_info[':xCount'])
n_rows = int(header_info[':yCount'])
n_attributes = len(attributes)
expected = n_attributes * n_rows * n_cols
if len(data_values) != expected:
raise ValueError(f"Expected {expected} values, got {len(data_values)}")
data_matrix = np.array(data_values, dtype=float)
data_matrix = data_matrix.reshape((n_attributes, n_rows, n_cols))
return header_info, attributes, data_matrix
[docs]
def reorder_attributes(attribute_data, new_order_ids, attributes):
"""
Reorder a flat name→2D-array dict according to a list of attribute IDs.
Parameters
----------
attribute_data : dict[str, np.ndarray]
Mapping attribute name → 2D array as read from a .r2c.
new_order_ids : list[int]
1-based attribute IDs in the order you want to write them.
attributes : dict[int, dict]
Original mapping from attribute ID → metadata dict returned by read_r2c_file.
Returns
-------
ordered_attributes : dict[int, dict]
New ID→metadata, where IDs run 1..N in the new sequence.
ordered_attribute_data : dict[str, np.ndarray]
Mapping attribute name → 2D array, in the same new sequence.
"""
ordered_attributes = {
new_idx: attributes[attr_id]
for new_idx, attr_id in enumerate(new_order_ids, start=1)
}
ordered_attribute_data = {
ordered_attributes[i]['name']: attribute_data[
attributes[attr_id]['name']
]
for i, attr_id in enumerate(new_order_ids, start=1)
}
return ordered_attributes, ordered_attribute_data
[docs]
def reorder_matlab_data(gru_data, order_ids):
"""
Reorder a MATLAB‐loaded GRU struct array into a specified field sequence.
Parameters
----------
gru_data : np.ndarray, dtype=object or structured
A 1×1 struct array from scipy.io.loadmat.
order_ids : list[int]
1-based indices indicating the new order of fields.
Returns
-------
np.ndarray of object, shape (len(order_ids),)
Each entry is the original gru_data[field][0,0] in the new sequence.
"""
# get original field names
field_names = [name for name, _ in gru_data.dtype.descr]
# map to new order
new_fields = [field_names[i-1] for i in order_ids]
# extract values
reordered = np.array(
[gru_data[field][0, 0] for field in new_fields],
dtype=object
)
return reordered
[docs]
def convert_to_2d(gru_data):
"""
Convert a 1D object array of 1D arrays into a 2D float array.
Parameters
----------
gru_data : np.ndarray, dtype=object
Object array where each element is a 1D numeric sequence.
Returns
-------
np.ndarray or None
2D array of shape (layers, length) or None on failure.
"""
if isinstance(gru_data, np.ndarray) and gru_data.dtype == object:
try:
return np.vstack([np.array(layer, dtype=float) for layer in gru_data])
except Exception as e:
print(f"[convert_to_2d] Conversion failed: {e}")
return None
return gru_data
[docs]
def write_new_r2c(new_file_path, header_info, ordered_attributes, attribute_data):
"""
Write a new .r2c file from header info, attribute metadata, and grid data.
Parameters
----------
new_file_path : str
Path to write the .r2c file.
header_info : dict[str, str]
Header key→value mapping (as from read_r2c_file).
ordered_attributes : dict[int, dict]
Attribute ID→metadata dict specifying write order.
attribute_data : dict[str, np.ndarray]
Mapping attribute name→2D array of shape (n_rows, n_cols).
"""
with open(new_file_path, 'w') as out:
# write non-attribute header lines
for key, val in header_info.items():
if not key.startswith(':Attribute'):
out.write(f"{key} {val}\n")
# write attribute metadata
for attr_id, meta in ordered_attributes.items():
out.write(f":AttributeName {attr_id} {meta['name']}\n")
if meta.get('type'):
out.write(f":AttributeType {attr_id} {meta['type']}\n")
if meta.get('units'):
out.write(f":AttributeUnits {attr_id} {meta['units']}\n")
out.write(":EndHeader\n")
# write grid rows for each attribute
for attr_id, meta in ordered_attributes.items():
name = meta['name']
if name not in attribute_data:
raise KeyError(f"Missing data for attribute '{name}'")
grid = attribute_data[name]
for row in grid:
out.write(" ".join(map(str, row)) + "\n")
[docs]
def clean_attribute_name(name):
"""
Strip parentheses and trailing whitespace from an attribute name.
Parameters
----------
name : str
Raw attribute name, possibly containing "(units)" or similar.
Returns
-------
str
Cleaned name with text in parentheses removed.
"""
return re.sub(r"\(.*?\)", "", name).strip()
# ─── PLOTTING FUNCTIONS ──────────────────────────────────────────────────────────
[docs]
def plot_diff_panel_geo(
base_mat, scen_mat,
header_info, attributes,
subset_ids, skip_idxs,
panel_title, out_file
):
"""
Small-multiples of (scen – base) for each attribute in subset_ids.
"""
diff_list = [scen_mat[aid-1] - base_mat[aid-1] for aid in subset_ids]
_plot_panel_geo(
diff_list, header_info, attributes,
subset_ids, skip_idxs, panel_title, out_file,
cmap='seismic_r', vmin=-1, vmax=1
)
[docs]
def plot_r2c_panel_geo(
mat,
header_info, attributes,
subset_ids, skip_idxs,
panel_title, out_file,
cmap='viridis', vmin=None, vmax=None
):
"""
Small-multiples of the raw .r2c layers in mat.
"""
arr_list = [mat[aid-1] for aid in subset_ids]
_plot_panel_geo(
arr_list, header_info, attributes,
subset_ids, skip_idxs, panel_title, out_file,
cmap=cmap, vmin=vmin, vmax=vmax
)
def _plot_panel_geo(
arr_list, header_info, attributes,
subset_ids, skip_idxs,
panel_title, out_file,
cmap, vmin, vmax
):
"""
Core routine for both absolute and difference panels.
"""
# grid geometry
x0 = float(header_info[':xOrigin'])
y0 = float(header_info[':yOrigin'])
dx = float(header_info[':xDelta'])
dy = float(header_info.get(':yDelta', header_info[':xDelta']))
nx = int(header_info[':xCount'])
ny = int(header_info[':yCount'])
lons = x0 + np.arange(nx)*dx
lats = y0 + np.arange(ny)*dy
lon_edges = np.r_[lons - dx/2, lons[-1] + dx/2]
lat_edges = np.r_[lats - dy/2, lats[-1] + dy/2]
# shapefile (SRB sub-drainages)
shp = gpd.read_file('/home/fuaday/matlablaptop/SRBshape/SaskRB_SubDrainage2.shp')
# build panels
titles = [clean_attribute_name(attributes[aid]['name']) for aid in subset_ids]
panels = [
(t,a)
for idx,(t,a) in enumerate(zip(titles,arr_list))
if idx not in skip_idxs
]
n = len(panels)
cols = 3
rows = ceil(n/cols)
fig,axes = plt.subplots(rows,cols,
figsize=(2.5*cols, 2.3*rows),
constrained_layout=True,
facecolor='white')
axes = axes.flatten()
for ax,(title,arr) in zip(axes, panels):
arrp = np.where(arr==0, np.nan, arr)
mesh = ax.pcolormesh(lon_edges, lat_edges, arrp,
cmap=cmap, vmin=vmin, vmax=vmax,
shading='auto')
shp.boundary.plot(ax=ax, edgecolor='black', linewidth=0.2)
ax.set_xlim(lon_edges[0], lon_edges[-1])
ax.set_ylim(lat_edges[0], lat_edges[-1])
ax.grid(True, which='major',
color='gray', linestyle='--',
linewidth=0.5, alpha=0.7)
ax.set_title(title, fontsize=10)
ax.set_xlabel("Longitude", fontsize=7)
ax.set_ylabel("Latitude", fontsize=7)
ax.tick_params(axis='both', labelsize=7)
# inset boxplot
flat = arr.flatten()
flat = flat[~np.isnan(flat)]
flat = flat[flat!=0]
inset = ax.inset_axes([0.30, 0.11, 0.65, 0.11])
if flat.size:
inset.boxplot(
flat, vert=False, notch=False, whis=1.5, widths=0.5,
patch_artist=True, showcaps=True, showfliers=True,
showmeans=False, meanline=False,
boxprops=dict(facecolor='lightgray', edgecolor='blue'),
whiskerprops=dict(color='black', linewidth=1.0),
capprops=dict(color='black', linewidth=1.0),
medianprops=dict(color='red', linewidth=1.2),
flierprops=dict(marker='+', color='black', alpha=0.6, markersize=3),
manage_ticks=True
)
lo,hi = np.percentile(flat,[1,99])
inset.set_xlim(lo,hi)
q1,q2,q3 = np.percentile(flat,[1,50,99])
inset.set_xticks([q1,q2,q3])
inset.set_xticklabels([f"{q1:.2f}",f"{q2:.2f}",f"{q3:.2f}"], fontsize=6)
inset.tick_params(axis='x', pad=1, labelsize=6, colors='blue')
else:
inset.text(0.5,0.5,"no change",ha='center',va='center',fontsize=6)
inset.set_xticks([])
inset.patch.set_facecolor('none')
inset.set_yticks([])
inset.set_frame_on(False)
# disable unused axes
for ax in axes[n:]:
ax.axis('off')
# shared colorbar
cbar = fig.colorbar(mesh, ax=axes[:n].tolist(),
orientation='horizontal',
fraction=0.02, pad=0.04)
cbar.set_label(f"{panel_title} (Δ fractional cover)")
fig.suptitle(panel_title, fontsize=16, y=1.02)
fig.savefig(out_file, dpi=300)
plt.show()