test_fsspec.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. import io
  2. import numpy as np
  3. import pytest
  4. from pandas._config import using_string_dtype
  5. from pandas import (
  6. DataFrame,
  7. date_range,
  8. read_csv,
  9. read_excel,
  10. read_feather,
  11. read_json,
  12. read_parquet,
  13. read_pickle,
  14. read_stata,
  15. read_table,
  16. )
  17. import pandas._testing as tm
  18. from pandas.util import _test_decorators as td
  19. pytestmark = pytest.mark.filterwarnings(
  20. "ignore:Passing a BlockManager to DataFrame:DeprecationWarning"
  21. )
  22. @pytest.fixture
  23. def fsspectest():
  24. pytest.importorskip("fsspec")
  25. from fsspec import register_implementation
  26. from fsspec.implementations.memory import MemoryFileSystem
  27. from fsspec.registry import _registry as registry
  28. class TestMemoryFS(MemoryFileSystem):
  29. protocol = "testmem"
  30. test = [None]
  31. def __init__(self, **kwargs) -> None:
  32. self.test[0] = kwargs.pop("test", None)
  33. super().__init__(**kwargs)
  34. register_implementation("testmem", TestMemoryFS, clobber=True)
  35. yield TestMemoryFS()
  36. registry.pop("testmem", None)
  37. TestMemoryFS.test[0] = None
  38. TestMemoryFS.store.clear()
  39. @pytest.fixture
  40. def df1():
  41. return DataFrame(
  42. {
  43. "int": [1, 3],
  44. "float": [2.0, np.nan],
  45. "str": ["t", "s"],
  46. "dt": date_range("2018-06-18", periods=2),
  47. }
  48. )
  49. @pytest.fixture
  50. def cleared_fs():
  51. fsspec = pytest.importorskip("fsspec")
  52. memfs = fsspec.filesystem("memory")
  53. yield memfs
  54. memfs.store.clear()
  55. def test_read_csv(cleared_fs, df1):
  56. text = str(df1.to_csv(index=False)).encode()
  57. with cleared_fs.open("test/test.csv", "wb") as w:
  58. w.write(text)
  59. df2 = read_csv("memory://test/test.csv", parse_dates=["dt"])
  60. tm.assert_frame_equal(df1, df2)
  61. def test_reasonable_error(monkeypatch, cleared_fs):
  62. from fsspec.registry import known_implementations
  63. with pytest.raises(ValueError, match="nosuchprotocol"):
  64. read_csv("nosuchprotocol://test/test.csv")
  65. err_msg = "test error message"
  66. monkeypatch.setitem(
  67. known_implementations,
  68. "couldexist",
  69. {"class": "unimportable.CouldExist", "err": err_msg},
  70. )
  71. with pytest.raises(ImportError, match=err_msg):
  72. read_csv("couldexist://test/test.csv")
  73. def test_to_csv(cleared_fs, df1):
  74. df1.to_csv("memory://test/test.csv", index=True)
  75. df2 = read_csv("memory://test/test.csv", parse_dates=["dt"], index_col=0)
  76. tm.assert_frame_equal(df1, df2)
  77. def test_to_excel(cleared_fs, df1):
  78. pytest.importorskip("openpyxl")
  79. ext = "xlsx"
  80. path = f"memory://test/test.{ext}"
  81. df1.to_excel(path, index=True)
  82. df2 = read_excel(path, parse_dates=["dt"], index_col=0)
  83. tm.assert_frame_equal(df1, df2)
  84. @pytest.mark.parametrize("binary_mode", [False, True])
  85. def test_to_csv_fsspec_object(cleared_fs, binary_mode, df1):
  86. fsspec = pytest.importorskip("fsspec")
  87. path = "memory://test/test.csv"
  88. mode = "wb" if binary_mode else "w"
  89. with fsspec.open(path, mode=mode).open() as fsspec_object:
  90. df1.to_csv(fsspec_object, index=True)
  91. assert not fsspec_object.closed
  92. mode = mode.replace("w", "r")
  93. with fsspec.open(path, mode=mode) as fsspec_object:
  94. df2 = read_csv(
  95. fsspec_object,
  96. parse_dates=["dt"],
  97. index_col=0,
  98. )
  99. assert not fsspec_object.closed
  100. tm.assert_frame_equal(df1, df2)
  101. def test_csv_options(fsspectest):
  102. df = DataFrame({"a": [0]})
  103. df.to_csv(
  104. "testmem://test/test.csv", storage_options={"test": "csv_write"}, index=False
  105. )
  106. assert fsspectest.test[0] == "csv_write"
  107. read_csv("testmem://test/test.csv", storage_options={"test": "csv_read"})
  108. assert fsspectest.test[0] == "csv_read"
  109. def test_read_table_options(fsspectest):
  110. # GH #39167
  111. df = DataFrame({"a": [0]})
  112. df.to_csv(
  113. "testmem://test/test.csv", storage_options={"test": "csv_write"}, index=False
  114. )
  115. assert fsspectest.test[0] == "csv_write"
  116. read_table("testmem://test/test.csv", storage_options={"test": "csv_read"})
  117. assert fsspectest.test[0] == "csv_read"
  118. def test_excel_options(fsspectest):
  119. pytest.importorskip("openpyxl")
  120. extension = "xlsx"
  121. df = DataFrame({"a": [0]})
  122. path = f"testmem://test/test.{extension}"
  123. df.to_excel(path, storage_options={"test": "write"}, index=False)
  124. assert fsspectest.test[0] == "write"
  125. read_excel(path, storage_options={"test": "read"})
  126. assert fsspectest.test[0] == "read"
  127. def test_to_parquet_new_file(cleared_fs, df1):
  128. """Regression test for writing to a not-yet-existent GCS Parquet file."""
  129. pytest.importorskip("fastparquet")
  130. df1.to_parquet(
  131. "memory://test/test.csv", index=True, engine="fastparquet", compression=None
  132. )
  133. def test_arrowparquet_options(fsspectest):
  134. """Regression test for writing to a not-yet-existent GCS Parquet file."""
  135. pytest.importorskip("pyarrow")
  136. df = DataFrame({"a": [0]})
  137. df.to_parquet(
  138. "testmem://test/test.csv",
  139. engine="pyarrow",
  140. compression=None,
  141. storage_options={"test": "parquet_write"},
  142. )
  143. assert fsspectest.test[0] == "parquet_write"
  144. read_parquet(
  145. "testmem://test/test.csv",
  146. engine="pyarrow",
  147. storage_options={"test": "parquet_read"},
  148. )
  149. assert fsspectest.test[0] == "parquet_read"
  150. @td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) fastparquet
  151. def test_fastparquet_options(fsspectest):
  152. """Regression test for writing to a not-yet-existent GCS Parquet file."""
  153. pytest.importorskip("fastparquet")
  154. df = DataFrame({"a": [0]})
  155. df.to_parquet(
  156. "testmem://test/test.csv",
  157. engine="fastparquet",
  158. compression=None,
  159. storage_options={"test": "parquet_write"},
  160. )
  161. assert fsspectest.test[0] == "parquet_write"
  162. read_parquet(
  163. "testmem://test/test.csv",
  164. engine="fastparquet",
  165. storage_options={"test": "parquet_read"},
  166. )
  167. assert fsspectest.test[0] == "parquet_read"
  168. @pytest.mark.single_cpu
  169. def test_from_s3_csv(s3_public_bucket_with_data, tips_file, s3so):
  170. pytest.importorskip("s3fs")
  171. tm.assert_equal(
  172. read_csv(
  173. f"s3://{s3_public_bucket_with_data.name}/tips.csv", storage_options=s3so
  174. ),
  175. read_csv(tips_file),
  176. )
  177. # the following are decompressed by pandas, not fsspec
  178. tm.assert_equal(
  179. read_csv(
  180. f"s3://{s3_public_bucket_with_data.name}/tips.csv.gz", storage_options=s3so
  181. ),
  182. read_csv(tips_file),
  183. )
  184. tm.assert_equal(
  185. read_csv(
  186. f"s3://{s3_public_bucket_with_data.name}/tips.csv.bz2", storage_options=s3so
  187. ),
  188. read_csv(tips_file),
  189. )
  190. @pytest.mark.single_cpu
  191. @pytest.mark.parametrize("protocol", ["s3", "s3a", "s3n"])
  192. def test_s3_protocols(s3_public_bucket_with_data, tips_file, protocol, s3so):
  193. pytest.importorskip("s3fs")
  194. tm.assert_equal(
  195. read_csv(
  196. f"{protocol}://{s3_public_bucket_with_data.name}/tips.csv",
  197. storage_options=s3so,
  198. ),
  199. read_csv(tips_file),
  200. )
  201. @pytest.mark.xfail(using_string_dtype(), reason="TODO(infer_string) fastparquet")
  202. @pytest.mark.single_cpu
  203. @td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) fastparquet
  204. def test_s3_parquet(s3_public_bucket, s3so, df1):
  205. pytest.importorskip("fastparquet")
  206. pytest.importorskip("s3fs")
  207. fn = f"s3://{s3_public_bucket.name}/test.parquet"
  208. df1.to_parquet(
  209. fn, index=False, engine="fastparquet", compression=None, storage_options=s3so
  210. )
  211. df2 = read_parquet(fn, engine="fastparquet", storage_options=s3so)
  212. tm.assert_equal(df1, df2)
  213. @td.skip_if_installed("fsspec")
  214. def test_not_present_exception():
  215. msg = "Missing optional dependency 'fsspec'|fsspec library is required"
  216. with pytest.raises(ImportError, match=msg):
  217. read_csv("memory://test/test.csv")
  218. def test_feather_options(fsspectest):
  219. pytest.importorskip("pyarrow")
  220. df = DataFrame({"a": [0]})
  221. df.to_feather("testmem://mockfile", storage_options={"test": "feather_write"})
  222. assert fsspectest.test[0] == "feather_write"
  223. out = read_feather("testmem://mockfile", storage_options={"test": "feather_read"})
  224. assert fsspectest.test[0] == "feather_read"
  225. tm.assert_frame_equal(df, out)
  226. def test_pickle_options(fsspectest):
  227. df = DataFrame({"a": [0]})
  228. df.to_pickle("testmem://mockfile", storage_options={"test": "pickle_write"})
  229. assert fsspectest.test[0] == "pickle_write"
  230. out = read_pickle("testmem://mockfile", storage_options={"test": "pickle_read"})
  231. assert fsspectest.test[0] == "pickle_read"
  232. tm.assert_frame_equal(df, out)
  233. def test_json_options(fsspectest, compression):
  234. df = DataFrame({"a": [0]})
  235. df.to_json(
  236. "testmem://mockfile",
  237. compression=compression,
  238. storage_options={"test": "json_write"},
  239. )
  240. assert fsspectest.test[0] == "json_write"
  241. out = read_json(
  242. "testmem://mockfile",
  243. compression=compression,
  244. storage_options={"test": "json_read"},
  245. )
  246. assert fsspectest.test[0] == "json_read"
  247. tm.assert_frame_equal(df, out)
  248. def test_stata_options(fsspectest):
  249. df = DataFrame({"a": [0]})
  250. df.to_stata(
  251. "testmem://mockfile", storage_options={"test": "stata_write"}, write_index=False
  252. )
  253. assert fsspectest.test[0] == "stata_write"
  254. out = read_stata("testmem://mockfile", storage_options={"test": "stata_read"})
  255. assert fsspectest.test[0] == "stata_read"
  256. tm.assert_frame_equal(df, out.astype("int64"))
  257. def test_markdown_options(fsspectest):
  258. pytest.importorskip("tabulate")
  259. df = DataFrame({"a": [0]})
  260. df.to_markdown("testmem://mockfile", storage_options={"test": "md_write"})
  261. assert fsspectest.test[0] == "md_write"
  262. assert fsspectest.cat("testmem://mockfile")
  263. def test_non_fsspec_options():
  264. pytest.importorskip("pyarrow")
  265. with pytest.raises(ValueError, match="storage_options"):
  266. read_csv("localfile", storage_options={"a": True})
  267. with pytest.raises(ValueError, match="storage_options"):
  268. # separate test for parquet, which has a different code path
  269. read_parquet("localfile", storage_options={"a": True})
  270. by = io.BytesIO()
  271. with pytest.raises(ValueError, match="storage_options"):
  272. read_csv(by, storage_options={"a": True})
  273. df = DataFrame({"a": [0]})
  274. with pytest.raises(ValueError, match="storage_options"):
  275. df.to_parquet("nonfsspecpath", storage_options={"a": True})