Skip to content

Commit fddc86c

Browse files
committed
Updated read from HEALPix catalog file.
1. Updated these codes to read HEALPix data from catalog file. - pyflextrkr/idfeature_driver.py - pyflextrkr/idclouds_tbpf.py - pyflextrkr/remap_healpix.py 2. Updated the example config file: config/config_mcs_tbpf_scream_healpix9.yml 3. Added intake libraries in the environment list: - environment.yml - requirements.txt
1 parent 30fb24c commit fddc86c

File tree

6 files changed

+75
-59
lines changed

6 files changed

+75
-59
lines changed

config/config_mcs_tbpf_scream_healpix9.yml

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,28 @@ timeout: 3600 # [seconds] Dask timeout limit
2626
startdate: '20190901.0000'
2727
enddate: '20200901.0000'
2828

29+
# HEALPix parameters
30+
catalog_file: '/global/cfs/cdirs/m4549/scream-cess-healpix/scream_catalog.yaml'
31+
catalog_source: 'scream2D_hrly'
32+
# Catalog parameters, can have multiple entries
33+
catalog_params:
34+
zoom: 9
35+
# time: 'PT1H'
36+
olr_varname: 'rlut' # OLR variable name
37+
pcp_varname: 'pr' # Precipitation variable name
38+
pcp_convert_factor: 3600000. # Convert precipitation flux from [m/s] to [mm/h]
39+
input_format: 'zarr' # 'zarr' or 'netcdf'
40+
2941
# Specify tracking input data date/time string format
3042
# This is the preprocessed file that contains Tb & rainrate
3143
# E.g., databasename20181101.011503.nc --> yyyymodd.hhmmss
3244
# E.g., databasename2018-11-01_01:15:00 --> yyyy-mo-dd_hh:mm:ss
3345
time_format: 'yyyymoddhh'
34-
databasename: 'scream2D_hrly_rlut_hp9_v6'
35-
precipdata_basename: 'scream2D_hrly_pr_hp9_v6'
36-
hp_zoomlev: '9' # HEALPix zoom level
37-
input_format: 'zarr' # 'zarr' or 'netcdf'
46+
databasename: '' # This is not used for HEALPix data
3847

3948
# Input files directory
40-
clouddata_path: '/pscratch/sd/w/wcmca1/scream-cess-healpix/'
41-
precipdata_path: '/pscratch/sd/w/wcmca1/scream-cess-healpix/'
49+
clouddata_path: '' # Not needed for catalog data
50+
# precipdata_path: '/pscratch/sd/w/wcmca1/scream-cess-healpix/' # Not needed for catalog data
4251
# Working directory for the tracking data
4352
root_path: '/pscratch/sd/w/wcmca1/scream-cess-healpix/mcs_tracking_hp9/'
4453
# Working sub-directory names
@@ -60,9 +69,6 @@ pixel_radius: 10.0 # [km] Spatial resolution of the input data
6069
datatimeresolution: 1.0 # [hour] Temporal resolution of the input data
6170
# Variable names in the input data
6271
olr2tb: True
63-
olr_varname: 'rlut'
64-
pcp_varname: 'pr'
65-
pcp_convert_factor: 3600000. # Convert precipitation flux from [m/s] to [mm/h]
6672
clouddatasource: 'model'
6773
time_dimname: 'time'
6874
x_dimname: 'lon'

environment.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ dependencies:
1010
- dask
1111
- ffmpeg
1212
- healpix
13+
- intake==0.7.0
14+
- intake-xarray==0.7.0
1315
- ipython
1416
- joblib
1517
- matplotlib

