test_gcs.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. from io import BytesIO
  2. import os
  3. import pathlib
  4. import tarfile
  5. import zipfile
  6. import numpy as np
  7. import pytest
  8. from pandas.compat.pyarrow import pa_version_under17p0
  9. from pandas import (
  10. DataFrame,
  11. Index,
  12. date_range,
  13. read_csv,
  14. read_excel,
  15. read_json,
  16. read_parquet,
  17. )
  18. import pandas._testing as tm
  19. from pandas.util import _test_decorators as td
  20. pytestmark = pytest.mark.filterwarnings(
  21. "ignore:Passing a BlockManager to DataFrame:DeprecationWarning"
  22. )
  23. @pytest.fixture
  24. def gcs_buffer():
  25. """Emulate GCS using a binary buffer."""
  26. pytest.importorskip("gcsfs")
  27. fsspec = pytest.importorskip("fsspec")
  28. gcs_buffer = BytesIO()
  29. gcs_buffer.close = lambda: True
  30. class MockGCSFileSystem(fsspec.AbstractFileSystem):
  31. @staticmethod
  32. def open(*args, **kwargs):
  33. gcs_buffer.seek(0)
  34. return gcs_buffer
  35. def ls(self, path, **kwargs):
  36. # needed for pyarrow
  37. return [{"name": path, "type": "file"}]
  38. # Overwrites the default implementation from gcsfs to our mock class
  39. fsspec.register_implementation("gs", MockGCSFileSystem, clobber=True)
  40. return gcs_buffer
  41. # Patches pyarrow; other processes should not pick up change
  42. @pytest.mark.single_cpu
  43. @pytest.mark.parametrize("format", ["csv", "json", "parquet", "excel", "markdown"])
  44. def test_to_read_gcs(gcs_buffer, format, monkeypatch, capsys, request):
  45. """
  46. Test that many to/read functions support GCS.
  47. GH 33987
  48. """
  49. df1 = DataFrame(
  50. {
  51. "int": [1, 3],
  52. "float": [2.0, np.nan],
  53. "str": ["t", "s"],
  54. "dt": date_range("2018-06-18", periods=2),
  55. }
  56. )
  57. path = f"gs://test/test.{format}"
  58. if format == "csv":
  59. df1.to_csv(path, index=True)
  60. df2 = read_csv(path, parse_dates=["dt"], index_col=0)
  61. elif format == "excel":
  62. path = "gs://test/test.xlsx"
  63. df1.to_excel(path)
  64. df2 = read_excel(path, parse_dates=["dt"], index_col=0)
  65. elif format == "json":
  66. df1.to_json(path)
  67. df2 = read_json(path, convert_dates=["dt"])
  68. elif format == "parquet":
  69. pytest.importorskip("pyarrow")
  70. pa_fs = pytest.importorskip("pyarrow.fs")
  71. class MockFileSystem(pa_fs.FileSystem):
  72. @staticmethod
  73. def from_uri(path):
  74. print("Using pyarrow filesystem")
  75. to_local = pathlib.Path(path.replace("gs://", "")).absolute().as_uri()
  76. return pa_fs.LocalFileSystem(to_local)
  77. request.applymarker(
  78. pytest.mark.xfail(
  79. not pa_version_under17p0,
  80. raises=TypeError,
  81. reason="pyarrow 17 broke the mocked filesystem",
  82. )
  83. )
  84. with monkeypatch.context() as m:
  85. m.setattr(pa_fs, "FileSystem", MockFileSystem)
  86. df1.to_parquet(path)
  87. df2 = read_parquet(path)
  88. captured = capsys.readouterr()
  89. assert captured.out == "Using pyarrow filesystem\nUsing pyarrow filesystem\n"
  90. elif format == "markdown":
  91. pytest.importorskip("tabulate")
  92. df1.to_markdown(path)
  93. df2 = df1
  94. tm.assert_frame_equal(df1, df2)
  95. def assert_equal_zip_safe(result: bytes, expected: bytes, compression: str):
  96. """
  97. For zip compression, only compare the CRC-32 checksum of the file contents
  98. to avoid checking the time-dependent last-modified timestamp which
  99. in some CI builds is off-by-one
  100. See https://en.wikipedia.org/wiki/ZIP_(file_format)#File_headers
  101. """
  102. if compression == "zip":
  103. # Only compare the CRC checksum of the file contents
  104. with zipfile.ZipFile(BytesIO(result)) as exp, zipfile.ZipFile(
  105. BytesIO(expected)
  106. ) as res:
  107. for res_info, exp_info in zip(res.infolist(), exp.infolist()):
  108. assert res_info.CRC == exp_info.CRC
  109. elif compression == "tar":
  110. with tarfile.open(fileobj=BytesIO(result)) as tar_exp, tarfile.open(
  111. fileobj=BytesIO(expected)
  112. ) as tar_res:
  113. for tar_res_info, tar_exp_info in zip(
  114. tar_res.getmembers(), tar_exp.getmembers()
  115. ):
  116. actual_file = tar_res.extractfile(tar_res_info)
  117. expected_file = tar_exp.extractfile(tar_exp_info)
  118. assert (actual_file is None) == (expected_file is None)
  119. if actual_file is not None and expected_file is not None:
  120. assert actual_file.read() == expected_file.read()
  121. else:
  122. assert result == expected
  123. @pytest.mark.parametrize("encoding", ["utf-8", "cp1251"])
  124. def test_to_csv_compression_encoding_gcs(
  125. gcs_buffer, compression_only, encoding, compression_to_extension
  126. ):
  127. """
  128. Compression and encoding should with GCS.
  129. GH 35677 (to_csv, compression), GH 26124 (to_csv, encoding), and
  130. GH 32392 (read_csv, encoding)
  131. """
  132. df = DataFrame(
  133. 1.1 * np.arange(120).reshape((30, 4)),
  134. columns=Index(list("ABCD")),
  135. index=Index([f"i-{i}" for i in range(30)]),
  136. )
  137. # reference of compressed and encoded file
  138. compression = {"method": compression_only}
  139. if compression_only == "gzip":
  140. compression["mtime"] = 1 # be reproducible
  141. buffer = BytesIO()
  142. df.to_csv(buffer, compression=compression, encoding=encoding, mode="wb")
  143. # write compressed file with explicit compression
  144. path_gcs = "gs://test/test.csv"
  145. df.to_csv(path_gcs, compression=compression, encoding=encoding)
  146. res = gcs_buffer.getvalue()
  147. expected = buffer.getvalue()
  148. assert_equal_zip_safe(res, expected, compression_only)
  149. read_df = read_csv(
  150. path_gcs, index_col=0, compression=compression_only, encoding=encoding
  151. )
  152. tm.assert_frame_equal(df, read_df)
  153. # write compressed file with implicit compression
  154. file_ext = compression_to_extension[compression_only]
  155. compression["method"] = "infer"
  156. path_gcs += f".{file_ext}"
  157. df.to_csv(path_gcs, compression=compression, encoding=encoding)
  158. res = gcs_buffer.getvalue()
  159. expected = buffer.getvalue()
  160. assert_equal_zip_safe(res, expected, compression_only)
  161. read_df = read_csv(path_gcs, index_col=0, compression="infer", encoding=encoding)
  162. tm.assert_frame_equal(df, read_df)
  163. def test_to_parquet_gcs_new_file(monkeypatch, tmpdir):
  164. """Regression test for writing to a not-yet-existent GCS Parquet file."""
  165. pytest.importorskip("fastparquet")
  166. pytest.importorskip("gcsfs")
  167. from fsspec import AbstractFileSystem
  168. df1 = DataFrame(
  169. {
  170. "int": [1, 3],
  171. "float": [2.0, np.nan],
  172. "str": ["t", "s"],
  173. "dt": date_range("2018-06-18", periods=2),
  174. }
  175. )
  176. class MockGCSFileSystem(AbstractFileSystem):
  177. def open(self, path, mode="r", *args):
  178. if "w" not in mode:
  179. raise FileNotFoundError
  180. return open(os.path.join(tmpdir, "test.parquet"), mode, encoding="utf-8")
  181. monkeypatch.setattr("gcsfs.GCSFileSystem", MockGCSFileSystem)
  182. df1.to_parquet(
  183. "gs://test/test.csv", index=True, engine="fastparquet", compression=None
  184. )
  185. @td.skip_if_installed("gcsfs")
  186. def test_gcs_not_present_exception():
  187. with tm.external_error_raised(ImportError):
  188. read_csv("gs://test/test.csv")