"""
Overview
========
The ``GSDESoil`` class provides a pipeline to process, clean, interpolate, and integrate soil property data
into hydrological model inputs, such as those required by MESH. It is designed to handle GSDE-derived statistics
stored in CSV files, convert them to model-ready format using weighted depth-averaging, and merge them into a
basin shapefile based on unique identifiers (e.g., ``COMID``).
Function Descriptions
=====================
.. py:class:: GSDESoil(directory, input_basin, output_shapefile)
Initializes the processor with input/output paths.
:param directory: Directory containing input CSV files.
:type directory: str
:param input_basin: Path to the input shapefile with a COMID field.
:type input_basin: str
:param output_shapefile: Path where the merged shapefile will be saved.
:type output_shapefile: str
.. py:method:: load_data(file_names, search_replace_dict=None, suffix_dict=None)
Loads and merges soil data from multiple CSV files. Columns can be renamed using
search/replace rules and optionally suffixed to avoid name collisions.
:param file_names: List of CSV file names.
:type file_names: list
:param search_replace_dict: Dictionary with filename as key and (search_list, replace_list) as value.
:type search_replace_dict: dict, optional
:param suffix_dict: Dictionary with filename as key and string suffix as value.
:type suffix_dict: dict, optional
.. py:method:: fill_and_clean_data(exclude_cols=['COMID'], exclude_patterns=['OC', 'BD', 'BDRICM', 'BDTICM'], max_val=100)
Cleans soil data by removing outliers, rescaling BDRICM/BDTICM, and filling missing values via forward/backward fill.
:param exclude_cols: Columns to ignore during cleaning.
:type exclude_cols: list
:param exclude_patterns: Substrings used to skip certain columns during range checks.
:type exclude_patterns: list
:param max_val: Maximum threshold for valid data (values above this become NaN).
:type max_val: float
.. py:method:: calculate_weights(gsde_intervals, mesh_intervals)
Computes weights to map GSDE soil depth intervals to model mesh layers.
:param gsde_intervals: List of tuples representing GSDE depth layers (e.g., [(0, 0.045), ...]).
:type gsde_intervals: list of tuple
:param mesh_intervals: List of tuples representing target model layer depths.
:type mesh_intervals: list of tuple
.. py:method:: calculate_mesh_values(column_names)
Applies weights to calculate layer-averaged MESH-compatible soil properties.
:param column_names: Dictionary mapping each property (e.g., "CLAY", "OC") to its source columns.
:type column_names: dict
.. py:method:: merge_and_save_shapefile()
Merges the processed soil data with the input basin shapefile using ``COMID`` and saves the final output.
.. py:method:: set_coordinates(input_ddb)
Optionally reads spatial reference (lon, lat, subbasin) from a NetCDF drainage database.
:param input_ddb: Path to the NetCDF drainage database file.
:type input_ddb: str
Example Usage
=============
.. code-block:: python
from gsde_soil import GSDESoil
# Step 1: Initialize the soil processor with paths to your directories and files
gsde = GSDESoil(
directory='/home/fuaday/scratch/sras-agg-model/gistool-outputs',
input_basin='/home/fuaday/scratch/sras-agg-model/geofabric-outputs/sras_subbasins_MAF_Agg2.shp',
output_shapefile='merged_soil_data_shapefile4.shp'
)
# Step 2: Define the list of input CSV files
file_names = [
'sras_model_stats_CLAY1.csv', 'sras_model_stats_CLAY2.csv',
'sras_model_stats_SAND1.csv', 'sras_model_stats_SAND2.csv',
'sras_model_stats_OC1.csv', 'sras_model_stats_OC2.csv',
'sras_model_stats_BDRICM_M_250m_ll.csv',
'sras_model_stats_BDTICM_M_250m_ll.csv',
'sras_model_slope_degree.csv', 'sras_model_riv_0p1_2.csv'
]
# Step 3: Prepare renaming instructions for each file (search/replace patterns)
search_replace_dict = {
'sras_model_stats_CLAY1.csv': (['.CLAY_depth=4.5', '.CLAY_depth=9.1000004', '.CLAY_depth=16.6', '.CLAY_depth=28.9'], ['CLAY1', 'CLAY2', 'CLAY3', 'CLAY4']),
'sras_model_stats_CLAY2.csv': (['.CLAY_depth=49.299999', '.CLAY_depth=82.900002', '.CLAY_depth=138.3', '.CLAY_depth=229.60001'], ['CLAY5', 'CLAY6', 'CLAY7', 'CLAY8']),
'sras_model_stats_SAND1.csv': (['.SAND_depth=4.5', '.SAND_depth=9.1000004', '.SAND_depth=16.6', '.SAND_depth=28.9'], ['SAND1', 'SAND2', 'SAND3', 'SAND4']),
'sras_model_stats_SAND2.csv': (['.SAND_depth=49.299999', '.SAND_depth=82.900002', '.SAND_depth=138.3', '.SAND_depth=229.60001'], ['SAND5', 'SAND6', 'SAND7', 'SAND8']),
'sras_model_stats_OC1.csv': (['.OC_depth=4.5', '.OC_depth=9.1000004', '.OC_depth=16.6', '.OC_depth=28.9'], ['OC1', 'OC2', 'OC3', 'OC4']),
'sras_model_stats_OC2.csv': (['.OC_depth=49.299999', '.OC_depth=82.900002', '.OC_depth=138.3', '.OC_depth=229.60001'], ['OC5', 'OC6', 'OC7', 'OC8'])
}
# Step 4: Optionally specify suffixes to distinguish overlapping columns
suffix_dict = {
'sras_model_stats_BDRICM_M_250m_ll.csv': 'BDRICM',
'sras_model_stats_BDTICM_M_250m_ll.csv': 'BDTICM'
}
# Step 5: Load the data, applying renaming and suffixes
gsde.load_data(
file_names=file_names,
search_replace_dict=search_replace_dict,
suffix_dict=suffix_dict
)
# Step 6: Clean and prepare the soil data (e.g., remove outliers, fill NaNs)
gsde.fill_and_clean_data()
# Step 7: Define soil profile intervals for GSDE and MESH (depths in meters)
gsde_intervals = [(0, 0.045), (0.045, 0.091), (0.091, 0.166), (0.166, 0.289),
(0.289, 0.493), (0.493, 0.829), (0.829, 1.383), (1.383, 2.296)]
mesh_intervals = [(0, 0.1), (0.1, 0.35), (0.35, 1.2), (1.2, 4.1)]
gsde.calculate_weights(gsde_intervals, mesh_intervals)
# Step 8: Compute mesh-compatible weighted averages of soil properties
column_names = {
'CLAY': ['CLAY1', 'CLAY2', 'CLAY3', 'CLAY4', 'CLAY5', 'CLAY6', 'CLAY7', 'CLAY8'],
'SAND': ['SAND1', 'SAND2', 'SAND3', 'SAND4', 'SAND5', 'SAND6', 'SAND7', 'SAND8'],
'OC': ['OC1', 'OC2', 'OC3', 'OC4', 'OC5', 'OC6', 'OC7', 'OC8']
}
gsde.calculate_mesh_values(column_names)
# Step 9: Merge processed soil data into the basin shapefile and save output
gsde.merge_and_save_shapefile()
"""
import os
import numpy as np
import pandas as pd
import geopandas as gpd
from functools import reduce
import xarray as xr
[docs]
class GSDESoil:
"""
A class to process, clean, interpolate, and merge soil property data from CSV files
with a given basin shapefile, producing model-ready soil inputs.
Attributes
----------
directory : str
Directory containing input CSV files with soil properties.
input_basin : str
Path to the input basin shapefile with a 'COMID' identifier.
output_shapefile : str
Path to the output shapefile with processed soil attributes.
file_paths : list
List of full file paths for input CSVs.
gsde_df : pandas.DataFrame
Combined soil property table after processing.
merged_gdf : geopandas.GeoDataFrame
Final spatial dataset with soil properties merged to polygons.
weights_used : list of list
Weights used to interpolate soil layers into mesh layers.
mesh_intervals : list of tuple
Target depth intervals used for model input (e.g., MESH layers).
lon : ndarray
Longitude values loaded from a NetCDF drainage database.
lat : ndarray
Latitude values loaded from a NetCDF drainage database.
segid : ndarray
Segment IDs (e.g., subbasin or COMID) from a drainage database.
num_soil_lyrs : int
Number of output mesh layers.
"""
def __init__(self, directory, input_basin, output_shapefile):
self.directory = directory
self.input_basin = input_basin
self.output_shapefile = output_shapefile
self.file_paths = []
self.gsde_df = pd.DataFrame()
self.merged_gdf = gpd.GeoDataFrame()
self.weights_used = []
self.mesh_intervals = []
self.lon = []
self.lat = []
self.segid = []
self.num_soil_lyrs = 0
[docs]
def load_data(self, file_names, search_replace_dict=None, suffix_dict=None):
"""
Load and merge multiple CSV files into a single DataFrame. Optionally apply
search-and-replace logic and suffixes to column names to ensure compatibility.
Parameters
----------
file_names : list of str
List of filenames to load from the given directory.
search_replace_dict : dict, optional
Dictionary where keys are filenames and values are (search_list, replace_list) tuples
used to rename columns (e.g., depth labels to CLAY1, CLAY2, etc.).
suffix_dict : dict, optional
Dictionary where keys are filenames and values are suffix strings
to append to column names (useful for distinguishing overlapping variables).
"""
self.file_paths = [os.path.join(self.directory, filename) for filename in file_names]
self.gsde_df = self.load_and_merge_files(self.file_paths, search_replace_dict, suffix_dict)
[docs]
@staticmethod
def load_and_merge_files(file_list, search_replace_dict=None, suffix_dict=None, key='COMID'):
"""
Load multiple CSV files and merge them on a common key. Renames and suffixes
column names as needed during the loading process.
Parameters
----------
file_list : list of str
List of full CSV file paths.
search_replace_dict : dict, optional
Column renaming instructions for each file.
suffix_dict : dict, optional
Suffix strings to append to column names by file.
key : str
Primary key used to merge all data files (default is 'COMID').
Returns
-------
pandas.DataFrame
Merged DataFrame containing columns from all input files.
"""
dfs = []
for fp in file_list:
df = pd.read_csv(fp)
file_name = os.path.basename(fp)
# Apply search and replace for column names if specified
if search_replace_dict and file_name in search_replace_dict:
search_list, replace_list = search_replace_dict[file_name]
for search, replace in zip(search_list, replace_list):
df.columns = [col.replace(search, str(replace)) for col in df.columns]
# Remove periods from column names
df.columns = [col.replace('.', '') for col in df.columns]
# Optionally append a suffix to the column names
if suffix_dict and file_name in suffix_dict:
suffix = suffix_dict[file_name]
if suffix: # Only apply if the suffix is not an empty string
df.columns = [f"{col}{suffix}" if col != key else col for col in df.columns]
dfs.append(df)
# Merge all the dataframes on the specified key
return reduce(lambda left, right: pd.merge(left, right, on=key, how='outer'), dfs)
[docs]
def fill_and_clean_data(self, exclude_cols=['COMID'], exclude_patterns=['OC', 'BD', 'BDRICM', 'BDTICM'], max_val=100):
"""
Clean the soil data by:
- Replacing extreme values with NaN (based on max_val).
- Normalizing and capping specific fields (e.g., BDRICM/BDTICM).
- Filling missing values using forward and backward fill.
Parameters
----------
exclude_cols : list of str
Columns to exclude from NaN replacement.
exclude_patterns : list of str
Column name substrings to skip when applying value caps.
max_val : float
Maximum valid threshold for general soil values.
"""
for col in self.gsde_df.columns:
if col not in exclude_cols:
if not any(pattern in col for pattern in exclude_patterns):
self.gsde_df.loc[self.gsde_df[col] > max_val, col] = np.nan
if 'BDRICM' in col or 'BDTICM' in col:
self.gsde_df.loc[self.gsde_df[col] == 9999.0, col] = np.nan
self.gsde_df[col] = self.gsde_df[col] / 100.0
if 'BDRICM' in col:
self.gsde_df.loc[self.gsde_df[col] > 2.0, col] = 2.0
self.gsde_df.loc[self.gsde_df[col] < 0.1, col] = 0.1
if 'BDTICM' in col:
self.gsde_df.loc[self.gsde_df[col] > 4.1, col] = 4.1
self.gsde_df.loc[self.gsde_df[col] < 0.1, col] = 0.1
self.gsde_df.sort_values('COMID', inplace=True)
self.gsde_df.fillna(method='ffill', inplace=True)
self.gsde_df.fillna(method='bfill', inplace=True)
[docs]
def calculate_weights(self, gsde_intervals, mesh_intervals):
"""
Calculate the contribution weights from each GSDE layer to each model-defined
mesh layer based on depth intervals.
Parameters:
-----------
gsde_intervals : list of tuple
List of tuples representing GSDE depth layers (e.g., [(0, 0.045), ...]).
mesh_intervals : list of tuple
Target model layer depths (e.g., [(0, 0.1), (0.1, 0.35), ...]).
"""
self.mesh_intervals = mesh_intervals
self.num_soil_lyrs = len(mesh_intervals)
weights_used = []
for mesh_interval in mesh_intervals:
start, end = mesh_interval
weights = []
for gsde_interval in gsde_intervals:
gsde_start, gsde_end = gsde_interval
overlap_start = max(start, gsde_start)
overlap_end = min(end, gsde_end)
weight = (overlap_end - overlap_start) / (end - start) if overlap_start < overlap_end else 0
weights.append(weight)
weights_used.append([w / sum(weights) for w in weights if sum(weights) > 0])
self.weights_used = weights_used
[docs]
def calculate_mesh_values(self, column_names):
"""
Apply the calculated weights to soil property columns and generate
epth-integrated values for each mesh layer.
Parameters:
-----------
column_names : dict
Dictionary mapping each property (e.g., "CLAY", "OC") to its source columns.
Example: {'CLAY': ['CLAY1', 'CLAY2', ...], 'OC': ['OC1', 'OC2', ...]}
"""
for prop, cols in column_names.items():
extracted_data = self.gsde_df[cols]
weights_array = np.array(self.weights_used).T
mesh_values = np.dot(extracted_data, weights_array)
if prop == 'OC':
mesh_values *= 0.01 * 1.72
for i, mesh_col in enumerate([f'mesh{prop}{j+1}' for j in range(len(self.mesh_intervals))]):
self.gsde_df[mesh_col] = mesh_values[:, i]
[docs]
def merge_and_save_shapefile(self):
"""
Merge the processed soil data (via COMID) into the input shapefile and save the result.
Output is a GeoDataFrame with mesh values appended as new attributes.
"""
gdf = gpd.read_file(self.input_basin).to_crs(epsg=4326)
gdf['COMID'] = gdf['COMID'].astype(int)
self.merged_gdf = gdf.merge(self.gsde_df, on='COMID', how='left')
self.merged_gdf.to_file(self.output_shapefile)
[docs]
def set_coordinates(self, input_ddb):
"""
Set longitude and latitude values from a NetCDF drainage database.
Parameters:
-----------
input_ddb : str
Path to the NetCDF drainage database file.
"""
db = xr.open_dataset(input_ddb)
self.lon = db.variables['lon'].values
self.lat = db.variables['lat'].values
self.segid = db.variables['subbasin'].values
db.close()