IBM Cloud Docs
使用 Python

使用 Python

Python 支持是通过 boto3 库的派生提供的,其中具有可充分利用 IBM Cloud® Object Storage 的功能。

可以在 Python Package Index 中通过 pip install ibm-cos-sdk 安装 Python。

源代码可在 GitHub.

ibm_boto3 库提供对 IBM Cloud® Object Storage API 的完全访问权。 端点、API 密钥和实例标识必须在创建服务资源或低级别客户机期间指定,如以下基本示例所示。

服务实例 ID 也称为_资源实例 ID_。 通过创建服务凭证或通过 CLI 可以找到此值。

详细文档请点击 此处

创建客户机和获取凭证

为了连接到 COS,将使用凭证信息(API 密钥和服务实例标识)来创建和配置客户机。 这些值还可以自动从凭证文件或环境变量中获取。

生成服务凭证后,生成的 JSON 文档可以保存到 ~/.bluemix/cos_credentials。 除非在客户机创建期间显式设置了其他凭证,否则 SDK 会自动从此文件中获取凭证。 如果 cos_credentials 文件包含 HMAC 密钥,那么客户机将使用签名进行认证,否则客户机将使用提供的 API 密钥通过不记名令牌进行认证 (使用 API 密钥仍要求在客户机创建期间包含 config=Config(signature_version="oauth") )。

如果是从 AWS S3 进行迁移,那么还可以从 ~/.aws/credentials 中获取以下格式的凭证数据:

[default]
aws_access_key_id = {API_KEY}
aws_secret_access_key = {SERVICE_INSTANCE_ID}

:如果同时存在 ~/.bluemix/cos_credentials~/.aws/credentials,则 cos_credentials 优先。

收集必需的信息

示例中显示了以下变量:

  • bucket_name 必须是唯一的 DNS 安全字符串。 因为存储区名称在整个系统中唯一,因此如果多次运行此示例,那么需要更改这些值。 请注意,名称在删除后会保留 10 到 15 分钟。
  • ibm_api_key_id 是在服务凭证中找到的 apikey 的值。
  • ibm_service_instance_id 是在服务凭证中找到的 resource_instance_id 的值。
  • endpoint_url 是服务端点 URL,包含 https:// 协议。 该值不是 服务证书 中的 endpoints 值。 有关端点的更多信息,请参阅端点和存储位置
  • LocationConstraint 是与 值相对应的 endpoint 有效供应代码

代码示例

在受支持的 Python发行版上测试代码示例。

在您的代码中,必须除去此处作为插图提供的尖括号或任何其他多余字符。

初始化配置

此示例创建 resource 对象。 资源提供面向对象的 COS 接口。 这允许比客户机对象提供的低级别调用更高级别的抽象。

请注意,某些操作 (例如 Aspera 高速传输) 需要 client 对象。 Aspera 本身需要 Python V 3.6。

旧通知: 对 Aspera 的支持被视为旧支持。 请改为使用 Aspera Transfer SDK

import ibm_boto3
from ibm_botocore.client import Config, ClientError

# Constants for IBM COS values
COS_ENDPOINT = "<endpoint>" # Current list avaiable at https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints
COS_API_KEY_ID = "<api-key>" # eg "W00YixxxxxxxxxxMB-odB-2ySfTrFBIQQWanc--P3byk"
COS_INSTANCE_CRN = "<service-instance-id>" # eg "crn:v1:bluemix:public:cloud-object-storage:global:a/3bf0d9003xxxxxxxxxx1c3e97696b71c:d6f04d83-6c4f-4a62-a165-696756d63903::"

# Create resource
cos_resource = ibm_boto3.resource("s3",
    ibm_api_key_id=COS_API_KEY_ID,
    ibm_service_instance_id=COS_INSTANCE_CRN,
    config=Config(signature_version="oauth"),
    endpoint_url=COS_ENDPOINT
)

客户机提供 COS S3 API 的低级别接口。 这样就可以直接处理 HTTP 响应,而不是利用资源提供的抽象方法和属性来访问包含在标头或 XML 响应有效载荷中的信息。


import ibm_boto3
from ibm_botocore.client import Config, ClientError

# Constants for IBM COS values
COS_ENDPOINT = "<endpoint>" # Current list avaiable at https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints
COS_API_KEY_ID = "<api-key>" # eg "W00YixxxxxxxxxxMB-odB-2ySfTrFBIQQWanc--P3byk"
COS_INSTANCE_CRN = "<service-instance-id>" # eg "crn:v1:bluemix:public:cloud-object-storage:global:a/3bf0d9003xxxxxxxxxx1c3e97696b71c:d6f04d83-6c4f-4a62-a165-696756d63903::"

