maxcompute_utils.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # Copyright (c) Alibaba, Inc. and its affiliates.
  2. import math
  3. import pandas as pd
  4. class MaxComputeUtil:
  5. """
  6. MaxCompute util class.
  7. Args:
  8. access_id: your access id of MaxCompute
  9. access_key: access key of MaxCompute
  10. project_name: your project name of MaxCompute
  11. endpoint: endpoint of MaxCompute
  12. Attributes:
  13. _odps: ODPS object
  14. """
  15. def __init__(self, access_id, access_key, project_name, endpoint):
  16. from odps import ODPS
  17. self._odps = ODPS(access_id, access_key, project_name, endpoint)
  18. def _get_table(self, table_name):
  19. """
  20. Get MaxCompute table object.
  21. """
  22. return self._odps.get_table(table_name)
  23. def _read_data(self, table_name: str, pt_condition: str) -> pd.DataFrame:
  24. """
  25. Read data from MaxCompute table.
  26. :param table_name: table name
  27. :param pt_condition: partition condition,
  28. Example: pt_condition = 'dt=20230331'
  29. :return: pandas dataframe with all data
  30. """
  31. t = self._get_table(table_name)
  32. with t.open_reader(partition=pt_condition, limit=False) as reader:
  33. pd_df = reader.to_pandas()
  34. return pd_df
  35. def fetch_data_to_csv(self, table_name: str, pt_condition: str,
  36. output_path: str) -> None:
  37. """
  38. Fetch data from MaxCompute table to local file.
  39. :param table_name: table name
  40. :param pt_condition: partition condition,
  41. Example: pt_condition = 'dt=20230331'
  42. :param output_path: output path
  43. :return: None
  44. """
  45. pd_df = self._read_data(table_name, pt_condition)
  46. pd_df.to_csv(output_path, index=False)
  47. print(f'Fetch data to {output_path} successfully.')
  48. @staticmethod
  49. def _check_batch_args(reader, batch_size, limit):
  50. if not limit:
  51. limit = reader.count
  52. if batch_size <= 0:
  53. raise ValueError(
  54. f'batch_size must be positive, but got {batch_size}')
  55. if batch_size > limit:
  56. batch_size = limit
  57. return batch_size, limit
  58. @staticmethod
  59. def gen_reader_batch(reader, batch_size_in: int, limit_in: int,
  60. drop_last_in: bool, partitions: list, columns: list):
  61. """
  62. Generate batch data from MaxCompute table.
  63. Args:
  64. reader: MaxCompute table reader
  65. batch_size_in: batch size
  66. limit_in: limit of data, None means fetch all data
  67. drop_last_in: whether drop last incomplete batch data
  68. partitions: table partitions
  69. columns: table columns
  70. Returns:
  71. batch data generator
  72. """
  73. batch_size_in, limit_in = MaxComputeUtil._check_batch_args(
  74. reader, batch_size_in, limit_in)
  75. batch_num = math.floor(limit_in / batch_size_in)
  76. for i in range(batch_num + 1):
  77. if i == batch_num and not drop_last_in and limit_in % batch_size_in > 0:
  78. batch_records = reader[i * batch_size_in:(
  79. i * batch_size_in + (limit_in % batch_size_in))]
  80. else:
  81. batch_records = reader[i * batch_size_in:(i + 1)
  82. * batch_size_in]
  83. batch_data_list = []
  84. for record in batch_records:
  85. tmp_vals = [val for _, val in list(record)]
  86. tmp_vals = tmp_vals[:(len(tmp_vals) - len(partitions))]
  87. batch_data_list.append(tmp_vals)
  88. yield pd.DataFrame(batch_data_list, columns=columns)
  89. @staticmethod
  90. def gen_reader_item(reader, index: int, batch_size_in: int, limit_in: int,
  91. drop_last_in: bool, partitions: list, columns: list):
  92. """
  93. Get single batch data from MaxCompute table by indexing.
  94. Args:
  95. reader: MaxCompute table reader
  96. index: index of batch data
  97. batch_size_in: batch size
  98. limit_in: limit of data, None means fetch all data
  99. drop_last_in: whether drop last incomplete batch data
  100. partitions: table partitions
  101. columns: table columns
  102. Returns:
  103. single batch data (dataframe)
  104. """
  105. batch_size_in, limit_in = MaxComputeUtil._check_batch_args(
  106. reader, batch_size_in, limit_in)
  107. if drop_last_in:
  108. batch_num = math.floor(limit_in / batch_size_in)
  109. else:
  110. batch_num = math.ceil(limit_in / batch_size_in)
  111. if index < 0:
  112. raise ValueError(f'index must be non-negative, but got {index}')
  113. if index >= batch_num:
  114. raise ValueError(
  115. f'index must be less than batch_num, but got index={index}, batch_num={batch_num}'
  116. )
  117. start = index * batch_size_in
  118. end = (index + 1) * batch_size_in
  119. if end > limit_in:
  120. end = limit_in
  121. batch_item = reader[start:end]
  122. batch_data_list = []
  123. for record in batch_item:
  124. tmp_vals = [val for _, val in list(record)]
  125. tmp_vals = tmp_vals[:(len(tmp_vals) - len(partitions))]
  126. batch_data_list.append(tmp_vals)
  127. return pd.DataFrame(batch_data_list, columns=columns)
  128. def get_table_reader_ins(self, table_name: str, pt_condition: str = None):
  129. table_ins = self._get_table(table_name)
  130. with table_ins.open_reader(partition=pt_condition) as reader:
  131. return table_ins, reader