使用 Python
Python 支持是通过 boto3 库的派生提供的,其中具有可充分利用 IBM Cloud® Object Storage 的功能。
可以在 Python Package Index 中通过 pip install ibm-cos-sdk 安装 Python。
源代码可在. GitHub 处找到。
ibm_boto3 库提供对 IBM Cloud® Object Storage API 的完全访问权。 端点、API 密钥和实例标识必须在创建服务资源或低级别客户机期间指定,如以下基本示例所示。
服务实例ID也称为_资源_实例ID。 通过创建服务凭证或通过 CLI 可以找到此值。
详细文档可 在此处查阅。
创建客户机和获取凭证
为了连接到 COS,将使用凭证信息(API 密钥和服务实例标识)来创建和配置客户机。 这些值还可以自动从凭证文件或环境变量中获取。
生成服务凭证后,生成的 JSON 文档可以保存到 ~/.bluemix/cos_credentials。 除非在客户机创建期间显式设置了其他凭证,否则 SDK 会自动从此文件中获取凭证。 如果 cos_credentials 文件包含 HMAC 密钥,那么客户机将使用签名进行认证,否则客户机将使用提供的
API 密钥通过不记名令牌进行认证 (使用 API 密钥仍要求在客户机创建期间包含 config=Config(signature_version="oauth") )。
如果是从 AWS S3 进行迁移,那么还可以从 ~/.aws/credentials 中获取以下格式的凭证数据:
[default]
aws_access_key_id = {API_KEY}
aws_secret_access_key = {SERVICE_INSTANCE_ID}
注:若 和 ~/.bluemix/cos_credentials 同时 ~/.aws/credentials 存在,cos_credentials 则 优先。
收集必需的信息
示例中显示了以下变量:
bucket_name必须是唯一的 DNS 安全字符串。 因为存储区名称在整个系统中唯一,因此如果多次运行此示例,那么需要更改这些值。 请注意,名称在删除后会保留 10 到 15 分钟。ibm_api_key_id是在服务凭证中找到的apikey的值。ibm_service_instance_id是在服务凭证中找到的resource_instance_id的值。endpoint_url是服务端点 URL,包含https://协议。 此值并非在 服务凭据 中找到的endpoints值。 有关端点的更多信息,请参阅端点和存储位置。LocationConstraint是一个 有效的配置代码,对应于该值endpoint。
代码示例
在受支持的 Python发行版上测试代码示例。
在您的代码中,必须除去此处作为插图提供的尖括号或任何其他多余字符。
初始化配置
此示例创建 resource 对象。 资源提供面向对象的 COS 接口。 这允许比客户机对象提供的低级别调用更高级别的抽象。
请注意,某些操作(例如 Aspera 高速传输)需要一个 client 对象。 Aspera 其本身需要 Python 版本 3.6。
旧版通知:对 Aspera 的支持已视为旧版支持。 相反,请使用 Aspera Transfer SDK.
import ibm_boto3
from ibm_botocore.client import Config, ClientError
# Constants for IBM COS values
COS_ENDPOINT = "<endpoint>" # Current list avaiable at https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints
COS_API_KEY_ID = "<api-key>" # eg "W00YixxxxxxxxxxMB-odB-2ySfTrFBIQQWanc--P3byk"
COS_INSTANCE_CRN = "<service-instance-id>" # eg "crn:v1:bluemix:public:cloud-object-storage:global:a/3bf0d9003xxxxxxxxxx1c3e97696b71c:d6f04d83-6c4f-4a62-a165-696756d63903::"
# Create resource
cos_resource = ibm_boto3.resource("s3",
ibm_api_key_id=COS_API_KEY_ID,
ibm_service_instance_id=COS_INSTANCE_CRN,
config=Config(signature_version="oauth"),
endpoint_url=COS_ENDPOINT
)
客户机提供 COS S3 API 的低级别接口。 这样就可以直接处理 HTTP 响应,而不是利用资源提供的抽象方法和属性来访问包含在标头或 XML 响应有效载荷中的信息。
import ibm_boto3
from ibm_botocore.client import Config, ClientError
# Constants for IBM COS values
COS_ENDPOINT = "<endpoint>" # Current list avaiable at https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints
COS_API_KEY_ID = "<api-key>" # eg "W00YixxxxxxxxxxMB-odB-2ySfTrFBIQQWanc--P3byk"
COS_INSTANCE_CRN = "<service-instance-id>" # eg "crn:v1:bluemix:public:cloud-object-storage:global:a/3bf0d9003xxxxxxxxxx1c3e97696b71c:d6f04d83-6c4f-4a62-a165-696756d63903::"
# Create client
cos_client = ibm_boto3.client("s3",
ibm_api_key_id=COS_API_KEY_ID,
ibm_service_instance_id=COS_INSTANCE_CRN,
config=Config(signature_version="oauth"),
endpoint_url=COS_ENDPOINT
)
键值
<endpoint>- 您的云端公共端点 Object Storage,带模式前缀(' https:// ')(可通过 IBM Cloud 控制台获取 )。 有关端点的更多信息,请参阅端点和存储位置。<api-key>- 创建服务凭据时生成的 API 密钥(创建和删除示例需要写入权限)<service-instance-id>- 您的云 Object Storage 资源ID(可通过 IBM Cloud命令行界面或 IBM Cloud 控制台获取)<location>- 云 Object Storage 的默认位置(必须与所使用的区域<endpoint>匹配)
SDK 引用
创建新存储区
以下示例使用作为低级别接口的客户机。
存储类指南 中提供了有效的配置 LocationConstraint 代码列表。
def create_bucket(bucket_name):
print("Creating new bucket: {0}".format(bucket_name))
try:
cos_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={
"LocationConstraint":COS_BUCKET_LOCATION
}
)
print("Bucket: {0} created!".format(bucket_name))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to create bucket: {0}".format(e))
SDK 引用
方法
创建新的文本文件
def create_text_file(bucket_name, item_name, file_text):
print("Creating new item: {0}".format(item_name))
try:
cos_client.put_object(
Bucket=bucket_name,
Key=item_name,
Body=file_text
)
print("Item: {0} created!".format(item_name))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to create text file: {0}".format(e))
SDK 引用
方法
列出可用存储区
def get_buckets():
print("Retrieving list of buckets")
try:
buckets = cos_client.list_buckets()
for bucket in buckets["Buckets"]:
print("Bucket Name: {0}".format(bucket["Name"]))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to retrieve list buckets: {0}".format(e))
SDK 引用
方法
列出存储区中的项
def get_bucket_contents(bucket_name):
print("Retrieving bucket contents from: {0}".format(bucket_name))
try:
files = cos_client.list_objects(Bucket=bucket_name)
for file in files.get("Contents", []):
print("Item: {0} ({1} bytes).".format(file["Key"], file["Size"]))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to retrieve bucket contents: {0}".format(e))
SDK 引用
方法
获取特定项的文件内容
def get_item(bucket_name, item_name):
print("Retrieving item from bucket: {0}, key: {1}".format(bucket_name, item_name))
try:
file = cos_client.get_object(Bucket=bucket_name, Key=item_name)
print("File Contents: {0}".format(file["Body"].read()))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to retrieve file contents: {0}".format(e))
SDK 引用
方法
从存储区中删除一个项
def delete_item(bucket_name, object_name):
try:
cos_client.delete_object(Bucket=bucket_name, Key=object_name)
print("Item: {0} deleted!\n".format(object_name))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to delete object: {0}".format(e))
SDK 引用
方法
从存储区中删除多个项
删除请求最多可包含 1000 个要删除的键。 虽然这有助于降低每次请求的性能开销,但在删除大量键时需谨慎操作。 此外,请考虑对象的大小,以确保性能合适。
def delete_items(bucket_name):
try:
delete_request = {
"Objects": [
{ "Key": "deletetest/testfile1.txt" },
{ "Key": "deletetest/testfile2.txt" },
{ "Key": "deletetest/testfile3.txt" },
{ "Key": "deletetest/testfile4.txt" },
{ "Key": "deletetest/testfile5.txt" }
]
}
response = cos_client.delete_objects(
Bucket=bucket_name,
Delete=delete_request
)
print("Deleted items for {0}\n".format(bucket_name))
print(json.dumps(response.get("Deleted"), indent=4))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to copy item: {0}".format(e))
SDK 引用
方法
删除存储区
def delete_bucket(bucket_name):
print("Deleting bucket: {0}".format(bucket_name))
try:
cos_client.delete_bucket(Bucket=bucket_name)
print("Bucket: {0} deleted!".format(bucket_name))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to delete bucket: {0}".format(e))
SDK 引用
方法
删除后,桶名称将保留10至15分钟。
运行分块上传
上传二进制文件(首选方法)
S3upload_fileobj 对象的该方法会在必要时自动执行多部分上传。 该类 TransferConfig 用于确定使用多部分上传的阈值。
def multi_part_upload(bucket_name, item_name, file_path):
try:
print("Starting file transfer for {0} to bucket: {1}\n".format(item_name, bucket_name))
# set 5 MB chunks
part_size = 1024 * 1024 * 5
# set threadhold to 15 MB
file_threshold = 1024 * 1024 * 15
# set the transfer threshold and chunk size
transfer_config = ibm_boto3.s3.transfer.TransferConfig(
multipart_threshold=file_threshold,
multipart_chunksize=part_size
)
# the upload_fileobj method will automatically execute a multi-part upload
# in 5 MB chunks for all files over 15 MB
with open(file_path, "rb") as file_data:
cos_client.upload_fileobj(
Bucket=bucket_name,
Key=item_name,
Fileobj=file_data,
Config=transfer_config
)
print("Transfer for {0} Complete!\n".format(item_name))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to complete multi-part upload: {0}".format(e))
SDK 引用
方法
手动运行分块上传
若需要,该类 S3.Client 可用于执行多部分上传。 如果需要拥有对上传过程的更多控制权,那么此方法可能非常有用。
def multi_part_upload_manual(bucket_name, item_name, file_path):
try:
# create client object
cos_client = ibm_boto3.client("s3",
ibm_api_key_id=COS_API_KEY_ID,
ibm_service_instance_id=COS_SERVICE_CRN,
config=Config(signature_version="oauth"),
endpoint_url=COS_ENDPOINT
)
print("Starting multi-part upload for {0} to bucket: {1}\n".format(item_name, bucket_name))
# initiate the multi-part upload
mp = cos_client.create_multipart_upload(
Bucket=bucket_name,
Key=item_name
)
upload_id = mp["UploadId"]
# min 20MB part size
part_size = 1024 * 1024 * 20
file_size = os.stat(file_path).st_size
part_count = int(math.ceil(file_size / float(part_size)))
data_packs = []
position = 0
part_num = 0
# begin uploading the parts
with open(file_path, "rb") as file:
for i in range(part_count):
part_num = i + 1
part_size = min(part_size, (file_size - position))
print("Uploading to {0} (part {1} of {2})".format(item_name, part_num, part_count))
file_data = file.read(part_size)
mp_part = cos_client.upload_part(
Bucket=bucket_name,
Key=item_name,
PartNumber=part_num,
Body=file_data,
ContentLength=part_size,
UploadId=upload_id
)
data_packs.append({
"ETag":mp_part["ETag"],
"PartNumber":part_num
})
position += part_size
# complete upload
cos_client.complete_multipart_upload(
Bucket=bucket_name,
Key=item_name,
UploadId=upload_id,
MultipartUpload={
"Parts": data_packs
}
)
print("Upload for {0} Complete!\n".format(item_name))
except ClientError as be:
# abort the upload
cos_client.abort_multipart_upload(
Bucket=bucket_name,
Key=item_name,
UploadId=upload_id
)
print("Multi-part upload aborted for {0}\n".format(item_name))
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to complete multi-part upload: {0}".format(e))
SDK 引用 (续)
类
方法
使用 TransferManager 上传大对象
TransferManager 提供了另一种方法,用于在每次有必要设置配置参数时,自动合并分块上传来运行大型文件传输。
def upload_large_file(bucket_name, item_name, file_path):
print("Starting large file upload for {0} to bucket: {1}".format(item_name, bucket_name))
# set the chunk size to 5 MB
part_size = 1024 * 1024 * 5
# set threadhold to 5 MB
file_threshold = 1024 * 1024 * 5
# Create client connection
cos_client = ibm_boto3.client("s3",
ibm_api_key_id=COS_API_KEY_ID,
ibm_service_instance_id=COS_SERVICE_CRN,
config=Config(signature_version="oauth"),
endpoint_url=COS_ENDPOINT
)
# set the transfer threshold and chunk size in config settings
transfer_config = ibm_boto3.s3.transfer.TransferConfig(
multipart_threshold=file_threshold,
multipart_chunksize=part_size
)
# create transfer manager
transfer_mgr = ibm_boto3.s3.transfer.TransferManager(cos_client, config=transfer_config)
try:
# initiate file upload
future = transfer_mgr.upload(file_path, bucket_name, item_name)
# wait for upload to complete
future.result()
print ("Large file upload complete!")
except Exception as e:
print("Unable to complete large file upload: {0}".format(e))
finally:
transfer_mgr.shutdown()
列出存储区中的项 (V2)
S3.Client 对象具有用于列出内容的更新方法 (list_objects_v2)。 此方法允许您限制返回的记录数,并批量检索记录。 这对于对应用程序中的结果进行分页可能非常有用,并可能提高性能。
def get_bucket_contents_v2(bucket_name, max_keys):
print("Retrieving bucket contents from: {0}".format(bucket_name))
try:
# create client object
cos_client = ibm_boto3.client("s3",
ibm_api_key_id=COS_API_KEY_ID,
ibm_service_instance_id=COS_SERVICE_CRN,
config=Config(signature_version="oauth"),
endpoint_url=COS_ENDPOINT)
more_results = True
next_token = ""
while (more_results):
response = cos_client.list_objects_v2(Bucket=bucket_name, MaxKeys=max_keys, ContinuationToken=next_token)
files = response["Contents"]
for file in files:
print("Item: {0} ({1} bytes).".format(file["Key"], file["Size"]))
if (response["IsTruncated"]):
next_token = response["NextContinuationToken"]
print("...More results in next batch!\n")
else:
more_results = False
next_token = ""
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to retrieve bucket contents: {0}".format(e))
SDK 引用
方法
创建备份策略
# Config values
api_key = "<API_KEY>"
vault_crn = "<SERVICE_INSTANCE_ID>"
source_bucket_name = "<BACKUP_VAULT_NAME>"
policy_name = "<POLICY_NAME>"
# Authenticator and client setup
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# Create policy
create_backup_policy = rc_client.create_backup_policy(
bucket=source_bucket_name,
policy_name=policy_name,
target_backup_vault_crn=vault_crn,
backup_type="continuous",
initial_retention={"delete_after_days": 1}
)
# Print response
print(f" Policy created: { create_backup_policy }")
列出备份策略
# Config values
api_key = "<API_KEY>"
source_bucket_name = "<BACKUP_VAULT_NAME>"
# Authenticator and client setup
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# List all backup policies
list_response = rc_client.list_backup_policies(bucket=source_bucket_name)
print("\nList of backup policies:")
for policy in list_response.result.get("backup_policies", []):
print(policy)
制定备份政策
# Config
api_key = "<API_KEY>"
source_bucket_name = "<SOURCE_BUCKET_NAME>"
backup_vault_crn = "<BACKUP_VAULT_CRN>"
policy_name = "<POLICY_NAME>"
# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# Create backup policy
create_backup_policy_response = rc_client.create_backup_policy(
bucket=source_bucket_name,
policy_name=policy_name,
target_backup_vault_crn=backup_vault_crn,
backup_type="continuous",
initial_retention={"delete_after_days": 1}
)
# Extract policy ID
policy_id = create_backup_policy_response.result.get("policy_id")
get_backup_policy_response = rc_client.get_backup_policy(
bucket=source_bucket_name,
policy_id=policy_id
)
print("\nFetched Backup Policy Details:")
print(get_backup_policy_response.result)
删除备份策略
# Config
api_key = "<API_KEY>"
source_bucket_name = "<SOURCE_BUCKET_NAME>"
policy_id = "<POLICY_ID>"
# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# Delete the backup policy
delete_backup_policy_response = rc_client.delete_backup_policy(
bucket=source_bucket_name,
policy_id=policy_id
)
print(f"Backup policy '{policy_id}' deleted successfully.")
创建备份库
# Config
api_key = "<API_KEY>"
service_instance_id = "<SERVICE_INSTANCE_ID>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
region = "<REGION>"
# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# Create a backup vault
create_backup_vault_response = rc_client.create_backup_vault(
service_instance_id=service_instance_id,
backup_vault_name=backup_vault_name,
region=region
)
# Output result
print("Backup vault created:")
print(create_backup_vault_response.result)
列表备份库
# Config
api_key = "<API_KEY>"
service_instance_id = "<SERVICE_INSTANCE_ID>"
# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# List backup vaults
list_backup_vaults_response = rc_client.list_backup_vaults(
service_instance_id=service_instance_id
)
print("List of backup vaults:")
print(list_backup_vaults_response.result)
获取备份库
# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# Get backup vault
get_backup_vault = rc_client.get_backup_vault(
backup_vault_name=backup_vault_name
)
# Output result
print("Backup vault details:")
print(get_backup_vault.result)
更新备份库
# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# Update backup vault settings (disable activity tracking and metrics monitoring)
backup_vault_patch = {
"activity_tracking": {"management_events": True},
"metrics_monitoring": {"usage_metrics_enabled": True},
}
update_backup_vault_response = rc_client.update_backup_vault(
backup_vault_name=backup_vault_name,
backup_vault_patch=backup_vault_patch
)
# Output result
print("Backup vault updated successfully.")
print(update_backup_vault_response)
删除备份库
# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# Delete the backup vault
delete_vault_response = rc_client.delete_backup_vault(
backup_vault_name=backup_vault_name
)
# Output result
print(f"Successfully deleted backup vault '{delete_vault_response}'.")
列表恢复范围
# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# List recovery ranges
recovery_ranges_response = rc_client.list_recovery_ranges(
backup_vault_name=backup_vault_name
)
# Output recovery range results
print("Recovery Ranges:")
print(recovery_ranges_response.result)
获取恢复范围
# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
recovery_range_id = "<RECOVERY_RANGE_ID>"
# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
get_recovery_range_response = rc_client.get_source_resource_recovery_range(
backup_vault_name=backup_vault_name,
recovery_range_id=recovery_range_id
)
print("Recovery Range Details:")
print(get_recovery_range_response.result)
更新恢复范围
# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
recovery_range_id = "<RECOVERY_RANGE_ID>"
# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
recovery_range_patch_model = {}
recovery_range_patch_model['retention'] = {"delete_after_days": 99}
patch_response = rc_client.patch_source_resource_recovery_range(
backup_vault_name=backup_vault_name,
recovery_range_id=recovery_range_id,
recovery_range_patch=recovery_range_patch_model
)
print("Patch Response Details:")
print(patch_response)
启动还原
# Configuration
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
target_bucket_crn = "<TARGET_BUCKET_CRN>"
recovery_range_id = "<RECOVERY_RANGE_ID>"
restore_point_in_time = "<RESTORE_POINT_IN_TIME>"
# Setup authenticator and clients
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# Initiate restore
create_restore = rc_client.create_restore(
backup_vault_name=backup_vault_name,
recovery_range_id=recovery_range_id,
restore_type="in_place",
restore_point_in_time=restore_point_in_time,
target_resource_crn=target_bucket_crn
)
print(f"Restore initiated : {create_restore}")
列表恢复
# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
# Setup authenticator and clients
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# List restore operations
get_store = rc_client.get_restore(
backup_vault_name=backup_vault_name)
print("Restore response:")
print(get_store.result)
获取恢复详情
# Config
api_key = "<API_KEY>"
source_bucket_name = "<SOURCE_BUCKET_NAME>"
backup_vault_crn = "<BACKUP_VAULT_CRN>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
target_bucket_crn = "<TARGET_BUCKET_CRN>"
recovery_range_id = "<RECOVERY_RANGE_ID>"
restore_point_in_time = "<RESTORE_POINT_IN_TIME>"
# Setup authenticator and clients
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)
# Create restore
create_restore = rc_client.create_restore(
backup_vault_name=backup_vault_name,
recovery_range_id=recovery_range_id,
restore_type="in_place",
restore_point_in_time=restore_point_in_time,
target_resource_crn=target_bucket_crn
)
restore_id = create_restore.result["restore_id"]
# List restore operations
get_store = rc_client.get_restore(
backup_vault_name=backup_vault_name, restore_id=restore_id)
print("Restore response:")
print(get_store.result)
创建一个启用了对象锁定的新COS存储桶
def create_bucket_with_objectlock(bucket_name):
cos_cli.create_bucket(
Bucket=bucket_name,
ObjectLockEnabledForBucket=True,
)
print("Bucket: {0} created with objectlock enabled".format(bucket_name))
在COS存储桶上启用对象锁配置并设置合规模式
def objectlock_configuration_on_bucket(bucket_name):
# Putting default retenion on the COS bucket.
default_retention_rule = {'DefaultRetention': {'Mode': 'COMPLIANCE', 'Years': 1}}
object_lock_config = {'ObjectLockEnabled': 'Enabled', 'Rule': default_retention_rule}
cos_cli.put_object_lock_configuration(Bucket=bucket_name, ObjectLockConfiguration=object_lock_config)
在COS存储桶上设置对象锁配置并启用治理模式
def objectlock_configuration_with_governance_mode_on_bucket(bucket_name):
# Putting default retenion on the COS bucket with governance mode.
default_retention_rule = {'DefaultRetention': {'Mode': 'GOVERNANCE', 'Years': 1}}
object_lock_config = {'ObjectLockEnabled': 'Enabled', 'Rule': default_retention_rule}
cos_cli.put_object_lock_configuration(Bucket=bucket_name, ObjectLockConfiguration=object_lock_config)
获取COS存储桶的对象锁配置
def objectlock_configuration_with_governance_mode_on_bucket(bucket_name):
# Reading the objectlock configuration set on the bucket.
response = cos_cli.get_object_lock_configuration(Bucket=bucket_name)
print("Objectlock Configuration for {0} =>".format(bucket_name))
print(response.ObjectLockConfiguration)
将具有治理模式的对象上传至COS存储桶
def upload_object_with_governance_mode(bucket_name,object_name,object_content):
cos_cli.put_object(
Bucket=bucket_name,
Key=object_name,
Body=object_content,
ObjectLockMode='GOVERNANCE',
ObjectLockRetainUntilDate=datetime(2025, 11, 15)
)
print("Object: {0} uploaded!".format(object_name))
为对象启用对象锁保留与合规模式
def objectlock_retention(bucket_name,object_name):
# Put objectlock retenion on the object uploaded to the bucket.
date = datetime.now()+timedelta(seconds=5)
retention_rule = {'Mode': 'COMPLIANCE', 'RetainUntilDate': date}
cos_cli.put_object_retention(Bucket=bucket_name, Key=object_name, Retention=retention_rule)
为对象启用对象锁保留与治理模式
def objectlock_retention_with_governance_mode(bucket_name,object_name):
# Put objectlock retenion with governance mode on the object uploaded to the bucket.
date = datetime.now()+timedelta(seconds=5)
retention_rule = {'Mode': 'GOVERNANCE', 'RetainUntilDate': date}
cos_cli.put_object_retention(Bucket=bucket_name, Key=object_name, Retention=retention_rule)
获取对象锁保留
def objectlock_retention_with_governance_mode(bucket_name,object_name):
# Get objectlock retention of the above object.
response = cos_cli.get_object_retention(Bucket=bucket_name, Key=object_name)
print("Objectlock Retention for {0}=>".format(object_name))
print(response.Retention)
设置对象锁定法律保留
def objectlock_legal_hold(bucket_name,object_name):
# Setting the objectlock legal-hold status to ON.
cos_cli.put_object_legal_hold(Bucket=bucket_name, Key=object_name, legal_hold={'Status': 'ON'})
获取对象锁定法律保留
def objectlock_legal_hold(bucket_name,object_name):
# Get objectlock retention of the above object.
response = cos_cli.get_object_legal_hold(Bucket=bucket_name, Key=object_name)
print("Objectlock legal-hold for {0}=>".format(object_name))
print(response.legal_hold)
使用绕过治理删除具有对象锁治理模式的对象
def delete_object_with_bypass_governance(bucket_name,object_name):
# Deleting an object with retention using bypass governance
cos_cli.delete_object(Bucket=bucket_name, Key=object_name, BypassGovernanceRetention=True)
使用 Key Protect
可以将 Key Protect 添加到存储区,以对云中的敏感数据进行静态加密。
准备工作
要创建启用了 Key Protect 的存储区,需要以下各项:
检索根密钥 CRN
- 检索 Key Protect 服务的实例标识。
- 使用 Key Protect API 来检索所有可用密钥。
- 可以使用
curl命令或 API REST 客户机(例如,Postman)来访问 Key Protect API。
- 可以使用
- 检索用于在存储区上启用 Key Protect 的根密钥的 CRN。 CRN 类似于以下内容:
crn:v1:bluemix:public:kms:us-south:a/3d624cd74a0dea86ed8efe3101341742:90b6a1db-0fe1-4fe9-b91e-962c327df531:key:0bg3e33e-a866-50f2-b715-5cba2bc93234
创建启用了 Key Protect 的存储区
COS_KP_ALGORITHM = "<algorithm>"
COS_KP_ROOTKEY_CRN = "<root-key-crn>"
# Create a new bucket with key protect (encryption)
def create_bucket_kp(bucket_name):
print("Creating new encrypted bucket: {0}".format(bucket_name))
try:
cos_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={
"LocationConstraint":COS_BUCKET_LOCATION
},
IBMSSEKPEncryptionAlgorithm=COS_KP_ALGORITHM,
IBMSSEKPCustomerRootKeyCrn=COS_KP_ROOTKEY_CRN
)
print("Encrypted Bucket: {0} created!".format(bucket_name))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to create encrypted bucket: {0}".format(e))
键值
<algorithm>- 用于存储桶中新增对象的加密算法(默认值为 AES256 )。<root-key-crn>- 从 Key Protect 服务获取的根密钥的CRN(证书注册号)。
SDK 引用
方法
使用 Aspera 高速传输
旧版通知:对 Aspera 的支持已视为旧版支持。 建议用户使用 Aspera Transfer SDK[https://developer.ibm.com/apis/catalog/aspera--aspera-transfer-sdk/API Reference]。
旧版通知:对 Aspera 的支持已视为旧版支持。 建议用户使用 Aspera Transfer SDK。
通过安装 Aspera 高速传输库,可以在应用程序中使用高速文件传输。 Aspera 库是封闭式源代码库,因此具有 COS SDK(使用 Apache 许可证)的可选依赖项。
每个 Aspera 会话都会创建一个单独的 ascp 进程,此进程在客户机上运行以执行传输。 请确保计算环境允许此进程运行。
初始化 AsperaTransferManager
在初始化之前 AsperaTransferManager,请确保您拥有一个可用的对象 client (而非或 resource 对象 session)。
import ibm_boto3
from ibm_botocore.client import Config
from ibm_s3transfer.aspera.manager import AsperaTransferManager
COS_ENDPOINT = "<endpoint>" # Current list avaiable at https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints
COS_API_KEY_ID = "<api-key>"
COS_RESOURCE_CRN = "<resource-instance-id>"
COS_BUCKET_LOCATION = "<location>"
# Create resource
cos_client = ibm_boto3.client("s3",
ibm_api_key_id=COS_API_KEY_ID,
ibm_service_instance_id=COS_RESOURCE_CRN,
config=Config(signature_version="oauth"),
endpoint_url=COS_ENDPOINT
)
transfer_manager = AsperaTransferManager(cos)
您需要提供 IAM API 密钥以用于 Aspera 高速传输。 当前 不 支持 HMAC 凭证。 有关 IAM 的更多信息,请单击此处。
要实现最高吞吐量,请将传输拆分成指定数量的并行会话,这些会话发送的数据块的大小由阈值定义。
使用多会话的典型配置应该如下:
- 2500 MBps 目标速率
- 100 MB 阈值(这是针对大多数应用程序的建议值)
ms_transfer_config = AsperaConfig(multi_session="all",
target_rate_mbps=2500,
multi_session_threshold_mb=100)
在以上示例中,SDK 将衍生足够的会话来尝试达到目标速率 2500 MBps。
此外,可以在 SDK 中显式配置会话管理。 对于需要更精确地控制网络利用率的情况,此功能会非常有用。
使用显式多会话的典型配置应该如下:
- 2 个或 10 个会话
- 100 MB 阈值(这是针对大多数应用程序的建议值)
from ibm_s3transfer.aspera.manager import AsperaConfig
# Configure 2 sessions for transfer
ms_transfer_config = AsperaConfig(multi_session=2,
multi_session_threshold_mb=100)
# Create the Aspera Transfer Manager
transfer_manager = AsperaTransferManager(client=client,
transfer_config=ms_transfer_config)
为在大多数场景中获得最佳性能,请始终使用多个会话,以最大限度地减少与实例化 Aspera 高速传输相关的任何处理操作。 如果网络容量至少为 1 Gbps,那么应该使用 10 个会话。 带宽低于 1 Gbps 的网络应该使用 2 个会话。
文件上传
bucket_name = "<bucket-name>"
upload_filename = "<absolute-path-to-file>"
object_name = "<item-name>"
# Create Transfer manager
with AsperaTransferManager(client) as transfer_manager:
# Perform upload
future = transfer_manager.upload(upload_filename, bucket_name, object_name)
# Wait for upload to complete
future.result()
键值
<bucket-name>- 目标存储桶名称<absolute-path-to-file>- 待上传文件的目录路径及文件名<item-name>- 添加到存储桶的新文件名称
文件下载
bucket_name = "<bucket-name>"
download_filename = "<absolute-path-to-file>"
object_name = "<object-to-download>"
# Create Transfer manager
with AsperaTransferManager(client) as transfer_manager:
# Get object with Aspera
future = transfer_manager.download(bucket_name, object_name, download_filename)
# Wait for download to complete
future.result()
键值
<bucket-name>- 在启用了 Aspera 功能的 Object Storage 服务实例中,存储桶的名称。<absolute-path-to-file>- 将文件保存至本地系统的目录及文件名。<object-to-download>- 桶中要下载的文件名称。
目录上传
bucket_name = "<bucket-name>"
# THIS DIRECTORY MUST EXIST LOCALLY, and have objects in it.
local_upload_directory = "<absolute-path-to-directory>"
# THIS SHOULD NOT HAVE A LEADING "/"
remote_directory = "<object prefix>"
# Create Transfer manager
with AsperaTransferManager(client) as transfer_manager:
# Perform upload
future = transfer_manager.upload_directory(local_upload_directory, bucket_name, remote_directory)
# Wait for upload to complete
future.result()
键值
<bucket-name>- 在启用了 Aspera 功能的 Object Storage 服务实例中,存储桶的名称<absolute-path-to-directory>- 包含待上传文件的本地目录。 必须具有前导和尾部/(例如,/Users/testuser/Documents/Upload/)<object prefix>- 存储文件的存储桶中的目录名称。 不能有前导斜杠/(例如,newuploads/)
目录下载
bucket_name = "<bucket-name>"
# THIS DIRECTORY MUST EXIST LOCALLY
local_download_directory = "<absolute-path-to-directory>"
remote_directory = "<object prefix>"
# Create Transfer manager
with AsperaTransferManager(client) as transfer_manager:
# Get object with Aspera
future = transfer_manager.download_directory(bucket_name, remote_directory, local_download_directory)
# Wait for download to complete
future.result()
键值
<bucket-name>- 在启用了 Aspera 功能的 Object Storage 服务实例中,存储桶的名称<absolute-path-to-directory>- 本地目录用于保存下载的文件。 必须包含开头和结尾的斜杠/(即/Users/testuser/Downloads/)<object prefix>- 存储文件的存储桶中的目录名称。 不能有前导斜杠/(例如,todownload/)
使用订户
订户通过附加定制回调方法来提供对传输的可观察性。 所有传输都会在以下各阶段之间进行转换:
Queued - In Progress - Done
每个阶段有三个可用订户:
CallbackOnQueued()- 将新传输添加到AsperaTransferManager时调用CallbackOnProgress()- 传输已传输数据时调用(正在传输期间会重复触发)。CallbackOnDone()- 传输完成后调用
bucket_name = "<bucket-name>"
local_download_directory = "<absolute-path-to-directory>"
remote_directory = "<object prefix>"
# Subscriber callbacks
class CallbackOnQueued(AsperaBaseSubscriber):
def __init__(self):
pass
def on_queued(self, future, **kwargs):
print("Directory download queued.")
class CallbackOnProgress(AsperaBaseSubscriber):
def __init__(self):
pass
def on_progress(self, future, bytes_transferred, **kwargs):
print("Directory download in progress: %s bytes transferred" % bytes_transferred)
class CallbackOnDone(AsperaBaseSubscriber):
def __init__(self):
pass
def on_done(self, future, **kwargs):
print("Downloads complete!")
# Create Transfer manager
transfer_manager = AsperaTransferManager(client)
# Attach subscribers
subscribers = [CallbackOnQueued(), CallbackOnProgress(), CallbackOnDone()]
# Get object with Aspera
future = transfer_manager.download_directory(bucket_name, remote_directory, local_download_directory, None, subscribers)
# Wait for download to complete
future.result()
键值
<bucket-name>- 在启用了 Aspera 功能的 Object Storage 服务实例中,存储桶的名称<absolute-path-to-directory>- 本地目录用于保存下载的文件。 必须具有前导和尾部斜杠/(例如,/Users/testuser/Downloads/)<object prefix>- 存储文件的存储桶中的目录名称。 不能有前导斜杠/(例如,todownload/)
上面的样本代码会生成以下输出:
Directory download queued.
Directory download in progress: 5632 bytes transferred
Directory download in progress: 1047552 bytes transferred
...
Directory download in progress: 53295130 bytes transferred
Directory download in progress: 62106855 bytes transferred
Download complete!
暂停/恢复/取消
SDK 提供了通过 AsperaTransferFuture 对象的以下方法来管理文件/目录传输进度的能力:
pause()resume()cancel()
调用以上概述的任一方法都不会有任何副作用。 合适的清除和整理工作由 SDK 负责处理。
# Create Transfer manager
bucket_name = "<bucket-name>"
local_download_directory = "<absolute-path-to-directory>"
remote_directory = "<object prefix>"
with AsperaTransferManager(client) as transfer_manager:
# download a directory with Aspera
future = transfer_manager.download_directory(bucket_name, remote_directory, local_download_directory, None, None)
# pause the transfer
future.pause()
# resume the transfer
future.resume()
# cancel the transfer
future.cancel()
对 Aspera 问题进行故障诊断
问题: 使用除 3.6 之外的任何 Python 版本的开发者在安装或使用 Aspera SDK时可能会迂到故障。
原因: 如果您的环境中安装了不同版本的 Python,则在尝试安装 Aspera SDK 时可能会遇到安装失败的情况。 原因可能是路径中缺少 DLL 文件或 DLL 不正确。
**解决方案:**解决此问题的第一步是重新安装 Aspera 库。 安装期间可能发生了故障。 结果,这可能影响了 DLL 文件。 如果这样做无法解决这些问题,那么您需要更新 Python 版本。 若无法执行此操作,则可使用 适用于 Python 的英特尔®发行版 *进行安装。 这应该允许您在 Python 3.6.x 上安装 Aspera SDK,而不会出现任何问题。
更新元数据
有两种方法可更新现有对象上的元数据:
- 对新的元数据和原始对象内容执行
PUT请求 - 使用将原始对象指定为复制源的新元数据来运行
COPY请求
使用 PUT 更新元数据
注意: 该 PUT 请求会覆盖对象的现有内容,因此必须先下载对象,再使用新元数据重新上传。
def update_metadata_put(bucket_name, item_name, key, value):
try:
# retrieve the existing item to reload the contents
response = cos_client.get_object(Bucket=bucket_name, Key=item_name)
existing_body = response.get("Body").read()
# set the new metadata
new_metadata = {
key: value
}
cos_client.put_object(Bucket=bucket_name, Key=item_name, Body=existing_body, Metadata=new_metadata)
print("Metadata update (PUT) for {0} Complete!\n".format(item_name))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
log_error("Unable to update metadata: {0}".format(e))
使用 COPY 更新元数据
def update_metadata_copy(bucket_name, item_name, key, value):
try:
# set the new metadata
new_metadata = {
key: value
}
# set the copy source to itself
copy_source = {
"Bucket": bucket_name,
"Key": item_name
}
cos_client.copy_object(Bucket=bucket_name, Key=item_name, CopySource=copy_source, Metadata=new_metadata, MetadataDirective="REPLACE")
print("Metadata update (COPY) for {0} Complete!\n".format(item_name))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
log_error("Unable to update metadata: {0}".format(e))
使用不可变对象存储器
向现有存储区添加保护配置
对于写入受保护存储区的对象,在保护时间段到期并且除去了对象上的所有合法保留之前,无法删除这些对象。 除非在创建对象时提供了特定于对象的值,否则将向对象提供存储区的缺省保留时间值。 如果覆盖受保护存储区中不再保留的对象(保留期已到期,并且对象没有任何合法保留),那么会再次保留这些对象。 可以在对象覆盖请求中提供新的保留期,否则会为对象提供存储区的缺省保留时间。
保留期限设置的最小值和最大值范围 MaximumRetention 为 DefaultRetention:最小值为0天,MinimumRetention 最大值为365243天(1000年)。
def add_protection_configuration_to_bucket(bucket_name):
try:
new_protection_config = {
"Status": "Retention",
"MinimumRetention": {"Days": 10},
"DefaultRetention": {"Days": 100},
"MaximumRetention": {"Days": 1000}
}
cos_client.put_bucket_protection_configuration(Bucket=bucket_name, ProtectionConfiguration=new_protection_config)
print("Protection added to bucket {0}\n".format(bucket_name))
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to set bucket protection config: {0}".format(e))
检查存储区上的保护
def get_protection_configuration_on_bucket(bucket_name):
try:
response = cos_client.get_bucket_protection_configuration(Bucket=bucket_name)
protection_config = response.get("ProtectionConfiguration")
print("Bucket protection config for {0}\n".format(bucket_name))
print(protection_config)
print("\n")
except ClientError as be:
print("CLIENT ERROR: {0}\n".format(be))
except Exception as e:
print("Unable to get bucket protection config: {0}".format(e))
上传受保护对象
如果覆盖受保护存储区中不再保留的对象(保留期已到期,并且对象没有任何合法保留),那么会再次保留这些对象。 可以在对象覆盖请求中提供新的保留期,否则会为对象提供存储区的缺省保留时间。
| 值 | 类型 | 描述 |
|---|---|---|
Retention-Period |
非负整数(秒) | 要在对象上存储的保留期(以秒为单位)。 在保留期中指定的时间长度到期之前,无法覆盖也无法删除对象。 如果同时指定了此字段和 Retention-Expiration-Date,将返回 400 错误。 如果这两个字段均未指定,将使用存储区的 DefaultRetention 时间段。 假定存储区的最短保留期为 0,那么零 (0) 是合法值。 |
Retention-expiration-date |
日期(ISO 8601 格式) | 能够合法删除或修改对象的日期。 只能指定此项或指定 Retention-Period 头。 如果同时指定这两项,将返回 400 错误。 如果这两项均未指定,将使用存储区的 DefaultRetention 时间段。 |
Retention-legal-hold-id |
string | 要应用于对象的单个合法保留。 合法保留是长度为 Y 个字符的字符串。 在除去与对象关联的所有合法保留之前,无法覆盖或删除对象。 |
def put_object_add_legal_hold(bucket_name, object_name, file_text, legal_hold_id):
print("Add legal hold {0} to {1} in bucket {2} with a putObject operation.\n".format(legal_hold_id, object_name, bucket_name))
cos_client.put_object(
Bucket=bucket_name,
Key=object_name,
Body=file_text,
RetentionLegalHoldId=legal_hold_id)
print("Legal hold {0} added to object {1} in bucket {2}\n".format(legal_hold_id, object_name, bucket_name))
def copy_protected_object(source_bucket_name, source_object_name, destination_bucket_name, new_object_name):
print("Copy protected object {0} from bucket {1} to {2}/{3}.\n".format(source_object_name, source_bucket_name, destination_bucket_name, new_object_name))
copy_source = {
"Bucket": source_bucket_name,
"Key": source_object_name
}
cos_client.copy_object(
Bucket=destination_bucket_name,
Key=new_object_name,
CopySource=copy_source,
RetentionDirective="Copy"
)
print("Protected object copied from {0}/{1} to {2}/{3}\n".format(source_bucket_name, source_object_name, destination_bucket_name, new_object_name));
def complete_multipart_upload_with_retention(bucket_name, object_name, upload_id, retention_period):
print("Completing multi-part upload for object {0} in bucket {1}\n".format(object_name, bucket_name))
cos_client.complete_multipart_upload(
Bucket=bucket_name,
Key=object_name,
MultipartUpload={
"Parts":[{
"ETag": part["ETag"],
"PartNumber": 1
}]
},
UploadId=upload_id,
RetentionPeriod=retention_period
)
print("Multi-part upload completed for object {0} in bucket {1}\n".format(object_name, bucket_name))
def upload_file_with_retention(bucket_name, object_name, path_to_file, retention_period):
print("Uploading file {0} to object {1} in bucket {2}\n".format(path_to_file, object_name, bucket_name))
args = {
"RetentionPeriod": retention_period
}
cos_client.upload_file(
Filename=path_to_file,
Bucket=bucket_name,
Key=object_name,
ExtraArgs=args
)
print("File upload complete to object {0} in bucket {1}\n".format(object_name, bucket_name))
向受保护对象添加合法保留或除去受保护对象的合法保留
一个对象可以支持 100 个合法保留:
- 合法保留标识是一个字符串,最大长度为 64 个字符,最小长度为 1 个字符。 有效字符包括字母,数字和符号
!,_,.,*,(,)和-。 - 如果添加给定合法保留将导致对象上的合法保留总数超过 100 个,那么不会添加新的合法保留,并且将返回
400错误。 - 如果标识太长,那么不会将其添加到对象,并且将返回
400错误。 - 如果标识包含无效字符,那么不会将其添加到对象,并且将返回
400错误。 - 如果标识已在对象上使用,那么不会修改现有合法保留,响应会指示该标识已在使用,并返回
409错误。 - 如果对象没有保留期元数据,那么将返回
400错误,并且不允许添加或除去合法保留。
要添加或除去法律保留,您必须具有此存储区的 Manager 许可权。
def add_legal_hold_to_object(bucket_name, object_name, legal_hold_id):
print("Adding legal hold {0} to object {1} in bucket {2}\n".format(legal_hold_id, object_name, bucket_name))
cos_client.add_legal_hold(
Bucket=bucket_name,
Key=object_name,
RetentionLegalHoldId=legal_hold_id
)
print("Legal hold {0} added to object {1} in bucket {2}!\n".format(legal_hold_id, object_name, bucket_name))
def delete_legal_hold_from_object(bucket_name, object_name, legal_hold_id):
print("Deleting legal hold {0} from object {1} in bucket {2}\n".format(legal_hold_id, object_name, bucket_name))
cos_client.delete_legal_hold(
Bucket=bucket_name,
Key=object_name,
RetentionLegalHoldId=legal_hold_id
)
print("Legal hold {0} deleted from object {1} in bucket {2}!\n".format(legal_hold_id, object_name, bucket_name))
延长受保护对象的保留期
对象的保留期只能延长。 不能在当前配置值的基础上缩短。
保留时间延长值可通过以下三种方式之一进行设置:
- 在当前值的基础上增加时间(
Additional-Retention-Period或类似方法) - 新的延长时间段(以秒为单位)(
Extend-Retention-From-Current-Time或类似方法) - 对象的新保留到期日期(
New-Retention-Expiration-Date或类似方法)
根据 extendRetention 请求中设置的参数,对象元数据中存储的当前保留期可通过给定更多时间延长,也可替换为新值。 在所有情况下,都会根据当前保留期来检查延长保留时间参数,并且仅当更新的保留期大于当前保留期时,才会接受延长参数。
如果覆盖受保护存储区中不再保留的对象(保留期已到期,并且对象没有任何合法保留),那么会再次保留这些对象。 可以在对象覆盖请求中提供新的保留期,否则会为对象提供存储区的缺省保留时间。
def extend_retention_period_on_object(bucket_name, object_name, additional_seconds):
print("Extend the retention period on {0} in bucket {1} by {2} seconds.\n".format(object_name, bucket_name, additional_seconds))
cos_client.extend_object_retention(
Bucket=bucket_ame,
Key=object_name,
AdditionalRetentionPeriod=additional_seconds
)
print("New retention period on {0} is {1}\n".format(object_name, additional_seconds))
列出受保护对象上的合法保留
此操作会返回以下内容:
- 对象创建日期
- 对象保留期(秒)
- 根据时间段和创建日期计算的保留到期日期
- 合法保留的列表
- 合法保留标识
- 应用合法保留时的时间戳记
如果对象上没有合法保留,那么会返回空的 LegalHoldSet。 如果在对象上未指定保留期,那么会返回 404 错误。
def list_legal_holds_on_object(bucket_name, object_name):
print("List all legal holds on object {0} in bucket {1}\n".format(object_name, bucket_name));
response = cos_client.list_legal_holds(
Bucket=bucket_name,
Key=object_name
)
print("Legal holds on bucket {0}: {1}\n".format(bucket_name, response))
创建托管静态 Web 站点
此操作需要许可权,因为通常仅允许存储区所有者配置存储区以托管静态 Web 站点。 这些参数确定站点访问者的缺省后缀以及可选错误文档。
def putBucketWebsiteConfiguration(bucket_name):
website_defaults = {
'ErrorDocument': {'Key': 'error.html'},
'IndexDocument': {'Suffix': 'index.html'},
}
cos_client.put_bucket_website(Bucket=bucket_name, WebsiteConfiguration=website_defaults)
print("Website configuration set on bucket {0}\n".format(bucket_name))
后续步骤
有关更多信息,可以在 GitHub中找到源代码。