| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- # Copyright (c) Alibaba, Inc. and its affiliates.
- import os
- import shutil
- from collections import defaultdict
- import json
- from datasets.utils.filelock import FileLock
- from modelscope.hub.api import HubApi
- from modelscope.msdatasets.context.dataset_context_config import \
- DatasetContextConfig
- from modelscope.msdatasets.meta.data_meta_config import DataMetaConfig
- from modelscope.msdatasets.utils.dataset_utils import (
- get_dataset_files, get_target_dataset_structure)
- from modelscope.utils.constant import (REPO_TYPE_DATASET, DatasetFormations,
- DatasetPathName, DownloadMode)
- class DataMetaManager(object):
- """Data-meta manager."""
- def __init__(self, dataset_context_config: DatasetContextConfig):
- self.dataset_context_config = dataset_context_config
- self.api = HubApi()
- def fetch_meta_files(self) -> None:
- # Init meta infos
- dataset_name = self.dataset_context_config.dataset_name
- namespace = self.dataset_context_config.namespace
- download_mode = self.dataset_context_config.download_mode
- version = self.dataset_context_config.version
- cache_root_dir = self.dataset_context_config.cache_root_dir
- subset_name = self.dataset_context_config.subset_name
- split = self.dataset_context_config.split
- dataset_version_cache_root_dir = os.path.join(cache_root_dir,
- namespace, dataset_name,
- version)
- meta_cache_dir = os.path.join(dataset_version_cache_root_dir,
- DatasetPathName.META_NAME)
- data_meta_config = self.dataset_context_config.data_meta_config or DataMetaConfig(
- )
- # Get lock file path
- if not subset_name:
- lock_subset_name = DatasetPathName.LOCK_FILE_NAME_ANY
- else:
- lock_subset_name = subset_name
- if not split:
- lock_split = DatasetPathName.LOCK_FILE_NAME_ANY
- else:
- lock_split = split
- lock_file_name = f'{DatasetPathName.META_NAME}{DatasetPathName.LOCK_FILE_NAME_DELIMITER}{dataset_name}' \
- f'{DatasetPathName.LOCK_FILE_NAME_DELIMITER}{version}' \
- f'{DatasetPathName.LOCK_FILE_NAME_DELIMITER}' \
- f'{lock_subset_name}{DatasetPathName.LOCK_FILE_NAME_DELIMITER}{lock_split}.lock'
- lock_file_path = os.path.join(dataset_version_cache_root_dir,
- lock_file_name)
- os.makedirs(dataset_version_cache_root_dir, exist_ok=True)
- # Fetch meta from cache or hub if reuse dataset
- if download_mode == DownloadMode.REUSE_DATASET_IF_EXISTS:
- if os.path.exists(meta_cache_dir) and os.listdir(meta_cache_dir):
- dataset_scripts, dataset_formation = self._fetch_meta_from_cache(
- meta_cache_dir)
- else:
- # Fetch meta-files from modelscope-hub if cache does not exist
- with FileLock(lock_file=lock_file_path):
- os.makedirs(meta_cache_dir, exist_ok=True)
- dataset_scripts, dataset_formation = self._fetch_meta_from_hub(
- dataset_name, namespace, version, meta_cache_dir)
- # Fetch meta from hub if force download
- elif download_mode == DownloadMode.FORCE_REDOWNLOAD:
- # Clean meta-files
- if os.path.exists(meta_cache_dir) and os.listdir(meta_cache_dir):
- shutil.rmtree(meta_cache_dir, ignore_errors=True)
- # Re-download meta-files
- with FileLock(lock_file=lock_file_path):
- os.makedirs(meta_cache_dir, exist_ok=True)
- dataset_scripts, dataset_formation = self._fetch_meta_from_hub(
- dataset_name, namespace, version, meta_cache_dir)
- else:
- raise ValueError(
- f'Expected values of download_mode: '
- f'{DownloadMode.REUSE_DATASET_IF_EXISTS.value} or '
- f'{DownloadMode.FORCE_REDOWNLOAD.value}, but got {download_mode} .'
- )
- # Set data_meta_config
- data_meta_config.meta_cache_dir = meta_cache_dir
- data_meta_config.dataset_scripts = dataset_scripts
- data_meta_config.dataset_formation = dataset_formation
- if '.py' in dataset_scripts:
- tmp_py_scripts = dataset_scripts['.py']
- if len(tmp_py_scripts) > 0:
- data_meta_config.dataset_py_script = tmp_py_scripts[0]
- # Set dataset_context_config
- self.dataset_context_config.data_meta_config = data_meta_config
- self.dataset_context_config.dataset_version_cache_root_dir = dataset_version_cache_root_dir
- self.dataset_context_config.global_meta_lock_file_path = lock_file_path
- def parse_dataset_structure(self):
- # Get dataset_name.json
- dataset_name = self.dataset_context_config.dataset_name
- subset_name = self.dataset_context_config.subset_name
- split = self.dataset_context_config.split
- namespace = self.dataset_context_config.namespace
- version = self.dataset_context_config.version
- data_meta_config = self.dataset_context_config.data_meta_config or DataMetaConfig(
- )
- dataset_json = None
- dataset_py_script = None
- dataset_scripts = data_meta_config.dataset_scripts
- if not dataset_scripts or len(dataset_scripts) == 0:
- raise FileNotFoundError(
- 'Cannot find dataset meta-files, please fetch meta from modelscope hub.'
- )
- if '.py' in dataset_scripts:
- dataset_py_script = dataset_scripts['.py'][0]
- for json_path in dataset_scripts['.json']:
- if json_path.endswith(f'{dataset_name}.json'):
- with open(json_path, encoding='utf-8') as dataset_json_file:
- dataset_json = json.load(dataset_json_file)
- break
- if not dataset_json and not dataset_py_script:
- raise FileNotFoundError(
- f'File {dataset_name}.json and {dataset_name}.py not found,'
- 'please specify at least one meta-file.')
- # Parse meta and get dataset structure
- if dataset_py_script:
- data_meta_config.dataset_py_script = dataset_py_script
- else:
- target_subset_name, target_dataset_structure = get_target_dataset_structure(
- dataset_json, subset_name, split)
- meta_map, file_map, args_map, type_map = get_dataset_files(
- target_dataset_structure, dataset_name, namespace,
- self.dataset_context_config, version)
- data_meta_config.meta_data_files = meta_map
- data_meta_config.zip_data_files = file_map
- data_meta_config.meta_args_map = args_map
- data_meta_config.meta_type_map = type_map
- data_meta_config.target_dataset_structure = target_dataset_structure
- self.dataset_context_config.data_meta_config = data_meta_config
- def fetch_virgo_meta(self) -> None:
- virgo_dataset_id = self.dataset_context_config.dataset_name
- version = int(self.dataset_context_config.version)
- meta_content = self.api.get_virgo_meta(
- dataset_id=virgo_dataset_id, version=version)
- self.dataset_context_config.config_kwargs.update(meta_content)
- def _fetch_meta_from_cache(self, meta_cache_dir):
- local_paths = defaultdict(list)
- dataset_type = None
- for meta_file_name in os.listdir(meta_cache_dir):
- file_ext = os.path.splitext(meta_file_name)[-1]
- if file_ext == DatasetFormations.formation_mark_ext.value:
- dataset_type = int(os.path.splitext(meta_file_name)[0])
- continue
- local_paths[file_ext].append(
- os.path.join(meta_cache_dir, meta_file_name))
- if not dataset_type:
- raise FileNotFoundError(
- f'{DatasetFormations.formation_mark_ext.value} file does not exist, '
- f'please use {DownloadMode.FORCE_REDOWNLOAD.value} .')
- return local_paths, DatasetFormations(dataset_type)
- def _fetch_meta_from_hub(self, dataset_name: str, namespace: str,
- revision: str, meta_cache_dir: str):
- _api = HubApi()
- endpoint = _api.get_endpoint_for_read(
- repo_id=namespace + '/' + dataset_name,
- repo_type=REPO_TYPE_DATASET)
- # Fetch id and type of dataset
- dataset_id, dataset_type = self.api.get_dataset_id_and_type(
- dataset_name, namespace, endpoint)
- # Fetch meta file-list of dataset
- file_list = self.api.get_dataset_meta_file_list(
- dataset_name, namespace, dataset_id, revision)
- # Fetch urls of meta-files
- local_paths, dataset_formation = self.api.get_dataset_meta_files_local_paths(
- dataset_name, namespace, revision, meta_cache_dir, dataset_type,
- file_list)
- return local_paths, dataset_formation
|