# Create client
cos_client = ibm_boto3.client("s3",
    ibm_api_key_id=COS_API_KEY_ID,
    ibm_service_instance_id=COS_INSTANCE_CRN,
    config=Config(signature_version="oauth"),
    endpoint_url=COS_ENDPOINT
)

键值

  • <endpoint>- 您的云 Object Storage 的公共端点,模式前缀为 (' https:// ') (可从 IBM Cloud 控制面板获取)。 有关端点的更多信息,请参阅端点和存储位置
  • <api-key>- 创建服务证书时生成的 api 密钥(创建和删除示例需要写访问权限)
  • <service-instance-id>- 云 Object Storage 的资源 ID(可通过 IBM Cloud CLIIBM Cloud Dashboard 获取)
  • <location>- 云 Object Storage 的默认位置(必须与 <endpoint> 所用的区域相匹配 )

SDK 引用

创建新存储区

以下示例使用作为低级别接口的客户机。

LocationConstraint 的有效配置代码列表可参考《 存储类别指南 》。

def create_bucket(bucket_name):
    print("Creating new bucket: {0}".format(bucket_name))
    try:
        cos_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={
                "LocationConstraint":COS_BUCKET_LOCATION
            }
        )
        print("Bucket: {0} created!".format(bucket_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to create bucket: {0}".format(e))

SDK 引用

方法

创建新的文本文件

def create_text_file(bucket_name, item_name, file_text):
    print("Creating new item: {0}".format(item_name))
    try:
        cos_client.put_object(
            Bucket=bucket_name,
            Key=item_name,
            Body=file_text
        )
        print("Item: {0} created!".format(item_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to create text file: {0}".format(e))

SDK 引用

方法

列出可用存储区

def get_buckets():
    print("Retrieving list of buckets")
    try:
        buckets = cos_client.list_buckets()
        for bucket in buckets["Buckets"]:
            print("Bucket Name: {0}".format(bucket["Name"]))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to retrieve list buckets: {0}".format(e))

SDK 引用

方法

列出存储区中的项

def get_bucket_contents(bucket_name):
    print("Retrieving bucket contents from: {0}".format(bucket_name))
    try:
        files = cos_client.list_objects(Bucket=bucket_name)
        for file in files.get("Contents", []):
            print("Item: {0} ({1} bytes).".format(file["Key"], file["Size"]))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to retrieve bucket contents: {0}".format(e))

SDK 引用

方法

获取特定项的文件内容

def get_item(bucket_name, item_name):
    print("Retrieving item from bucket: {0}, key: {1}".format(bucket_name, item_name))
    try:
        file = cos_client.get_object(Bucket=bucket_name, Key=item_name)
        print("File Contents: {0}".format(file["Body"].read()))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to retrieve file contents: {0}".format(e))

SDK 引用

方法

从存储区中删除一个项

def delete_item(bucket_name, object_name):
    try:
        cos_client.delete_object(Bucket=bucket_name, Key=object_name)
        print("Item: {0} deleted!\n".format(object_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to delete object: {0}".format(e))

SDK 引用

方法

从存储区中删除多个项

删除请求最多可包含 1000 个要删除的键。 虽然这有助于减少每次请求的性能冲击,但在删除许多键时要注意。 此外,请考虑对象的大小,以确保性能合适。

def delete_items(bucket_name):
    try:
        delete_request = {
            "Objects": [
                { "Key": "deletetest/testfile1.txt" },
                { "Key": "deletetest/testfile2.txt" },
                { "Key": "deletetest/testfile3.txt" },
                { "Key": "deletetest/testfile4.txt" },
                { "Key": "deletetest/testfile5.txt" }
            ]
        }

        response = cos_client.delete_objects(
            Bucket=bucket_name,
            Delete=delete_request
        )

        print("Deleted items for {0}\n".format(bucket_name))
        print(json.dumps(response.get("Deleted"), indent=4))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to copy item: {0}".format(e))

SDK 引用

方法

删除存储区

def delete_bucket(bucket_name):
    print("Deleting bucket: {0}".format(bucket_name))
    try:
        cos_client.delete_bucket(Bucket=bucket_name)
        print("Bucket: {0} deleted!".format(bucket_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to delete bucket: {0}".format(e))

SDK 引用

方法

删除后,水桶名称会保留 10-15 分钟。

运行分块上传

上传二进制文件(首选方法)

对象的 upload_fileobjS3 对象的方法会在必要时自动运行多部分上传。 该 TransferConfig 类用于确定使用多部分上传的阈值。

def multi_part_upload(bucket_name, item_name, file_path):
    try:
        print("Starting file transfer for {0} to bucket: {1}\n".format(item_name, bucket_name))
        # set 5 MB chunks
        part_size = 1024 * 1024 * 5

        # set threadhold to 15 MB
        file_threshold = 1024 * 1024 * 15

        # set the transfer threshold and chunk size
        transfer_config = ibm_boto3.s3.transfer.TransferConfig(
            multipart_threshold=file_threshold,
            multipart_chunksize=part_size
        )

        # the upload_fileobj method will automatically execute a multi-part upload
        # in 5 MB chunks for all files over 15 MB
        with open(file_path, "rb") as file_data:
            cos_client.upload_fileobj(
                Bucket=bucket_name,
                Key=item_name,
                Fileobj=file_data,
                Config=transfer_config
            )

        print("Transfer for {0} Complete!\n".format(item_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to complete multi-part upload: {0}".format(e))

SDK 引用

方法

手动运行分块上传

如果需要,该 S3.Client 类可用于执行多部分上传。 如果需要拥有对上传过程的更多控制权,那么此方法可能非常有用。

def multi_part_upload_manual(bucket_name, item_name, file_path):
    try:
        # create client object
        cos_client = ibm_boto3.client("s3",
            ibm_api_key_id=COS_API_KEY_ID,
            ibm_service_instance_id=COS_SERVICE_CRN,
            config=Config(signature_version="oauth"),
            endpoint_url=COS_ENDPOINT
        )

        print("Starting multi-part upload for {0} to bucket: {1}\n".format(item_name, bucket_name))

        # initiate the multi-part upload
        mp = cos_client.create_multipart_upload(
            Bucket=bucket_name,
            Key=item_name
        )

        upload_id = mp["UploadId"]

        # min 20MB part size
        part_size = 1024 * 1024 * 20
        file_size = os.stat(file_path).st_size
        part_count = int(math.ceil(file_size / float(part_size)))
        data_packs = []
        position = 0
        part_num = 0

        # begin uploading the parts
        with open(file_path, "rb") as file:
            for i in range(part_count):
                part_num = i + 1
                part_size = min(part_size, (file_size - position))

                print("Uploading to {0} (part {1} of {2})".format(item_name, part_num, part_count))

                file_data = file.read(part_size)

                mp_part = cos_client.upload_part(
                    Bucket=bucket_name,
                    Key=item_name,
                    PartNumber=part_num,
                    Body=file_data,
                    ContentLength=part_size,
                    UploadId=upload_id
                )

                data_packs.append({
                    "ETag":mp_part["ETag"],
                    "PartNumber":part_num
                })

                position += part_size

        # complete upload
        cos_client.complete_multipart_upload(
            Bucket=bucket_name,
            Key=item_name,
            UploadId=upload_id,
            MultipartUpload={
                "Parts": data_packs
            }
        )
        print("Upload for {0} Complete!\n".format(item_name))
    except ClientError as be:
        # abort the upload
        cos_client.abort_multipart_upload(
            Bucket=bucket_name,
            Key=item_name,
            UploadId=upload_id
        )
        print("Multi-part upload aborted for {0}\n".format(item_name))
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to complete multi-part upload: {0}".format(e))

SDK 引用 (续)

方法

使用 TransferManager 上传大对象

TransferManager 提供了另一种方法,用于在每次有必要设置配置参数时,自动合并分块上传来运行大型文件传输。

def upload_large_file(bucket_name, item_name, file_path):
    print("Starting large file upload for {0} to bucket: {1}".format(item_name, bucket_name))

    # set the chunk size to 5 MB
    part_size = 1024 * 1024 * 5

    # set threadhold to 5 MB
    file_threshold = 1024 * 1024 * 5

    # Create client connection
    cos_client = ibm_boto3.client("s3",
        ibm_api_key_id=COS_API_KEY_ID,
        ibm_service_instance_id=COS_SERVICE_CRN,
        config=Config(signature_version="oauth"),
        endpoint_url=COS_ENDPOINT
    )

    # set the transfer threshold and chunk size in config settings
    transfer_config = ibm_boto3.s3.transfer.TransferConfig(
        multipart_threshold=file_threshold,
        multipart_chunksize=part_size
    )

    # create transfer manager
    transfer_mgr = ibm_boto3.s3.transfer.TransferManager(cos_client, config=transfer_config)

    try:
        # initiate file upload
        future = transfer_mgr.upload(file_path, bucket_name, item_name)

        # wait for upload to complete
        future.result()

        print ("Large file upload complete!")
    except Exception as e:
        print("Unable to complete large file upload: {0}".format(e))
    finally:
        transfer_mgr.shutdown()

列出存储区中的项 (V2)

S3.Client 对象具有用于列出内容的更新方法 (list_objects_v2)。 此方法允许您限制返回的记录数,并批量检索记录。 这对于对应用程序中的结果进行分页可能非常有用,并可能提高性能。

def get_bucket_contents_v2(bucket_name, max_keys):
    print("Retrieving bucket contents from: {0}".format(bucket_name))
    try:
        # create client object
        cos_client = ibm_boto3.client("s3",
            ibm_api_key_id=COS_API_KEY_ID,
            ibm_service_instance_id=COS_SERVICE_CRN,
            config=Config(signature_version="oauth"),
            endpoint_url=COS_ENDPOINT)

        more_results = True
        next_token = ""

        while (more_results):
            response = cos_client.list_objects_v2(Bucket=bucket_name, MaxKeys=max_keys, ContinuationToken=next_token)
            files = response["Contents"]
            for file in files:
                print("Item: {0} ({1} bytes).".format(file["Key"], file["Size"]))

            if (response["IsTruncated"]):
                next_token = response["NextContinuationToken"]
                print("...More results in next batch!\n")
            else:
                more_results = False
                next_token = ""

    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to retrieve bucket contents: {0}".format(e))

SDK 引用

方法

使用 Key Protect

可以将 Key Protect 添加到存储区,以对云中的敏感数据进行静态加密。

准备工作

要创建启用了 Key Protect 的存储区,需要以下各项:

检索根密钥 CRN

  1. 检索 Key Protect 服务的实例标识
  2. 使用 Key Protect API 来检索所有可用密钥
  3. 检索用于在存储区上启用 Key Protect 的根密钥的 CRN。 CRN 类似于以下内容:

crn:v1:bluemix:public:kms:us-south:a/3d624cd74a0dea86ed8efe3101341742:90b6a1db-0fe1-4fe9-b91e-962c327df531:key:0bg3e33e-a866-50f2-b715-5cba2bc93234

创建启用了 Key Protect 的存储区

COS_KP_ALGORITHM = "<algorithm>"
COS_KP_ROOTKEY_CRN = "<root-key-crn>"

# Create a new bucket with key protect (encryption)
def create_bucket_kp(bucket_name):
    print("Creating new encrypted bucket: {0}".format(bucket_name))
    try:
        cos_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={
                "LocationConstraint":COS_BUCKET_LOCATION
            },
            IBMSSEKPEncryptionAlgorithm=COS_KP_ALGORITHM,
            IBMSSEKPCustomerRootKeyCrn=COS_KP_ROOTKEY_CRN
        )
        print("Encrypted Bucket: {0} created!".format(bucket_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to create encrypted bucket: {0}".format(e))

键值

  • <algorithm>- 对添加到数据桶的新对象使用的加密算法(默认为 AES256 )。
  • <root-key-crn>- 从 Key Protect 服务获取的根密钥的 CRN。

SDK 引用

方法

使用 Aspera 高速传输

旧通知: 对 Aspera 的支持被视为旧支持。 建议用户使用 Aspera Transfer SDK[https://developer.ibm.com/apis/catalog/aspera--aspera-transfer-sdk/API Reference]。

旧通知: 对 Aspera 的支持被视为旧支持。 建议用户使用 Aspera Transfer SDK

通过安装 Aspera 高速传输库,可以在应用程序中使用高速文件传输。 Aspera 库是封闭式源代码库,因此具有 COS SDK(使用 Apache 许可证)的可选依赖项。

每个 Aspera 会话都会创建一个单独的 ascp 进程,此进程在客户机上运行以执行传输。 请确保计算环境允许此进程运行。

初始化 AsperaTransferManager

在初始化 AsperaTransferManager 之前,请确保您有一个正在运行的 client (不是 resourcesession )对象。

import ibm_boto3
from ibm_botocore.client import Config
from ibm_s3transfer.aspera.manager import AsperaTransferManager

COS_ENDPOINT = "<endpoint>" # Current list avaiable at https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints
COS_API_KEY_ID = "<api-key>"
COS_RESOURCE_CRN = "<resource-instance-id>"
COS_BUCKET_LOCATION = "<location>"

# Create resource
cos_client = ibm_boto3.client("s3",
    ibm_api_key_id=COS_API_KEY_ID,
    ibm_service_instance_id=COS_RESOURCE_CRN,
    config=Config(signature_version="oauth"),
    endpoint_url=COS_ENDPOINT
)

transfer_manager = AsperaTransferManager(cos)

您需要提供 IAM API 密钥以用于 Aspera 高速传输。 当前 支持 HMAC 凭证。 有关 IAM 的更多信息,请单击此处

要实现最高吞吐量,请将传输拆分成指定数量的并行会话,这些会话发送的数据块的大小由阈值定义。

使用多会话的典型配置应该如下:

  • 2500 MBps 目标速率
  • 100 MB 阈值(这是针对大多数应用程序的建议值
ms_transfer_config = AsperaConfig(multi_session="all",
                                  target_rate_mbps=2500,
                                  multi_session_threshold_mb=100)

在以上示例中,SDK 将衍生足够的会话来尝试达到目标速率 2500 MBps。

此外,可以在 SDK 中显式配置会话管理。 对于需要更精确地控制网络利用率的情况,此功能会非常有用。

使用显式多会话的典型配置应该如下:

  • 2 个或 10 个会话
  • 100 MB 阈值(这是针对大多数应用程序的建议值
from ibm_s3transfer.aspera.manager import AsperaConfig
# Configure 2 sessions for transfer
ms_transfer_config = AsperaConfig(multi_session=2,
                                  multi_session_threshold_mb=100)

# Create the Aspera Transfer Manager
transfer_manager = AsperaTransferManager(client=client,
                                         transfer_config=ms_transfer_config)

在大多数情况下,要想获得最佳性能,应始终使用多个会话,以尽量减少与实例化 Aspera 高速传输相关的任何处理。 如果网络容量至少为 1 Gbps,那么应该使用 10 个会话。 带宽低于 1 Gbps 的网络应该使用 2 个会话。

文件上传

bucket_name = "<bucket-name>"
upload_filename = "<absolute-path-to-file>"
object_name = "<item-name>"

# Create Transfer manager
with AsperaTransferManager(client) as transfer_manager:

    # Perform upload
    future = transfer_manager.upload(upload_filename, bucket_name, object_name)

    # Wait for upload to complete
    future.result()

键值

  • <bucket-name>- 目标水桶名称
  • <absolute-path-to-file>- 要上传文件的目录路径和文件名
  • <item-name>- 水桶中新增文件的名称

文件下载

bucket_name = "<bucket-name>"
download_filename = "<absolute-path-to-file>"
object_name = "<object-to-download>"

# Create Transfer manager
with AsperaTransferManager(client) as transfer_manager:

    # Get object with Aspera
    future = transfer_manager.download(bucket_name, object_name, download_filename)

    # Wait for download to complete
    future.result()

键值

  • <bucket-name>- Object Storage 服务实例中已启用 Aspera 的桶的名称。
  • <absolute-path-to-file>- 将文件保存到本地系统的目录和文件名。
  • <object-to-download>- 要下载的邮筒中文件的名称。

目录上传

bucket_name = "<bucket-name>"
# THIS DIRECTORY MUST EXIST LOCALLY, and have objects in it.
local_upload_directory = "<absolute-path-to-directory>"
# THIS SHOULD NOT HAVE A LEADING "/"
remote_directory = "<object prefix>"

# Create Transfer manager
with AsperaTransferManager(client) as transfer_manager:

    # Perform upload
    future = transfer_manager.upload_directory(local_upload_directory, bucket_name, remote_directory)

    # Wait for upload to complete
    future.result()

键值

  • <bucket-name>- Object Storage 服务实例中已启用 Aspera 的水桶名称
  • <absolute-path-to-directory>- 本地目录,其中包含要上传的文件。 必须具有前导和尾部 /(例如,/Users/testuser/Documents/Upload/
  • <object prefix>- 存储文件的存储桶目录名称。 不能有前导斜杠 /(例如,newuploads/

目录下载

bucket_name = "<bucket-name>"
# THIS DIRECTORY MUST EXIST LOCALLY
local_download_directory = "<absolute-path-to-directory>"
remote_directory = "<object prefix>"

# Create Transfer manager
with AsperaTransferManager(client) as transfer_manager:

    # Get object with Aspera
    future = transfer_manager.download_directory(bucket_name, remote_directory, local_download_directory)

    # Wait for download to complete
    future.result()

键值

  • <bucket-name>- Object Storage 服务实例中已启用 Aspera 的水桶名称
  • <absolute-path-to-directory>- 保存下载文件的本地目录。 必须有前斜线和后斜线 / (即 /Users/testuser/Downloads/)
  • <object prefix>- 存储文件的存储桶目录名称。 不能有前导斜杠 /(例如,todownload/

使用订户

订户通过附加定制回调方法来提供对传输的可观察性。 所有传输都会在以下各阶段之间进行转换:

Queued - In Progress - Done

每个阶段有三个可用订户:

  • CallbackOnQueued() - 将新传输添加到 AsperaTransferManager 时调用
  • CallbackOnProgress() - 传输已传输数据时调用(正在传输期间会重复触发)。
  • CallbackOnDone() - 传输完成后调用
bucket_name = "<bucket-name>"
local_download_directory = "<absolute-path-to-directory>"
remote_directory = "<object prefix>"

# Subscriber callbacks
class CallbackOnQueued(AsperaBaseSubscriber):
    def __init__(self):
        pass

    def on_queued(self, future, **kwargs):
        print("Directory download queued.")

class CallbackOnProgress(AsperaBaseSubscriber):
    def __init__(self):
        pass

    def on_progress(self, future, bytes_transferred, **kwargs):
        print("Directory download in progress: %s bytes transferred" % bytes_transferred)

class CallbackOnDone(AsperaBaseSubscriber):
    def __init__(self):
        pass

    def on_done(self, future, **kwargs):
        print("Downloads complete!")

# Create Transfer manager
transfer_manager = AsperaTransferManager(client)

# Attach subscribers
subscribers = [CallbackOnQueued(), CallbackOnProgress(), CallbackOnDone()]

# Get object with Aspera
future = transfer_manager.download_directory(bucket_name, remote_directory, local_download_directory, None, subscribers)

# Wait for download to complete
future.result()

键值

  • <bucket-name>- Object Storage 服务实例中已启用 Aspera 的水桶名称
  • <absolute-path-to-directory>- 保存下载文件的本地目录。 必须具有前导和尾部斜杠 /(例如,/Users/testuser/Downloads/
  • <object prefix>- 存储文件的存储桶目录名称。 不能有前导斜杠 /(例如,todownload/

上面的样本代码会生成以下输出:

Directory download queued.
Directory download in progress: 5632 bytes transferred
Directory download in progress: 1047552 bytes transferred
...
Directory download in progress: 53295130 bytes transferred
Directory download in progress: 62106855 bytes transferred
Download complete!

暂停/恢复/取消

SDK 提供了通过 AsperaTransferFuture 对象的以下方法来管理文件/目录传输进度的能力:

  • pause()
  • resume()
  • cancel()

调用以上概述的任一方法都不会有任何副作用。 合适的清除和整理工作由 SDK 负责处理。

# Create Transfer manager
bucket_name = "<bucket-name>"
local_download_directory = "<absolute-path-to-directory>"
remote_directory = "<object prefix>"

with AsperaTransferManager(client) as transfer_manager:

    # download a directory with Aspera
    future = transfer_manager.download_directory(bucket_name, remote_directory, local_download_directory, None, None)

    # pause the transfer
    future.pause()

    # resume the transfer
    future.resume()

    # cancel the transfer
    future.cancel()

对 Aspera 问题进行故障诊断

问题: 使用除 3.6 之外的任何 Python 版本的开发者在安装或使用 Aspera SDK时可能会迂到故障。

原因: 如果环境中安装了不同版本的 Python,那么在尝试安装 Aspera SDK 时可能会遇到安装失败。 原因可能是路径中缺少 DLL 文件或 DLL 不正确。

**解决方案:**解决此问题的第一步是重新安装 Aspera 库。 安装期间可能发生了故障。 结果,这可能影响了 DLL 文件。 如果这样做无法解决这些问题,那么您需要更新 Python 版本。 如果您无法这样做,则可以使用安装 Intel® Distribution for Python *。 这应该允许您在 Python 3.6.x 上安装 Aspera SDK,而不会出现任何问题。

更新元数据

有两种方法可更新现有对象上的元数据:

  • 对新的元数据和原始对象内容执行 PUT 请求
  • 使用将原始对象指定为复制源的新元数据来运行 COPY 请求

使用 PUT 更新元数据

注意: PUT 请求会覆盖对象的现有内容,因此必须首先下载并重新上传新的元数据。

def update_metadata_put(bucket_name, item_name, key, value):
    try:
        # retrieve the existing item to reload the contents
        response = cos_client.get_object(Bucket=bucket_name, Key=item_name)
        existing_body = response.get("Body").read()

        # set the new metadata
        new_metadata = {
            key: value
        }

        cos_client.put_object(Bucket=bucket_name, Key=item_name, Body=existing_body, Metadata=new_metadata)

        print("Metadata update (PUT) for {0} Complete!\n".format(item_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        log_error("Unable to update metadata: {0}".format(e))

使用 COPY 更新元数据

def update_metadata_copy(bucket_name, item_name, key, value):
    try:
        # set the new metadata
        new_metadata = {
            key: value
        }

        # set the copy source to itself
        copy_source = {
            "Bucket": bucket_name,
            "Key": item_name
        }

        cos_client.copy_object(Bucket=bucket_name, Key=item_name, CopySource=copy_source, Metadata=new_metadata, MetadataDirective="REPLACE")

        print("Metadata update (COPY) for {0} Complete!\n".format(item_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        log_error("Unable to update metadata: {0}".format(e))

使用不可变对象存储器

向现有存储区添加保护配置

对于写入受保护存储区的对象,在保护时间段到期并且除去了对象上的所有合法保留之前,无法删除这些对象。 除非在创建对象时提供了特定于对象的值,否则将向对象提供存储区的缺省保留时间值。 如果覆盖受保护存储区中不再保留的对象(保留期已到期,并且对象没有任何合法保留),那么会再次保留这些对象。 可以在对象覆盖请求中提供新的保留期,否则会为对象提供存储区的缺省保留时间。

保留期设置 MinimumRetentionDefaultRetentionMaximumRetention 的最小和最大支持值分别为 0 天和 365243 天(1000 年)。

def add_protection_configuration_to_bucket(bucket_name):
    try:
        new_protection_config = {
            "Status": "Retention",
            "MinimumRetention": {"Days": 10},
            "DefaultRetention": {"Days": 100},
            "MaximumRetention": {"Days": 1000}
        }

        cos_client.put_bucket_protection_configuration(Bucket=bucket_name, ProtectionConfiguration=new_protection_config)

        print("Protection added to bucket {0}\n".format(bucket_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to set bucket protection config: {0}".format(e))

检查存储区上的保护

def get_protection_configuration_on_bucket(bucket_name):
    try:
        response = cos_client.get_bucket_protection_configuration(Bucket=bucket_name)
        protection_config = response.get("ProtectionConfiguration")

        print("Bucket protection config for {0}\n".format(bucket_name))
        print(protection_config)
        print("\n")
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to get bucket protection config: {0}".format(e))

上传受保护对象

如果覆盖受保护存储区中不再保留的对象(保留期已到期,并且对象没有任何合法保留),那么会再次保留这些对象。 可以在对象覆盖请求中提供新的保留期,否则会为对象提供存储区的缺省保留时间。

类型 描述
Retention-Period 非负整数(秒) 要在对象上存储的保留期(以秒为单位)。 在保留期中指定的时间长度到期之前,无法覆盖也无法删除对象。 如果同时指定了此字段和 Retention-Expiration-Date,将返回 400 错误。 如果这两个字段均未指定,将使用存储区的 DefaultRetention 时间段。 假定存储区的最短保留期为 0,那么零 (0) 是合法值。
Retention-expiration-date 日期(ISO 8601 格式) 能够合法删除或修改对象的日期。 只能指定此项或指定 Retention-Period 头。 如果同时指定这两项,将返回 400 错误。 如果这两项均未指定,将使用存储区的 DefaultRetention 时间段。
Retention-legal-hold-id string 要应用于对象的单个合法保留。 合法保留是长度为 Y 个字符的字符串。 在除去与对象关联的所有合法保留之前,无法覆盖或删除对象。
def put_object_add_legal_hold(bucket_name, object_name, file_text, legal_hold_id):
    print("Add legal hold {0} to {1} in bucket {2} with a putObject operation.\n".format(legal_hold_id, object_name, bucket_name))
    cos_client.put_object(
        Bucket=bucket_name,
        Key=object_name,
        Body=file_text,
        RetentionLegalHoldId=legal_hold_id)
    print("Legal hold {0} added to object {1} in bucket {2}\n".format(legal_hold_id, object_name, bucket_name))

def copy_protected_object(source_bucket_name, source_object_name, destination_bucket_name, new_object_name):
    print("Copy protected object {0} from bucket {1} to {2}/{3}.\n".format(source_object_name, source_bucket_name, destination_bucket_name, new_object_name))

    copy_source = {
        "Bucket": source_bucket_name,
        "Key": source_object_name
    }

    cos_client.copy_object(
        Bucket=destination_bucket_name,
        Key=new_object_name,
        CopySource=copy_source,
        RetentionDirective="Copy"
    )

    print("Protected object copied from {0}/{1} to {2}/{3}\n".format(source_bucket_name, source_object_name, destination_bucket_name, new_object_name));

def complete_multipart_upload_with_retention(bucket_name, object_name, upload_id, retention_period):
    print("Completing multi-part upload for object {0} in bucket {1}\n".format(object_name, bucket_name))
    cos_client.complete_multipart_upload(
        Bucket=bucket_name,
        Key=object_name,
        MultipartUpload={
            "Parts":[{
                "ETag": part["ETag"],
                "PartNumber": 1
            }]
        },
        UploadId=upload_id,
        RetentionPeriod=retention_period
    )

    print("Multi-part upload completed for object {0} in bucket {1}\n".format(object_name, bucket_name))

def upload_file_with_retention(bucket_name, object_name, path_to_file, retention_period):
    print("Uploading file {0} to object {1} in bucket {2}\n".format(path_to_file, object_name, bucket_name))

    args = {
        "RetentionPeriod": retention_period
    }

    cos_client.upload_file(
        Filename=path_to_file,
        Bucket=bucket_name,
        Key=object_name,
        ExtraArgs=args
    )

    print("File upload complete to object {0} in bucket {1}\n".format(object_name, bucket_name))

延长受保护对象的保留期

对象的保留期只能延长。 不能在当前配置值的基础上缩短。

保留时间延长值可通过以下三种方式之一进行设置:

  • 在当前值的基础上增加时间(Additional-Retention-Period 或类似方法)
  • 新的延长时间段(以秒为单位)(Extend-Retention-From-Current-Time 或类似方法)
  • 对象的新保留到期日期(New-Retention-Expiration-Date 或类似方法)

根据 extendRetention 请求中设置的参数,对象元数据中存储的当前保留期可通过给定更多时间延长,也可替换为新值。 在所有情况下,都会根据当前保留期来检查延长保留时间参数,并且仅当更新的保留期大于当前保留期时,才会接受延长参数。

如果覆盖受保护存储区中不再保留的对象(保留期已到期,并且对象没有任何合法保留),那么会再次保留这些对象。 可以在对象覆盖请求中提供新的保留期,否则会为对象提供存储区的缺省保留时间。

def extend_retention_period_on_object(bucket_name, object_name, additional_seconds):
    print("Extend the retention period on {0} in bucket {1} by {2} seconds.\n".format(object_name, bucket_name, additional_seconds))

    cos_client.extend_object_retention(
        Bucket=bucket_ame,
        Key=object_name,
        AdditionalRetentionPeriod=additional_seconds
    )

    print("New retention period on {0} is {1}\n".format(object_name, additional_seconds))

列出受保护对象上的合法保留

此操作会返回以下内容:

  • 对象创建日期
  • 对象保留期(秒)
  • 根据时间段和创建日期计算的保留到期日期
  • 合法保留的列表
  • 合法保留标识
  • 应用合法保留时的时间戳记

如果对象上没有合法保留,那么会返回空的 LegalHoldSet。 如果在对象上未指定保留期,那么会返回 404 错误。

def list_legal_holds_on_object(bucket_name, object_name):
    print("List all legal holds on object {0} in bucket {1}\n".format(object_name, bucket_name));

    response = cos_client.list_legal_holds(
        Bucket=bucket_name,
        Key=object_name
    )

    print("Legal holds on bucket {0}: {1}\n".format(bucket_name, response))

创建托管静态 Web 站点

此操作需要许可权,因为通常仅允许存储区所有者配置存储区以托管静态 Web 站点。 这些参数确定站点访问者的缺省后缀以及可选错误文档。

def putBucketWebsiteConfiguration(bucket_name):
    website_defaults = {
        'ErrorDocument': {'Key': 'error.html'},
        'IndexDocument': {'Suffix': 'index.html'},
    }
    cos_client.put_bucket_website(Bucket=bucket_name, WebsiteConfiguration=website_defaults)
    print("Website configuration set on bucket {0}\n".format(bucket_name))

后续步骤

有关更多信息,可以在 GitHub中找到源代码。