Skip to content

Commit ee39d17

Browse files
authored
FEAT-#2491: optimized groupby dictionary aggregation (#2534)
Signed-off-by: Dmitry Chigarev <[email protected]>
1 parent ab29ed6 commit ee39d17

File tree

8 files changed

+472
-254
lines changed

8 files changed

+472
-254
lines changed

asv_bench/benchmarks/benchmarks.py

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,46 +57,62 @@ def execute(df):
5757
return df.shape
5858

5959

60-
class TimeMultiColumnGroupby:
61-
param_names = ["data_size", "count_columns"]
62-
params = [UNARY_OP_DATA_SIZE, [6]]
63-
64-
def setup(self, data_size, count_columns):
60+
class BaseTimeGroupBy:
61+
def setup(self, data_size, ncols=1):
6562
self.df = generate_dataframe(
6663
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
6764
)
68-
self.groupby_columns = [col for col in self.df.columns[:count_columns]]
65+
self.groupby_columns = self.df.columns[:ncols].tolist()
66+
6967

70-
def time_groupby_agg_quan(self, data_size, count_columns):
68+
class TimeMultiColumnGroupby(BaseTimeGroupBy):
69+
param_names = ["data_size", "ncols"]
70+
params = [UNARY_OP_DATA_SIZE, [6]]
71+
72+
def time_groupby_agg_quan(self, data_size, ncols):
7173
execute(self.df.groupby(by=self.groupby_columns).agg("quantile"))
7274

73-
def time_groupby_agg_mean(self, data_size, count_columns):
75+
def time_groupby_agg_mean(self, data_size, ncols):
7476
execute(self.df.groupby(by=self.groupby_columns).apply(lambda df: df.mean()))
7577

7678

77-
class TimeGroupByDefaultAggregations:
79+
class TimeGroupByDefaultAggregations(BaseTimeGroupBy):
7880
param_names = ["data_size"]
7981
params = [
8082
UNARY_OP_DATA_SIZE,
8183
]
8284

83-
def setup(self, data_size):
84-
self.df = generate_dataframe(
85-
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
86-
)
87-
self.groupby_column = self.df.columns[0]
88-
8985
def time_groupby_count(self, data_size):
90-
execute(self.df.groupby(by=self.groupby_column).count())
86+
execute(self.df.groupby(by=self.groupby_columns).count())
9187

9288
def time_groupby_size(self, data_size):
93-
execute(self.df.groupby(by=self.groupby_column).size())
89+
execute(self.df.groupby(by=self.groupby_columns).size())
9490

9591
def time_groupby_sum(self, data_size):
96-
execute(self.df.groupby(by=self.groupby_column).sum())
92+
execute(self.df.groupby(by=self.groupby_columns).sum())
9793

9894
def time_groupby_mean(self, data_size):
99-
execute(self.df.groupby(by=self.groupby_column).mean())
95+
execute(self.df.groupby(by=self.groupby_columns).mean())
96+
97+
98+
class TimeGroupByDictionaryAggregation(BaseTimeGroupBy):
99+
param_names = ["data_size", "operation_type"]
100+
params = [UNARY_OP_DATA_SIZE, ["reduction", "aggregation"]]
101+
operations = {
102+
"reduction": ["sum", "count", "prod"],
103+
"aggregation": ["quantile", "std", "median"],
104+
}
105+
106+
def setup(self, data_size, operation_type):
107+
super().setup(data_size)
108+
self.cols_to_agg = self.df.columns[1:4]
109+
operations = self.operations[operation_type]
110+
self.agg_dict = {
111+
c: operations[i % len(operations)] for i, c in enumerate(self.cols_to_agg)
112+
}
113+
114+
def time_groupby_dict_agg(self, data_size, operation_type):
115+
execute(self.df.groupby(by=self.groupby_columns).agg(self.agg_dict))
100116

101117

102118
class TimeJoin:

modin/backends/pandas/query_compiler.py

Lines changed: 75 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
)
2424
from pandas.core.base import DataError
2525
from typing import Type, Callable
26+
from collections.abc import Iterable, Container
2627
import warnings
2728

2829

@@ -37,6 +38,7 @@
3738
ReductionFunction,
3839
BinaryFunction,
3940
GroupbyReduceFunction,
41+
groupby_reduce_functions,
4042
)
4143

4244

@@ -2443,33 +2445,57 @@ def _callable_func(self, func, axis, *args, **kwargs):
24432445
# nature. They require certain data to exist on the same partition, and
24442446
# after the shuffle, there should be only a local map required.
24452447

