bls_client.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. # Copyright (c) 2014 Baidu.com, Inc. All Rights Reserved
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
  4. # except in compliance with the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the
  9. # License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  10. # either express or implied. See the License for the specific language governing permissions
  11. # and limitations under the License.
  12. """
  13. This module provides a client class for BCM.
  14. """
  15. import copy
  16. import json
  17. import sys
  18. import uuid
  19. from baidubce import bce_base_client, compat
  20. from baidubce.auth import bce_v1_signer
  21. from baidubce.http import handler, bce_http_client, http_methods
  22. from baidubce.services.bls import bls_handler
  23. if sys.version_info[0] == 2:
  24. value_type = (str, unicode)
  25. else:
  26. value_type = (str, bytes)
  27. MAX_BATCH_RECORD_NUMBER = 1000
  28. DEFAULT_BATCH_RECORD_NUMBER = 100
  29. DEFAULT_SORT = "desc"
  30. class BlsClient(bce_base_client.BceBaseClient):
  31. """
  32. BLS base sdk client
  33. """
  34. log_prefix = b'/logstore'
  35. version = b'/v1'
  36. version_v2 = b'/v2'
  37. version_v3 = b'/v3'
  38. content_type_header_key = b"content-type"
  39. content_type_header_value = b"application/json;charset=UTF-8"
  40. request_id_header_key = b"x-bce-request-id"
  41. def __init__(self, config=None):
  42. bce_base_client.BceBaseClient.__init__(self, config)
  43. def _merge_config(self, config=None):
  44. if config is None:
  45. return self.config
  46. else:
  47. new_config = copy.copy(self.config)
  48. new_config.merge_non_none_values(config)
  49. return new_config
  50. def _send_request(self, http_method, path, version=b'/v1',
  51. body=None, headers=None, params=None, config=None, body_parser=None):
  52. config = self._merge_config(config)
  53. if body_parser is None:
  54. body_parser = handler.parse_json
  55. if headers is None:
  56. headers = {}
  57. if self.content_type_header_key not in headers:
  58. headers[self.content_type_header_key] = self.content_type_header_value
  59. if self.request_id_header_key not in headers:
  60. headers[self.request_id_header_key] = uuid.uuid4()
  61. return bce_http_client.send_request(
  62. config, bce_v1_signer.sign, [bls_handler.parse_error, body_parser],
  63. http_method, version + BlsClient.log_prefix + path, body, headers, params)
  64. def pull_log_records(self, log_store_name, start_time, end_time, log_stream_name, project=None,
  65. limit=None, marker=None, config=None):
  66. """
  67. Pull log records from specified log store.
  68. :param config:
  69. :param marker:
  70. :param limit:
  71. :param project:
  72. :param log_stream_name:
  73. :param log_store_name: The name of log store which will be pulled.
  74. :type log_store_name: string
  75. :param start_time: Start time of pulling log records.
  76. :type start_time: string
  77. :param end_time: End time of pulling log records.
  78. :type end_time: string
  79. :return: A list of log records.
  80. :rtype: list
  81. """
  82. log_store_name = compat.convert_to_bytes(log_store_name)
  83. path = b'/%s/logrecord' % log_store_name
  84. params = {b'startDateTime': start_time, b'endDateTime': end_time, b'logStreamName': log_stream_name}
  85. if project is not None:
  86. params[b'project'] = project
  87. if limit is None:
  88. params[b'limit'] = DEFAULT_BATCH_RECORD_NUMBER
  89. if limit is not None:
  90. if limit > MAX_BATCH_RECORD_NUMBER:
  91. limit = MAX_BATCH_RECORD_NUMBER
  92. params[b'limit'] = limit
  93. if marker is not None:
  94. params[b'marker'] = marker
  95. return self._send_request(http_methods.GET, path, params=params, config=config)
  96. def push_log_records(self, log_store_name, log_stream_name, log_records, project=None, type=None, tags=None,
  97. config=None):
  98. """
  99. Push log records into the specified log stream in the given log store.
  100. :param log_store_name: The name of the log store which will receive the log records.
  101. :type log_store_name: str
  102. :param log_stream_name: The name of the log stream to write to.
  103. :type log_stream_name: str
  104. :param log_records: A list of log records to push.
  105. Each record should be an instance of `LogRecordModel`.
  106. Example:
  107. [
  108. LogRecordModel(message="test log", timestamp=1715231012000),
  109. LogRecordModel(message="another log", timestamp=1715231044000)
  110. ]
  111. :type log_records: List[LogRecordModel]
  112. :param project: (Optional) The project name to which the log store belongs.
  113. :type project: str or None
  114. :param type: (Optional) Type of logs, defaults to 'TEXT' if not specified.
  115. :type type: str or None
  116. :param tags: (Optional) A list of tags associated with the push.
  117. Each tag should be an instance of `TagModel`.
  118. Example:
  119. [
  120. TagModel("env", "prod"),
  121. TagModel("service", "auth")
  122. ]
  123. :type tags: List[TagModel] or None
  124. :param config: (Optional) Custom request config.
  125. :type config: baidubce.BceClientConfiguration or None
  126. :return: A dictionary containing the result of the push operation.
  127. :rtype: dict
  128. """
  129. log_store_name = compat.convert_to_bytes(log_store_name)
  130. path = b'/%s/logrecord' % log_store_name
  131. params = {}
  132. tags_list = [dict(tag) for tag in tags]
  133. type = type if type is not None else 'TEXT'
  134. records_payload = [dict(record) for record in log_records]
  135. if project is not None:
  136. params['project'] = project
  137. body = {
  138. "logStreamName": log_stream_name,
  139. "tags": tags_list,
  140. "type": type,
  141. "logRecords": records_payload
  142. }
  143. return self._send_request(http_methods.POST, path, params=params, body=json.dumps(body), config=config)
  144. def query_log_records(self, log_store_name, start_time, end_time, log_stream_name, project=None, query=None,
  145. sort=None, limit=None, marker=None, config=None):
  146. """
  147. query log records from specified log store.
  148. :param sort: desc or asc
  149. :type sort: string
  150. :param query: query statement, eg: match *
  151. :type query: string
  152. :param config:
  153. :param marker:
  154. :param limit:
  155. :param project:
  156. :param log_stream_name:
  157. :param log_store_name: The name of log store which will be pulled.
  158. :type log_store_name: string
  159. :param start_time: Start time of pulling log records.
  160. :type start_time: string
  161. :param end_time: End time of pulling log records.
  162. :type end_time: string
  163. :return: A list of log records.
  164. :rtype: list
  165. """
  166. log_store_name = compat.convert_to_bytes(log_store_name)
  167. path = b'/%s/logrecord' % log_store_name
  168. params = {b'startDateTime': start_time, b'endDateTime': end_time, b'logStreamName': log_stream_name}
  169. if project is not None:
  170. params[b'project'] = project
  171. if limit is None:
  172. params[b'limit'] = DEFAULT_BATCH_RECORD_NUMBER
  173. if limit is not None:
  174. if limit > MAX_BATCH_RECORD_NUMBER:
  175. limit = MAX_BATCH_RECORD_NUMBER
  176. params[b'limit'] = limit
  177. if marker is not None:
  178. params[b'marker'] = marker
  179. if query is not None:
  180. params[b'query'] = query
  181. if sort is not None:
  182. if sort == 'asc':
  183. params[b'sort'] = sort
  184. else:
  185. params[b'sort'] = DEFAULT_SORT
  186. return self._send_request(http_methods.GET, path, params=params, config=config)
  187. def pull_log_records_v3(self, log_store_name, start_time, end_time, log_stream_name, project=None, query=None,
  188. limit=None, marker=None, sort=None, config=None):
  189. """
  190. Pull log records v3 from specified log store.
  191. :param query: query statement, eg: match *
  192. :type query: string
  193. :param sort: desc or asc
  194. :type: sort: string
  195. :param config:
  196. :param marker:
  197. :param limit:
  198. :param project:
  199. :param log_stream_name:
  200. :param log_store_name: The name of log store which will be pulled.
  201. :type log_store_name: string
  202. :param start_time: Start time of pulling log records.
  203. :type start_time: string
  204. :param end_time: End time of pulling log records.
  205. :type end_time: string
  206. :return: A list of log records.
  207. :rtype: list
  208. """
  209. log_store_name = compat.convert_to_bytes(log_store_name)
  210. path = b'/%s/logrecord/pull' % log_store_name
  211. params = {b'startDateTime': start_time, b'endDateTime': end_time, b'logStreamName': log_stream_name}
  212. if project is not None:
  213. params[b'project'] = project
  214. if limit is None:
  215. params[b'limit'] = DEFAULT_BATCH_RECORD_NUMBER
  216. if limit is not None:
  217. if limit > MAX_BATCH_RECORD_NUMBER:
  218. limit = MAX_BATCH_RECORD_NUMBER
  219. params[b'limit'] = limit
  220. if marker is not None:
  221. params[b'marker'] = marker
  222. if sort is not None:
  223. if sort == 'asc':
  224. params[b'sort'] = sort
  225. else:
  226. params[b'sort'] = DEFAULT_SORT
  227. if query is not None:
  228. params[b'query'] = query
  229. return self._send_request(http_methods.GET, path, version=self.version_v3, params=params, config=config)