| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268 |
- # Copyright 2014 Baidu, Inc.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- # except in compliance with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the
- # License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- # either express or implied. See the License for the specific language governing permissions
- # and limitations under the License.
- """
- This module provides a client class for BOS.
- """
- import io
- import copy
- import http.client
- import os
- import json
- import logging
- import shutil
- import struct
- from builtins import str
- from builtins import bytes
- from future.utils import iteritems, iterkeys, itervalues
- from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
- import threading
- import functools
- import multiprocessing
- import baidubce
- from baidubce import bce_client_configuration
- from baidubce import utils
- from baidubce.auth import bce_v1_signer
- from baidubce.bce_base_client import BceBaseClient
- from baidubce.exception import BceClientError
- from baidubce.exception import BceServerError
- from baidubce.exception import BceHttpClientError
- from baidubce.http import bce_http_client
- from baidubce.http import handler
- from baidubce.http import http_content_types
- from baidubce.http import http_headers
- from baidubce.http import http_methods
- from baidubce.services import bos
- from baidubce.services.bos import bos_handler
- from baidubce.services.bos import storage_class
- from baidubce.utils import required
- from baidubce import compat
- _logger = logging.getLogger(__name__)
- FETCH_MODE_SYNC = b"sync"
- FETCH_MODE_ASYNC = b"async"
- ENCRYPTION_ALGORITHM= "AES256"
- HTTP_PROTOCOL_HEAD = b'http'
- class UploadTaskHandle:
- """
- handle to control multi upload file with multi-thread
- """
- def __init__(self):
- self.cancel_flag = False
- self.cancel_lock = threading.Lock()
- def cancel(self):
- """
- cancel putting super object from file with multi-thread
- """
- self.cancel_lock.acquire()
- self.cancel_flag= True
- self.cancel_lock.release()
- def is_cancel(self):
- """
- get cancel flag
- """
- self.cancel_lock.acquire()
- result = self.cancel_flag
- self.cancel_lock.release()
- return result
- class BosClient(BceBaseClient):
- """
- sdk client
- """
- def __init__(self, config=None):
- BceBaseClient.__init__(self, config)
- def list_buckets(self, config=None):
- """
- List buckets of user
- :param config: None
- :type config: BceClientConfiguration
- :returns: all buckets owned by the user.
- :rtype: baidubce.bce_response.BceResponse
- """
- return self._send_request(http_methods.GET, config=config)
- @required(bucket_name=(bytes, str))
- def get_bucket_location(self, bucket_name, config=None):
- """
- Get the region which the bucket located in.
- :param bucket_name: the name of bucket
- :type bucket_name: string or unicode
- :param config: None
- :type config: BceClientConfiguration
- :return: region of the bucket
- :rtype: str
- """
- params = {b'location': b''}
- response = self._send_request(http_methods.GET, bucket_name, params=params, config=config)
- return response.location_constraint
- @required(bucket_name=(bytes, str))
- def create_bucket(self, bucket_name, config=None):
- """
- Create bucket with specific name
- :param bucket_name: the name of bucket
- :type bucket_name: string or unicode
- :param config: None
- :type config: BceClientConfiguration
- :returns:
- :rtype: baidubce.bce_response.BceResponse
- """
- return self._send_request(http_methods.PUT, bucket_name, config=config)
- @required(bucket_name=(bytes, str))
- def does_bucket_exist(self, bucket_name, config=None):
- """
- Check whether there is a bucket with specific name
- :param bucket_name: None
- :type bucket_name: str
- :return:True or False
- :rtype: bool
- """
- try:
- self._send_request(http_methods.HEAD, bucket_name, config=config)
- return True
- except BceHttpClientError as e:
- if isinstance(e.last_error, BceServerError):
- if e.last_error.status_code == http.client.FORBIDDEN:
- return True
- if e.last_error.status_code == http.client.NOT_FOUND:
- return False
- raise e
- @required(bucket_name=(bytes, str))
- def get_bucket_acl(self, bucket_name, config=None):
- """
- Get Access Control Level of bucket
- :type bucket: string
- :param bucket: None
- :return:
- **json text of acl**
- """
- return self._send_request(
- http_methods.GET,
- bucket_name,
- params={b'acl': b''},
- config=config)
- @staticmethod
- def _dump_acl_object(acl):
- result = {}
- for k, v in iteritems(acl.__dict__):
- if not k.startswith('_'):
- result[k] = v
- return result
- @required(bucket_name=(bytes, str), acl=(list, dict))
- def set_bucket_acl(self, bucket_name, acl, config=None):
- """
- Set Access Control Level of bucket
- :type bucket: string
- :param bucket: None
- :type grant_list: list of grant
- :param grant_list: None
- :return:
- **HttpResponse Class**
- """
- self._send_request(http_methods.PUT,
- bucket_name,
- body=json.dumps({'accessControlList': acl},
- default=BosClient._dump_acl_object),
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- params={b'acl': b''},
- config=config)
- @required(bucket_name=(bytes, str), canned_acl=bytes)
- def set_bucket_canned_acl(self, bucket_name, canned_acl, config=None):
- """
- :param bucket_name:
- :param canned_acl:
- :param config:
- :return:
- """
- self._send_request(http_methods.PUT,
- bucket_name,
- headers={http_headers.BCE_ACL: canned_acl},
- params={b'acl': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def set_bucket_storage_class(self, bucket_name, storage_class, config=None):
- """
- :param bucket_name:
- :param config:
- :return:
- """
- storage_class = compat.convert_to_string(storage_class)
- return self._send_request(http_methods.PUT,
- bucket_name,
- body=json.dumps({'storageClass': storage_class},
- default=BosClient._dump_acl_object),
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- params={b'storageClass': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def get_bucket_storage_class(self, bucket_name, config=None):
- """
- :param bucket_name:
- :param config:
- :return:
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'storageClass': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def delete_bucket(self, bucket_name, config=None):
- """
- Delete a Bucket(Must Delete all the Object in Bucket before)
- :type bucket: string
- :param bucket: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.DELETE, bucket_name, config=config)
- # bucket static website
- @required(bucket_name=(bytes, str))
- def put_bucket_static_website(self, bucket_name, index=None, not_found=None, config=None):
- """
- Set index page and not_found 404 page for static website trusteeship
- :type bucket_name: string
- :param bucket_name: None
- :type index:string
- :param index:object name of index page for static website trusteeship
- :type not_found:string
- :param not_found:object name of not_found 404 page for static website trusteeship
- :return:
- **HttpResponse Class**
- """
- body = {}
- if index is not None:
- body['index'] = index
- if not_found is not None:
- body['notFound'] = not_found
- return self._send_request(http_methods.PUT,
- bucket_name,
- body=json.dumps(body,
- default=BosClient._dump_acl_object),
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- params={b'website': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def get_bucket_static_website(self, bucket_name, config=None):
- """
- Get Information of static website trusteeship
- :type bucket: string
- :param bucket: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'website': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def delete_bucket_static_website(self, bucket_name, config=None):
- """
- Delete Information of static website trusteeship to be closed
- :type bucket: string
- :param bucket: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.DELETE,
- bucket_name,
- params={b'website': b''},
- config=config)
- # bucket encryption
- @required(bucket_name=(bytes, str))
- def put_bucket_encryption(self, bucket_name, encryption_algorithm=ENCRYPTION_ALGORITHM, config=None):
- """
- Set server encryption for bucket
- :type bucket: string
- :param bucket: None
- :type encryption_algorithm: string
- :param grant_list: server encryption algorithm for bucekt.Now the value of encryption_algorithm
- only is 'AES256'
- :return:
- **HttpResponse Class**
- """
- encryption_algorithm = compat.convert_to_string(encryption_algorithm)
- return self._send_request(http_methods.PUT,
- bucket_name,
- body=json.dumps({"encryptionAlgorithm":encryption_algorithm},
- default=BosClient._dump_acl_object),
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- params={b'encryption': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def get_bucket_encryption(self, bucket_name, config=None):
- """
- Get status of server encryption
- :type bucket: string
- :param bucket: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'encryption': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def delete_bucket_encryption(self, bucket_name, config=None):
- """
- Close server encryption
- :type bucket: string
- :param bucket: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.DELETE,
- bucket_name,
- params={b'encryption': b''},
- config=config)
- # Bucket Copyright Protection
- @required(bucket_name=(bytes, str), resource=(list))
- def put_bucket_copyright_protection(self, bucket_name, resource, config=None):
- """
- Open image copyright protection and set resource
- :type bucket: string
- :param bucket: None
- :type resource: list of string
- :param grant_list: resource range to be protected
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.PUT,
- bucket_name,
- body=json.dumps({"resource": resource},
- default=BosClient._dump_acl_object),
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- params={b'copyrightProtection': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def get_bucket_copyright_protection(self, bucket_name, config=None):
- """
- Get configuration of image copyright protection
- :type bucket: string
- :param grant_list: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'copyrightProtection': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def delete_bucket_copyright_protection(self, bucket_name, config=None):
- """
- Close image copyright protection
- :type bucket: string
- :param bucket: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.DELETE,
- bucket_name,
- params={b'copyrightProtection': b''},
- config=config)
- # bucket replication
- @required(bucket_name=(bytes, str), replication=(dict))
- def put_bucket_replication(self, bucket_name, replication, config=None):
- """
- Open cross-region replication
- :type bucket: string
- :param bucket: None
- :type replication: dict
- :type replication: configuration for cross-region replication
- :return:
- **HttpResponse Class**
- """
- params={b'replication': b''}
- if "id" in replication:
- params[b"id"] = compat.convert_to_bytes(replication["id"])
- return self._send_request(http_methods.PUT,
- bucket_name,
- body=json.dumps(replication,
- default=BosClient._dump_acl_object),
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- params=params,
- config=config)
- @required(bucket_name=(bytes, str))
- def get_bucket_replication(self, bucket_name, id=None, config=None):
- """
- Get configuration of cross-region replication
- :type bucket: string
- :param bucket: None
- :type id: string
- :param id: replication rule id
- :return:
- **HttpResponse Class**
- """
- params={b'replication': b''}
- if id is not None:
- params[b"id"] = compat.convert_to_bytes(id)
- return self._send_request(http_methods.GET,
- bucket_name,
- params=params,
- config=config)
- @required(bucket_name=(bytes, str))
- def delete_bucket_replication(self, bucket_name, id=None, config=None):
- """
- Delete configuration of cross-region replication and close it
- :type bucket: string
- :param bucket: None
- :type id: string
- :param id: replication rule id
- :return:
- **HttpResponse Class**
- """
- params={b'replication': b''}
- if id is not None:
- params[b"id"] = compat.convert_to_bytes(id)
- return self._send_request(http_methods.DELETE,
- bucket_name,
- params=params,
- config=config)
- @required(bucket_name=(bytes, str))
- def get_bucket_replication_progress(self, bucket_name, id=None, config=None):
- """
- Get status of cross-region replication,for exapmle 'historyReplicationPercent',
- 'latestReplicationTime'
- :type bucket: string
- :param bucket: None
- :type id: string
- :param id: replication rule id
- :return:
- **HttpResponse Class**
- """
- params={b'replicationProgress': b''}
- if id is not None:
- params[b"id"] = compat.convert_to_bytes(id)
- return self._send_request(http_methods.GET,
- bucket_name,
- params=params,
- config=config)
-
- @required(bucket_name=(bytes, str))
- def list_bucket_replication(self, bucket_name, config=None):
- """
- list configuration of cross-region replication rule
- :type bucket: string
- :param bucket: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'replication': b'', b'list': b''},
- config=config)
- @required(bucket_name=(bytes, str), inventory=(dict))
- def put_bucket_inventory(self, bucket_name, inventory, config=None):
- """
- set bucket inventoru
- :type bucket: string
- :param bucket: None
- :type inventory: dict
- :param inventory: configuration for bucket inventory
- :return:
- **HttpResponse Class**
- """
- conf_id = compat.convert_to_bytes(inventory["id"])
- return self._send_request(http_methods.PUT,
- bucket_name,
- body=json.dumps(inventory,
- default=BosClient._dump_acl_object),
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- params={b'inventory': b'', b'id': conf_id},
- config=config)
- @required(bucket_name=(bytes, str), inventory_conf_id=(bytes, str))
- def get_bucket_inventory(self, bucket_name, inventory_conf_id, config=None):
- """
- Get configuration of bucket inventory
- :type bucket: string
- :param bucket: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'inventory': b'', b'id': compat.convert_to_bytes(inventory_conf_id)},
- config=config)
- @required(bucket_name=(bytes, str), inventory_conf_id=(bytes, str))
- def delete_bucket_inventory(self, bucket_name, inventory_conf_id, config=None):
- """
- Delete configuration of bucket inventory
- :type bucket: string
- :param bucket: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.DELETE,
- bucket_name,
- params={b'inventory': b'', b'id': compat.convert_to_bytes(inventory_conf_id)},
- config=config)
- @required(bucket_name=(bytes, str))
- def list_bucket_inventory(self, bucket_name, config=None):
- """
- list configuration of bucket inventory
- :type bucket: string
- :param bucket: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'inventory': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def put_bucket_trash(self, bucket_name, trash_dir=None, config=None):
- """
- Open bucket trash function
- :type bucket: string
- :param bucket: None
- :type trash_dir: string
- :param trash_dir: directory of trash,optional
- :return:
- **HttpResponse Class**
- """
- if trash_dir is not None:
- trash_dir = compat.convert_to_string(trash_dir)
- return self._send_request(http_methods.PUT,
- bucket_name,
- body=json.dumps({"trashDir": trash_dir},
- default=BosClient._dump_acl_object),
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- params={b'trash': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def get_bucket_trash(self, bucket_name, config=None):
- """
- Get status of bucket trash
- :type bucket: string
- :param grant_list: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'trash': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def delete_bucket_trash(self, bucket_name, config=None):
- """
- Close bucket trash
- :type bucket: string
- :param bucket: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.DELETE,
- bucket_name,
- params={b'trash': b''},
- config=config)
- @required(bucket_name=(bytes, str), key=(bytes, str))
- def generate_pre_signed_url(self,
- bucket_name,
- key,
- timestamp=0,
- expiration_in_seconds=1800,
- headers=None,
- params=None,
- headers_to_sign=None,
- protocol=None,
- config=None,
- httpmethod=http_methods.GET):
- """
- Get an authorization url with expire time.
- specified protocol in endpoint > protocal > default protocol in config.
- :type timestamp: int
- :param timestamp: None
- :type expiration_in_seconds: int
- :param expiration_in_seconds: None
- :type options: dict
- :param options: None
- :param is_official_domain: default use not official domain,example: bucket.bj.bcebos.com
- :return:
- **URL string**
- """
- key = compat.convert_to_bytes(key)
- if len(key) == 0 or key == b'v1':
- raise ValueError('generate url the key param error!')
- config = self._merge_config(config, bucket_name)
- headers = headers or {}
- params = params or {}
- # specified protocol in endpoint > protocal > default protocol in config
- if protocol is not None:
- config.protocol = protocol
- endpoint_protocol, endpoint_host, endpoint_port = \
- utils.parse_host_port(config.endpoint, config.protocol)
- full_host = endpoint_host
- if endpoint_port != endpoint_protocol.default_port:
- full_host += b':' + compat.convert_to_bytes(endpoint_port)
- headers[http_headers.HOST] = full_host
- path = self._get_path(config, bucket_name, key)
- if httpmethod != http_methods.GET and httpmethod != http_methods.HEAD:
- headers_to_sign = set([b'host'])
- # Compatible with STS request acquisition
- if config.security_token is not None:
- params[http_headers.STS_SECURITY_TOKEN.lower()] = config.security_token
- params[http_headers.AUTHORIZATION.lower()] = bce_v1_signer.sign(
- config.credentials,
- httpmethod,
- path,
- headers,
- params,
- timestamp,
- expiration_in_seconds,
- headers_to_sign)
-
- return b"%s://%s%s?%s" % (compat.convert_to_bytes(endpoint_protocol.name),
- full_host,
- path,
- utils.get_canonical_querystring(params, False))
- @required(bucket_name=(bytes, str), rules=(list, dict))
- def put_bucket_lifecycle(self,
- bucket_name,
- rules,
- config=None):
- """
- Put Bucket Lifecycle
-
- :type bucket: string
- :param bucket: None
- :type rules: list
- :param rules: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.PUT,
- bucket_name,
- params={b'lifecycle': b''},
- body=json.dumps({'rule': rules}),
- config=config)
- @required(bucket_name=(bytes, str))
- def get_bucket_lifecycle(self, bucket_name, config=None):
- """
- Get Bucket Lifecycle
- :type bucket: string
- :param bucket: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'lifecycle': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def delete_bucket_lifecycle(self, bucket_name, config=None):
- """
- Delete Bucket Lifecycle
-
- :type bucket: string
- :param bucket: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.DELETE,
- bucket_name,
- params={b'lifecycle': b''},
- config=config)
-
- @required(bucket_name=(bytes, str), cors_configuration=list)
- def put_bucket_cors(self,
- bucket_name,
- cors_configuration,
- config=None):
- """
- Put Bucket Cors
- :type bucket: string
- :param bucket: None
- :type cors_configuration: list
- :param cors_configuration: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.PUT,
- bucket_name,
- params={b'cors': b''},
- body=json.dumps({'corsConfiguration': cors_configuration}),
- config=config)
- @required(bucket_name=(bytes, str))
- def get_bucket_cors(self, bucket_name, config=None):
- """
- Get Bucket Cors
- :type bucket: string
- :param bucket: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'cors': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def delete_bucket_cors(self, bucket_name, config=None):
- """
- Delete Bucket Cors
- :type bucket: string
- :param bucket: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.DELETE,
- bucket_name,
- params={b'cors': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def list_objects(self, bucket_name,
- max_keys=1000, prefix=None, marker=None, delimiter=None,
- config=None):
- """
- Get Object Information of bucket
- :type bucket: string
- :param bucket: None
- :type delimiter: string
- :param delimiter: None
- :type marker: string
- :param marker: None
- :type max_keys: int
- :param max_keys: value <= 1000
- :type prefix: string
- :param prefix: None
- :return:
- **_ListObjectsResponse Class**
- """
- params = {}
- if max_keys is not None:
- params[b'maxKeys'] = max_keys
- if prefix is not None:
- params[b'prefix'] = prefix
- if marker is not None:
- params[b'marker'] = marker
- if delimiter is not None:
- params[b'delimiter'] = delimiter
- return self._send_request(http_methods.GET, bucket_name, params=params, config=config)
- @required(bucket_name=(bytes, str))
- def list_all_objects(self, bucket_name, prefix=None, delimiter=None, config=None):
- """
- :param bucket_name:
- :param prefix:
- :param delimiter:
- :param config:
- :return:
- """
- marker = None
- while True:
- response = self.list_objects(
- bucket_name, marker=marker, prefix=prefix, delimiter=delimiter, config=config)
- for item in response.contents:
- yield item
- if response.is_truncated:
- marker = response.next_marker
- else:
- break
- @staticmethod
- def _get_range_header_dict(range):
- if range is None:
- return None
- if not isinstance(range, (list, tuple)):
- raise TypeError('range should be a list or a tuple')
- if len(range) != 2:
- raise ValueError('range should have length of 2')
- return {http_headers.RANGE: b'bytes=%d-%d' % tuple(range)}
- @staticmethod
- def _parse_bos_object(http_response, response):
- """Sets response.body to http_response and response.user_metadata to a dict consists of all http
- headers starts with 'x-bce-meta-'.
- :param http_response: the http_response object returned by HTTPConnection.getresponse()
- :type http_response: httplib.HTTPResponse
- :param response: general response object which will be returned to the caller
- :type response: baidubce.BceResponse
- :return: always true
- :rtype bool
- """
- user_metadata = {}
- headers_list = http_response.getheaders()
- if compat.PY3:
- temp_heads = []
- for k, v in headers_list:
- k = k.lower()
- temp_heads.append((k, v))
- headers_list = temp_heads
- prefix = compat.convert_to_string(
- http_headers.BCE_USER_METADATA_PREFIX
- )
- for k, v in headers_list:
- if k.startswith(prefix):
- k = k[len(prefix):]
- user_metadata[compat.convert_to_unicode(k)] = \
- compat.convert_to_unicode(v)
- response.metadata.user_metadata = user_metadata
- response.data = http_response
- return True
- @required(bucket_name=(bytes, str), key=(bytes, str))
- def get_object(self, bucket_name, key, range=None, traffic_limit=None, version_id=None,
- cond_read_write=None, config=None):
- """
- :param bucket_name:
- :param key:
- :param range:
- :param traffic_limit:
- :param version_id:
- :param cond_read_write:
- :param config:
- :return:
- """
- query_params = {}
- if version_id is not None:
- version_id = compat.convert_to_bytes(version_id)
- query_params={b'versionId': version_id}
- key = compat.convert_to_bytes(key)
- if len(key) == 0 or key.startswith(b"/"):
- raise BceClientError("Key can not be empty or start with '/' .")
- range_header = BosClient._get_range_header_dict(range)
- if traffic_limit is not None:
- if range_header is None:
- range_header = {}
- range_header[http_headers.BOS_TRAFFIC_LIMIT] = traffic_limit
-
- if cond_read_write is not None:
- if range_header is None:
- range_header = {}
- range_header = self._get_cond_read_write_headers(http_methods.GET, range_header, cond_read_write)
-
- return self._send_request(
- http_methods.GET,
- bucket_name,
- key,
- headers=range_header,
- params=query_params,
- config=config,
- body_parser=BosClient._parse_bos_object)
- # restore object
- @required(bucket_name=(bytes, str), key=(bytes, str))
- def restore_object(self, bucket_name, key, days=None, tier="Standard", config=None):
- """
- :param bucket_name:
- :param key:
- :param config:
- :return:
- """
- key = compat.convert_to_bytes(key)
- headers = {}
- if days is not None:
- headers[http_headers.BOS_RESTORE_DAYS] = days
- if compat.convert_to_string(tier) not in ("Standard", "Expedited", "LowCost"):
- raise ValueError('invalid tier:{} for restore_object.The valid value is \"Standard\" or \"Expedited\" or '\
- '\"LowCost\"'.format(tier) )
- headers[http_headers.BOS_RESTORE_TIER] = compat.convert_to_bytes(tier)
- return self._send_request(
- http_methods.POST,
- bucket_name,
- key,
- headers=headers,
- params={b'restore': b''},
- config=config,
- body_parser=BosClient._parse_bos_object)
- @staticmethod
- def _save_body_to_file(http_response, response, file_name, buf_size=16 * 1024, progress_callback=None):
- f = open(file_name, 'wb')
- try:
- # Added progress bar monitoring
- if progress_callback:
- file_size = int(response.metadata.content_length)
- stream = utils.make_progress_adapter(http_response, progress_callback, file_size)
- else:
- stream = http_response
- shutil.copyfileobj(stream, f, buf_size)
- http_response.close()
- finally:
- f.close()
- return True
- @staticmethod
- def _parse_select_message(http_response, response, select_response):
- select_response.init_from_http_response(http_response, response)
- return True
- @required(bucket_name=(bytes, str), key=(bytes, str))
- def get_object_as_string(self, bucket_name, key, range=None, version_id=None, cond_read_write=None, config=None):
- """
- :param bucket_name:
- :param key:
- :param range:
- :param config:
- :return:
- """
- key = compat.convert_to_bytes(key)
- response = self.get_object(bucket_name, key, range=range, version_id = version_id,
- cond_read_write = cond_read_write, config=config)
- s = response.data.read()
- response.data.close()
- return s
- @required(bucket_name=(bytes, str), key=(bytes, str), file_name=(bytes, str))
- def get_object_to_file(self, bucket_name, key, file_name, range=None, config=None,
- progress_callback=None, traffic_limit=None, cond_read_write=None, version_id=None):
- """
- Get Content of Object and Put Content to File
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :type file_name: string
- :param file_name: None
- :type range: tuple
- :param range: (0,9) represent get object contents of 0-9 in bytes. 10 bytes date in total.
- :return:
- **HTTP Response**
- """
- query_params = {}
- if version_id is not None:
- version_id = compat.convert_to_bytes(version_id)
- query_params={b'versionId': version_id}
- key = compat.convert_to_bytes(key)
- if len(key) == 0 or key.startswith(b"/"):
- raise BceClientError("Key can not be empty or start with '/' .")
- file_name = compat.convert_to_bytes(file_name)
- range_header = BosClient._get_range_header_dict(range)
- if traffic_limit is not None:
- if range_header is None:
- range_header = {}
- range_header[http_headers.BOS_TRAFFIC_LIMIT] = traffic_limit
-
- if cond_read_write is not None:
- if range_header is None:
- range_header = {}
- range_header = self._get_cond_read_write_headers(http_methods.GET, range_header, cond_read_write)
-
- return self._send_request(
- http_methods.GET,
- bucket_name,
- key,
- headers=range_header,
- params=query_params,
- config=config,
- body_parser=lambda http_response, response: BosClient._save_body_to_file(
- http_response,
- response,
- file_name,
- self._get_config_parameter(config, 'recv_buf_size'),
- progress_callback=progress_callback))
- @required(bucket_name=(bytes, str), key=(bytes, str))
- def get_object_meta_data(self, bucket_name, key, version_id=None, cond_read_write=None, config=None):
- """
- Get head of object
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :return:
- **_GetObjectMetaDataResponse Class**
- """
- query_params = {}
- if version_id is not None:
- version_id = compat.convert_to_bytes(version_id)
- query_params={b'versionId': version_id}
- headers = {}
- if cond_read_write is not None:
- headers = self._get_cond_read_write_headers(http_methods.HEAD, headers, cond_read_write)
-
- key = compat.convert_to_bytes(key)
- return self._send_request(http_methods.HEAD, bucket_name, key,
- headers=headers, params=query_params, config=config)
- @required(bucket_name=(bytes, str),
- key=(bytes, str),
- data=object,
- content_length=compat.integer_types,
- content_md5=(bytes, str))
- def append_object(self, bucket_name, key, data,
- content_md5,
- content_length,
- offset=None,
- content_type=None,
- user_metadata=None,
- content_sha256=None,
- storage_class=None,
- user_headers=None,
- progress_callback=None,
- traffic_limit=None,
- object_tagging=None,
- config=None):
- """
- Put an appendable object to BOS or add content to an appendable object
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :type content_length: long
- :type offset: long
- :return:
- **HTTP Response**
- """
- key = compat.convert_to_bytes(key)
- content_md5 = compat.convert_to_bytes(content_md5)
- headers = self._prepare_object_headers(
- content_length=content_length,
- content_md5=content_md5,
- content_type=content_type,
- content_sha256=content_sha256,
- user_metadata=user_metadata,
- storage_class=storage_class,
- user_headers=user_headers,
- traffic_limit=traffic_limit,
- object_tagging=object_tagging)
- if content_length > bos.MAX_APPEND_OBJECT_LENGTH:
- raise ValueError('Object length should be less than %d. '
- 'Use multi-part upload instead.' % bos.MAX_APPEND_OBJECT_LENGTH)
- params = {b'append': b''}
- if offset is not None:
- params[b'offset'] = offset
-
- if progress_callback:
- data = utils.make_progress_adapter(data, progress_callback)
- return self._send_request(
- http_methods.POST,
- bucket_name,
- key,
- body=data,
- headers=headers,
- params=params,
- config=config)
- @required(bucket_name=(bytes, str),
- key=(bytes, str),
- data=(bytes, str))
- def append_object_from_string(self, bucket_name, key, data,
- content_md5=None,
- offset=None,
- content_type=None,
- user_metadata=None,
- content_sha256=None,
- storage_class=None,
- user_headers=None,
- progress_callback=None,
- traffic_limit=None,
- config=None):
- """
- Create an appendable object and put content of string to the object
- or add content of string to an appendable object
- """
- key = compat.convert_to_bytes(key)
- if isinstance(data, str):
- data = data.encode(baidubce.DEFAULT_ENCODING)
- fp = None
- try:
- fp = io.BytesIO(data)
- if content_md5 is None:
- content_md5 = utils.get_md5_from_fp(
- fp, buf_size=self._get_config_parameter(config, 'recv_buf_size'))
- return self.append_object(bucket_name=bucket_name,
- key=key,
- data=fp,
- content_md5=content_md5,
- content_length=len(data),
- offset=offset,
- content_type=content_type,
- user_metadata=user_metadata,
- content_sha256=content_sha256,
- storage_class=storage_class,
- user_headers=user_headers,
- progress_callback=progress_callback,
- traffic_limit=traffic_limit,
- config=config)
- finally:
- if fp is not None:
- fp.close()
- @required(bucket_name=(bytes, str),
- key=(bytes, str),
- data=object,
- content_length=compat.integer_types,
- content_md5=(bytes, str))
- def put_object(self, bucket_name, key, data,
- content_length,
- content_md5,
- content_type=None,
- content_sha256=None,
- user_metadata=None,
- storage_class=None,
- user_headers=None,
- encryption=None,
- customer_key=None,
- customer_key_md5=None,
- progress_callback=None,
- traffic_limit=None,
- object_tagging=None,
- cond_read_write=None,
- config=None):
- """
- Put object and put content of file to the object
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :type fp: FILE
- :param fp: None
- :type file_size: long
- :type offset: long
- :type content_length: long
- :return:
- **HTTP Response**
- """
- key = compat.convert_to_bytes(key)
- content_md5 = compat.convert_to_bytes(content_md5)
- headers = self._prepare_object_headers(
- content_length=content_length,
- content_md5=content_md5,
- content_type=content_type,
- content_sha256=content_sha256,
- user_metadata=user_metadata,
- storage_class=storage_class,
- user_headers=user_headers,
- traffic_limit=traffic_limit,
- object_tagging=object_tagging,)
- if cond_read_write is not None:
- headers = self._get_cond_read_write_headers(http_methods.PUT, headers, cond_read_write)
- buf_size = self._get_config_parameter(config, 'recv_buf_size')
- if content_length > bos.MAX_PUT_OBJECT_LENGTH:
- raise ValueError('Object length should be less than %d. '
- 'Use multi-part upload instead.' % bos.MAX_PUT_OBJECT_LENGTH)
-
- if progress_callback:
- data = utils.make_progress_adapter(data, progress_callback)
- return self._send_request(
- http_methods.PUT,
- bucket_name,
- key,
- body=data,
- headers=headers,
- config=config)
- @required(bucket=(bytes, str), key=(bytes, str), data=(bytes, str))
- def put_object_from_string(self, bucket, key, data,
- content_md5=None,
- content_type=None,
- content_sha256=None,
- user_metadata=None,
- storage_class=None,
- user_headers=None,
- encryption=None,
- customer_key=None,
- customer_key_md5=None,
- progress_callback=None,
- traffic_limit=None,
- object_tagging=None,
- cond_read_write=None,
- config=None):
- """
- Create object and put content of string to the object
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :type input_content: string
- :param input_content: None
- :type options: dict
- :param options: None
- :return:
- **HTTP Response**
- """
- key = compat.convert_to_bytes(key)
- if isinstance(data, str):
- data = data.encode(baidubce.DEFAULT_ENCODING)
- fp = None
- try:
- fp = io.BytesIO(data)
- if content_md5 is None:
- content_md5 = utils.get_md5_from_fp(
- fp, buf_size=self._get_config_parameter(config, 'recv_buf_size'))
- return self.put_object(bucket, key, fp,
- content_length=len(data),
- content_md5=content_md5,
- content_type=content_type,
- content_sha256=content_sha256,
- user_metadata=user_metadata,
- storage_class=storage_class,
- user_headers=user_headers,
- encryption=encryption,
- customer_key=customer_key,
- customer_key_md5=customer_key_md5,
- progress_callback = progress_callback,
- traffic_limit=traffic_limit,
- object_tagging=object_tagging,
- cond_read_write=cond_read_write,
- config=config)
- finally:
- if fp is not None:
- fp.close()
- @required(bucket=(bytes, str), key=(bytes, str), file_name=(bytes, str))
- def put_object_from_file(self, bucket, key, file_name,
- content_length=None,
- content_md5=None,
- content_type=None,
- content_sha256=None,
- user_metadata=None,
- storage_class=None,
- user_headers=None,
- encryption=None,
- customer_key=None,
- customer_key_md5=None,
- progress_callback=None,
- traffic_limit=None,
- object_tagging=None,
- cond_read_write=None,
- config=None,
- ):
- """
- Put object and put content of file to the object
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :type file_name: string
- :param file_name: None
- :type options: dict
- :param options: None
- :return:
- **HttpResponse Class**
- """
- key = compat.convert_to_bytes(key)
- fp = open(file_name, 'rb')
- try:
- if content_length is None:
- fp.seek(0, os.SEEK_END)
- content_length = fp.tell()
- fp.seek(0)
- if content_md5 is None:
- recv_buf_size = self._get_config_parameter(config, 'recv_buf_size')
- content_md5 = utils.get_md5_from_fp(fp, length=content_length,
- buf_size=recv_buf_size)
- if content_type is None:
- content_type = utils.guess_content_type_by_file_name(file_name)
- return self.put_object(bucket, key, fp,
- content_length=content_length,
- content_md5=content_md5,
- content_type=content_type,
- content_sha256=content_sha256,
- user_metadata=user_metadata,
- storage_class=storage_class,
- user_headers=user_headers,
- encryption=encryption,
- customer_key=customer_key,
- customer_key_md5=customer_key_md5,
- progress_callback=progress_callback,
- traffic_limit=traffic_limit,
- object_tagging=object_tagging,
- cond_read_write=cond_read_write,
- config=config)
- finally:
- fp.close()
- @required(source_bucket_name=(bytes, str),
- source_key=(bytes, str),
- target_bucket_name=(bytes, str),
- target_key=(bytes, str))
- def copy_object(self,
- source_bucket_name, source_key,
- target_bucket_name, target_key,
- etag=None,
- content_type=None,
- user_metadata=None,
- storage_class=None,
- user_headers=None,
- copy_object_user_headers=None,
- traffic_limit=None,
- object_tagging=None,
- source_version_id=None,
- config=None):
- """
- Copy one object to another object
- :type source_bucket: string
- :param source_bucket: None
- :type source_key: string
- :param source_key: None
- :type target_bucket: string
- :param target_bucket: None
- :type target_key: string
- :param target_key: None
- :return:
- **HttpResponse Class**
- """
- source_key = compat.convert_to_bytes(source_key)
- target_key = compat.convert_to_bytes(target_key)
- headers = self._prepare_object_headers(
- content_type=content_type,
- user_metadata=user_metadata,
- storage_class=storage_class,
- user_headers=user_headers,
- traffic_limit=traffic_limit,
- object_tagging=object_tagging)
-
- merge_source_key = utils.normalize_string(
- b'/%s/%s' % (
- compat.convert_to_bytes(source_bucket_name),
- source_key), False)
- if source_version_id is not None:
- merge_source_key = merge_source_key + b'?versionId=%s' % compat.convert_to_bytes(source_version_id)
- headers[http_headers.BCE_COPY_SOURCE] = merge_source_key
- if etag is not None:
- headers[http_headers.BCE_COPY_SOURCE_IF_MATCH] = etag
- if user_metadata is not None or content_type is not None:
- headers[http_headers.BCE_COPY_METADATA_DIRECTIVE] = b'replace'
- else:
- headers[http_headers.BCE_COPY_METADATA_DIRECTIVE] = b'copy'
- if copy_object_user_headers is not None:
- try:
- headers = BosClient._get_user_header(headers, copy_object_user_headers, True)
- except Exception as e:
- raise e
- return self._send_request(
- http_methods.PUT,
- target_bucket_name,
- target_key,
- headers=headers,
- config=config,
- body_parser=bos_handler.parse_copy_object_response)
- @required(bucket_name=(bytes, str), key=(bytes, str))
- def delete_object(self, bucket_name, key, version_id=None, config=None):
- """
- Delete Object
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :return:
- **HttpResponse Class**
- """
- query_params = {}
- if version_id is not None:
- version_id = compat.convert_to_bytes(version_id)
- query_params={b'versionId': version_id}
- key = compat.convert_to_bytes(key)
- return self._send_request(http_methods.DELETE, bucket_name, key,
- params=query_params, config=config)
- @required(bucket_name=(bytes, str), key_list=list)
- def delete_multiple_objects(self, bucket_name, key_list, config=None):
- """
- Delete Multiple Objects
- :type bucket: string
- :param bucket: None
- :type key_list: string list
- :param key_list: None
- :return:
- **HttpResponse Class**
- """
- key_list_json = [{'key': compat.convert_to_string(k)} for k in key_list]
- return self._send_request(http_methods.POST,
- bucket_name,
- body=json.dumps({'objects': key_list_json}),
- params={b'delete': b''},
- config=config)
- @required(source_bucket=(bytes, str),
- target_bucket=(bytes, str),
- target_prefix=(bytes, str))
- def put_bucket_logging(self,
- source_bucket,
- target_bucket,
- target_prefix=None,
- config=None):
- """
- Put Bucket Logging
- :type source_bucket: string
- :param source_bucket: None
- :type target_bucket: string
- :param target_bucket: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.PUT,
- source_bucket,
- params={b'logging': b''},
- body=json.dumps({'targetBucket': target_bucket,
- 'targetPrefix': target_prefix}),
- config=config)
- @required(bucket_name=(bytes, str))
- def get_bucket_logging(self, bucket_name, config=None):
- """
- Get Bucket Logging
- :type bucket_name: string
- :param bucket_name: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'logging': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def delete_bucket_logging(self, bucket_name, config=None):
- """
- Delete Bucket Logging
- :type bucket_name: string
- :param bucket_name: None
- :return:
- **HttpResponse Class**
- """
- return self._send_request(http_methods.DELETE,
- bucket_name,
- params={b'logging': b''},
- config=config)
- @required(bucket_name=(bytes, str), key=(bytes, str))
- def initiate_multipart_upload(self,
- bucket_name,
- key,
- content_type=None,
- storage_class=None,
- user_headers=None,
- config=None):
- """
- Initialize multi_upload_file.
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :return:
- **HttpResponse**
- """
- key = compat.convert_to_bytes(key)
- headers = {}
- if storage_class is not None:
- headers[http_headers.BOS_STORAGE_CLASS] = storage_class
- if content_type is not None:
- headers[http_headers.CONTENT_TYPE] = utils.convert_to_standard_string(content_type)
- else:
- headers[http_headers.CONTENT_TYPE] = http_content_types.OCTET_STREAM
- if user_headers is not None:
- try:
- headers = BosClient._get_user_header(headers, user_headers, False)
- except Exception as e:
- raise e
- return self._send_request(
- http_methods.POST,
- bucket_name,
- key,
- headers=headers,
- params={b'uploads': b''},
- config=config)
- @required(bucket_name=(bytes, str),
- key=(bytes, str),
- upload_id=(bytes, str),
- part_number=int,
- part_size=compat.integer_types,
- part_fp=object)
- def upload_part(self, bucket_name, key, upload_id,
- part_number, part_size, part_fp, part_md5=None,
- progress_callback=None, traffic_limit=None, config=None):
- """
- Upload a part.
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :type upload_id: string
- :param upload_id: None
- :type part_number: int
- :param part_number: None
- :type part_size: int or long
- :param part_size: None
- :type part_fp: file pointer
- :param part_fp: not None
- :type part_md5: str
- :param part_md5: None
- :type config: dict
- :param config: None
- :return:
- **HttpResponse**
- """
- key = compat.convert_to_bytes(key)
- if part_number < bos.MIN_PART_NUMBER or part_number > bos.MAX_PART_NUMBER:
- raise ValueError('Invalid part_number %d. The valid range is from %d to %d.' % (
- part_number, bos.MIN_PART_NUMBER, bos.MAX_PART_NUMBER))
- if part_size > bos.MAX_PUT_OBJECT_LENGTH:
- raise ValueError('Single part length should be less than %d. '
- % bos.MAX_PUT_OBJECT_LENGTH)
- headers = {http_headers.CONTENT_LENGTH: part_size,
- http_headers.CONTENT_TYPE: http_content_types.OCTET_STREAM}
- if part_md5 is not None:
- headers[http_headers.CONTENT_MD5] = part_md5
- if progress_callback:
- part_fp = utils.make_progress_adapter(part_fp, progress_callback, part_size)
-
- if traffic_limit is not None:
- headers[http_headers.BOS_TRAFFIC_LIMIT] = traffic_limit
- return self._send_request(
- http_methods.PUT,
- bucket_name,
- key,
- body=part_fp,
- headers=headers,
- params={b'partNumber': part_number, b'uploadId': upload_id},
- config=config)
- @required(source_bucket_name=(bytes, str),
- source_key=(bytes, str),
- target_bucket_name=(bytes, str),
- target_key=(bytes, str),
- upload_id=(bytes, str),
- part_number=int,
- part_size=compat.integer_types,
- offset=compat.integer_types)
- def upload_part_copy(self,
- source_bucket_name, source_key,
- target_bucket_name, target_key,
- upload_id, part_number, part_size, offset,
- etag=None,
- content_type=None,
- user_metadata=None,
- traffic_limit=None,
- config=None):
- """
- Copy part.
- :type source_bucket_name: string
- :param source_bucket_name: None
- :type source_key: string
- :param source_key: None
- :type target_bucket_name: string
- :param target_bucket_name: None
- :type target_key: string
- :param target_key: None
- :type upload_id: string
- :param upload_id: None
- :return:
- **HttpResponse**
- """
- source_key = compat.convert_to_bytes(source_key)
- target_key = compat.convert_to_bytes(target_key)
- headers = self._prepare_object_headers(
- content_type=content_type,
- user_metadata=user_metadata,
- traffic_limit=traffic_limit)
- headers[http_headers.BCE_COPY_SOURCE] = utils.normalize_string(
- b"/%s/%s" % (compat.convert_to_bytes(source_bucket_name),
- source_key), False)
- range = b"""bytes=%d-%d""" % (offset, offset + part_size - 1)
- headers[http_headers.BCE_COPY_SOURCE_RANGE] = range
- if etag is not None:
- headers[http_headers.BCE_COPY_SOURCE_IF_MATCH] = etag
- return self._send_request(
- http_methods.PUT,
- target_bucket_name,
- target_key,
- headers=headers,
- params={b'partNumber': part_number, b'uploadId': upload_id},
- config=config)
- @required(bucket_name=(bytes, str),
- key=(bytes, str),
- upload_id=(bytes, str),
- part_number=int,
- part_size=compat.integer_types,
- file_name=(bytes, str),
- offset=compat.integer_types)
- def upload_part_from_file(self, bucket_name, key, upload_id,
- part_number, part_size, file_name, offset, part_md5=None,
- progress_callback=None, traffic_limit=None, config=None):
- """
- :param bucket_name:
- :param key:
- :param upload_id:
- :param part_number:
- :param part_size:
- :param file_name:
- :param offset:
- :param part_md5:
- :param config:
- :return:
- """
- key = compat.convert_to_bytes(key)
- f = open(file_name, 'rb')
- try:
- f.seek(offset)
- return self.upload_part(bucket_name, key, upload_id, part_number, part_size, f,
- part_md5=part_md5, progress_callback=progress_callback,
- traffic_limit=traffic_limit, config=config)
- finally:
- f.close()
- @required(bucket_name=(bytes, str),
- key=(bytes, str),
- upload_id=(bytes, str),
- part_list=list)
- def complete_multipart_upload(self, bucket_name, key,
- upload_id, part_list,
- user_headers=None,
- user_metadata=None,
- cond_read_write=None,
- config=None):
- """
- After finish all the task, complete multi_upload_file.
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :type upload_id: string
- :param upload_id: None
- :type part_list: list
- :param part_list: None
- :return:
- **HttpResponse**
- """
- key = compat.convert_to_bytes(key)
- headers = self._prepare_object_headers(
- content_type=http_content_types.JSON,
- user_metadata=user_metadata,
- user_headers=user_headers)
- if cond_read_write is not None:
- headers = self._get_cond_read_write_headers(http_methods.POST, headers, cond_read_write)
- return self._send_request(
- http_methods.POST,
- bucket_name,
- key,
- body=json.dumps({'parts': part_list}),
- headers=headers,
- params={b'uploadId': upload_id})
- @required(bucket_name=(bytes, str), key=(bytes, str), upload_id=(bytes, str))
- def abort_multipart_upload(self, bucket_name, key, upload_id, config=None):
- """
- Abort upload a part which is being uploading.
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :type upload_id: string
- :param upload_id: None
- :return:
- **HttpResponse**
- """
- key = compat.convert_to_bytes(key)
- return self._send_request(http_methods.DELETE, bucket_name, key,
- params={b'uploadId': upload_id})
- @required(bucket_name=(bytes, str), key=(bytes, str), upload_id=(bytes, str))
- def list_parts(self, bucket_name, key, upload_id,
- max_parts=None, part_number_marker=None,
- config=None):
- """
- List all the parts that have been upload success.
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :type upload_id: string
- :param upload_id: None
- :type max_parts: int
- :param max_parts: None
- :type part_number_marker: string
- :param part_number_marker: None
- :return:
- **_ListPartsResponse Class**
- """
- key = compat.convert_to_bytes(key)
- params = {b'uploadId': upload_id}
- if max_parts is not None:
- params[b'maxParts'] = max_parts
- if part_number_marker is not None:
- params[b'partNumberMarker'] = part_number_marker
- return self._send_request(http_methods.GET, bucket_name, key, params=params, config=config)
- @required(bucket_name=(bytes, str), key=(bytes, str), upload_id=(bytes, str))
- def list_all_parts(self, bucket_name, key, upload_id, config=None):
- """
- :param bucket_name:
- :param key:
- :param upload_id:
- :param config:
- :return:
- """
- key = compat.convert_to_bytes(key)
- part_number_marker = None
- while True:
- response = self.list_parts(bucket_name, key, upload_id,
- part_number_marker=part_number_marker, config=config)
- for item in response.parts:
- yield item
- if not response.is_truncated:
- break
- part_number_marker = response.next_part_number_marker
- @required(bucket_name=(bytes, str))
- def list_multipart_uploads(self, bucket_name, max_uploads=None, key_marker=None,
- prefix=None, delimiter=None,
- config=None):
- """
- List all Multipart upload task which haven't been ended.(Completed Init_MultiPartUpload
- but not completed Complete_MultiPartUpload or Abort_MultiPartUpload)
- :type bucket: string
- :param bucket: None
- :type delimiter: string
- :param delimiter: None
- :type max_uploads: int
- :param max_uploads: <=1000
- :type key_marker: string
- :param key_marker: None
- :type prefix: string
- :param prefix: None
- :type upload_id_marker: string
- :param upload_id_marker:
- :return:
- **_ListMultipartUploadResponse Class**
- """
- params = {b'uploads': b''}
- if delimiter is not None:
- params[b'delimiter'] = delimiter
- if max_uploads is not None:
- params[b'maxUploads'] = max_uploads
- if key_marker is not None:
- params[b'keyMarker'] = key_marker
- if prefix is not None:
- params[b'prefix'] = prefix
- return self._send_request(http_methods.GET, bucket_name, params=params, config=config)
- @required(bucket_name=(bytes, str))
- def list_all_multipart_uploads(self, bucket_name, prefix=None, delimiter=None, config=None):
- """
- :param bucket_name:
- :param prefix:
- :param delimiter:
- :param config:
- :return:
- """
- key_marker = None
- while True:
- response = self.list_multipart_uploads(bucket_name,
- key_marker=key_marker,
- prefix=prefix,
- delimiter=delimiter,
- config=config)
- for item in response.uploads:
- yield item
- if not response.is_truncated:
- break
- if response.next_key_marker is not None:
- key_marker = response.next_key_marker
- elif len(response.uploads) != 0:
- key_marker = response.uploads[-1].key
- else:
- break
- def _upload_task(self, bucket_name, object_key, upload_id,
- part_number, part_size, file_name, offset, part_list, uploadTaskHandle,
- progress_callback=None, traffic_limit=None):
- if uploadTaskHandle.is_cancel():
- _logger.debug("upload task canceled with partNumber={}!".format(part_number))
- return
- try:
- response = self.upload_part_from_file(bucket_name, object_key, upload_id,
- part_number, part_size, file_name, offset, progress_callback=progress_callback
- , traffic_limit=traffic_limit)
- part_list.append({
- "partNumber": part_number,
- "eTag": response.metadata.etag
- })
- _logger.debug("upload task success with partNumber={}!".format(part_number))
- except Exception as e:
- _logger.debug("upload task failed with partNumber={}!".format(part_number))
- raise e
- #_logger.debug(e)
- @required(bucket_name=(bytes, str), key=(bytes, str), file_name=(bytes, str))
- def put_super_obejct_from_file(self, bucket_name, key, file_name, chunk_size=5,
- thread_num=None,
- uploadTaskHandle=None,
- content_type=None,
- storage_class=None,
- user_headers=None,
- progress_callback=None,
- traffic_limit=None,
- cond_read_write=None,
- config=None):
- """
- Multipart Upload file to bos
- param chunk_size: part size , default part size is 5MB
- """
- # check params
- if chunk_size > 5 * 1024 or chunk_size <= 0:
- raise BceClientError("chunk size is valid, it should be more than 0 and not nore than 5120!")
- left_size = os.path.getsize(file_name)
- # if file size more than 48.8TB, reject
- if left_size > 50000 * 1024 * 1024 * 1024:
- raise BceClientError("File size must not be more than 48.8TB!")
- if thread_num is None or thread_num <= 1:
- thread_num = multiprocessing.cpu_count()
- part_size = chunk_size * 1024 * 1024
- total_part = left_size // part_size
- if left_size % part_size != 0:
- total_part += 1
- if uploadTaskHandle is None:
- uploadTaskHandle = UploadTaskHandle()
- # initial
- upload_id = self.initiate_multipart_upload(bucket_name, key,
- content_type=content_type,
- storage_class=storage_class,
- user_headers=user_headers).upload_id
- executor = ThreadPoolExecutor(thread_num)
- all_tasks = []
- offset = 0
- part_number = 1
- part_list = []
- while left_size > 0:
- if left_size < part_size:
- part_size = left_size
- temp_task= executor.submit(self._upload_task, bucket_name, key, upload_id, part_number, part_size,
- file_name, offset, part_list, uploadTaskHandle, progress_callback, traffic_limit)
- all_tasks.append(temp_task)
- left_size -= part_size
- offset += part_size
- part_number += 1
- # wait all upload task to exit
- wait(all_tasks, return_when=ALL_COMPLETED)
- if uploadTaskHandle.is_cancel():
- _logger.debug("putting super object is canceled!")
- self.abort_multipart_upload(bucket_name, key, upload_id = upload_id)
- return False
- elif len(part_list) != total_part:
- _logger.debug("putting super object failed!")
- self.abort_multipart_upload(bucket_name, key, upload_id = upload_id)
- return False
- # sort
- part_list.sort(key=lambda x: x["partNumber"])
- # complete_multipart_upload
- self.complete_multipart_upload(bucket_name, key, upload_id, part_list, cond_read_write)
- return True
- @required(bucket_name=(bytes, str), key=(bytes, str), acl=(list, dict))
- def set_object_acl(self, bucket_name, key, acl, config=None):
- """
- Set Access Control Level of object
- :type bucket: string
- :param bucket: None
- :type acl: list of grant
- :param acl: None
- :return:
- **HttpResponse Class**
- """
- key = compat.convert_to_bytes(key)
- self._send_request(http_methods.PUT,
- bucket_name,
- key,
- body=json.dumps({'accessControlList': acl},
- default=BosClient._dump_acl_object),
- headers={http_headers.CONTENT_TYPE: http_content_types.JSON},
- params={b'acl': b''},
- config=config)
- @required(bucket_name=(bytes, str), key=(bytes, str))
- def set_object_canned_acl(self, bucket_name, key,
- canned_acl=None,
- grant_read=None,
- grant_full_control=None,
- config=None):
- """
- :type bucket_name: string
- :param bucket_name: None
- :type key: string
- :param key: None
- :type canned_acl: string
- :param canned_acl: for header 'x-bce-acl', it's value only is
- canned_acl.PRIVATE or canned_acl.PRIVATE_READ
- :type grant_read: string
- :param grant_read: Object id of getting READ right permission.
- for exapmle,grant_read = 'id="6c47...4c94",id="8c42...4c94"'
- :type grant_full_control: string
- :param grant_full_control: Object id of getting READ right permission.
- for exapmle,grant_full_control = 'id="6c47...4c94",id="8c42...4c94"'
- :param config:
- :return:
- **HttpResponse Class**
- """
- key = compat.convert_to_bytes(key)
- headers = None
- num_args = 0
- if canned_acl is not None:
- headers = {http_headers.BCE_ACL: compat.convert_to_bytes(canned_acl)}
- num_args += 1
- if grant_read is not None:
- headers = {http_headers.BOS_GRANT_READ: compat.convert_to_bytes(grant_read)}
- num_args += 1
- if grant_full_control is not None:
- headers = {http_headers.BOS_GRANT_FULL_CONTROL: compat.convert_to_bytes(grant_full_control)}
- num_args += 1
- if num_args == 0:
- raise ValueError("donn't give any object canned acl arguments!")
- elif num_args >= 2:
- raise ValueError("cann't get more than one object canned acl arguments!")
- self._send_request(http_methods.PUT,
- bucket_name,
- key,
- headers=headers,
- params={b'acl': b''},
- config=config)
- @required(bucket_name=(bytes, str), key=(bytes, str))
- def get_object_acl(self, bucket_name, key, config=None):
- """
- Get Access Control Level of object
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :return:
- **HttpResponse Class**
- """
- key = compat.convert_to_bytes(key)
- return self._send_request(
- http_methods.GET,
- bucket_name,
- key,
- params={b'acl': b''},
- config=config)
- @required(bucket_name=(bytes, str), key=(bytes, str))
- def delete_object_acl(self, bucket_name, key, config=None):
- """
- Get Access Control Level of object
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: None
- :return:
- **HttpResponse Class**
- """
- key = compat.convert_to_bytes(key)
- return self._send_request(
- http_methods.DELETE,
- bucket_name,
- key,
- params={b'acl': b''},
- config=config)
- @required(bucket_name=(bytes, str), key=(bytes, str), url=(bytes, str))
- def fetch_object(self, bucket_name, key, url,
- fetch_mode=None,
- storage_class=None,
- config=None):
- """
- fetch object with given url and save to Baidu object storage
- :type bucket: string
- :param bucket: None
- :type key: string
- :param key: object name to be saved
- :type url:string
- :param url: url of resource to be fetched
- :type fetch_mode:string
- :param fetch_mode: fetch mode for get resource, valid value only is
- 'sync' and 'async'
- :return:
- **HttpResponse Class**
- """
- key = compat.convert_to_bytes(key)
- headers = {}
- headers[http_headers.BOS_FETCH_SOURCE] = compat.convert_to_bytes(url)
- if fetch_mode is not None:
- headers[http_headers.BOS_FETCH_MODE] = fetch_mode
- if storage_class is not None:
- headers[http_headers.BOS_STORAGE_CLASS] = storage_class
- return self._send_request(
- http_methods.POST,
- bucket_name,
- key,
- headers=headers,
- params={b'fetch': b''},
- config=config)
- @required(bucket_name=(bytes, str), target_key=(bytes, str), symlink=(bytes, str), forbid_overwrite=(bool))
- def put_object_symlink(self, bucket_name, target_key, symlink, forbid_overwrite=None,
- user_metadata=None, storage_class=None, target_bucket=None, content_type=None, config=None):
- """
- put object symlink
- :type bucket: string
- :param bucket: None
- :type key: string
- :type key: object name
- :type symlink: string
- :type symlink_key: symlink name
- :return:
- **HttpResponse Class**
- """
- target_key = compat.convert_to_bytes(target_key)
- symlink = compat.convert_to_bytes(symlink)
- if content_type is None:
- content_type = utils.guess_content_type_by_file_name(symlink)
- headers = self._prepare_object_headers(user_metadata=user_metadata,
- content_type=content_type,
- storage_class=storage_class)
- headers[http_headers.BOS_SYMLINK_TARGET] = target_key
- if forbid_overwrite is not None:
- if forbid_overwrite:
- headers[http_headers.BOS_FORBID_OVERWRITE] = b'true'
- else:
- headers[http_headers.BOS_FORBID_OVERWRITE] = b'false'
- if target_bucket is not None:
- headers[http_headers.BOS_SYMLINK_BUCKET] = compat.convert_to_bytes(target_bucket)
- return self._send_request(http_methods.PUT,
- bucket_name,
- symlink,
- headers=headers,
- params={b'symlink': b''},
- config=config)
- @required(bucket_name=(bytes, str), symlink=(bytes, str))
- def get_object_symlink(self, bucket_name, symlink, config=None):
- """
- Get symlink info
- :type bucket: string
- :param bucket: None
- :type symlink: string
- :param symlink: symlink
- :return:
- **HttpResponse Class**
- """
- key = compat.convert_to_bytes(symlink)
- return self._send_request(
- http_methods.GET,
- bucket_name,
- key,
- params={b'symlink': b''},
- config=config)
- @required(bucket_name=(bytes, str), key=(bytes, str), select_object_args=(dict, ))
- def select_object(self, bucket_name, key, select_object_args, headers=None, config=None):
- """
- :type bucket_name: string
- :param bucket_name: bucket name
- :type key: string
- :param key: object name
- :type select_object_args: dict
- :param select_object_args: requesta parameters for select object api
- :param config:
- :return:
- """
- key = compat.convert_to_bytes(key)
- headers = headers or {}
- if "inputSerialization" in select_object_args and "json" in select_object_args["inputSerialization"]:
- select_type = b"json"
- elif "inputSerialization" in select_object_args and "csv" in select_object_args["inputSerialization"]:
- select_type = b"csv"
- else:
- select_type = b"parquet"
- select_response = SelectResponse()
- self._send_request(
- http_methods.POST,
- bucket_name,
- key,
- body=json.dumps({'selectRequest': select_object_args}, default=BosClient._dump_acl_object),
- headers=headers,
- params={b'select': b'', b'type': select_type},
- config=config,
- body_parser=lambda http_response, response: BosClient._parse_select_message(
- http_response, response, select_response)
- )
- return select_response
- def get_user_quota(self, config=None):
- """
- get user quota
- :param config:
- :return:
- """
- return self._send_request(
- http_methods.GET, params={b'userQuota': b''}, config=config,)
-
- @required(max_bucket_count=(int), max_capacity_mega_bytes=(int))
- def put_user_quota(self, max_bucket_count, max_capacity_mega_bytes, config=None):
- """
- put user quota
- :type max_bucket_count: int
- :param max_bucket_count: max bucket count
- :type max_capacity_mega_bytes: long
- :param max_capacity_mega_bytes: max capacity mega bytes
- :param config:
- :return:
- """
- return self._send_request(
- http_methods.PUT,
- body=json.dumps({'maxBucketCount': max_bucket_count,
- 'maxCapacityMegaBytes': max_capacity_mega_bytes}),
- params={b'userQuota': b''}, config=config)
- def delete_user_quota(self, config=None):
- """
- delete user quota
- :param config:
- :return:
- """
- return self._send_request(
- http_methods.DELETE, params={b'userQuota': b''}, config=config)
- @required(bucket_name=(bytes, str))
- def get_notification(self, bucket_name, config=None):
- """
- get notification
- :type bucket_name: string
- :param bucket_name: bucket name
- :param config:
- :return:
- """
- return self._send_request(
- http_methods.GET, bucket_name=bucket_name,
- params={b'notification': b''}, config=config,)
-
- @required(bucket_name=(bytes, str), notifications=(list, ))
- def put_notification(self, bucket_name, notifications, config=None):
- """
- put user quota
- :type bucket_name: string
- :param bucket_name: bucket
- :type notifications: list of dict
- :param notifications: notifacation param
- :param config:
- :return:
- """
- return self._send_request(
- http_methods.PUT, bucket_name=bucket_name,
- body=json.dumps({'notifications': notifications}),
- params={b'notification': b''}, config=config,)
- @required(bucket_name=(bytes, str))
- def delete_notification(self, bucket_name, config=None):
- """
- delete notification
- :type bucket_name: string
- :param bucket_name: bucket name
- :param config:
- :return:
- """
- return self._send_request(
- http_methods.DELETE, bucket_name=bucket_name, params={b'notification': b''},
- config=config,)
- @required(bucket_name=(bytes, str), mirror_args=(list, ))
- def put_bucket_mirroring(self, bucket_name, mirror_args, config=None):
- """
- put bucket mirroring
- :type bucket_name: string
- :param bucket_name: bucket name
- :param mirror_args: mirror conf
- :return:
- """
- return self._send_request(
- http_methods.PUT,
- bucket_name=bucket_name,
- body=json.dumps({'bucketMirroringConfiguration': mirror_args}, default=BosClient._dump_acl_object),
- params={b'mirroring': b''},
- config=config,
- )
- @required(bucket_name=(bytes, str))
- def get_bucket_mirroring(self, bucket_name, config=None):
- """
- get bucket mirroring
- :type bucket_name: string
- :param bucket_name: bucket name
- :return:
- """
- return self._send_request(
- http_methods.GET,
- bucket_name=bucket_name,
- params={b'mirroring': b''},
- config=config,
- )
- @required(bucket_name=(bytes, str))
- def delete_bucket_mirroring(self, bucket_name, config=None):
- """
- delete bucket mirroring
- :type bucket_name: string
- :param bucket_name: bucket name
- :return:
- """
- return self._send_request(
- http_methods.DELETE,
- bucket_name=bucket_name,
- params={b'mirroring': b''},
- config=config,
- )
- def put_object_tagging(self, bucket_name, key, obj_tag_args, config=None):
- """
- put object tagging
- :type bucket_name: string
- :param bucket_name: bucket name
- :type key: string
- :param key: object name
- :type obj_tag_args: dict
- :param obj_tag_args: object tagging args
- :return:
- """
- return self._send_request(
- http_methods.PUT,
- bucket_name=bucket_name,
- key=key,
- body=json.dumps(obj_tag_args, default=BosClient._dump_acl_object),
- params={b'tagging': b''},
- config=config,)
- def put_object_tagging_canned(self, bucket_name, key, tag_header, config=None):
- """
- put object tagging
- :type bucket_name: string
- :param bucket_name: bucket name
- :type key: string
- :param key: object name
- :type obj_tag_args: dict
- :param obj_tag_args: object tagging args
- :return:
- """
- headers = {}
- headers[http_headers.BOS_TAGGING] = compat.convert_to_bytes(tag_header)
- return self._send_request(
- http_methods.PUT,
- bucket_name=bucket_name,
- key=key,
- headers=headers,
- params={b'tagging': b''},
- config=config,)
- def get_object_tagging(self, bucket_name, key, config=None):
- """
- put object tagging
- :type bucket_name: string
- :param bucket_name: bucket name
- :type key: string
- :param key: object name
- :return:
- """
- return self._send_request(
- http_methods.GET,
- bucket_name=bucket_name,
- key=key,
- params={b'tagging': b''},
- config=config,)
-
- def put_bucket_versioning(self, bucket_name, status, config=None):
- """
- put bucket versioning
- :type bucket_name: string
- :param bucket_name: bucket name
- :type status: string
- :param key: version status:disable/enabled/suspended
- :return:
- """
- return self._send_request(
- http_methods.PUT,
- bucket_name=bucket_name,
- body=json.dumps({'status': status}, default=BosClient._dump_acl_object),
- params={b'versioning': b''},
- config=config,)
-
- def get_bucket_versioning(self, bucket_name, config=None):
- """
- get bucket versioning
- :type bucket_name: string
- :param bucket_name: bucket name
- :return:
- """
- return self._send_request(
- http_methods.GET,
- bucket_name=bucket_name,
- params={b'versioning': b''},
- config=config,)
-
- @required(bucket_name=(bytes, str))
- def list_objects_versions(self, bucket_name,
- max_keys=1000, prefix=None, marker=None, version_marker=None,
- delimiter=None, config=None):
- """
- Get Object Information of bucket
- :type bucket: string
- :param bucket: None
- :type delimiter: string
- :param delimiter: None
- :type marker: string
- :param marker: None
- :type max_keys: int
- :param max_keys: value <= 1000
- :type prefix: string
- :param prefix: None
- :return:
- **_ListObjectsResponse Class**
- """
- params = {b'versions': b''}
- if max_keys is not None:
- params[b'maxKeys'] = max_keys
- if prefix is not None:
- params[b'prefix'] = prefix
- if marker is not None:
- params[b'marker'] = marker
- if delimiter is not None:
- params[b'delimiter'] = delimiter
- if version_marker is not None:
- params[b'versionIdMarker'] = version_marker
- return self._send_request(http_methods.GET, bucket_name, params=params, config=config)
-
- @required(bucket_name=(bytes, str), cors_configuration=list)
- def init_bucket_object_lock(self,
- bucket_name,
- retention_days,
- config=None):
- """
- init bucket object lock
- :type bucket: string
- :param bucket: None
- :type retention_days: int
- :param retention_days: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.POST,
- bucket_name,
- params={b'objectlock': b''},
- body=json.dumps({'retentionDays': retention_days}),
- config=config)
- @required(bucket_name=(bytes, str))
- def get_bucket_object_lock(self, bucket_name, config=None):
- """
- get bucket object lock
- :type bucket: string
- :param bucket: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'objectlock': b''},
- config=config)
- @required(bucket_name=(bytes, str))
- def delete_bucket_object_lock(self, bucket_name, config=None):
- """
- Delete Bucket Object Lock
- :type bucket: string
- :param bucket: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.DELETE,
- bucket_name,
- params={b'objectlock': b''},
- config=config)
- @required(bucket_name=(bytes, str), cors_configuration=list)
- def complete_bucket_object_lock(self,
- bucket_name,
- config=None):
- """
- complete bucket object lock
- :type bucket: string
- :param bucket: None
- :type retention_days: int
- :param retention_days: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.POST,
- bucket_name,
- params={b'completeobjectlock': b''},
- config=config)
-
- @required(bucket_name=(bytes, str), cors_configuration=list)
- def extend_bucket_object_lock(self,
- bucket_name,
- extend_retent_days,
- config=None):
- """
- extend bucket object lock
- :type bucket: string
- :param bucket: None
- :type extend_retent_days: int
- :param extend_retent_days: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.POST,
- bucket_name,
- params={b'extendobjectlock': b''},
- body=json.dumps({'extendRetentionDays': extend_retent_days}),
- config=config)
- @required(bucket_name=(bytes, str), cors_configuration=list)
- def get_bucket_quota(self,
- bucket_name,
- config=None):
- """
- get bucket quota
- :type bucket: string
- :param bucket: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'quota': b''},
- config=config)
-
- @required(bucket_name=(bytes, str), cors_configuration=list)
- def put_bucket_quota(self,
- bucket_name,
- quota_conf,
- config=None):
- """
- put quota conf of bucket
- :type bucket: string
- :param bucket: None
- :type retention_days: int
- :param retention_days: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.PUT,
- bucket_name,
- body=json.dumps(quota_conf, default=BosClient._dump_acl_object),
- params={b'quota': b''},
- config=config)
-
- @required(bucket_name=(bytes, str), cors_configuration=list)
- def delete_bucket_quota(self,
- bucket_name,
- config=None):
- """
- get bucket quota
- :type bucket: string
- :param bucket: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.DELETE,
- bucket_name,
- params={b'quota': b''},
- config=config)
-
- @required(bucket_name=(bytes, str), cors_configuration=list)
- def get_bucket_tagging(self,
- bucket_name,
- config=None):
- """
- get bucket tagging
- :type bucket: string
- :param bucket: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.GET,
- bucket_name,
- params={b'tagging': b''},
- config=config)
-
- @required(bucket_name=(bytes, str), cors_configuration=list)
- def put_bucket_tagging(self,
- bucket_name,
- tag_conf,
- config=None):
- """
- put tagging conf of bucket
- :type bucket: string
- :param bucket: None
- :type retention_days: int
- :param retention_days: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.PUT,
- bucket_name,
- body=json.dumps(tag_conf, default=BosClient._dump_acl_object),
- params={b'tagging': b''},
- config=config)
-
- @required(bucket_name=(bytes, str), cors_configuration=list)
- def delete_bucket_tagging(self,
- bucket_name,
- config=None):
- """
- delete bucket tagging
- :type bucket: string
- :param bucket: None
- :return:**Http Response**
- """
- return self._send_request(http_methods.DELETE,
- bucket_name,
- params={b'tagging': b''},
- config=config)
- @staticmethod
- def _prepare_object_headers(
- content_length=None,
- content_md5=None,
- content_type=None,
- content_sha256=None,
- etag=None,
- user_metadata=None,
- storage_class=None,
- user_headers=None,
- encryption=None,
- customer_key=None,
- customer_key_md5=None,
- traffic_limit=None,
- object_tagging=None,):
- headers = {}
- if content_length is not None:
- if content_length and content_length < 0:
- raise ValueError('content_length should not be negative.')
- headers[http_headers.CONTENT_LENGTH] = compat.convert_to_bytes(content_length)
- if content_md5 is not None:
- headers[http_headers.CONTENT_MD5] = utils.convert_to_standard_string(content_md5)
- if content_type is not None:
- headers[http_headers.CONTENT_TYPE] = utils.convert_to_standard_string(content_type)
- else:
- headers[http_headers.CONTENT_TYPE] = http_content_types.OCTET_STREAM
- if content_sha256 is not None:
- headers[http_headers.BCE_CONTENT_SHA256] = content_sha256
- if etag is not None:
- headers[http_headers.ETAG] = b'"%s"' % utils.convert_to_standard_string(etag)
- if user_metadata is not None:
- meta_size = 0
- meta_data_set = set()
- if not isinstance(user_metadata, dict):
- raise TypeError('user_metadata should be of type dict.')
- for k, v in iteritems(user_metadata):
- meta_data_set.add(k.lower())
- k = utils.convert_to_standard_string(k)
- v = utils.convert_to_standard_string(v)
- normalized_key = http_headers.BCE_USER_METADATA_PREFIX + k
- headers[normalized_key] = v
- meta_size += len(normalized_key)
- meta_size += len(v)
- if meta_size > bos.MAX_USER_METADATA_SIZE:
- raise ValueError(
- 'Metadata size should not be greater than %d.' % bos.MAX_USER_METADATA_SIZE)
- if len(meta_data_set) != len(user_metadata):
- raise ValueError('user_metadata has duplicate elements.')
- if storage_class is not None:
- headers[http_headers.BOS_STORAGE_CLASS] = storage_class
- if encryption is not None:
- headers[http_headers.BOS_SERVER_SIDE_ENCRYPTION] = utils.convert_to_standard_string(encryption)
- if customer_key is not None:
- headers[http_headers.BOS_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY] = \
- utils.convert_to_standard_string(customer_key)
- if customer_key_md5 is not None:
- headers[http_headers.BOS_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5] = \
- utils.convert_to_standard_string(customer_key_md5)
- if user_headers is not None:
- try:
- headers = BosClient._get_user_header(headers, user_headers, False)
- except Exception as e:
- raise e
-
- if traffic_limit is not None:
- headers[http_headers.BOS_TRAFFIC_LIMIT] = traffic_limit
- if object_tagging is not None:
- headers[http_headers.BOS_TAGGING] = compat.convert_to_bytes(object_tagging)
- return headers
- @staticmethod
- def _get_cond_read_write_headers(http_method, headers, cond_read_write):
- """
- get if condition headers
- :type http_method: string
- :param http_method: GET, HEAD, PUT, POST
- :type cond_read_write: string
- :param cond_read_write: put_object, complete_multipart_upload, get_object, get_object_meta_data
- :return: headers
- """
- cond_read_write_set = http_headers.BOS_COND_READ_WRITE_HEADERS
-
- if http_method == http_methods.GET or http_method == http_methods.HEAD:
- cond_read_write_set = cond_read_write_set.union(set(
- [http_headers.BOS_IF_MODIFIED_SINCE,
- http_headers.BOS_IF_UNMODIFIED_SINCE,
- http_headers.BOS_IF_MATCH,
- http_headers.BOS_IF_NONE_MATCH]))
-
- for k, v in iteritems(cond_read_write):
- k = utils.convert_to_standard_string(k)
- if k in cond_read_write_set:
- headers[k] = v
- else:
- raise ValueError('%s is not valid in %s' % (k, http_method))
- return headers
- @staticmethod
- def _get_user_header(headers, user_headers, is_copy=False):
- if not isinstance(user_headers, dict):
- raise TypeError('user_headers should be of type dict.')
-
- bos_headers = http_headers.BOS_BASE_ALLOW_HEADERS
- if not is_copy:
- user_headers_set = bos_headers.union(set([http_headers.CACHE_CONTROL,
- http_headers.CONTENT_ENCODING,
- http_headers.CONTENT_DISPOSITION,
- http_headers.EXPIRES,
- http_headers.BOS_PROCESS]))
- else:
- user_headers_set = bos_headers.union(set([http_headers.BCE_COPY_SOURCE_IF_NONE_MATCH,
- http_headers.BCE_COPY_SOURCE_IF_UNMODIFIED_SINCE,
- http_headers.BCE_COPY_SOURCE_IF_MODIFIED_SINCE]))
- for k, v in iteritems(user_headers):
- k = utils.convert_to_standard_string(k)
- if k != http_headers.BOS_OBJECT_EXPIRES:
- v = utils.convert_to_standard_string(v)
- if k in user_headers_set:
- headers[k] = v
- return headers
- def _get_config_parameter(self, config, attr):
- result = None
- if config is not None:
- result = getattr(config, attr)
- if result is not None:
- return result
- return getattr(self.config, attr)
- @staticmethod
- def _get_path(config, bucket_name=None, key=None, use_backup_endpoint=False):
- host = config.endpoint
- if use_backup_endpoint:
- host = config.backup_endpoint
- endpoint_protocol, host_name, endpoint_port = \
- utils.parse_host_port(config.endpoint, config.protocol)
- if config.cname_enabled or utils.is_cname_like_host(host_name) or utils.is_custom_host(host_name, bucket_name):
- return utils.append_uri(bos.URL_PREFIX, key)
- return utils.append_uri(bos.URL_PREFIX, bucket_name, key)
-
- def _merge_config(self, config, bucket_name):
- new_config = copy.copy(self.config)
- if config is not None:
- new_config.merge_non_none_values(config)
-
- endpoint = self._change_user_endpoint(new_config, bucket_name)
- new_config.endpoint = endpoint
- return new_config
-
- def _change_user_endpoint(self, config, bucket_name):
- endpoint_protocol, user_host_name, endpoint_port = \
- utils.parse_host_port(config.endpoint, config.protocol)
- user_endpoint_split = compat.convert_to_bytes(user_host_name).split(b'.')
- user_endpoint = config.endpoint
- is_bos_path_style_host = utils.is_bos_suffixed_host(user_host_name) and len(user_endpoint_split) == 3
- # 1. check ipv4 or path style
- if utils.check_ipv4(user_host_name):
- return config.endpoint
-
- if config.path_style_enable:
- return config.endpoint
-
- # 2. check cname domain
- if config.cname_enabled or utils.is_cname_like_host(user_host_name):
- # cname domain
- if is_bos_path_style_host:
- raise ValueError(
- 'endpoint is not cname domain, please set cname_enabled=False')
- else:
- return config.endpoint
-
- # default use virtual-hosted endpoint
- if bucket_name is not None:
- if is_bos_path_style_host:
- # split http head
- if user_endpoint.startswith(HTTP_PROTOCOL_HEAD):
- http_head_split = user_endpoint.split(b'//')
- if len(http_head_split) < 2:
- return config.endpoint
- bucket_endpoint = http_head_split[0] + b'//' + compat.convert_to_bytes(bucket_name) +\
- b'.' + http_head_split[1]
- return compat.convert_to_bytes(bucket_endpoint)
- else:
- return compat.convert_to_bytes(bucket_name)+b'.'+\
- compat.convert_to_bytes(user_endpoint)
-
- # check virtual-hosted endpoint's bucket_name is not query bucket_name
- if len(user_endpoint_split) == 4 and bucket_name is not None:
- if user_endpoint_split[0] != compat.convert_to_bytes(bucket_name):
- raise ValueError('your endpoint\'s bucket_name is not equal your query bucket_name!')
- return config.endpoint
- @staticmethod
- def _need_retry_for_bos(config, error):
-
- if not isinstance(error, BceServerError):
- return False
- # if you need add more retry condition, please add it here
- if error.status_code == http.client.FORBIDDEN:
- _logger.debug('BOS retry condition matched: 403 Forbidden')
- return True
- return False
- @staticmethod
- def _need_retry_backup_endpoint(error):
- # always retry on IOError
- if isinstance(error, IOError):
- return True
- # Only retry on a subset of service exceptions
- if isinstance(error, BceServerError):
- if error.status_code == http.client.INTERNAL_SERVER_ERROR:
- return True
- if error.status_code == http.client.SERVICE_UNAVAILABLE:
- return True
- if error.code == BceServerError.REQUEST_EXPIRED:
- return True
- if error.status_code == http.client.FORBIDDEN:
- return True
- return False
- def _send_request(
- self, http_method, bucket_name=None, key=None,
- body=None, headers=None, params=None,
- config=None,
- body_parser=None):
- config = self._merge_config(config, bucket_name)
- path = BosClient._get_path(config, bucket_name, key)
- if body_parser is None:
- body_parser = handler.parse_json
- if config.security_token is not None:
- headers = headers or {}
- headers[http_headers.STS_SECURITY_TOKEN] = config.security_token
- last_exception = None
- e = None
- try:
- return bce_http_client.send_request(
- config, bce_v1_signer.sign, [handler.parse_error, body_parser],
- http_method, path, body, headers, params)
- except BceHttpClientError as ex:
- last_exception = ex
- e = ex
- # retry backup endpoint
- if e is not None and config.backup_endpoint is not None and BosClient._need_retry_backup_endpoint(e):
- try:
- _logger.debug(b'Retry for backup endpoint error code: %d.', e.status_code)
- path = BosClient._get_path(config, bucket_name, key, True)
- return bce_http_client.send_request(
- config, bce_v1_signer.sign, [handler.parse_error, body_parser],
- http_method, path, body, headers, params, True)
- except BceHttpClientError as ex:
- last_exception = ex
- e = ex
- # retry for bos error
- if e is not None and BosClient._need_retry_for_bos(config, e.last_error):
- try:
- _logger.debug(b'Retry for BOS error code: %d.', e.status_code)
- return bce_http_client.send_request(
- config, bce_v1_signer.sign, [handler.parse_error, body_parser],
- http_method, path, body, headers, params)
- except BceHttpClientError as ex:
- last_exception = ex
- e = ex
- if last_exception is None:
- raise
- raise last_exception
- class SelectMessage(object):
- """
- returned message from select object api
- """
- def set_record_message(self, headers, payload, crc):
- """
- Initialize for record message
- """
- self.type = "Records"
- self.headers = headers
- self.payload = payload
- self.crc = crc
- def set_cont_message(self, headers, bytes_scanned, bytes_returned, crc):
- """
- Initialize for continue message
- """
- self.type = "Cont"
- self.headers = headers
- self.bytes_scanned = bytes_scanned
- self.bytes_returned = bytes_returned
- self.crc = crc
- def set_end_message(self, headers, crc):
- """
- Initialize for end message
- """
- self.type = "End"
- self.headers = headers
- self.crc = crc
- def __str__(self):
- if self.type == "Records":
- return '{}\n{}'.format(self.headers, self.payload)
- elif self.type == "Cont":
- return '{}\nbytes_scanned/bytes_returned={}/{}'.format(self.headers, self.bytes_scanned,
- self.bytes_returned)
- else:
- return '{}'.format(self.headers)
- class SelectResponse(object):
- """
- deal with message of select object api
- """
- def __init__(self):
- self.finish = False
- def init_from_http_response(self, http_response, response):
- """
- get HttpResponse and BceResponse
- """
- self.http_response = http_response
- self.response = response
- def result(self):
- """
- generator for SelectMessage
- """
- f = self.http_response
- try:
- while not self.finish:
- prelude = f.read(8)
- if not prelude:
- raise StopIteration
- return
- total_len = struct.unpack('>I', prelude[0:4])[0]
- headers_len = struct.unpack('>I', prelude[4:8])[0]
- headers = f.read(headers_len)
- headers_map = self._parse_select_headers(headers)
- msg = SelectMessage()
- if headers_map['message-type'] == 'Records':
- payload_len = total_len - headers_len - 12
- payload = f.read(payload_len)
- crc = struct.unpack('>I', f.read(4))[0]
- msg.set_record_message(headers_map, compat.convert_to_string(payload), crc)
- yield msg
- elif headers_map['message-type'] == 'Cont':
- bytes_scanned = f.read(8)
- bytes_returned = f.read(8)
- crc = struct.unpack('>I', f.read(4))[0]
- bytes_scanned = struct.unpack('>Q', bytes_scanned)[0]
- bytes_returned = struct.unpack('>Q', bytes_returned)[0]
- msg.set_cont_message(headers_map, bytes_scanned, bytes_returned, crc)
- yield msg
- elif headers_map['message-type'] == 'End':
- crc = struct.unpack('>I', f.read(4))[0]
- if headers_map["error-code"] != "success":
- raise BceServerError(headers_map['error-message'], code=headers_map['error-code'],
- request_id=self.response.metadata.bce_request_id)
- return
- msg.set_end_message(headers_map, crc)
- self.finish = True
- yield msg
- return
- finally:
- self.http_response.close()
- @staticmethod
- def _parse_select_headers(headers):
- """
- parse SELECT headers
- :param headers: <str>
- :return: <dict>
- """
- hm = {}
- index = 0
- while index < len(headers):
- # headers key length
- key_len = struct.unpack('B', headers[index: index + 1])[0]
- index += 1
- # headers key
- key = headers[index: index + key_len]
- index += key_len
- # headers value length
- value_len = struct.unpack('>H', headers[index: index + 2])[0]
- index += 2
- # headers value
- value = headers[index: index + value_len]
- index += value_len
- hm[compat.convert_to_string(key)] = compat.convert_to_string(value)
- return hm
|