test_time_grouper.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. from datetime import datetime
  2. from operator import methodcaller
  3. import numpy as np
  4. import pytest
  5. import pandas as pd
  6. from pandas import (
  7. DataFrame,
  8. Index,
  9. Series,
  10. Timestamp,
  11. )
  12. import pandas._testing as tm
  13. from pandas.core.groupby.grouper import Grouper
  14. from pandas.core.indexes.datetimes import date_range
  15. @pytest.fixture
  16. def test_series():
  17. return Series(
  18. np.random.default_rng(2).standard_normal(1000),
  19. index=date_range("1/1/2000", periods=1000),
  20. )
  21. def test_apply(test_series):
  22. grouper = Grouper(freq="YE", label="right", closed="right")
  23. grouped = test_series.groupby(grouper)
  24. def f(x):
  25. return x.sort_values()[-3:]
  26. applied = grouped.apply(f)
  27. expected = test_series.groupby(lambda x: x.year).apply(f)
  28. applied.index = applied.index.droplevel(0)
  29. expected.index = expected.index.droplevel(0)
  30. tm.assert_series_equal(applied, expected)
  31. def test_count(test_series):
  32. test_series[::3] = np.nan
  33. expected = test_series.groupby(lambda x: x.year).count()
  34. grouper = Grouper(freq="YE", label="right", closed="right")
  35. result = test_series.groupby(grouper).count()
  36. expected.index = result.index
  37. tm.assert_series_equal(result, expected)
  38. result = test_series.resample("YE").count()
  39. expected.index = result.index
  40. tm.assert_series_equal(result, expected)
  41. def test_numpy_reduction(test_series):
  42. result = test_series.resample("YE", closed="right").prod()
  43. msg = "using SeriesGroupBy.prod"
  44. with tm.assert_produces_warning(FutureWarning, match=msg):
  45. expected = test_series.groupby(lambda x: x.year).agg(np.prod)
  46. expected.index = result.index
  47. tm.assert_series_equal(result, expected)
  48. def test_apply_iteration():
  49. # #2300
  50. N = 1000
  51. ind = date_range(start="2000-01-01", freq="D", periods=N)
  52. df = DataFrame({"open": 1, "close": 2}, index=ind)
  53. tg = Grouper(freq="ME")
  54. grouper, _ = tg._get_grouper(df)
  55. # Errors
  56. grouped = df.groupby(grouper, group_keys=False)
  57. def f(df):
  58. return df["close"] / df["open"]
  59. # it works!
  60. result = grouped.apply(f)
  61. tm.assert_index_equal(result.index, df.index)
  62. @pytest.mark.parametrize(
  63. "index",
  64. [
  65. Index([1, 2]),
  66. Index(["a", "b"]),
  67. Index([1.1, 2.2]),
  68. pd.MultiIndex.from_arrays([[1, 2], ["a", "b"]]),
  69. ],
  70. )
  71. def test_fails_on_no_datetime_index(index):
  72. name = type(index).__name__
  73. df = DataFrame({"a": range(len(index))}, index=index)
  74. msg = (
  75. "Only valid with DatetimeIndex, TimedeltaIndex "
  76. f"or PeriodIndex, but got an instance of '{name}'"
  77. )
  78. with pytest.raises(TypeError, match=msg):
  79. df.groupby(Grouper(freq="D"))
  80. def test_aaa_group_order():
  81. # GH 12840
  82. # check TimeGrouper perform stable sorts
  83. n = 20
  84. data = np.random.default_rng(2).standard_normal((n, 4))
  85. df = DataFrame(data, columns=["A", "B", "C", "D"])
  86. df["key"] = [
  87. datetime(2013, 1, 1),
  88. datetime(2013, 1, 2),
  89. datetime(2013, 1, 3),
  90. datetime(2013, 1, 4),
  91. datetime(2013, 1, 5),
  92. ] * 4
  93. grouped = df.groupby(Grouper(key="key", freq="D"))
  94. tm.assert_frame_equal(grouped.get_group(datetime(2013, 1, 1)), df[::5])
  95. tm.assert_frame_equal(grouped.get_group(datetime(2013, 1, 2)), df[1::5])
  96. tm.assert_frame_equal(grouped.get_group(datetime(2013, 1, 3)), df[2::5])
  97. tm.assert_frame_equal(grouped.get_group(datetime(2013, 1, 4)), df[3::5])
  98. tm.assert_frame_equal(grouped.get_group(datetime(2013, 1, 5)), df[4::5])
  99. def test_aggregate_normal(resample_method):
  100. """Check TimeGrouper's aggregation is identical as normal groupby."""
  101. data = np.random.default_rng(2).standard_normal((20, 4))
  102. normal_df = DataFrame(data, columns=["A", "B", "C", "D"])
  103. normal_df["key"] = [1, 2, 3, 4, 5] * 4
  104. dt_df = DataFrame(data, columns=["A", "B", "C", "D"])
  105. dt_df["key"] = Index(
  106. [
  107. datetime(2013, 1, 1),
  108. datetime(2013, 1, 2),
  109. datetime(2013, 1, 3),
  110. datetime(2013, 1, 4),
  111. datetime(2013, 1, 5),
  112. ]
  113. * 4,
  114. dtype="M8[ns]",
  115. )
  116. normal_grouped = normal_df.groupby("key")
  117. dt_grouped = dt_df.groupby(Grouper(key="key", freq="D"))
  118. expected = getattr(normal_grouped, resample_method)()
  119. dt_result = getattr(dt_grouped, resample_method)()
  120. expected.index = date_range(start="2013-01-01", freq="D", periods=5, name="key")
  121. tm.assert_equal(expected, dt_result)
  122. @pytest.mark.xfail(reason="if TimeGrouper is used included, 'nth' doesn't work yet")
  123. def test_aggregate_nth():
  124. """Check TimeGrouper's aggregation is identical as normal groupby."""
  125. data = np.random.default_rng(2).standard_normal((20, 4))
  126. normal_df = DataFrame(data, columns=["A", "B", "C", "D"])
  127. normal_df["key"] = [1, 2, 3, 4, 5] * 4
  128. dt_df = DataFrame(data, columns=["A", "B", "C", "D"])
  129. dt_df["key"] = [
  130. datetime(2013, 1, 1),
  131. datetime(2013, 1, 2),
  132. datetime(2013, 1, 3),
  133. datetime(2013, 1, 4),
  134. datetime(2013, 1, 5),
  135. ] * 4
  136. normal_grouped = normal_df.groupby("key")
  137. dt_grouped = dt_df.groupby(Grouper(key="key", freq="D"))
  138. expected = normal_grouped.nth(3)
  139. expected.index = date_range(start="2013-01-01", freq="D", periods=5, name="key")
  140. dt_result = dt_grouped.nth(3)
  141. tm.assert_frame_equal(expected, dt_result)
  142. @pytest.mark.parametrize(
  143. "method, method_args, unit",
  144. [
  145. ("sum", {}, 0),
  146. ("sum", {"min_count": 0}, 0),
  147. ("sum", {"min_count": 1}, np.nan),
  148. ("prod", {}, 1),
  149. ("prod", {"min_count": 0}, 1),
  150. ("prod", {"min_count": 1}, np.nan),
  151. ],
  152. )
  153. def test_resample_entirely_nat_window(method, method_args, unit):
  154. ser = Series([0] * 2 + [np.nan] * 2, index=date_range("2017", periods=4))
  155. result = methodcaller(method, **method_args)(ser.resample("2d"))
  156. exp_dti = pd.DatetimeIndex(["2017-01-01", "2017-01-03"], dtype="M8[ns]", freq="2D")
  157. expected = Series([0.0, unit], index=exp_dti)
  158. tm.assert_series_equal(result, expected)
  159. @pytest.mark.parametrize(
  160. "func, fill_value",
  161. [("min", np.nan), ("max", np.nan), ("sum", 0), ("prod", 1), ("count", 0)],
  162. )
  163. def test_aggregate_with_nat(func, fill_value):
  164. # check TimeGrouper's aggregation is identical as normal groupby
  165. # if NaT is included, 'var', 'std', 'mean', 'first','last'
  166. # and 'nth' doesn't work yet
  167. n = 20
  168. data = np.random.default_rng(2).standard_normal((n, 4)).astype("int64")
  169. normal_df = DataFrame(data, columns=["A", "B", "C", "D"])
  170. normal_df["key"] = [1, 2, np.nan, 4, 5] * 4
  171. dt_df = DataFrame(data, columns=["A", "B", "C", "D"])
  172. dt_df["key"] = Index(
  173. [
  174. datetime(2013, 1, 1),
  175. datetime(2013, 1, 2),
  176. pd.NaT,
  177. datetime(2013, 1, 4),
  178. datetime(2013, 1, 5),
  179. ]
  180. * 4,
  181. dtype="M8[ns]",
  182. )
  183. normal_grouped = normal_df.groupby("key")
  184. dt_grouped = dt_df.groupby(Grouper(key="key", freq="D"))
  185. normal_result = getattr(normal_grouped, func)()
  186. dt_result = getattr(dt_grouped, func)()
  187. pad = DataFrame([[fill_value] * 4], index=[3], columns=["A", "B", "C", "D"])
  188. expected = pd.concat([normal_result, pad])
  189. expected = expected.sort_index()
  190. dti = date_range(
  191. start="2013-01-01",
  192. freq="D",
  193. periods=5,
  194. name="key",
  195. unit=dt_df["key"]._values.unit,
  196. )
  197. expected.index = dti._with_freq(None) # TODO: is this desired?
  198. tm.assert_frame_equal(expected, dt_result)
  199. assert dt_result.index.name == "key"
  200. def test_aggregate_with_nat_size():
  201. # GH 9925
  202. n = 20
  203. data = np.random.default_rng(2).standard_normal((n, 4)).astype("int64")
  204. normal_df = DataFrame(data, columns=["A", "B", "C", "D"])
  205. normal_df["key"] = [1, 2, np.nan, 4, 5] * 4
  206. dt_df = DataFrame(data, columns=["A", "B", "C", "D"])
  207. dt_df["key"] = Index(
  208. [
  209. datetime(2013, 1, 1),
  210. datetime(2013, 1, 2),
  211. pd.NaT,
  212. datetime(2013, 1, 4),
  213. datetime(2013, 1, 5),
  214. ]
  215. * 4,
  216. dtype="M8[ns]",
  217. )
  218. normal_grouped = normal_df.groupby("key")
  219. dt_grouped = dt_df.groupby(Grouper(key="key", freq="D"))
  220. normal_result = normal_grouped.size()
  221. dt_result = dt_grouped.size()
  222. pad = Series([0], index=[3])
  223. expected = pd.concat([normal_result, pad])
  224. expected = expected.sort_index()
  225. expected.index = date_range(
  226. start="2013-01-01",
  227. freq="D",
  228. periods=5,
  229. name="key",
  230. unit=dt_df["key"]._values.unit,
  231. )._with_freq(None)
  232. tm.assert_series_equal(expected, dt_result)
  233. assert dt_result.index.name == "key"
  234. def test_repr():
  235. # GH18203
  236. result = repr(Grouper(key="A", freq="h"))
  237. expected = (
  238. "TimeGrouper(key='A', freq=<Hour>, axis=0, sort=True, dropna=True, "
  239. "closed='left', label='left', how='mean', "
  240. "convention='e', origin='start_day')"
  241. )
  242. assert result == expected
  243. result = repr(Grouper(key="A", freq="h", origin="2000-01-01"))
  244. expected = (
  245. "TimeGrouper(key='A', freq=<Hour>, axis=0, sort=True, dropna=True, "
  246. "closed='left', label='left', how='mean', "
  247. "convention='e', origin=Timestamp('2000-01-01 00:00:00'))"
  248. )
  249. assert result == expected
  250. @pytest.mark.parametrize(
  251. "method, method_args, expected_values",
  252. [
  253. ("sum", {}, [1, 0, 1]),
  254. ("sum", {"min_count": 0}, [1, 0, 1]),
  255. ("sum", {"min_count": 1}, [1, np.nan, 1]),
  256. ("sum", {"min_count": 2}, [np.nan, np.nan, np.nan]),
  257. ("prod", {}, [1, 1, 1]),
  258. ("prod", {"min_count": 0}, [1, 1, 1]),
  259. ("prod", {"min_count": 1}, [1, np.nan, 1]),
  260. ("prod", {"min_count": 2}, [np.nan, np.nan, np.nan]),
  261. ],
  262. )
  263. def test_upsample_sum(method, method_args, expected_values):
  264. ser = Series(1, index=date_range("2017", periods=2, freq="h"))
  265. resampled = ser.resample("30min")
  266. index = pd.DatetimeIndex(
  267. ["2017-01-01T00:00:00", "2017-01-01T00:30:00", "2017-01-01T01:00:00"],
  268. dtype="M8[ns]",
  269. freq="30min",
  270. )
  271. result = methodcaller(method, **method_args)(resampled)
  272. expected = Series(expected_values, index=index)
  273. tm.assert_series_equal(result, expected)
  274. def test_groupby_resample_interpolate():
  275. # GH 35325
  276. d = {"price": [10, 11, 9], "volume": [50, 60, 50]}
  277. df = DataFrame(d)
  278. df["week_starting"] = date_range("01/01/2018", periods=3, freq="W")
  279. msg = "DataFrameGroupBy.resample operated on the grouping columns"
  280. with tm.assert_produces_warning(FutureWarning, match=msg):
  281. result = (
  282. df.set_index("week_starting")
  283. .groupby("volume")
  284. .resample("1D")
  285. .interpolate(method="linear")
  286. )
  287. volume = [50] * 15 + [60]
  288. week_starting = list(date_range("2018-01-07", "2018-01-21")) + [
  289. Timestamp("2018-01-14")
  290. ]
  291. expected_ind = pd.MultiIndex.from_arrays(
  292. [volume, week_starting],
  293. names=["volume", "week_starting"],
  294. )
  295. expected = DataFrame(
  296. data={
  297. "price": [
  298. 10.0,
  299. 9.928571428571429,
  300. 9.857142857142858,
  301. 9.785714285714286,
  302. 9.714285714285714,
  303. 9.642857142857142,
  304. 9.571428571428571,
  305. 9.5,
  306. 9.428571428571429,
  307. 9.357142857142858,
  308. 9.285714285714286,
  309. 9.214285714285714,
  310. 9.142857142857142,
  311. 9.071428571428571,
  312. 9.0,
  313. 11.0,
  314. ],
  315. "volume": [50.0] * 15 + [60],
  316. },
  317. index=expected_ind,
  318. )
  319. tm.assert_frame_equal(result, expected)