data_meta_manager.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. # Copyright (c) Alibaba, Inc. and its affiliates.
  2. import os
  3. import shutil
  4. from collections import defaultdict
  5. import json
  6. from datasets.utils.filelock import FileLock
  7. from modelscope.hub.api import HubApi
  8. from modelscope.msdatasets.context.dataset_context_config import \
  9. DatasetContextConfig
  10. from modelscope.msdatasets.meta.data_meta_config import DataMetaConfig
  11. from modelscope.msdatasets.utils.dataset_utils import (
  12. get_dataset_files, get_target_dataset_structure)
  13. from modelscope.utils.constant import (REPO_TYPE_DATASET, DatasetFormations,
  14. DatasetPathName, DownloadMode)
  15. class DataMetaManager(object):
  16. """Data-meta manager."""
  17. def __init__(self, dataset_context_config: DatasetContextConfig):
  18. self.dataset_context_config = dataset_context_config
  19. self.api = HubApi()
  20. def fetch_meta_files(self) -> None:
  21. # Init meta infos
  22. dataset_name = self.dataset_context_config.dataset_name
  23. namespace = self.dataset_context_config.namespace
  24. download_mode = self.dataset_context_config.download_mode
  25. version = self.dataset_context_config.version
  26. cache_root_dir = self.dataset_context_config.cache_root_dir
  27. subset_name = self.dataset_context_config.subset_name
  28. split = self.dataset_context_config.split
  29. dataset_version_cache_root_dir = os.path.join(cache_root_dir,
  30. namespace, dataset_name,
  31. version)
  32. meta_cache_dir = os.path.join(dataset_version_cache_root_dir,
  33. DatasetPathName.META_NAME)
  34. data_meta_config = self.dataset_context_config.data_meta_config or DataMetaConfig(
  35. )
  36. # Get lock file path
  37. if not subset_name:
  38. lock_subset_name = DatasetPathName.LOCK_FILE_NAME_ANY
  39. else:
  40. lock_subset_name = subset_name
  41. if not split:
  42. lock_split = DatasetPathName.LOCK_FILE_NAME_ANY
  43. else:
  44. lock_split = split
  45. lock_file_name = f'{DatasetPathName.META_NAME}{DatasetPathName.LOCK_FILE_NAME_DELIMITER}{dataset_name}' \
  46. f'{DatasetPathName.LOCK_FILE_NAME_DELIMITER}{version}' \
  47. f'{DatasetPathName.LOCK_FILE_NAME_DELIMITER}' \
  48. f'{lock_subset_name}{DatasetPathName.LOCK_FILE_NAME_DELIMITER}{lock_split}.lock'
  49. lock_file_path = os.path.join(dataset_version_cache_root_dir,
  50. lock_file_name)
  51. os.makedirs(dataset_version_cache_root_dir, exist_ok=True)
  52. # Fetch meta from cache or hub if reuse dataset
  53. if download_mode == DownloadMode.REUSE_DATASET_IF_EXISTS:
  54. if os.path.exists(meta_cache_dir) and os.listdir(meta_cache_dir):
  55. dataset_scripts, dataset_formation = self._fetch_meta_from_cache(
  56. meta_cache_dir)
  57. else:
  58. # Fetch meta-files from modelscope-hub if cache does not exist
  59. with FileLock(lock_file=lock_file_path):
  60. os.makedirs(meta_cache_dir, exist_ok=True)
  61. dataset_scripts, dataset_formation = self._fetch_meta_from_hub(
  62. dataset_name, namespace, version, meta_cache_dir)
  63. # Fetch meta from hub if force download
  64. elif download_mode == DownloadMode.FORCE_REDOWNLOAD:
  65. # Clean meta-files
  66. if os.path.exists(meta_cache_dir) and os.listdir(meta_cache_dir):
  67. shutil.rmtree(meta_cache_dir, ignore_errors=True)
  68. # Re-download meta-files
  69. with FileLock(lock_file=lock_file_path):
  70. os.makedirs(meta_cache_dir, exist_ok=True)
  71. dataset_scripts, dataset_formation = self._fetch_meta_from_hub(
  72. dataset_name, namespace, version, meta_cache_dir)
  73. else:
  74. raise ValueError(
  75. f'Expected values of download_mode: '
  76. f'{DownloadMode.REUSE_DATASET_IF_EXISTS.value} or '
  77. f'{DownloadMode.FORCE_REDOWNLOAD.value}, but got {download_mode} .'
  78. )
  79. # Set data_meta_config
  80. data_meta_config.meta_cache_dir = meta_cache_dir
  81. data_meta_config.dataset_scripts = dataset_scripts
  82. data_meta_config.dataset_formation = dataset_formation
  83. if '.py' in dataset_scripts:
  84. tmp_py_scripts = dataset_scripts['.py']
  85. if len(tmp_py_scripts) > 0:
  86. data_meta_config.dataset_py_script = tmp_py_scripts[0]
  87. # Set dataset_context_config
  88. self.dataset_context_config.data_meta_config = data_meta_config
  89. self.dataset_context_config.dataset_version_cache_root_dir = dataset_version_cache_root_dir
  90. self.dataset_context_config.global_meta_lock_file_path = lock_file_path
  91. def parse_dataset_structure(self):
  92. # Get dataset_name.json
  93. dataset_name = self.dataset_context_config.dataset_name
  94. subset_name = self.dataset_context_config.subset_name
  95. split = self.dataset_context_config.split
  96. namespace = self.dataset_context_config.namespace
  97. version = self.dataset_context_config.version
  98. data_meta_config = self.dataset_context_config.data_meta_config or DataMetaConfig(
  99. )
  100. dataset_json = None
  101. dataset_py_script = None
  102. dataset_scripts = data_meta_config.dataset_scripts
  103. if not dataset_scripts or len(dataset_scripts) == 0:
  104. raise FileNotFoundError(
  105. 'Cannot find dataset meta-files, please fetch meta from modelscope hub.'
  106. )
  107. if '.py' in dataset_scripts:
  108. dataset_py_script = dataset_scripts['.py'][0]
  109. for json_path in dataset_scripts['.json']:
  110. if json_path.endswith(f'{dataset_name}.json'):
  111. with open(json_path, encoding='utf-8') as dataset_json_file:
  112. dataset_json = json.load(dataset_json_file)
  113. break
  114. if not dataset_json and not dataset_py_script:
  115. raise FileNotFoundError(
  116. f'File {dataset_name}.json and {dataset_name}.py not found,'
  117. 'please specify at least one meta-file.')
  118. # Parse meta and get dataset structure
  119. if dataset_py_script:
  120. data_meta_config.dataset_py_script = dataset_py_script
  121. else:
  122. target_subset_name, target_dataset_structure = get_target_dataset_structure(
  123. dataset_json, subset_name, split)
  124. meta_map, file_map, args_map, type_map = get_dataset_files(
  125. target_dataset_structure, dataset_name, namespace,
  126. self.dataset_context_config, version)
  127. data_meta_config.meta_data_files = meta_map
  128. data_meta_config.zip_data_files = file_map
  129. data_meta_config.meta_args_map = args_map
  130. data_meta_config.meta_type_map = type_map
  131. data_meta_config.target_dataset_structure = target_dataset_structure
  132. self.dataset_context_config.data_meta_config = data_meta_config
  133. def fetch_virgo_meta(self) -> None:
  134. virgo_dataset_id = self.dataset_context_config.dataset_name
  135. version = int(self.dataset_context_config.version)
  136. meta_content = self.api.get_virgo_meta(
  137. dataset_id=virgo_dataset_id, version=version)
  138. self.dataset_context_config.config_kwargs.update(meta_content)
  139. def _fetch_meta_from_cache(self, meta_cache_dir):
  140. local_paths = defaultdict(list)
  141. dataset_type = None
  142. for meta_file_name in os.listdir(meta_cache_dir):
  143. file_ext = os.path.splitext(meta_file_name)[-1]
  144. if file_ext == DatasetFormations.formation_mark_ext.value:
  145. dataset_type = int(os.path.splitext(meta_file_name)[0])
  146. continue
  147. local_paths[file_ext].append(
  148. os.path.join(meta_cache_dir, meta_file_name))
  149. if not dataset_type:
  150. raise FileNotFoundError(
  151. f'{DatasetFormations.formation_mark_ext.value} file does not exist, '
  152. f'please use {DownloadMode.FORCE_REDOWNLOAD.value} .')
  153. return local_paths, DatasetFormations(dataset_type)
  154. def _fetch_meta_from_hub(self, dataset_name: str, namespace: str,
  155. revision: str, meta_cache_dir: str):
  156. _api = HubApi()
  157. endpoint = _api.get_endpoint_for_read(
  158. repo_id=namespace + '/' + dataset_name,
  159. repo_type=REPO_TYPE_DATASET)
  160. # Fetch id and type of dataset
  161. dataset_id, dataset_type = self.api.get_dataset_id_and_type(
  162. dataset_name, namespace, endpoint)
  163. # Fetch meta file-list of dataset
  164. file_list = self.api.get_dataset_meta_file_list(
  165. dataset_name, namespace, dataset_id, revision)
  166. # Fetch urls of meta-files
  167. local_paths, dataset_formation = self.api.get_dataset_meta_files_local_paths(
  168. dataset_name, namespace, revision, meta_cache_dir, dataset_type,
  169. file_list)
  170. return local_paths, dataset_formation