pyflextrkr/idclouds_tbpf.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ def idclouds_tbpf(
107107
# Read landmask file to get target lat/lon grid
108108
landmask_filename = config.get('landmask_filename', None)
109109
dslm = xr.open_dataset(landmask_filename)
110-
lon = dslm.lon.data
111-
lat = dslm.lat.data
110+
lon = dslm.lon.values
111+
lat = dslm.lat.values
112112
dslm.close()
113113

114114
# Find the HEALPix pixels that are closest to the target grid points
@@ -120,11 +120,14 @@ def idclouds_tbpf(
120120
# Note this would change the calendar type of the original time coordinate
121121
time_coord = input_data[time_coordname]
122122
time_pd = pd.to_datetime(time_coord.dt.strftime("%Y-%m-%dT%H:%M:%S").item())
123-
# Regrid variables, expand time dimension so the variable has dimensions [time, y, x]
124-
olr = input_data[olr_varname].isel(cell=pix).expand_dims({time_dimname:[time_pd]})
125-
pcp = input_data[pcp_varname].isel(cell=pix).expand_dims({time_dimname:[time_pd]})
126-
# Combine DataArrays into a single Dataset
127-
rawdata = xr.Dataset({olr_varname: olr, pcp_varname: pcp})
123+
# Remap DataSet to lat/lon grid, expand time dimension so it has dimensions [time, y, x]
124+
rawdata = input_data.isel(cell=pix).expand_dims({time_dimname:[time_pd]})
125+
126+
# # Remap variables, expand time dimension so the variable has dimensions [time, y, x]
127+
# olr = input_data[olr_varname].isel(cell=pix).expand_dims({time_dimname:[time_pd]})
128+
# pcp = input_data[pcp_varname].isel(cell=pix).expand_dims({time_dimname:[time_pd]})
129+
# # Combine DataArrays into a single Dataset
130+
# rawdata = xr.Dataset({olr_varname: olr, pcp_varname: pcp})
128131

129132
# NetCDF format
130133
elif input_format.lower() == "netcdf":
@@ -164,8 +167,8 @@ def idclouds_tbpf(
164167
logger.debug(f'Added Timestamp: {file_timestamp} calculated from filename to the input data')
165168

166169
# Get data coordinates
167-
lat = rawdata[y_coordname].data
168-
lon = rawdata[x_coordname].data
170+
lat = rawdata[y_coordname].values
171+
lon = rawdata[x_coordname].values
169172
time_decode = rawdata[time_coordname]
170173

171174
# Check coordinate dimensions
@@ -210,8 +213,8 @@ def idclouds_tbpf(
210213
# Subset dataset
211214
rawdata = rawdata[subset_dict]
212215
# Get lat/lon coordinates again
213-
lat = rawdata[y_coordname].data
214-
lon = rawdata[x_coordname].data
216+
lat = rawdata[y_coordname].values
217+
lon = rawdata[x_coordname].values
215218
# Check coordinate dimensions
216219
if (lat.ndim == 1) | (lon.ndim == 1):
217220
# Mesh 1D coordinate into 2D
@@ -223,13 +226,12 @@ def idclouds_tbpf(
223226

224227
# Convert OLR to Tb if olr2tb flag is set
225228
if olr2tb is True:
226-
olr = rawdata[olr_varname].data
229+
olr = rawdata[olr_varname].values
227230
original_ir = olr_to_tb(olr)
228231
else:
229232
# Read Tb from data
230-
original_ir = rawdata[tb_varname].data
231-
rawdata.close()
232-
233+
original_ir = rawdata[tb_varname].values
234+
# rawdata.close()
233235

234236
# Loop over each time
235237
ntimes = get_length(time_decode)
@@ -337,7 +339,7 @@ def idclouds_tbpf(
337339
if final_nclouds > 0:
338340

339341
# Convert precipitation factor to unit [mm/hour]
340-
pcp = rawdata[pcp_varname].data * pcp_convert_factor
342+
pcp = rawdata[pcp_varname].values * pcp_convert_factor
341343

342344
# For 'gpmirimerg', precipitation is averaged to 1-hourly
343345
# and put in first time dimension
@@ -359,8 +361,9 @@ def idclouds_tbpf(
359361
# Replace values <=0 with 0 before smoothing
360362
pcp_linkpf[pcp_linkpf <= 0] = 0
361363

364+
# Check piriodic boundary conditions
362365
if pbc_direction != 'none':
363-
# Step 2: Extend and pad data
366+
# Extend and pad data
364367
pcp_linkpf_orig = np.copy(pcp_linkpf)
365368
pcp_linkpf, padded_x, padded_y = pad_and_extend(pcp_linkpf, config)
366369
# Smooth pcp_linkpf using convolve filter (handles NaN)

pyflextrkr/idfeature_driver.py

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@ def idfeature_driver(config):
2222
logger.info('Identifying features from raw data')
2323

2424
clouddata_path = config["clouddata_path"]
25-
databasename = config["databasename"]
25+
databasename = config.get("databasename", "")
2626
start_basetime = config.get("start_basetime", None)
2727
end_basetime = config.get("end_basetime", None)
28-
# time_format = config["time_format"]
2928
run_parallel = config["run_parallel"]
3029
feature_type = config["feature_type"]
3130
input_format = config.get("input_format", "netcdf")
@@ -46,50 +45,54 @@ def idfeature_driver(config):
4645

4746
if input_format.lower() == "zarr":
4847

49-
# Get precipitation data info from config
50-
precipdata_path = config["precipdata_path"]
51-
precipdata_basename = config["precipdata_basename"]
48+
import intake # For catalogs
49+
50+
# Get catalog info from config
51+
catalog_file = config["catalog_file"]
52+
catalog_source = config["catalog_source"]
53+
catalog_params = config.get("catalog_params", {})
54+
olr_varname = config['olr_varname']
55+
pcp_varname = config['pcp_varname']
5256
start_date = config["startdate"]
5357
end_date = config["enddate"]
5458

55-
# OLR Zarr filename
56-
fn_olr = f"{clouddata_path}{databasename}.zarr"
57-
fn_pr = f"{precipdata_path}{precipdata_basename}.zarr"
58-
# Read HEALPix zarr file
59-
ds_olr = xr.open_dataset(fn_olr)
60-
ds_pr = xr.open_dataset(fn_pr)
59+
# Load the catalog
60+
in_catalog = intake.open_catalog(catalog_file)
61+
# Get the DataSet from the catalog
62+
ds = in_catalog[catalog_source](**catalog_params).to_dask()
63+
64+
# Subset to keep only the required variables
65+
all_vars = list(ds.data_vars)
66+
keep_vars = [olr_varname, pcp_varname]
67+
drop_vars = [var for var in all_vars if var not in keep_vars]
68+
ds = ds.drop_vars(drop_vars)
69+
6170
# Check the calendar type of the time coordinate
62-
calendar = ds_olr['time'].dt.calendar
63-
# Add coordinates (lat and lon)
64-
# ds_olr = ds_olr.pipe(egh.attach_coords)
65-
# ds_pr = ds_pr.pipe(egh.attach_coords)
71+
calendar = ds['time'].dt.calendar
6672
# Convert start_date and end_date to pandas.Timestamp
6773
start_datetime = pd.to_datetime(start_date, format='%Y%m%d.%H%M')
6874
end_datetime = pd.to_datetime(end_date, format='%Y%m%d.%H%M')
6975
# Convert pandas.Timestamp to cftime objects based on the calendar type
7076
start_datetime_cftime = convert_to_cftime(start_datetime, calendar)
7177
end_datetime_cftime = convert_to_cftime(end_datetime, calendar)
7278
# Subset the Dataset using the cftime objects
73-
ds_olr = ds_olr.sel(time=slice(start_datetime_cftime, end_datetime_cftime))
74-
ds_pr = ds_pr.sel(time=slice(start_datetime_cftime, end_datetime_cftime))
79+
ds = ds.sel(time=slice(start_datetime_cftime, end_datetime_cftime))
7580

7681
# Get the number of time steps
77-
nfiles = ds_olr.sizes['time']
82+
nfiles = ds.sizes['time']
7883
logger.info(f"Total number of time steps to process: {nfiles}")
7984

8085
# Serial
8186
if run_parallel == 0:
8287
for ifile in range(0, nfiles):
8388
# Subset one time from the DataSets and combine them
84-
ds = xr.merge([ds_olr.isel(time=ifile), ds_pr.isel(time=ifile)])
85-
id_feature(ds, config)
89+
id_feature(ds.isel(time=ifile), config)
8690
# Parallel
8791
elif run_parallel >= 1:
8892
results = []
8993
for ifile in range(0, nfiles):
9094
# Subset one time from the DataSets and combine them
91-
ds = xr.merge([ds_olr.isel(time=ifile), ds_pr.isel(time=ifile)])
92-
result = dask.delayed(id_feature)(ds, config)
95+
result = dask.delayed(id_feature)(ds.isel(time=ifile), config)
9396
results.append(result)
9497
final_result = dask.compute(*results)
9598
wait(final_result)

pyflextrkr/remap_healpix.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import dask.array as da
77
import healpix as hp
8+
import intake
89
from pyflextrkr.ft_utilities import setup_logging
910

1011
def remap_mask_to_healpix(config):
@@ -30,18 +31,17 @@ def remap_mask_to_healpix(config):
3031
try:
3132
from dask.distributed import get_client
3233
client = get_client()
33-
parallel = True
3434
logger.info(f"Using existing Dask client with {len(client.scheduler_info()['workers'])} workers")
3535
except ValueError:
3636
logger.warning("No Dask client found, continuing without explicit client")
3737
client = None
38-
parallel = False
3938

4039
# Get config parameters
4140
pixeltracking_outpath = config.get("pixeltracking_outpath")
42-
clouddata_path = config.get("clouddata_path")
43-
databasename = config.get("databasename")
44-
hp_zoomlev = config.get("hp_zoomlev")
41+
catalog_file = config.get("catalog_file")
42+
catalog_source = config.get("catalog_source")
43+
catalog_params = config.get("catalog_params", {})
44+
hp_zoomlev = catalog_params.get("zoom")
4545
startdate = config.get("startdate")
4646
enddate = config.get("enddate")
4747
outpath = os.path.dirname(os.path.normpath(pixeltracking_outpath)) + "/"
@@ -50,8 +50,6 @@ def remap_mask_to_healpix(config):
5050
in_mask_filebase = presets.get("mask", {}).get("out_filebase", "mcs_mask_latlon_")
5151
# Input mask Zarr store
5252
in_mask_dir = f"{outpath}{in_mask_filebase}{startdate}_{enddate}.zarr"
53-
# Input HEALPix Zarr store
54-
in_hp_dir = f"{clouddata_path}{databasename}.zarr"
5553

5654
# Build output filename
5755
out_mask_filebase = presets.get("healpix", {}).get("out_filebase", "mcs_mask_hp")
@@ -67,8 +65,8 @@ def remap_mask_to_healpix(config):
6765
if os.path.exists(in_mask_dir) is False:
6866
logger.error(f"Input mask file {in_mask_dir} does not exist. Skipping remap.")
6967
return out_zarr
70-
if os.path.exists(in_hp_dir) is False:
71-
logger.error(f"Input HEALPix file {in_hp_dir} does not exist. Skipping remap.")
68+
if os.path.isfile(catalog_file) is False:
69+
logger.error(f"Catalog file {catalog_file} does not exist. Skipping remap.")
7270
return out_zarr
7371

7472
# Get chunk size from config
@@ -82,8 +80,10 @@ def remap_mask_to_healpix(config):
8280
# Modify mask grid for remapping
8381
ds_mask = prepare_grid_for_analysis(ds_mask)
8482

85-
# Read HEALPix zarr file
86-
ds_hp = xr.open_dataset(in_hp_dir)
83+
# Load the catalog
84+
in_catalog = intake.open_catalog(catalog_file)
85+
# Get the DataSet from the catalog
86+
ds_hp = in_catalog[catalog_source](**catalog_params).to_dask()
8787

8888
# Make remap lat/lon for HEALPix
8989
remap_lons, remap_lats = hp.pix2ang(

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ colormath
55
dask
66
ffmpeg
77
healpix
8+
intake==0.7.0
9+
intake-xarray==0.7.0
810
ipython
911
joblib
1012
matplotlib

0 commit comments

Comments
 (0)