Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

xarray: Larger than memory array using map_blocks dumping results into .zarr store

I am trying to parallelize an operation that generates a very large numpy array and usually blows up the memory of a machine that is running it.

What I came up with is the following workflow:

  1. Use Dask to generate a lazy zero filled array
  2. Use X-Array to generate a DataArray, using the previous lazy zero array with its appropriate coordinates etc...
  3. Using DataArray.map_blocks I call on a function write_values that gets a subset of a Numpy array from a separate file and then insert them into the appropriate location in the xarray.DataArray.
  4. Lazily convert to xarray.Dataset with a name for the DataArray
  5. Then I attempt to store into disk via to_zarr

First: Is this the appropriate to handle an operation that loops through the blocks in a chunked array?

Second: When I run this program, it executes while blowing up my memory, this could be due to the amount of tasks created via Dask? How can I optimize to never hit the memory limit of my machine.

Third: After this code runs, I get a zarr stored into disk, but it seems to not actually do the storing of the values I get from the external function. Is this the right way to change values in the disk stored array.

Problem: My function that writes the .zarr into disk, does not write the values from the numpy_returning_volume. I am thinking that it could be that I need to write the values while in the map_blocks function?

Thank you!

Fully working example:

import dask.array as da
import xarray as xr
import numpy as np
import pathlib
from dask.diagnostics import ProgressBar

class NumpyReturningVolume():
    def __init__(self):
        # self.data = da.random.random_sample([50000, 50000, 50000])
        self.data = np.random.random_sample([500, 1000, 100])

    def num_i(self):
        return self.data.shape[0]

    def num_j(self):
        return self.data.shape[1]

    def num_k(self):
        return self.data.shape[2]
    
    def get_float(self, start_coordinate, end_coordinate):
        return self.data[
            start_coordinate[0]:end_coordinate[0],
            start_coordinate[1]:end_coordinate[1],
            start_coordinate[2]:end_coordinate[2]
            ]


def write_values(chunk, **kwargs):

    start_coordinate = (chunk.coords["track"].values[0], chunk.coords["bin"].values[0], chunk.coords["time"].values[0])
    end_coordinate = (chunk.coords["track"].values[-1]+1, chunk.coords["bin"].values[-1]+1, chunk.coords["time"].values[-1]+1)

    volume_data = kwargs["volume"].get_float(start_coordinate, end_coordinate)
    chunk.data = volume_data

    return(chunk)


seismic_file_path = pathlib.Path("./")
seismic_file_name = "TEST_FILE.ds"
store_path = seismic_file_path.parent.joinpath(
    seismic_file_name + "_test.zarr")

numpy_returning_volume = NumpyReturningVolume()

dimensions = ('track', 'bin', 'time')

track_coords = np.arange(0, numpy_returning_volume.num_i(), 1, dtype=np.uint32)
bin_coords = np.arange(0, numpy_returning_volume.num_j(), 1, dtype=np.uint32)
time_coords = np.arange(0, numpy_returning_volume.num_k(), 1, dtype=np.uint32)

empty_arr = da.empty(shape=(
    numpy_returning_volume.num_i(),
    numpy_returning_volume.num_j(),
    numpy_returning_volume.num_k()),
    dtype=np.float32)

xarray_data = xr.DataArray(empty_arr, name="seis", coords={
    'track': track_coords,
    'bin': bin_coords, 'time': time_coords},
    dims=dimensions)

xarray_data.map_blocks(write_values, kwargs={
                       "volume": numpy_returning_volume}, template=xarray_data).compute()
xarray_data = xarray_data.to_dataset(name="seis")
delayed_results = xarray_data.to_zarr(store_path.__str__(), compute=False)

with ProgressBar():
    delayed_results.compute()

like image 356
gavargas Avatar asked Jan 24 '26 08:01

gavargas


1 Answers

OMG! I just realized that my problem was the simplest thing in the world! I just needed to set a variable equal to the result of map blocks and everything works. Here is the complete working script if anyone is interested. It generates a 6GB dataset though

import dask.array as da
import xarray as xr
import numpy as np
import pathlib
from dask.diagnostics import ProgressBar

class NumpyReturningVolume():
    def __init__(self):
        self.data = da.random.random_sample([1000, 2000, 1000])
        # self.data = np.random.random_sample([500, 1000, 100])

    def num_i(self):
        return self.data.shape[0]

    def num_j(self):
        return self.data.shape[1]

    def num_k(self):
        return self.data.shape[2]
    
    def get_float(self, start_coordinate, end_coordinate):
        return self.data[
            start_coordinate[0]:end_coordinate[0],
            start_coordinate[1]:end_coordinate[1],
            start_coordinate[2]:end_coordinate[2]
            ].compute()


def write_values(chunk, **kwargs):

    start_coordinate = (chunk.coords["track"].values[0], chunk.coords["bin"].values[0], chunk.coords["time"].values[0])
    end_coordinate = (chunk.coords["track"].values[-1]+1, chunk.coords["bin"].values[-1]+1, chunk.coords["time"].values[-1]+1)

    volume_data = kwargs["volume"].get_float(start_coordinate, end_coordinate)
    chunk.data = volume_data

    return(chunk)


seismic_file_path = pathlib.Path("./")
seismic_file_name = "TEST_FILE.ds"
store_path = seismic_file_path.parent.joinpath(
    seismic_file_name + "_test.zarr")

numpy_returning_volume = NumpyReturningVolume()

dimensions = ('track', 'bin', 'time')

track_coords = np.arange(0, numpy_returning_volume.num_i(), 1, dtype=np.uint32)
bin_coords = np.arange(0, numpy_returning_volume.num_j(), 1, dtype=np.uint32)
time_coords = np.arange(0, numpy_returning_volume.num_k(), 1, dtype=np.uint32)

empty_arr = da.empty(shape=(
    numpy_returning_volume.num_i(),
    numpy_returning_volume.num_j(),
    numpy_returning_volume.num_k()),
    dtype=np.float32)

xarray_data = xr.DataArray(empty_arr, name="seis", coords={
    'track': track_coords,
    'bin': bin_coords, 'time': time_coords},
    dims=dimensions)

# This xarray_data = is what I was missing!!
xarray_data = xarray_data.map_blocks(write_values, kwargs={
                       "volume": numpy_returning_volume}, template=xarray_data)
xarray_data = xarray_data.to_dataset(name="seis")
delayed_results = xarray_data.to_zarr(store_path.__str__(), compute=False)

with ProgressBar():
    delayed_results.compute()
like image 170
gavargas Avatar answered Jan 26 '26 22:01

gavargas