tsdb_client.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. # Copyright 2014 Baidu, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
  4. # except in compliance with the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the
  9. # License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  10. # either express or implied. See the License for the specific language governing permissions
  11. # and limitations under the License.
  12. """
  13. This module provides a client class for TSDB.
  14. """
  15. import io
  16. import copy
  17. import json
  18. import logging
  19. import gzip
  20. from baidubce import bce_client_configuration
  21. from baidubce import utils
  22. from baidubce.auth import bce_v1_signer
  23. from baidubce.bce_base_client import BceBaseClient
  24. from baidubce.http import bce_http_client
  25. from baidubce.http import handler
  26. from baidubce.http import http_content_types
  27. from baidubce.http import http_headers
  28. from baidubce.http import http_methods
  29. from baidubce.services.tsdb import tsdb_handler
  30. _logger = logging.getLogger(__name__)
  31. class TsdbClient(BceBaseClient):
  32. """
  33. sdk client
  34. """
  35. def __init__(self, config, database=None):
  36. self.database = database
  37. BceBaseClient.__init__(self, config)
  38. def write_datapoints(self, datapoints, use_gzip=True):
  39. """
  40. write datapoints
  41. :param datapoints: a list of datapoint dict
  42. :type datapoints: list
  43. :param use_gzip: open gzip compress
  44. :type use_gzip: boolean
  45. """
  46. path = b'/v1/datapoint'
  47. body = json.dumps({"datapoints": datapoints}).encode('utf-8')
  48. headers={http_headers.CONTENT_TYPE: http_content_types.JSON}
  49. if use_gzip:
  50. body = self._gzip_compress(body)
  51. headers[http_headers.CONTENT_ENCODING] = b'gzip'
  52. return self._send_request(
  53. http_methods.POST,
  54. path=path,
  55. body=body,
  56. headers=headers,
  57. body_parser=tsdb_handler.parse_json
  58. )
  59. def get_metrics(self):
  60. """
  61. list metrics
  62. :return: a list of metric
  63. :rtype: baidubce.bce_response.BceResponse
  64. """
  65. path = b"/v1/metric"
  66. return self._send_request(http_methods.GET, path=path, body_parser=tsdb_handler.parse_json)
  67. def get_fields(self, metric):
  68. """
  69. get fields
  70. :type metric: string
  71. :param metric:
  72. :return: field dict. {field1:{type: 'Number'},field2:{type: 'String'}}
  73. :rtype: baidubce.bce_response.BceResponse
  74. """
  75. metric = utils.convert_to_standard_string(metric)
  76. path = b'/v1/metric/' + metric + b'/field'
  77. return self._send_request(http_methods.GET, path=path, body_parser=tsdb_handler.parse_json)
  78. def get_tags(self, metric):
  79. """
  80. get tags
  81. :type metric: string
  82. :param metric:
  83. :return: {tagk1:[tagk11,tagk21,..],tagk2:[tagk21,tagk22,..]..}
  84. :rtype: baidubce.bce_response.BceResponse
  85. """
  86. metric = utils.convert_to_standard_string(metric)
  87. path = b'/v1/metric/' + metric + b'/tag'
  88. return self._send_request(http_methods.GET, path=path, body_parser=tsdb_handler.parse_json)
  89. def get_datapoints(self, query_list, disable_presampling=False):
  90. """
  91. query datapoints
  92. :param query_list: a list of query dict
  93. :type query_list: list
  94. :param disable_presampling: open of close presampling result query
  95. :type disable_presampling: boolean
  96. :return: a list of result dict
  97. :rtype: baidubce.bce_response.BceResponse
  98. """
  99. path = b'/v1/datapoint'
  100. params = {'query': '', 'disablePresampling': disable_presampling}
  101. body = json.dumps({"queries": query_list})
  102. return self._send_request(http_methods.PUT, path=path, params=params,
  103. body=body, body_parser=tsdb_handler.parse_json)
  104. def get_rows_with_sql(self, statement):
  105. """
  106. get_rows_with_sql
  107. :param statement: sql statement
  108. :type statement: string
  109. :return: {rows:[[],[],...], columns: []}
  110. :rtype: baidubce.bce_response.BceResponse
  111. """
  112. path = b'/v1/row'
  113. params = {'sql': statement}
  114. return self._send_request(http_methods.GET, path=path, params=params,
  115. body_parser=tsdb_handler.parse_json)
  116. def generate_pre_signed_url(self,
  117. query_list,
  118. timestamp=0,
  119. expiration_in_seconds=1800,
  120. disable_presampling=False,
  121. headers=None,
  122. headers_to_sign=None,
  123. protocol=None,
  124. config=None):
  125. """
  126. Get an authorization url with expire time
  127. :type timestamp: int
  128. :param timestamp: None
  129. :type expiration_in_seconds: int
  130. :param expiration_in_seconds: None
  131. :type options: dict
  132. :param options: None
  133. :return:
  134. **URL string**
  135. """
  136. path = b'/v1/datapoint'
  137. params = {
  138. 'query': json.dumps({"queries": query_list}),
  139. 'disablePresampling': disable_presampling
  140. }
  141. return self._generate_pre_signed_url(path, timestamp, expiration_in_seconds,
  142. params, headers, headers_to_sign, protocol, config)
  143. def generate_pre_signed_url_with_sql(self,
  144. statement,
  145. timestamp=0,
  146. expiration_in_seconds=1800,
  147. headers=None,
  148. headers_to_sign=None,
  149. protocol=None,
  150. config=None):
  151. """
  152. Get an authorization url with sql
  153. :type timestamp: int
  154. :param timestamp: None
  155. :type expiration_in_seconds: int
  156. :param expiration_in_seconds: None
  157. :type options: dict
  158. :param options: None
  159. :return:
  160. **URL string**
  161. """
  162. path = b'/v1/row'
  163. params = {'sql': statement}
  164. return self._generate_pre_signed_url(path, timestamp, expiration_in_seconds,
  165. params, headers, headers_to_sign, protocol, config)
  166. def _generate_pre_signed_url(
  167. self, path, timestamp=0,
  168. expiration_in_seconds=1800,
  169. params=None,
  170. headers=None,
  171. headers_to_sign=None,
  172. protocol=None,
  173. config=None):
  174. """
  175. Get an authorization url with expire time
  176. :type timestamp: int
  177. :param timestamp: None
  178. :type expiration_in_seconds: int
  179. :param expiration_in_seconds: None
  180. :type options: dict
  181. :param options: None
  182. :return:
  183. **URL string**
  184. """
  185. config = self._merge_config(config)
  186. headers = headers or {}
  187. params = params or {}
  188. # specified protocol > protocol in endpoint > default protocol
  189. endpoint_protocol, endpoint_host, endpoint_port = utils.parse_host_port(
  190. config.endpoint, config.protocol)
  191. protocol = protocol or endpoint_protocol
  192. full_host = endpoint_host
  193. if endpoint_port != config.protocol.default_port:
  194. full_host += b':' + str(endpoint_port)
  195. headers[http_headers.HOST] = full_host
  196. params[http_headers.AUTHORIZATION.lower()] = bce_v1_signer.sign(
  197. config.credentials,
  198. http_methods.GET,
  199. path,
  200. headers,
  201. params,
  202. timestamp,
  203. expiration_in_seconds,
  204. headers_to_sign)
  205. return "%s://%s%s?%s" % (protocol.name,
  206. full_host.decode(),
  207. path.decode(),
  208. utils.get_canonical_querystring(params, False).decode())
  209. def _gzip_compress(self, str):
  210. out = io.BytesIO()
  211. with gzip.GzipFile(fileobj=out, mode="w") as f:
  212. f.write(str)
  213. return out.getvalue()
  214. def _merge_config(self, config):
  215. if config is None:
  216. return self.config
  217. else:
  218. new_config = copy.copy(self.config)
  219. new_config.merge_non_none_values(config)
  220. return new_config
  221. def _send_request(
  222. self, http_method, path,
  223. body=None,
  224. headers=None,
  225. params=None,
  226. config=None,
  227. body_parser=None):
  228. config = self._merge_config(config)
  229. if headers is None:
  230. headers = {http_headers.CONTENT_TYPE: http_content_types.JSON}
  231. if body_parser is None:
  232. body_parser = handler.parse_json
  233. if self.database is not None:
  234. if params is None:
  235. params = {}
  236. params.update({b'database': self.database})
  237. return bce_http_client.send_request(
  238. config, bce_v1_signer.sign, [handler.parse_error, body_parser],
  239. http_method, path, body, headers, params)