2446-
groupby_count = GroupbyReduceFunction.register(
2447-
lambda df, **kwargs: df.count(**kwargs), lambda df, **kwargs: df.sum(**kwargs)
2448-
)
2449-
groupby_any = GroupbyReduceFunction.register(
2450-
lambda df, **kwargs: df.any(**kwargs), lambda df, **kwargs: df.any(**kwargs)
2451-
)
2452-
groupby_min = GroupbyReduceFunction.register(
2453-
lambda df, **kwargs: df.min(**kwargs), lambda df, **kwargs: df.min(**kwargs)
2454-
)
2455-
groupby_prod = GroupbyReduceFunction.register(
2456-
lambda df, **kwargs: df.prod(**kwargs), lambda df, **kwargs: df.prod(**kwargs)
2457-
)
2458-
groupby_max = GroupbyReduceFunction.register(
2459-
lambda df, **kwargs: df.max(**kwargs), lambda df, **kwargs: df.max(**kwargs)
2460-
)
2461-
groupby_all = GroupbyReduceFunction.register(
2462-
lambda df, **kwargs: df.all(**kwargs), lambda df, **kwargs: df.all(**kwargs)
2463-
)
2464-
groupby_sum = GroupbyReduceFunction.register(
2465-
lambda df, **kwargs: df.sum(**kwargs), lambda df, **kwargs: df.sum(**kwargs)
2466-
)
2448+
groupby_count = GroupbyReduceFunction.register(*groupby_reduce_functions["count"])
2449+
groupby_any = GroupbyReduceFunction.register(*groupby_reduce_functions["any"])
2450+
groupby_min = GroupbyReduceFunction.register(*groupby_reduce_functions["min"])
2451+
groupby_prod = GroupbyReduceFunction.register(*groupby_reduce_functions["prod"])
2452+
groupby_max = GroupbyReduceFunction.register(*groupby_reduce_functions["max"])
2453+
groupby_all = GroupbyReduceFunction.register(*groupby_reduce_functions["all"])
2454+
groupby_sum = GroupbyReduceFunction.register(*groupby_reduce_functions["sum"])
24672455
groupby_size = GroupbyReduceFunction.register(
2468-
lambda df, **kwargs: pandas.DataFrame(df.size()),
2469-
lambda df, **kwargs: df.sum(),
2470-
method="size",
2456+
*groupby_reduce_functions["size"], method="size"
24712457
)
24722458

2459+
def _groupby_dict_reduce(
2460+
self, by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop=False
2461+
):
2462+
map_dict = {}
2463+
reduce_dict = {}
2464+
rename_columns = any(
2465+
not isinstance(fn, str) and isinstance(fn, Iterable)
2466+
for fn in agg_func.values()
2467+
)
2468+
for col, col_funcs in agg_func.items():
2469+
if not rename_columns:
2470+
map_dict[col], reduce_dict[col] = groupby_reduce_functions[col_funcs]
2471+
continue
2472+
2473+
if isinstance(col_funcs, str):
2474+
col_funcs = [col_funcs]
2475+
2476+
map_fns = []
2477+
for i, fn in enumerate(col_funcs):
2478+
if not isinstance(fn, str) and isinstance(fn, Iterable):
2479+
new_col_name, func = fn
2480+
elif isinstance(fn, str):
2481+
new_col_name, func = fn, fn
2482+
else:
2483+
raise TypeError
2484+
2485+
map_fns.append((new_col_name, groupby_reduce_functions[func][0]))
2486+
reduce_dict[(col, new_col_name)] = groupby_reduce_functions[func][1]
2487+
map_dict[col] = map_fns
2488+
return GroupbyReduceFunction.register(map_dict, reduce_dict)(
2489+
query_compiler=self,
2490+
by=by,
2491+
axis=axis,
2492+
groupby_args=groupby_kwargs,
2493+
map_args=agg_kwargs,
2494+
reduce_args=agg_kwargs,
2495+
numeric_only=False,
2496+
drop=drop,
2497+
)
2498+
24732499
def groupby_agg(
24742500
self,
24752501
by,
@@ -2481,6 +2507,31 @@ def groupby_agg(
24812507
groupby_kwargs,
24822508
drop=False,
24832509
):
2510+
def is_reduce_fn(fn, deep_level=0):
2511+
if not isinstance(fn, str) and isinstance(fn, Container):
2512+
# `deep_level` parameter specifies the number of nested containers that was met:
2513+
# - if it's 0, then we're outside of container, `fn` could be either function name
2514+
# or container of function names/renamers.
2515+
# - if it's 1, then we're inside container of function names/renamers. `fn` must be
2516+
# either function name or renamer (renamer is some container which length == 2,
2517+
# the first element is the new column name and the second is the function name).
2518+
assert deep_level == 0 or (
2519+
deep_level > 0 and len(fn) == 2
2520+
), f"Got the renamer with incorrect length, expected 2 got {len(fn)}."
2521+
return (
2522+
all(is_reduce_fn(f, deep_level + 1) for f in fn)
2523+
if deep_level == 0
2524+
else is_reduce_fn(fn[1], deep_level + 1)
2525+
)
2526+
return isinstance(fn, str) and fn in groupby_reduce_functions
2527+
2528+
if isinstance(agg_func, dict) and all(
2529+
is_reduce_fn(x) for x in agg_func.values()
2530+
):
2531+
return self._groupby_dict_reduce(
2532+
by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop
2533+
)
2534+
24842535
if callable(agg_func):
24852536
agg_func = wrap_udf_function(agg_func)
24862537

modin/data_management/functions/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from .reductionfunction import ReductionFunction
1818
from .foldfunction import FoldFunction
1919
from .binary_function import BinaryFunction
20-
from .groupby_function import GroupbyReduceFunction
20+
from .groupby_function import GroupbyReduceFunction, groupby_reduce_functions
2121

2222
__all__ = [
2323
"Function",
@@ -27,4 +27,5 @@
2727
"FoldFunction",
2828
"BinaryFunction",
2929
"GroupbyReduceFunction",
30+
"groupby_reduce_functions",
3031
]

0 commit comments

Comments
 (0)