_pyxlsb.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. # pyright: reportMissingImports=false
  2. from __future__ import annotations
  3. from typing import TYPE_CHECKING
  4. from pandas.compat._optional import import_optional_dependency
  5. from pandas.util._decorators import doc
  6. from pandas.core.shared_docs import _shared_docs
  7. from pandas.io.excel._base import BaseExcelReader
  8. if TYPE_CHECKING:
  9. from pyxlsb import Workbook
  10. from pandas._typing import (
  11. FilePath,
  12. ReadBuffer,
  13. Scalar,
  14. StorageOptions,
  15. )
  16. class PyxlsbReader(BaseExcelReader["Workbook"]):
  17. @doc(storage_options=_shared_docs["storage_options"])
  18. def __init__(
  19. self,
  20. filepath_or_buffer: FilePath | ReadBuffer[bytes],
  21. storage_options: StorageOptions | None = None,
  22. engine_kwargs: dict | None = None,
  23. ) -> None:
  24. """
  25. Reader using pyxlsb engine.
  26. Parameters
  27. ----------
  28. filepath_or_buffer : str, path object, or Workbook
  29. Object to be parsed.
  30. {storage_options}
  31. engine_kwargs : dict, optional
  32. Arbitrary keyword arguments passed to excel engine.
  33. """
  34. import_optional_dependency("pyxlsb")
  35. # This will call load_workbook on the filepath or buffer
  36. # And set the result to the book-attribute
  37. super().__init__(
  38. filepath_or_buffer,
  39. storage_options=storage_options,
  40. engine_kwargs=engine_kwargs,
  41. )
  42. @property
  43. def _workbook_class(self) -> type[Workbook]:
  44. from pyxlsb import Workbook
  45. return Workbook
  46. def load_workbook(
  47. self, filepath_or_buffer: FilePath | ReadBuffer[bytes], engine_kwargs
  48. ) -> Workbook:
  49. from pyxlsb import open_workbook
  50. # TODO: hack in buffer capability
  51. # This might need some modifications to the Pyxlsb library
  52. # Actual work for opening it is in xlsbpackage.py, line 20-ish
  53. return open_workbook(filepath_or_buffer, **engine_kwargs)
  54. @property
  55. def sheet_names(self) -> list[str]:
  56. return self.book.sheets
  57. def get_sheet_by_name(self, name: str):
  58. self.raise_if_bad_sheet_by_name(name)
  59. return self.book.get_sheet(name)
  60. def get_sheet_by_index(self, index: int):
  61. self.raise_if_bad_sheet_by_index(index)
  62. # pyxlsb sheets are indexed from 1 onwards
  63. # There's a fix for this in the source, but the pypi package doesn't have it
  64. return self.book.get_sheet(index + 1)
  65. def _convert_cell(self, cell) -> Scalar:
  66. # TODO: there is no way to distinguish between floats and datetimes in pyxlsb
  67. # This means that there is no way to read datetime types from an xlsb file yet
  68. if cell.v is None:
  69. return "" # Prevents non-named columns from not showing up as Unnamed: i
  70. if isinstance(cell.v, float):
  71. val = int(cell.v)
  72. if val == cell.v:
  73. return val
  74. else:
  75. return float(cell.v)
  76. return cell.v
  77. def get_sheet_data(
  78. self,
  79. sheet,
  80. file_rows_needed: int | None = None,
  81. ) -> list[list[Scalar]]:
  82. data: list[list[Scalar]] = []
  83. previous_row_number = -1
  84. # When sparse=True the rows can have different lengths and empty rows are
  85. # not returned. The cells are namedtuples of row, col, value (r, c, v).
  86. for row in sheet.rows(sparse=True):
  87. row_number = row[0].r
  88. converted_row = [self._convert_cell(cell) for cell in row]
  89. while converted_row and converted_row[-1] == "":
  90. # trim trailing empty elements
  91. converted_row.pop()
  92. if converted_row:
  93. data.extend([[]] * (row_number - previous_row_number - 1))
  94. data.append(converted_row)
  95. previous_row_number = row_number
  96. if file_rows_needed is not None and len(data) >= file_rows_needed:
  97. break
  98. if data:
  99. # extend rows to max_width
  100. max_width = max(len(data_row) for data_row in data)
  101. if min(len(data_row) for data_row in data) < max_width:
  102. empty_cell: list[Scalar] = [""]
  103. data = [
  104. data_row + (max_width - len(data_row)) * empty_cell
  105. for data_row in data
  106. ]
  107. return data