from datetime import ( date, datetime, timedelta, ) from itertools import product import numpy as np import pytest from pandas.errors import PerformanceWarning import pandas as pd from pandas import ( Categorical, DataFrame, Grouper, Index, MultiIndex, Series, concat, date_range, ) import pandas._testing as tm from pandas.api.types import CategoricalDtype as CDT from pandas.core.reshape import reshape as reshape_lib from pandas.core.reshape.pivot import pivot_table @pytest.fixture(params=[True, False]) def dropna(request): return request.param @pytest.fixture(params=[([0] * 4, [1] * 4), (range(0, 3), range(1, 4))]) def interval_values(request, closed): left, right = request.param return Categorical(pd.IntervalIndex.from_arrays(left, right, closed)) class TestPivotTable: def setup_method(self): self.data = DataFrame( { "A": [ "foo", "foo", "foo", "foo", "bar", "bar", "bar", "bar", "foo", "foo", "foo", ], "B": [ "one", "one", "one", "two", "one", "one", "one", "two", "two", "two", "one", ], "C": [ "dull", "dull", "shiny", "dull", "dull", "shiny", "shiny", "dull", "shiny", "shiny", "shiny", ], "D": np.random.randn(11), "E": np.random.randn(11), "F": np.random.randn(11), } ) def test_pivot_table(self, observed): index = ["A", "B"] columns = "C" table = pivot_table( self.data, values="D", index=index, columns=columns, observed=observed ) table2 = self.data.pivot_table( values="D", index=index, columns=columns, observed=observed ) tm.assert_frame_equal(table, table2) # this works pivot_table(self.data, values="D", index=index, observed=observed) if len(index) > 1: assert table.index.names == tuple(index) else: assert table.index.name == index[0] if len(columns) > 1: assert table.columns.names == columns else: assert table.columns.name == columns[0] expected = self.data.groupby(index + [columns])["D"].agg(np.mean).unstack() tm.assert_frame_equal(table, expected) def test_pivot_table_categorical_observed_equal(self, observed): # issue #24923 df = DataFrame( {"col1": list("abcde"), "col2": list("fghij"), "col3": [1, 2, 3, 4, 5]} ) expected = df.pivot_table( index="col1", values="col3", columns="col2", aggfunc=np.sum, fill_value=0 ) expected.index = expected.index.astype("category") expected.columns = expected.columns.astype("category") df.col1 = df.col1.astype("category") df.col2 = df.col2.astype("category") result = df.pivot_table( index="col1", values="col3", columns="col2", aggfunc=np.sum, fill_value=0, observed=observed, ) tm.assert_frame_equal(result, expected) def test_pivot_table_nocols(self): df = DataFrame( {"rows": ["a", "b", "c"], "cols": ["x", "y", "z"], "values": [1, 2, 3]} ) # GH#50538 msg = "The operation None: # __init__ will raise the warning super().__init__(*args, **kwargs) raise Exception("Don't compute final result.") with monkeypatch.context() as m: m.setattr(reshape_lib, "_Unstacker", MockUnstacker) df = DataFrame( {"ind1": np.arange(2**16), "ind2": np.arange(2**16), "count": 0} ) msg = "The following operation may generate" with tm.assert_produces_warning(PerformanceWarning, match=msg): with pytest.raises(Exception, match="Don't compute final result."): df.pivot_table( index="ind1", columns="ind2", values="count", aggfunc="count" ) def test_pivot_table_aggfunc_dropna(self, dropna): # GH 22159 df = DataFrame( { "fruit": ["apple", "peach", "apple"], "size": [1, 1, 2], "taste": [7, 6, 6], } ) def ret_one(x): return 1 def ret_sum(x): return sum(x) def ret_none(x): return np.nan result = pivot_table( df, columns="fruit", aggfunc=[ret_sum, ret_none, ret_one], dropna=dropna ) data = [[3, 1, np.nan, np.nan, 1, 1], [13, 6, np.nan, np.nan, 1, 1]] col = MultiIndex.from_product( [["ret_sum", "ret_none", "ret_one"], ["apple", "peach"]], names=[None, "fruit"], ) expected = DataFrame(data, index=["size", "taste"], columns=col) if dropna: expected = expected.dropna(axis="columns") tm.assert_frame_equal(result, expected) def test_pivot_table_aggfunc_scalar_dropna(self, dropna): # GH 22159 df = DataFrame( {"A": ["one", "two", "one"], "x": [3, np.nan, 2], "y": [1, np.nan, np.nan]} ) result = pivot_table(df, columns="A", aggfunc=np.mean, dropna=dropna) data = [[2.5, np.nan], [1, np.nan]] col = Index(["one", "two"], name="A") expected = DataFrame(data, index=["x", "y"], columns=col) if dropna: expected = expected.dropna(axis="columns") tm.assert_frame_equal(result, expected) def test_pivot_table_empty_aggfunc(self): # GH 9186 & GH 13483 df = DataFrame( { "A": [2, 2, 3, 3, 2], "id": [5, 6, 7, 8, 9], "C": ["p", "q", "q", "p", "q"], "D": [None, None, None, None, None], } ) result = df.pivot_table(index="A", columns="D", values="id", aggfunc=np.size) expected = DataFrame(index=Index([], dtype="int64", name="A")) expected.columns.name = "D" tm.assert_frame_equal(result, expected) def test_pivot_table_no_column_raises(self): # GH 10326 def agg(arr): return np.mean(arr) foo = DataFrame({"X": [0, 0, 1, 1], "Y": [0, 1, 0, 1], "Z": [10, 20, 30, 40]}) with pytest.raises(KeyError, match="notpresent"): foo.pivot_table("notpresent", "X", "Y", aggfunc=agg) def test_pivot_table_multiindex_columns_doctest_case(self): # The relevant characteristic is that the call # to maybe_downcast_to_dtype(agged[v], data[v].dtype) in # __internal_pivot_table has `agged[v]` a DataFrame instead of Series, # In this case this is because agged.columns is a MultiIndex and 'v' # is only indexing on its first level. df = DataFrame( { "A": ["foo", "foo", "foo", "foo", "foo", "bar", "bar", "bar", "bar"], "B": ["one", "one", "one", "two", "two", "one", "one", "two", "two"], "C": [ "small", "large", "large", "small", "small", "large", "small", "small", "large", ], "D": [1, 2, 2, 3, 3, 4, 5, 6, 7], "E": [2, 4, 5, 5, 6, 6, 8, 9, 9], } ) table = pivot_table( df, values=["D", "E"], index=["A", "C"], aggfunc={"D": np.mean, "E": [min, max, np.mean]}, ) cols = MultiIndex.from_tuples( [("D", "mean"), ("E", "max"), ("E", "mean"), ("E", "min")] ) index = MultiIndex.from_tuples( [("bar", "large"), ("bar", "small"), ("foo", "large"), ("foo", "small")], names=["A", "C"], ) vals = np.array( [ [5.5, 9.0, 7.5, 6.0], [5.5, 9.0, 8.5, 8.0], [2.0, 5.0, 4.5, 4.0], [2.33333333, 6.0, 4.33333333, 2.0], ] ) expected = DataFrame(vals, columns=cols, index=index) expected[("E", "min")] = expected[("E", "min")].astype(np.int64) expected[("E", "max")] = expected[("E", "max")].astype(np.int64) tm.assert_frame_equal(table, expected) def test_pivot_table_sort_false(self): # GH#39143 df = DataFrame( { "a": ["d1", "d4", "d3"], "col": ["a", "b", "c"], "num": [23, 21, 34], "year": ["2018", "2018", "2019"], } ) result = df.pivot_table( index=["a", "col"], columns="year", values="num", aggfunc="sum", sort=False ) expected = DataFrame( [[23, np.nan], [21, np.nan], [np.nan, 34]], columns=Index(["2018", "2019"], name="year"), index=MultiIndex.from_arrays( [["d1", "d4", "d3"], ["a", "b", "c"]], names=["a", "col"] ), ) tm.assert_frame_equal(result, expected) def test_pivot_table_sort_false_with_multiple_values(self): df = DataFrame( { "firstname": ["John", "Michael"], "lastname": ["Foo", "Bar"], "height": [173, 182], "age": [47, 33], } ) result = df.pivot_table( index=["lastname", "firstname"], values=["height", "age"], sort=False ) expected = DataFrame( [[173, 47], [182, 33]], columns=["height", "age"], index=MultiIndex.from_tuples( [("Foo", "John"), ("Bar", "Michael")], names=["lastname", "firstname"], ), ) tm.assert_frame_equal(result, expected) def test_pivot_table_with_margins_and_numeric_columns(self): # GH 26568 df = DataFrame([["a", "x", 1], ["a", "y", 2], ["b", "y", 3], ["b", "z", 4]]) df.columns = [10, 20, 30] result = df.pivot_table( index=10, columns=20, values=30, aggfunc="sum", fill_value=0, margins=True ) expected = DataFrame([[1, 2, 0, 3], [0, 3, 4, 7], [1, 5, 4, 10]]) expected.columns = ["x", "y", "z", "All"] expected.index = ["a", "b", "All"] expected.columns.name = 20 expected.index.name = 10 tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("dropna", [True, False]) def test_pivot_ea_dtype_dropna(self, dropna): # GH#47477 df = DataFrame({"x": "a", "y": "b", "age": Series([20, 40], dtype="Int64")}) result = df.pivot_table( index="x", columns="y", values="age", aggfunc="mean", dropna=dropna ) expected = DataFrame( [[30]], index=Index(["a"], name="x"), columns=Index(["b"], name="y"), dtype="Float64", ) tm.assert_frame_equal(result, expected) def test_pivot_table_datetime_warning(self): # GH#48683 df = DataFrame( { "a": "A", "b": [1, 2], "date": pd.Timestamp("2019-12-31"), "sales": [10.0, 11], } ) with tm.assert_produces_warning(None): result = df.pivot_table( index=["b", "date"], columns="a", margins=True, aggfunc="sum" ) expected = DataFrame( [[10.0, 10.0], [11.0, 11.0], [21.0, 21.0]], index=MultiIndex.from_arrays( [ Index([1, 2, "All"], name="b"), Index( [pd.Timestamp("2019-12-31"), pd.Timestamp("2019-12-31"), ""], dtype=object, name="date", ), ] ), columns=MultiIndex.from_tuples( [("sales", "A"), ("sales", "All")], names=[None, "a"] ), ) tm.assert_frame_equal(result, expected) def test_pivot_table_with_mixed_nested_tuples(self, using_array_manager): # GH 50342 df = DataFrame( { "A": ["foo", "foo", "foo", "foo", "foo", "bar", "bar", "bar", "bar"], "B": ["one", "one", "one", "two", "two", "one", "one", "two", "two"], "C": [ "small", "large", "large", "small", "small", "large", "small", "small", "large", ], "D": [1, 2, 2, 3, 3, 4, 5, 6, 7], "E": [2, 4, 5, 5, 6, 6, 8, 9, 9], ("col5",): [ "foo", "foo", "foo", "foo", "foo", "bar", "bar", "bar", "bar", ], ("col6", 6): [ "one", "one", "one", "two", "two", "one", "one", "two", "two", ], (7, "seven"): [ "small", "large", "large", "small", "small", "large", "small", "small", "large", ], } ) result = pivot_table( df, values="D", index=["A", "B"], columns=[(7, "seven")], aggfunc=np.sum ) expected = DataFrame( [[4.0, 5.0], [7.0, 6.0], [4.0, 1.0], [np.nan, 6.0]], columns=Index(["large", "small"], name=(7, "seven")), index=MultiIndex.from_arrays( [["bar", "bar", "foo", "foo"], ["one", "two"] * 2], names=["A", "B"] ), ) if using_array_manager: # INFO(ArrayManager) column without NaNs can preserve int dtype expected["small"] = expected["small"].astype("int64") tm.assert_frame_equal(result, expected) class TestPivot: def test_pivot(self): data = { "index": ["A", "B", "C", "C", "B", "A"], "columns": ["One", "One", "One", "Two", "Two", "Two"], "values": [1.0, 2.0, 3.0, 3.0, 2.0, 1.0], } frame = DataFrame(data) pivoted = frame.pivot(index="index", columns="columns", values="values") expected = DataFrame( { "One": {"A": 1.0, "B": 2.0, "C": 3.0}, "Two": {"A": 1.0, "B": 2.0, "C": 3.0}, } ) expected.index.name, expected.columns.name = "index", "columns" tm.assert_frame_equal(pivoted, expected) # name tracking assert pivoted.index.name == "index" assert pivoted.columns.name == "columns" # don't specify values pivoted = frame.pivot(index="index", columns="columns") assert pivoted.index.name == "index" assert pivoted.columns.names == (None, "columns") def test_pivot_duplicates(self): data = DataFrame( { "a": ["bar", "bar", "foo", "foo", "foo"], "b": ["one", "two", "one", "one", "two"], "c": [1.0, 2.0, 3.0, 3.0, 4.0], } ) with pytest.raises(ValueError, match="duplicate entries"): data.pivot(index="a", columns="b", values="c") def test_pivot_empty(self): df = DataFrame(columns=["a", "b", "c"]) result = df.pivot(index="a", columns="b", values="c") expected = DataFrame() tm.assert_frame_equal(result, expected, check_names=False) def test_pivot_integer_bug(self): df = DataFrame(data=[("A", "1", "A1"), ("B", "2", "B2")]) result = df.pivot(index=1, columns=0, values=2) repr(result) tm.assert_index_equal(result.columns, Index(["A", "B"], name=0)) def test_pivot_index_none(self): # GH#3962 data = { "index": ["A", "B", "C", "C", "B", "A"], "columns": ["One", "One", "One", "Two", "Two", "Two"], "values": [1.0, 2.0, 3.0, 3.0, 2.0, 1.0], } frame = DataFrame(data).set_index("index") result = frame.pivot(columns="columns", values="values") expected = DataFrame( { "One": {"A": 1.0, "B": 2.0, "C": 3.0}, "Two": {"A": 1.0, "B": 2.0, "C": 3.0}, } ) expected.index.name, expected.columns.name = "index", "columns" tm.assert_frame_equal(result, expected) # omit values result = frame.pivot(columns="columns") expected.columns = MultiIndex.from_tuples( [("values", "One"), ("values", "Two")], names=[None, "columns"] ) expected.index.name = "index" tm.assert_frame_equal(result, expected, check_names=False) assert result.index.name == "index" assert result.columns.names == (None, "columns") expected.columns = expected.columns.droplevel(0) result = frame.pivot(columns="columns", values="values") expected.columns.name = "columns" tm.assert_frame_equal(result, expected) def test_pivot_index_list_values_none_immutable_args(self): # GH37635 df = DataFrame( { "lev1": [1, 1, 1, 2, 2, 2], "lev2": [1, 1, 2, 1, 1, 2], "lev3": [1, 2, 1, 2, 1, 2], "lev4": [1, 2, 3, 4, 5, 6], "values": [0, 1, 2, 3, 4, 5], } ) index = ["lev1", "lev2"] columns = ["lev3"] result = df.pivot(index=index, columns=columns, values=None) expected = DataFrame( np.array( [ [1.0, 2.0, 0.0, 1.0], [3.0, np.nan, 2.0, np.nan], [5.0, 4.0, 4.0, 3.0], [np.nan, 6.0, np.nan, 5.0], ] ), index=MultiIndex.from_arrays( [(1, 1, 2, 2), (1, 2, 1, 2)], names=["lev1", "lev2"] ), columns=MultiIndex.from_arrays( [("lev4", "lev4", "values", "values"), (1, 2, 1, 2)], names=[None, "lev3"], ), ) tm.assert_frame_equal(result, expected) assert index == ["lev1", "lev2"] assert columns == ["lev3"]