base.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import abc
  15. import yaml
  16. from paddlex import create_pipeline
  17. from paddlex.inference import load_pipeline_config
  18. from paddlex.utils.config import AttrDict
  19. from paddlex.utils.deps import DependencyError
  20. from .._abstract import CLISubcommandExecutor
  21. from .._common_args import (
  22. add_common_cli_opts,
  23. parse_common_args,
  24. prepare_common_init_args,
  25. )
  26. _DEFAULT_ENABLE_HPI = None
  27. def _merge_dicts(d1, d2):
  28. res = d1.copy()
  29. for k, v in d2.items():
  30. if k in res and isinstance(res[k], dict) and isinstance(v, dict):
  31. res[k] = _merge_dicts(res[k], v)
  32. else:
  33. res[k] = v
  34. return res
  35. def _to_builtin(obj):
  36. if isinstance(obj, AttrDict):
  37. return {k: _to_builtin(v) for k, v in obj.items()}
  38. elif isinstance(obj, dict):
  39. return {k: _to_builtin(v) for k, v in obj.items()}
  40. elif isinstance(obj, list):
  41. return [_to_builtin(item) for item in obj]
  42. else:
  43. return obj
  44. class PaddleXPipelineWrapper(metaclass=abc.ABCMeta):
  45. def __init__(
  46. self,
  47. *,
  48. paddlex_config=None,
  49. **common_args,
  50. ):
  51. super().__init__()
  52. self._paddlex_config = paddlex_config
  53. self._common_args = parse_common_args(
  54. common_args, default_enable_hpi=_DEFAULT_ENABLE_HPI
  55. )
  56. self._merged_paddlex_config = self._get_merged_paddlex_config()
  57. self.paddlex_pipeline = self._create_paddlex_pipeline()
  58. @property
  59. @abc.abstractmethod
  60. def _paddlex_pipeline_name(self):
  61. raise NotImplementedError
  62. def export_paddlex_config_to_yaml(self, yaml_path):
  63. with open(yaml_path, "w", encoding="utf-8") as f:
  64. config = _to_builtin(self._merged_paddlex_config)
  65. yaml.safe_dump(config, f)
  66. def close(self):
  67. self.paddlex_pipeline.close()
  68. @classmethod
  69. @abc.abstractmethod
  70. def get_cli_subcommand_executor(cls):
  71. raise NotImplementedError
  72. def _get_paddlex_config_overrides(self):
  73. return {}
  74. def _get_merged_paddlex_config(self):
  75. if self._paddlex_config is None:
  76. config = load_pipeline_config(self._paddlex_pipeline_name)
  77. elif isinstance(self._paddlex_config, str):
  78. config = load_pipeline_config(self._paddlex_config)
  79. else:
  80. config = self._paddlex_config
  81. overrides = self._get_paddlex_config_overrides()
  82. return _merge_dicts(config, overrides)
  83. def _create_paddlex_pipeline(self):
  84. kwargs = prepare_common_init_args(None, self._common_args)
  85. try:
  86. return create_pipeline(config=self._merged_paddlex_config, **kwargs)
  87. except DependencyError as e:
  88. raise RuntimeError(
  89. "A dependency error occurred during pipeline creation. Please refer to the installation documentation to ensure all required dependencies are installed."
  90. ) from e
  91. class PipelineCLISubcommandExecutor(CLISubcommandExecutor):
  92. @property
  93. @abc.abstractmethod
  94. def subparser_name(self):
  95. raise NotImplementedError
  96. def add_subparser(self, subparsers):
  97. subparser = subparsers.add_parser(name=self.subparser_name)
  98. self._update_subparser(subparser)
  99. add_common_cli_opts(
  100. subparser,
  101. default_enable_hpi=_DEFAULT_ENABLE_HPI,
  102. allow_multiple_devices=True,
  103. )
  104. subparser.add_argument(
  105. "--paddlex_config",
  106. type=str,
  107. help="Path to PaddleX pipeline configuration file.",
  108. )
  109. return subparser
  110. @abc.abstractmethod
  111. def _update_subparser(self, subparser):
  112. raise NotImplementedError