| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- # Copyright (c) Alibaba, Inc. and its affiliates.
- import math
- import pandas as pd
- class MaxComputeUtil:
- """
- MaxCompute util class.
- Args:
- access_id: your access id of MaxCompute
- access_key: access key of MaxCompute
- project_name: your project name of MaxCompute
- endpoint: endpoint of MaxCompute
- Attributes:
- _odps: ODPS object
- """
- def __init__(self, access_id, access_key, project_name, endpoint):
- from odps import ODPS
- self._odps = ODPS(access_id, access_key, project_name, endpoint)
- def _get_table(self, table_name):
- """
- Get MaxCompute table object.
- """
- return self._odps.get_table(table_name)
- def _read_data(self, table_name: str, pt_condition: str) -> pd.DataFrame:
- """
- Read data from MaxCompute table.
- :param table_name: table name
- :param pt_condition: partition condition,
- Example: pt_condition = 'dt=20230331'
- :return: pandas dataframe with all data
- """
- t = self._get_table(table_name)
- with t.open_reader(partition=pt_condition, limit=False) as reader:
- pd_df = reader.to_pandas()
- return pd_df
- def fetch_data_to_csv(self, table_name: str, pt_condition: str,
- output_path: str) -> None:
- """
- Fetch data from MaxCompute table to local file.
- :param table_name: table name
- :param pt_condition: partition condition,
- Example: pt_condition = 'dt=20230331'
- :param output_path: output path
- :return: None
- """
- pd_df = self._read_data(table_name, pt_condition)
- pd_df.to_csv(output_path, index=False)
- print(f'Fetch data to {output_path} successfully.')
- @staticmethod
- def _check_batch_args(reader, batch_size, limit):
- if not limit:
- limit = reader.count
- if batch_size <= 0:
- raise ValueError(
- f'batch_size must be positive, but got {batch_size}')
- if batch_size > limit:
- batch_size = limit
- return batch_size, limit
- @staticmethod
- def gen_reader_batch(reader, batch_size_in: int, limit_in: int,
- drop_last_in: bool, partitions: list, columns: list):
- """
- Generate batch data from MaxCompute table.
- Args:
- reader: MaxCompute table reader
- batch_size_in: batch size
- limit_in: limit of data, None means fetch all data
- drop_last_in: whether drop last incomplete batch data
- partitions: table partitions
- columns: table columns
- Returns:
- batch data generator
- """
- batch_size_in, limit_in = MaxComputeUtil._check_batch_args(
- reader, batch_size_in, limit_in)
- batch_num = math.floor(limit_in / batch_size_in)
- for i in range(batch_num + 1):
- if i == batch_num and not drop_last_in and limit_in % batch_size_in > 0:
- batch_records = reader[i * batch_size_in:(
- i * batch_size_in + (limit_in % batch_size_in))]
- else:
- batch_records = reader[i * batch_size_in:(i + 1)
- * batch_size_in]
- batch_data_list = []
- for record in batch_records:
- tmp_vals = [val for _, val in list(record)]
- tmp_vals = tmp_vals[:(len(tmp_vals) - len(partitions))]
- batch_data_list.append(tmp_vals)
- yield pd.DataFrame(batch_data_list, columns=columns)
- @staticmethod
- def gen_reader_item(reader, index: int, batch_size_in: int, limit_in: int,
- drop_last_in: bool, partitions: list, columns: list):
- """
- Get single batch data from MaxCompute table by indexing.
- Args:
- reader: MaxCompute table reader
- index: index of batch data
- batch_size_in: batch size
- limit_in: limit of data, None means fetch all data
- drop_last_in: whether drop last incomplete batch data
- partitions: table partitions
- columns: table columns
- Returns:
- single batch data (dataframe)
- """
- batch_size_in, limit_in = MaxComputeUtil._check_batch_args(
- reader, batch_size_in, limit_in)
- if drop_last_in:
- batch_num = math.floor(limit_in / batch_size_in)
- else:
- batch_num = math.ceil(limit_in / batch_size_in)
- if index < 0:
- raise ValueError(f'index must be non-negative, but got {index}')
- if index >= batch_num:
- raise ValueError(
- f'index must be less than batch_num, but got index={index}, batch_num={batch_num}'
- )
- start = index * batch_size_in
- end = (index + 1) * batch_size_in
- if end > limit_in:
- end = limit_in
- batch_item = reader[start:end]
- batch_data_list = []
- for record in batch_item:
- tmp_vals = [val for _, val in list(record)]
- tmp_vals = tmp_vals[:(len(tmp_vals) - len(partitions))]
- batch_data_list.append(tmp_vals)
- return pd.DataFrame(batch_data_list, columns=columns)
- def get_table_reader_ins(self, table_name: str, pt_condition: str = None):
- table_ins = self._get_table(table_name)
- with table_ins.open_reader(partition=pt_condition) as reader:
- return table_ins, reader
|