diff --git a/pandas/core/apply.py b/pandas/core/apply.py index 6af4557897a0d..fc322312a9195 100644 --- a/pandas/core/apply.py +++ b/pandas/core/apply.py @@ -2,14 +2,12 @@ import abc from collections import defaultdict -from contextlib import nullcontext from functools import partial import inspect from typing import ( TYPE_CHECKING, Any, Callable, - ContextManager, DefaultDict, Dict, Hashable, @@ -57,7 +55,6 @@ ABCSeries, ) -from pandas.core.base import SelectionMixin import pandas.core.common as com from pandas.core.construction import ensure_wrapped_if_datetimelike @@ -144,6 +141,18 @@ def __init__( def apply(self) -> DataFrame | Series: pass + @abc.abstractmethod + def agg_or_apply_list_like( + self, op_name: Literal["agg", "apply"] + ) -> DataFrame | Series: + pass + + @abc.abstractmethod + def agg_or_apply_dict_like( + self, op_name: Literal["agg", "apply"] + ) -> DataFrame | Series: + pass + def agg(self) -> DataFrame | Series | None: """ Provide an implementation for the aggregators. @@ -300,80 +309,76 @@ def agg_list_like(self) -> DataFrame | Series: """ return self.agg_or_apply_list_like(op_name="agg") - def agg_or_apply_list_like( - self, op_name: Literal["agg", "apply"] - ) -> DataFrame | Series: - from pandas.core.groupby.generic import ( - DataFrameGroupBy, - SeriesGroupBy, - ) - from pandas.core.reshape.concat import concat - - obj = self.obj - func = cast(List[AggFuncTypeBase], self.func) - kwargs = self.kwargs - if op_name == "apply": - if isinstance(self, FrameApply): - by_row = self.by_row - - elif isinstance(self, SeriesApply): - by_row = "_compat" if self.by_row else False - else: - by_row = False - kwargs = {**kwargs, "by_row": by_row} + def compute_list_like( + self, + op_name: Literal["agg", "apply"], + selected_obj: Series | DataFrame, + kwargs: dict[str, Any], + ) -> tuple[list[Hashable], list[Any]]: + """ + Compute agg/apply results for like-like input. - if getattr(obj, "axis", 0) == 1: - raise NotImplementedError("axis other than 0 is not supported") + Parameters + ---------- + op_name : {"agg", "apply"} + Operation being performed. + selected_obj : Series or DataFrame + Data to perform operation on. + kwargs : dict + Keyword arguments to pass to the functions. - if not isinstance(obj, SelectionMixin): - # i.e. obj is Series or DataFrame - selected_obj = obj - elif obj._selected_obj.ndim == 1: - # For SeriesGroupBy this matches _obj_with_exclusions - selected_obj = obj._selected_obj - else: - selected_obj = obj._obj_with_exclusions + Returns + ------- + keys : list[hashable] + Index labels for result. + results : list + Data for result. When aggregating with a Series, this can contain any + Python objects. + """ + func = cast(List[AggFuncTypeBase], self.func) + obj = self.obj results = [] keys = [] - is_groupby = isinstance(obj, (DataFrameGroupBy, SeriesGroupBy)) + # degenerate case + if selected_obj.ndim == 1: + for a in func: + colg = obj._gotitem(selected_obj.name, ndim=1, subset=selected_obj) + args = ( + [self.axis, *self.args] + if include_axis(op_name, colg) + else self.args + ) + new_res = getattr(colg, op_name)(a, *args, **kwargs) + results.append(new_res) - context_manager: ContextManager - if is_groupby: - # When as_index=False, we combine all results using indices - # and adjust index after - context_manager = com.temp_setattr(obj, "as_index", True) - else: - context_manager = nullcontext() + # make sure we find a good name + name = com.get_callable_name(a) or a + keys.append(name) - def include_axis(colg) -> bool: - return isinstance(colg, ABCDataFrame) or ( - isinstance(colg, ABCSeries) and op_name == "agg" - ) + else: + indices = [] + for index, col in enumerate(selected_obj): + colg = obj._gotitem(col, ndim=1, subset=selected_obj.iloc[:, index]) + args = ( + [self.axis, *self.args] + if include_axis(op_name, colg) + else self.args + ) + new_res = getattr(colg, op_name)(func, *args, **kwargs) + results.append(new_res) + indices.append(index) + keys = selected_obj.columns.take(indices) - with context_manager: - # degenerate case - if selected_obj.ndim == 1: - for a in func: - colg = obj._gotitem(selected_obj.name, ndim=1, subset=selected_obj) - args = [self.axis, *self.args] if include_axis(colg) else self.args - new_res = getattr(colg, op_name)(a, *args, **kwargs) - results.append(new_res) + return keys, results - # make sure we find a good name - name = com.get_callable_name(a) or a - keys.append(name) + def wrap_results_list_like( + self, keys: list[Hashable], results: list[Series | DataFrame] + ): + from pandas.core.reshape.concat import concat - else: - indices = [] - for index, col in enumerate(selected_obj): - colg = obj._gotitem(col, ndim=1, subset=selected_obj.iloc[:, index]) - args = [self.axis, *self.args] if include_axis(colg) else self.args - new_res = getattr(colg, op_name)(func, *args, **kwargs) - results.append(new_res) - indices.append(index) - keys = selected_obj.columns.take(indices) + obj = self.obj try: return concat(results, keys=keys, axis=1, sort=False) @@ -399,100 +404,93 @@ def agg_dict_like(self) -> DataFrame | Series: """ return self.agg_or_apply_dict_like(op_name="agg") - def agg_or_apply_dict_like( - self, op_name: Literal["agg", "apply"] - ) -> DataFrame | Series: - from pandas import Index - from pandas.core.groupby.generic import ( - DataFrameGroupBy, - SeriesGroupBy, - ) - from pandas.core.reshape.concat import concat - - assert op_name in ["agg", "apply"] + def compute_dict_like( + self, + op_name: Literal["agg", "apply"], + selected_obj: Series | DataFrame, + selection: Hashable | Sequence[Hashable], + kwargs: dict[str, Any], + ) -> tuple[list[Hashable], list[Any]]: + """ + Compute agg/apply results for dict-like input. + + Parameters + ---------- + op_name : {"agg", "apply"} + Operation being performed. + selected_obj : Series or DataFrame + Data to perform operation on. + selection : hashable or sequence of hashables + Used by GroupBy, Window, and Resample if selection is applied to the object. + kwargs : dict + Keyword arguments to pass to the functions. + Returns + ------- + keys : list[hashable] + Index labels for result. + results : list + Data for result. When aggregating with a Series, this can contain any + Python object. + """ obj = self.obj func = cast(AggFuncTypeDict, self.func) - kwargs = {} - if op_name == "apply": - by_row = "_compat" if self.by_row else False - kwargs.update({"by_row": by_row}) - - if getattr(obj, "axis", 0) == 1: - raise NotImplementedError("axis other than 0 is not supported") - - if not isinstance(obj, SelectionMixin): - # i.e. obj is Series or DataFrame - selected_obj = obj - selection = None - else: - selected_obj = obj._selected_obj - selection = obj._selection - func = self.normalize_dictlike_arg(op_name, selected_obj, func) - is_groupby = isinstance(obj, (DataFrameGroupBy, SeriesGroupBy)) - context_manager: ContextManager - if is_groupby: - # When as_index=False, we combine all results using indices - # and adjust index after - context_manager = com.temp_setattr(obj, "as_index", True) - else: - context_manager = nullcontext() - is_non_unique_col = ( selected_obj.ndim == 2 and selected_obj.columns.nunique() < len(selected_obj.columns) ) - # Numba Groupby engine/engine-kwargs passthrough - if is_groupby: - engine = self.kwargs.get("engine", None) - engine_kwargs = self.kwargs.get("engine_kwargs", None) - kwargs.update({"engine": engine, "engine_kwargs": engine_kwargs}) - - with context_manager: - if selected_obj.ndim == 1: - # key only used for output - colg = obj._gotitem(selection, ndim=1) - result_data = [ - getattr(colg, op_name)(how, **kwargs) for _, how in func.items() + if selected_obj.ndim == 1: + # key only used for output + colg = obj._gotitem(selection, ndim=1) + results = [getattr(colg, op_name)(how, **kwargs) for _, how in func.items()] + keys = list(func.keys()) + elif is_non_unique_col: + # key used for column selection and output + # GH#51099 + results = [] + keys = [] + for key, how in func.items(): + indices = selected_obj.columns.get_indexer_for([key]) + labels = selected_obj.columns.take(indices) + label_to_indices = defaultdict(list) + for index, label in zip(indices, labels): + label_to_indices[label].append(index) + + key_data = [ + getattr(selected_obj._ixs(indice, axis=1), op_name)(how, **kwargs) + for label, indices in label_to_indices.items() + for indice in indices ] - result_index = list(func.keys()) - elif is_non_unique_col: - # key used for column selection and output - # GH#51099 - result_data = [] - result_index = [] - for key, how in func.items(): - indices = selected_obj.columns.get_indexer_for([key]) - labels = selected_obj.columns.take(indices) - label_to_indices = defaultdict(list) - for index, label in zip(indices, labels): - label_to_indices[label].append(index) - - key_data = [ - getattr(selected_obj._ixs(indice, axis=1), op_name)( - how, **kwargs - ) - for label, indices in label_to_indices.items() - for indice in indices - ] - - result_index += [key] * len(key_data) - result_data += key_data - else: - # key used for column selection and output - result_data = [ - getattr(obj._gotitem(key, ndim=1), op_name)(how, **kwargs) - for key, how in func.items() - ] - result_index = list(func.keys()) + + keys += [key] * len(key_data) + results += key_data + else: + # key used for column selection and output + results = [ + getattr(obj._gotitem(key, ndim=1), op_name)(how, **kwargs) + for key, how in func.items() + ] + keys = list(func.keys()) + + return keys, results + + def wrap_results_dict_like( + self, + selected_obj: Series | DataFrame, + result_index: list[Hashable], + result_data: list, + ): + from pandas import Index + from pandas.core.reshape.concat import concat + + obj = self.obj # Avoid making two isinstance calls in all and any below is_ndframe = [isinstance(r, ABCNDFrame) for r in result_data] - # combine results if all(is_ndframe): results = dict(zip(result_index, result_data)) keys_to_use: Iterable[Hashable] @@ -693,6 +691,50 @@ def index(self) -> Index: def agg_axis(self) -> Index: return self.obj._get_agg_axis(self.axis) + def agg_or_apply_list_like( + self, op_name: Literal["agg", "apply"] + ) -> DataFrame | Series: + obj = self.obj + kwargs = self.kwargs + + if op_name == "apply": + if isinstance(self, FrameApply): + by_row = self.by_row + + elif isinstance(self, SeriesApply): + by_row = "_compat" if self.by_row else False + else: + by_row = False + kwargs = {**kwargs, "by_row": by_row} + + if getattr(obj, "axis", 0) == 1: + raise NotImplementedError("axis other than 0 is not supported") + + keys, results = self.compute_list_like(op_name, obj, kwargs) + result = self.wrap_results_list_like(keys, results) + return result + + def agg_or_apply_dict_like( + self, op_name: Literal["agg", "apply"] + ) -> DataFrame | Series: + assert op_name in ["agg", "apply"] + obj = self.obj + + kwargs = {} + if op_name == "apply": + by_row = "_compat" if self.by_row else False + kwargs.update({"by_row": by_row}) + + if getattr(obj, "axis", 0) == 1: + raise NotImplementedError("axis other than 0 is not supported") + + selection = None + result_index, result_data = self.compute_dict_like( + op_name, obj, selection, kwargs + ) + result = self.wrap_results_dict_like(obj, result_index, result_data) + return result + class FrameApply(NDFrameApply): obj: DataFrame @@ -1258,6 +1300,8 @@ def curried(x): class GroupByApply(Apply): + obj: GroupBy | Resampler | BaseWindow + def __init__( self, obj: GroupBy[NDFrameT], @@ -1283,8 +1327,73 @@ def apply(self): def transform(self): raise NotImplementedError + def agg_or_apply_list_like( + self, op_name: Literal["agg", "apply"] + ) -> DataFrame | Series: + obj = self.obj + kwargs = self.kwargs + if op_name == "apply": + kwargs = {**kwargs, "by_row": False} + + if getattr(obj, "axis", 0) == 1: + raise NotImplementedError("axis other than 0 is not supported") -class ResamplerWindowApply(Apply): + if obj._selected_obj.ndim == 1: + # For SeriesGroupBy this matches _obj_with_exclusions + selected_obj = obj._selected_obj + else: + selected_obj = obj._obj_with_exclusions + + # Only set as_index=True on groupby objects, not Window or Resample + # that inherit from this class. + with com.temp_setattr( + obj, "as_index", True, condition=hasattr(obj, "as_index") + ): + keys, results = self.compute_list_like(op_name, selected_obj, kwargs) + result = self.wrap_results_list_like(keys, results) + return result + + def agg_or_apply_dict_like( + self, op_name: Literal["agg", "apply"] + ) -> DataFrame | Series: + from pandas.core.groupby.generic import ( + DataFrameGroupBy, + SeriesGroupBy, + ) + + assert op_name in ["agg", "apply"] + + obj = self.obj + kwargs = {} + if op_name == "apply": + by_row = "_compat" if self.by_row else False + kwargs.update({"by_row": by_row}) + + if getattr(obj, "axis", 0) == 1: + raise NotImplementedError("axis other than 0 is not supported") + + selected_obj = obj._selected_obj + selection = obj._selection + + is_groupby = isinstance(obj, (DataFrameGroupBy, SeriesGroupBy)) + + # Numba Groupby engine/engine-kwargs passthrough + if is_groupby: + engine = self.kwargs.get("engine", None) + engine_kwargs = self.kwargs.get("engine_kwargs", None) + kwargs.update({"engine": engine, "engine_kwargs": engine_kwargs}) + + with com.temp_setattr( + obj, "as_index", True, condition=hasattr(obj, "as_index") + ): + result_index, result_data = self.compute_dict_like( + op_name, selected_obj, selection, kwargs + ) + result = self.wrap_results_dict_like(selected_obj, result_index, result_data) + return result + + +class ResamplerWindowApply(GroupByApply): axis: AxisInt = 0 obj: Resampler | BaseWindow @@ -1296,7 +1405,7 @@ def __init__( args, kwargs, ) -> None: - super().__init__( + super(GroupByApply, self).__init__( obj, func, raw=False, @@ -1699,6 +1808,12 @@ def validate_func_kwargs( return columns, func +def include_axis(op_name: Literal["agg", "apply"], colg: Series | DataFrame) -> bool: + return isinstance(colg, ABCDataFrame) or ( + isinstance(colg, ABCSeries) and op_name == "agg" + ) + + def warn_alias_replacement( obj: AggObjType, func: Callable, diff --git a/pandas/core/common.py b/pandas/core/common.py index 9db03ac3ae571..c1d78f7c19c98 100644 --- a/pandas/core/common.py +++ b/pandas/core/common.py @@ -524,23 +524,29 @@ def convert_to_list_like( @contextlib.contextmanager -def temp_setattr(obj, attr: str, value) -> Generator[None, None, None]: +def temp_setattr( + obj, attr: str, value, condition: bool = True +) -> Generator[None, None, None]: """Temporarily set attribute on an object. Args: obj: Object whose attribute will be modified. attr: Attribute to modify. value: Value to temporarily set attribute to. + condition: Whether to set the attribute. Provided in order to not have to + conditionally use this context manager. Yields: obj with modified attribute. """ - old_value = getattr(obj, attr) - setattr(obj, attr, value) + if condition: + old_value = getattr(obj, attr) + setattr(obj, attr, value) try: yield obj finally: - setattr(obj, attr, old_value) + if condition: + setattr(obj, attr, old_value) def require_length_match(data, index: Index) -> None: