| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834 |
- # Copyright (c) 2014 Baidu.com, Inc. All Rights Reserved
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- # except in compliance with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the
- # License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- # either express or implied. See the License for the specific language governing permissions
- # and limitations under the License.
- """
- This module provides a client for IAM.
- """
- import copy
- import json
- import logging
- from future.utils import iteritems
- from baidubce.auth import bce_v1_signer
- from baidubce.bce_base_client import BceBaseClient
- from baidubce.http import bce_http_client
- from baidubce.http import handler
- from baidubce.http import http_content_types
- from baidubce.http import http_headers
- from baidubce.http import http_methods
- from baidubce.services import iam
- from baidubce.utils import required
- _logger = logging.getLogger(__name__)
- class IamClient(BceBaseClient):
- """
- sdk client
- """
- def __init__(self, config=None):
- BceBaseClient.__init__(self, config)
- def _send_iam_request(self,
- http_method,
- path,
- body=None,
- headers=None,
- params=None,
- config=None,
- body_parser=None):
- config = self._merge_config(config)
- path = iam.URL_PREFIX + path
- if body_parser is None:
- body_parser = handler.parse_json
- return bce_http_client.send_request(
- config, bce_v1_signer.sign, [handler.parse_error, body_parser],
- http_method, path, body, headers, params)
- def _merge_config(self, config):
- if config is None:
- return self.config
- else:
- new_config = copy.copy(self.config)
- new_config.merge_non_none_values(config)
- return new_config
- # ######################################### #role management# #################################################### #
- def get_role(self, role_name):
- """
- :type role_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/role/" + role_name
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def create_role(self, create_role_request):
- """
- :type create_role_request: dict
- :return:
- **HttpResponse**
- """
- if create_role_request is None:
- body = None
- else:
- if not isinstance(create_role_request, dict):
- raise TypeError(b'create_role_request should be dict')
- else:
- body = json.dumps(create_role_request)
- path = b"/role"
- return self._send_iam_request(
- http_methods.POST,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- body=body
- )
- def update_role(self, role_name, update_role_request):
- """
- :type role_name: bytes
- :type update_role_request: dict
- :return:
- **HttpResponse**
- """
- if update_role_request is None:
- body = None
- else:
- if not isinstance(update_role_request, dict):
- raise TypeError(b'update_role_request should be dict')
- else:
- body = json.dumps(update_role_request)
- path = b"/role/" + role_name
- return self._send_iam_request(
- http_methods.PUT,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- body=body
- )
- def delete_role(self, role_name):
- """
- :type role_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/role/" + role_name
- return self._send_iam_request(
- http_methods.DELETE,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def list_role(self):
- """
- :return:
- **HttpResponse**
- """
- path = b"/role"
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- # ######################################### #policy management# ################################################## #
- def create_policy(self, create_policy_request):
- """
- :type create_policy_request: dict
- :return:
- **HttpResponse**
- """
- if create_policy_request is None:
- body = None
- else:
- if not isinstance(create_policy_request, dict):
- raise TypeError(b'create_policy_request should be dict')
- else:
- body = json.dumps(create_policy_request)
- path = b"/policy"
- return self._send_iam_request(
- http_methods.POST,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- body=body
- )
- def get_policy(self, policy_name, policy_type):
- """
- :type policy_name: bytes
- :type policy_type: bytes
- :return:
- **HttpResponse**
- """
- path = b"/policy/" + policy_name
- params = {b"policyType": policy_type}
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- params=params
- )
- @required(policy_name=(bytes, str))
- def update_policy(self, policy_name, update_policy_request):
- """
- :type update_policy_request: dict
- :return:
- **HttpResponse**
- """
- policy_name_bytes = policy_name
- if isinstance(policy_name, str):
- policy_name_bytes = policy_name.encode('utf-8')
- if update_policy_request is None:
- body = None
- else:
- if not isinstance(update_policy_request, dict):
- raise TypeError(b'update_policy_request should be dict')
- else:
- body = json.dumps(update_policy_request)
- path = b"/policy/" + policy_name_bytes
- return self._send_iam_request(
- http_methods.POST,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- body=body
- )
- def delete_policy(self, policy_name):
- """
- :type policy_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/policy/" + policy_name
- return self._send_iam_request(
- http_methods.DELETE,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def list_policy(self, policy_type=None, name_filter=None):
- """
- :type policy_type: bytes
- :type name_filter: bytes
- :param name_filter: bytes
- :return:
- **HttpResponse**
- """
- path = b"/policy"
- params = {b"policyType": policy_type, b"nameFilter": name_filter}
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- params=params
- )
- def attach_policy_to_user(self, user_name, policy_name, policy_type=None):
- """
- :type user_name: bytes
- :type policy_name: bytes
- :type policy_type: bytes
- :param policy_type: None
- :return:
- **HttpResponse**
- """
- path = b"/user/" + user_name + b"/policy/" + policy_name
- params = {b"policyType": policy_type}
- return self._send_iam_request(
- http_methods.PUT,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- params=params
- )
- def detach_policy_from_user(self, user_name, policy_name, policy_type=None):
- """
- :type user_name: bytes
- :type policy_name: bytes
- :type policy_type: bytes
- :param policy_type: None
- :return:
- **HttpResponse**
- """
- path = b"/user/" + user_name + b"/policy/" + policy_name
- params = {b"policyType": policy_type}
- return self._send_iam_request(
- http_methods.DELETE,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- params=params
- )
- def list_policies_from_user(self, user_name):
- """
- :type user_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/user/" + user_name + b"/policy"
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def attach_policy_to_group(self, group_name, policy_name, policy_type=None):
- """
- :type group_name: bytes
- :type policy_name: bytes
- :type policy_type: bytes
- :param policy_type: None
- :return:
- **HttpResponse**
- """
- path = b"/group/" + group_name + b"/policy/" + policy_name
- params = {"policyType": policy_type}
- return self._send_iam_request(
- http_methods.PUT,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- params=params
- )
- def detach_policy_from_group(self, group_name, policy_name, policy_type=None):
- """
- :type group_name: bytes
- :type policy_name: bytes
- :type policy_type: bytes
- :param policy_type: None
- :return:
- **HttpResponse**
- """
- path = b"/group/" + group_name + b"/policy/" + policy_name
- params = {b"policyType": policy_type}
- return self._send_iam_request(
- http_methods.DELETE,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- params=params
- )
- def list_policies_from_group(self, group_name):
- """
- :type group_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/group/" + group_name + b"/policy"
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def attach_policy_to_role(self, role_name, policy_name, policy_type=None):
- """
- :type role_name: bytes
- :type policy_name: bytes
- :type policy_type: bytes
- :param policy_type: None
- :return:
- **HttpResponse**
- """
- path = b"/role/" + role_name + b"/policy/" + policy_name
- params = {b"policyType": policy_type}
- return self._send_iam_request(
- http_methods.PUT,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- params=params
- )
- def detach_policy_from_role(self, role_name, policy_name, policy_type=None):
- """
- :type role_name: bytes
- :type policy_name: bytes
- :type policy_type: bytes
- :param policy_type: None
- :return:
- **HttpResponse**
- """
- path = b"/role/" + role_name + b"/policy/" + policy_name
- params = {b"policyType": policy_type}
- return self._send_iam_request(
- http_methods.DELETE,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- params=params
- )
- def list_policies_from_role(self, role_name):
- """
- :type role_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/role/" + role_name + b"/policy"
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- @required(policy_id=(bytes, str), grant_type=(bytes, str))
- def list_attached_entities_by_grant_type(self, policy_id, grant_type):
- """
- :type policy_id: bytes :type grant_type:
- : grant_type: UserPolicy, GroupPolicy
- :return:
- **HttpResponse**
- """
- policy_id_bytes = policy_id
- if isinstance(policy_id, str):
- policy_id_bytes = policy_id.encode('utf-8')
- grant_type_bytes = grant_type
- if isinstance(grant_type, str):
- grant_type_bytes = grant_type.encode('utf-8')
- path = b"/policy/" + policy_id_bytes + b"/grant/" + grant_type_bytes
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- # ######################################### #user management# #################################################### #
- def create_user(self, create_user_request):
- """
- :type create_user_request: dict
- :return:
- **HttpResponse**
- """
- if create_user_request is None:
- body = None
- else:
- if not isinstance(create_user_request, dict):
- raise TypeError(b'create_user_request should be dict')
- else:
- body = json.dumps(create_user_request)
- path = b'/user'
- return self._send_iam_request(
- http_methods.POST,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- body=body
- )
- def get_user(self, user_name):
- """
- :type user_name: bytes
- :return:
- **HttpResponse**
- """
- path = b'/user/' + user_name
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def update_user(self, user_name, update_user_request):
- """
- :type user_name: bytes
- :type update_user_request: dict
- :return:
- **HttpResponse**
- """
- if update_user_request is None:
- body = None
- else:
- if not isinstance(update_user_request, dict):
- raise TypeError(b'update_user_request should be dict')
- else:
- body = json.dumps(update_user_request)
- path = b"/user/" + user_name
- return self._send_iam_request(
- http_methods.PUT,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- body=body
- )
- def delete_user(self, user_name):
- """
- :type user_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/user/" + user_name
- return self._send_iam_request(
- http_methods.DELETE,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def list_user(self):
- """
- :return:
- **HttpResponse**
- """
- path = b"/user"
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def update_user_login_profile(self, user_name, update_user_login_profile_request):
- """
- :type user_name: bytes
- :type update_user_login_profile_request: dict
- :return:
- **HttpResponse**
- """
- if update_user_login_profile_request is None:
- body = None
- else:
- if not isinstance(update_user_login_profile_request, dict):
- raise TypeError(b'update_user_login_profile_request should be dict')
- else:
- body = json.dumps(update_user_login_profile_request)
- path = b"/user/" + user_name + b"/loginProfile"
- return self._send_iam_request(
- http_methods.PUT,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- body=body
- )
- def get_user_login_profile(self, user_name):
- """
- :type user_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/user/" + user_name + b"/loginProfile"
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def delete_user_login_profile(self, user_name):
- """
- :type user_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/user/" + user_name + b"/loginProfile"
- return self._send_iam_request(
- http_methods.DELETE,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def create_user_accesskey(self, user_name):
- """
- :type user_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/user/" + user_name + b"/accesskey"
- return self._send_iam_request(
- http_methods.POST,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def disable_user_accesskey(self, user_name, accesskey_id):
- """
- :type user_name: bytes
- :type accesskey_id: bytes
- :return:
- **HttpResponse**
- """
- path = b"/user/" + user_name + b"/accesskey/" + accesskey_id
- params = {"disable": ""}
- return self._send_iam_request(
- http_methods.PUT,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- params=params
- )
- def enable_user_accesskey(self, user_name, accesskey_id):
- """
- :type user_name: bytes
- :type accesskey_id: bytes
- :return:
- **HttpResponse**
- """
- path = b"/user/" + user_name + b"/accesskey/" + accesskey_id
- params = {"enable": ""}
- return self._send_iam_request(
- http_methods.PUT,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- params=params
- )
- def delete_user_accesskey(self, user_name, accesskey_id):
- """
- :type user_name: bytes
- :type accesskey_id: bytes
- :return:
- **HttpResponse**
- """
- path = b"/user/" + user_name + b"/accesskey/" + accesskey_id
- return self._send_iam_request(
- http_methods.DELETE,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def list_user_accesskey(self, user_name):
- """
- :type user_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/user/" + user_name + b"/accesskey"
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- @required(user_name=(bytes, str), mfa_type=(bytes, str))
- def unbind_user_mfa_device(self, user_name, mfa_type):
- """
- :type user_name: bytes :type mfa_type:
- :mfa_type: TOTP
- :return:
- none
- """
- user_name_bytes = user_name
- if isinstance(user_name, str):
- user_name_bytes = user_name.encode('utf-8')
- mfa_type_bytes = mfa_type
- if isinstance(mfa_type, str):
- mfa_type_bytes = mfa_type.encode('utf-8')
- path = b"/user/" + user_name_bytes + b"/mfaType/" + mfa_type_bytes
- return self._send_iam_request(
- http_methods.DELETE,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- # ######################################### #group management# ################################################### #
- def create_group(self, create_group_request):
- """
- :type create_group_request:dict
- :return:
- **HttpResponse**
- """
- if create_group_request is None:
- body = None
- else:
- if not isinstance(create_group_request, dict):
- raise TypeError(b'create_group_request should be dict')
- else:
- body = json.dumps(create_group_request)
- path = b"/group"
- return self._send_iam_request(
- http_methods.POST,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- body=body
- )
- def get_group(self, group_name):
- """
- :type group_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/group/" + group_name
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def update_group(self, group_name, update_group_request):
- """
- :type group_name: bytes
- :type update_group_request: dict
- :return:
- **HttpResponse**
- """
- if update_group_request is None:
- body = None
- else:
- if not isinstance(update_group_request, dict):
- raise TypeError(b'update_group_request should be dict')
- else:
- body = json.dumps(update_group_request)
- path = b"/group/" + group_name
- return self._send_iam_request(
- http_methods.PUT,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path,
- body=body
- )
- def delete_group(self, group_name):
- """
- :type group_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/group/" + group_name
- return self._send_iam_request(
- http_methods.DELETE,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def list_group(self):
- """
- :return:
- **HttpResponse**
- """
- path = b"/group"
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def add_user_to_group(self, group_name, user_name):
- """
- :type group_name: bytes
- :type user_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/group/" + group_name + b"/user/" + user_name
- return self._send_iam_request(
- http_methods.PUT,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def remove_user_from_group(self, group_name, user_name):
- """
- :type group_name: bytes
- :type user_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/group/" + group_name + b"/user/" + user_name
- return self._send_iam_request(
- http_methods.DELETE,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def list_user_group(self, user_name):
- """
- :type user_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/user/" + user_name + b"/group"
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
- def list_group_user(self, group_name):
- """
- :type group_name: bytes
- :return:
- **HttpResponse**
- """
- path = b"/group/" + group_name + b"/user"
- return self._send_iam_request(
- http_methods.GET,
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- path=path
- )
|