Source code for VectorPreProcessing.remap_climate_to_ddb

import os
import argparse
import numpy as np
import xarray as xs
import geopandas as gpd
import glob
from natsort import natsorted

[docs] def remap_rdrs_climate_data(input_directory, output_directory, input_basin, input_ddb, start_year, end_year): """ Remap RDRS climate data to a drainage database (DDB) format for a range of years. Parameters ---------- input_directory : str Path to the directory containing input NetCDF files. output_directory : str Path to the directory where processed files will be saved. input_basin : str Path to the basin shapefile. input_ddb : str Path to the drainage database NetCDF file. start_year : int Start year of the data to process. end_year : int End year of the data to process. Example ------- >>> from remap_climate_to_ddb import remap_rdrs_climate_data >>> remap_rdrs_climate_data( ... input_directory="path/to/input", ... output_directory="path/to/output", ... input_basin="path/to/basin.shp", ... input_ddb="path/to/ddb.nc", ... start_year=2000, ... end_year=2020 ... ) """ os.makedirs(output_directory, exist_ok=True) # Read basin and drainage database files basin = gpd.read_file(input_basin) db = xs.open_dataset(input_ddb) lon = db.variables['lon'].values lat = db.variables['lat'].values segid = db.variables['subbasin'].values db.close() print("Basin Info:") print(basin.head()) print("Longitude:", lon[:5]) print("Latitude:", lat[:5]) print("Subbasin IDs:", segid[:5]) # List files based on year range files = [] for year in range(start_year, end_year + 1): year_files = glob.glob(os.path.join(input_directory, f"*_{str(year)}*.nc")) files.extend(year_files) print(f"Files for year {year}: {year_files}") # Sort files in natural order files = natsorted(files) print("Sorted files:", files) # Process each file for file_path in files: print(f"Processing file: {file_path}") process_file(file_path, segid, lon, lat, output_directory)
[docs] def remap_rdrs_climate_data_single_year(input_directory, output_directory, input_basin, input_ddb, year): """ Remap RDRS climate data to a drainage database (DDB) format for a single year. Parameters ---------- input_directory : str Path to the directory containing input NetCDF files. output_directory : str Path to the directory where processed files will be saved. input_basin : str Path to the basin shapefile. input_ddb : str Path to the drainage database NetCDF file. year : int Year of the data to process. Example ------- >>> from remap_climate_to_ddb import remap_rdrs_climate_data_single_year >>> remap_rdrs_climate_data_single_year( ... input_directory="path/to/input", ... output_directory="path/to/output", ... input_basin="path/to/basin.shp", ... input_ddb="path/to/ddb.nc", ... year=2020 ... ) """ remap_rdrs_climate_data(input_directory, output_directory, input_basin, input_ddb, year, year)
[docs] def process_file(file_path, segid, lon, lat, output_directory): """ Process a single NetCDF file and remap its data to the drainage database (DDB) format. Parameters ---------- file_path : str Path to the input NetCDF file. segid : numpy.ndarray Array of subbasin IDs from the drainage database. lon : numpy.ndarray Array of longitude values from the drainage database. lat : numpy.ndarray Array of latitude values from the drainage database. output_directory : str Path to the directory where the processed file will be saved. Example ------- >>> from remap_climate_to_ddb import process_file >>> process_file( ... file_path="path/to/input.nc", ... segid=subbasin_ids, ... lon=longitudes, ... lat=latitudes, ... output_directory="path/to/output" ... ) """ print(f"Started processing file: {file_path}") forc = xs.open_dataset(file_path) # Extract indices of forcing IDs based on the drainage database ind = [] for i in range(len(segid)): fid = np.where(np.int32(forc['COMID'].values) == segid[i])[0] ind = np.append(ind, fid) ind = np.int32(ind) # Create a new dataset with data ordered as needed forc_vec = xs.Dataset() variables_to_process = ['RDRS_v2.1_A_PR0_SFC', 'RDRS_v2.1_P_P0_SFC', 'RDRS_v2.1_P_HU_09944', 'RDRS_v2.1_P_TT_09944', 'RDRS_v2.1_P_FB_SFC', 'RDRS_v2.1_P_FI_SFC', 'RDRS_v2.1_P_UVC_09944'] for var in variables_to_process: data = forc[var].values[:, ind] forc_vec[var] = (("time", "subbasin"), data) forc_vec[var].attrs = forc[var].attrs # Correctly setting coordinates: forc_vec = forc_vec.assign_coords( time=forc['time'].values, lon=(['subbasin'], lon[ind]), lat=(['subbasin'], lat[ind]) ) forc_vec['lon'].attrs = { 'long_name': 'longitude', 'units': 'degrees_east' } forc_vec['lat'].attrs = { 'long_name': 'latitude', 'units': 'degrees_north' } # Metadata and attributes forc_vec.attrs.update({ 'Conventions': 'CF-1.6', 'history': 'Processed on Apr 06, 2024', 'License': 'The data were written by Fuad Yassin.', 'featureType': 'timeSeries' }) # Define coordinate system forc_vec['crs'] = xs.DataArray(np.int32(1), attrs={ 'grid_mapping_name': 'latitude_longitude', 'longitude_of_prime_meridian': 0.0, 'semi_major_axis': 6378137.0, 'inverse_flattening': 298.257223563 }) # Define a variable for the points and set the 'timeseries_id' forc_vec['subbasin'] = xs.DataArray(segid, dims=['subbasin']) forc_vec['subbasin'].attrs.update({ 'long_name': 'shape_id', 'units': '1', 'cf_role': 'timeseries_id' }) # Save to netCDF output_path = os.path.join(output_directory, os.path.basename(file_path).replace('.nc', '_modified.nc')) encoding = {var: {'zlib': True, 'complevel': 6} for var in forc_vec.data_vars} forc_vec.to_netcdf(output_path, encoding=encoding) print(f"Processed and saved: {output_path}") forc.close() print(f"Finished processing file: {file_path}")
if __name__ == "__main__": parser = argparse.ArgumentParser(description="Process RDRS climate data.") subparsers = parser.add_subparsers(dest="command") all_years_parser = subparsers.add_parser("all_years", help="Process data for a range of years.") all_years_parser.add_argument("--input_directory", required=True, help="Path to the input directory.") all_years_parser.add_argument("--output_directory", required=True, help="Path to the output directory.") all_years_parser.add_argument("--input_basin", required=True, help="Path to the basin shapefile.") all_years_parser.add_argument("--input_ddb", required=True, help="Path to the drainage database NetCDF file.") all_years_parser.add_argument("--start_year", type=int, required=True, help="Start year of the data to process.") all_years_parser.add_argument("--end_year", type=int, required=True, help="End year of the data to process.") single_year_parser = subparsers.add_parser("single_year", help="Process data for a single year.") single_year_parser.add_argument("--input_directory", required=True, help="Path to the input directory.") single_year_parser.add_argument("--output_directory", required=True, help="Path to the output directory.") single_year_parser.add_argument("--input_basin", required=True, help="Path to the basin shapefile.") single_year_parser.add_argument("--input_ddb", required=True, help="Path to the drainage database NetCDF file.") single_year_parser.add_argument("--year", type=int, required=True, help="Year of the data to process.") args = parser.parse_args() if args.command == "all_years": remap_rdrs_climate_data( args.input_directory, args.output_directory, args.input_basin, args.input_ddb, args.start_year, args.end_year ) elif args.command == "single_year": remap_rdrs_climate_data_single_year( args.input_directory, args.output_directory, args.input_basin, args.input_ddb, args.year )