I'm trying to rechunk a NetCDF file collection and create a Zarr dataset on AWS S3. I have 168 original NetCDF4 classic files with arrays of dimension time: 1, y: 3840, x: 4608 chunked as chunks={'time':1, 'y':768, 'x':922}.
I want to write this output to Zarr, and I want to optimize for time series extraction, so include more time records in my chunks. I figured I would use xarray to help get the job done, since I have a lot of processors available to take advantage of Dask, and xarray has both xr.open_mfdataset and ds.to_zarr.
I first tried rechunking to chunks={'time':24, 'y':768, 'x':922} to match the input NetCDF4 chunking in x and y, but when I tried to write to Zarr it complained because it needed uniform chunk sizes in both x and y, only allowing non-uniform size in the last chunk along the time dimension (and unfortunately in the x dimension, the total size 4608 is not a multiple of chunk size 922.
So then I tried chunks={'time':168, 'y':384, 'x':288} and that started working, and proceeded very quickly for a few minutes, then got slower and slower. Eventually after 50 minutes, the cluster died with:
4072 distributed.core - INFO - Event loop was unresponsive in Worker for 1.41s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
4073 slurmstepd: error: Step 3294889.0 exceeded memory limit (25346188 > 25165824), being killed
Here's the code I'm using:
from dask.distributed import Client
import pandas as pd
import xarray as xr
import s3fs
import zarr
client = Client(scheduler_file='/home/rsignell/scheduler.json')
client

root = '/lustre/projects/hazards/cmgp/woodshole/rsignell/nwm/forcing_short_range/'
bucket_endpoint='https://s3.us-west-1.amazonaws.com/'
f_zarr = 'rsignell/nwm/test_week4'
dates = pd.date_range(start='2018-04-01T00:00', end='2018-04-07T23:00', freq='H')
urls = ['{}{}/nwm.t{}z.short_range.forcing.f001.conus.nc'.format(root,a.strftime('%Y%m%d'),a.strftime('%H')) for a in dates]
ds = xr.open_mfdataset(urls, concat_dim='time', chunks={'time':1, 'y':768, 'x':922})
ds = ds.drop(['ProjectionCoordinateSystem','time_bounds'])
ds = ds.chunk(chunks={'time':168, 'y':384, 'x':288}).persist()
ds
producing
<xarray.Dataset>
Dimensions: (reference_time: 168, time: 168, x: 4608, y: 3840)
Coordinates:
* reference_time (reference_time) datetime64[ns] 2018-04-01 ...
* x (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 -2.301e+06 ...
* y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 -1.917e+06 ...
* time (time) datetime64[ns] 2018-04-01T01:00:00 ...
Data variables:
T2D (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
LWDOWN (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
Q2D (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
U2D (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
V2D (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
PSFC (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
RAINRATE (time, y, x) float32 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
SWDOWN (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
Then I call
fs = s3fs.S3FileSystem(anon=False, client_kwargs=dict(endpoint_url=bucket_endpoint))
d = s3fs.S3Map(f_zarr, s3=fs)
compressor = zarr.Blosc(cname='zstd', clevel=3, shuffle=2)
encoding = {vname: {'compressor': compressor} for vname in ds.data_vars}
delayed_store = ds.to_zarr(store=d, mode='w', encoding=encoding, compute=False)
persist_store = delayed_store.persist(retries=100)
and just before it dies, the Dask Daskboard looks like this:

The total size of the NetCDF4 files is 20GB, so it seems a bit crazy that I've got over 500GB showing in the Dask Dashboard, and that 30 processors each with 60GB RAM is not sufficient for the job.
What am I doing wrong, or what would be a better solution?
I notice that you say you want to increase the number of chunks in the time dimension. Or maybe I've misunderstood.
You start with chunks specified as chunks={'time':1, 'y':768, 'x':922}, but then try chunks={'time':168, 'y':384, 'x':288} and find that the second one uses a large amount of memory.
The problem is that the chunks keyword specifies the size of the chunks, not the number of chunks!
In the first case the size of each chunk is 1*768*922 ~ 7e5, whereas in the second case the size of each chunk is 168*384*288 ~ 2e7.
The maximum number of chunks in time is achieved by chunks={'time': 1}.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With