Skip to content

Improved cohorts. #140

Closed
Closed
@dcherian

Description

@dcherian

Summary

We should be able to improve method="cohorts" by first applying the groupby reduction blockwise and then "shuffling". This should substantially reduce the amount of data being moved around.

Inspired by dask/dask#9302

Current status

In #55 I added support for cohorts with nD by arrays.

-                # indexes for a subset of groups
-                subset_idx = idx[np.isin(by, cohort)]
-                array_subset = array[..., subset_idx]
-                numblocks = len(array_subset.chunks[-1])
+                # equivalent of xarray.DataArray.where(mask, drop=True)
+                mask = np.isin(by, cohort)
+                indexer = [np.unique(v) for v in np.nonzero(mask)]
+                array_subset = array
+                for ax, idxr in zip(range(-by.ndim, 0), indexer):
+                    array_subset = np.take(array_subset, idxr, axis=ax)
+                numblocks = np.prod([len(array_subset.chunks[ax]) for ax in axis])
 
                 # get final result for these groups
                 r, *g = partial_agg(
                     array_subset,
-                    by[subset_idx],
+                    by[np.ix_(*indexer)],
                     expected_groups=cohort,
+                    # reindex to expected_groups at the blockwise step.
+                    # this approach avoids replacing non-cohort members with
+                    # np.nan or some other sentinel value, and preserves dtypes
+                    reindex=True,
                     # if only a single block along axis, we can just work blockwise
                     # inspired by https://github.com/dask/dask/issues/8361
-                    method="blockwise" if numblocks == 1 else "map-reduce",
+                    method="blockwise" if numblocks == 1 and len(axis) == by.ndim else "map-reduce",
                 )

The previous 1D version was easy and inspired by xarray's original algorithm. Basically select out the cohorts

-                subset_idx = idx[np.isin(by, cohort)]
-                array_subset = array[..., subset_idx]

The new one works like calling .where(..., drop=True) which really just leads to an explosion in tasks

+                # equivalent of xarray.DataArray.where(mask, drop=True)
+                mask = np.isin(by, cohort)
+                indexer = [np.unique(v) for v in np.nonzero(mask)]
+                array_subset = array
+                for ax, idxr in zip(range(-by.ndim, 0), indexer):
+                    array_subset = np.take(array_subset, idxr, axis=ax)

Importantly the model here is to first "shuffle" or select out cohorts, then reduce using dask_groupby_agg

Proposal

The insight from dask/dask#9302, as I understand it, is that it's better to groupby-reduce blockwise first and then shuffle. Ideally, that initial blockwise reduction is very effective and substantially reduces the amount of data duplication and replication that happens. Also, we always apply the blockwise reduction to all cohorts so we might as well just apply it once.

So the new model is blockwise reduce -> shuffle to cohorts -> tree-reduce

This shuffling would need to happen here after the blockwise call:

flox/flox/core.py

Line 1137 in fbc2af8

)

It will be quite hard, but doable, because our intermediate structures are dicts, not arrays, though we could consider moving to structured arrays.

Another wacky idea might be stick our dicts in a dask dataframe and use dask.dataframe.shuffle to move the data to the right places (Inspired by https://discourse.pangeo.io/t/tables-x-arrays-and-rasters/1945)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions