IBM Cloud Docs
Usando Python

Usando Python

O suporte ao Python é fornecido por meio de uma bifurcação da biblioteca boto3 com recursos para aproveitar ao máximo o IBM Cloud® Object Storage.

Ele pode ser instalado por meio do Python Package Index via pip install ibm-cos-sdk.

O código-fonte pode ser encontrado em GitHub.

A biblioteca ibm_boto3 fornece acesso completo à API do IBM Cloud® Object Storage. Os terminais, uma chave de API e o ID da instância devem ser especificados durante a criação de um recurso de serviço ou cliente de baixo nível, conforme mostrado nos exemplos básicos a seguir.

A identificação da instância do serviço também é conhecida como identificação da instância do recurso. O valor pode ser localizado criando uma credencial de serviço ou por meio da CLI.

Documentação detalhada pode ser encontrada aqui.

Criando um cliente e credenciais de fornecimento

Para se conectar ao COS, um cliente é criado e configurado usando informações de credenciais (chave de API e ID da instância de serviço). Esses valores também podem ser originados automaticamente de um arquivo de credenciais ou de variáveis de ambiente.

Depois de gerar uma Credencial de serviço, o documento JSON resultante pode ser salvo em ~/.bluemix/cos_credentials. O SDK originará automaticamente as credenciais desse arquivo, a menos que outras credenciais sejam explicitamente configuradas durante a criação do cliente. Se o arquivo cos_credentials contiver chaves HMAC, o cliente será autenticado com uma assinatura, caso contrário, o cliente usará a chave API fornecida para autenticar usando um token de acesso (usando uma chave API ainda requer que o config=Config(signature_version="oauth") seja incluído durante a criação do cliente).

Se estiver migrando do AWS S3, também será possível originar os dados de credenciais de ~/.aws/credentials no formato:

[default]
aws_access_key_id = {API_KEY}
aws_secret_access_key = {SERVICE_INSTANCE_ID}

Observação: Se ambos ~/.bluemix/cos_credentials e ~/.aws/credentials existirem, cos_credentials tem preferência.

Reunir informações necessárias

As variáveis a seguir aparecem nos exemplos:

  • bucket_name deve ser uma sequência exclusiva e protegida por DNS. Como os nomes dos depósitos são exclusivos em todo o sistema, esses valores precisarão ser mudados se este exemplo for executado múltiplas vezes. Observe que os nomes ficam reservados por 10 a 15 minutos após a exclusão.
  • ibm_api_key_id é o valor localizado na Credencial de serviço como apikey.
  • ibm_service_instance_id é o valor localizado na Credencial de serviço como resource_instance_id.
  • endpoint_url é uma URL de terminal em serviço, inclusive do protocolo https://. Esse valor não é o valor endpoints que está localizado na Credencial de serviço. Para obter mais informações sobre terminais, consulte Terminais e locais de armazenamento.
  • LocationConstraint é um código de fornecimento válido que corresponde ao valor endpoint.

Exemplos de código

Os exemplos de código são testados em versões de liberação suportadas do Python

Em seu código, deve-se remover os colchetes ou quaisquer outros caracteres em excesso que são fornecidos aqui como ilustração.

Inicializando a configuração

Este exemplo cria um objeto resource.. Um recurso fornece uma interface orientada a objetos para o COS Isso permite um nível mais alto de abstração do que as chamadas de baixo nível fornecidas por um objeto de cliente.

Observe que algumas operações (como uma transferência de alta velocidade d Aspera ) exigem um client objeto. Aspera requer uma versão d Python3.6.

Aviso sobre legado: O suporte para Aspera é considerado legado. Em vez disso, use o Aspera Transfer SDK.

import ibm_boto3
from ibm_botocore.client import Config, ClientError

# Constants for IBM COS values
COS_ENDPOINT = "<endpoint>" # Current list avaiable at https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints
COS_API_KEY_ID = "<api-key>" # eg "W00YixxxxxxxxxxMB-odB-2ySfTrFBIQQWanc--P3byk"
COS_INSTANCE_CRN = "<service-instance-id>" # eg "crn:v1:bluemix:public:cloud-object-storage:global:a/3bf0d9003xxxxxxxxxx1c3e97696b71c:d6f04d83-6c4f-4a62-a165-696756d63903::"

# Create resource
cos_resource = ibm_boto3.resource("s3",
    ibm_api_key_id=COS_API_KEY_ID,
    ibm_service_instance_id=COS_INSTANCE_CRN,
    config=Config(signature_version="oauth"),
    endpoint_url=COS_ENDPOINT
)

Um cliente fornece uma interface de baixo nível para a API do COS S3 Isso permite o processamento direto das respostas do site HTTP, em vez de usar métodos e atributos abstratos fornecidos por um recurso para acessar as informações contidas nos cabeçalhos ou nas cargas de resposta XML.


import ibm_boto3
from ibm_botocore.client import Config, ClientError

# Constants for IBM COS values
COS_ENDPOINT = "<endpoint>" # Current list avaiable at https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints
COS_API_KEY_ID = "<api-key>" # eg "W00YixxxxxxxxxxMB-odB-2ySfTrFBIQQWanc--P3byk"
COS_INSTANCE_CRN = "<service-instance-id>" # eg "crn:v1:bluemix:public:cloud-object-storage:global:a/3bf0d9003xxxxxxxxxx1c3e97696b71c:d6f04d83-6c4f-4a62-a165-696756d63903::"

# Create client
cos_client = ibm_boto3.client("s3",
    ibm_api_key_id=COS_API_KEY_ID,
    ibm_service_instance_id=COS_INSTANCE_CRN,
    config=Config(signature_version="oauth"),
    endpoint_url=COS_ENDPOINT
)

Valores da chave

  • <endpoint>- endpoint público para o seu cloud Object Storage com prefixo de esquema (' https:// ') (disponível no painel IBM Cloud ). Para obter mais informações sobre terminais, consulte Terminais e locais de armazenamento.
  • <api-key>- chave API gerada ao criar as credenciais do serviço (é necessário acesso de gravação para os exemplos de criação e exclusão)
  • <service-instance-id>- ID do recurso para o seu Object Storage na nuvem (disponível através da CLI IBM Cloud ou do painel IBM Cloud )
  • <location>- localização padrão para o seu Object Storage na nuvem (deve corresponder à região usada para <endpoint>)

Referências do SDK.

Criando um novo depósito

Os exemplos abaixo usam cliente que é uma interface de baixo nível.

Uma lista de códigos de fornecimento válidos para LocationConstraint pode ser referenciada no guia de Classes de armazenamento.

def create_bucket(bucket_name):
    print("Creating new bucket: {0}".format(bucket_name))
    try:
        cos_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={
                "LocationConstraint":COS_BUCKET_LOCATION
            }
        )
        print("Bucket: {0} created!".format(bucket_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to create bucket: {0}".format(e))

Referências do SDK.

Métodos

Criando um novo arquivo de texto

def create_text_file(bucket_name, item_name, file_text):
    print("Creating new item: {0}".format(item_name))
    try:
        cos_client.put_object(
            Bucket=bucket_name,
            Key=item_name,
            Body=file_text
        )
        print("Item: {0} created!".format(item_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to create text file: {0}".format(e))

Referências do SDK.

Métodos

Listar depósitos disponíveis

def get_buckets():
    print("Retrieving list of buckets")
    try:
        buckets = cos_client.list_buckets()
        for bucket in buckets["Buckets"]:
            print("Bucket Name: {0}".format(bucket["Name"]))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to retrieve list buckets: {0}".format(e))

Referências do SDK.

Métodos

Listar itens em um depósito

def get_bucket_contents(bucket_name):
    print("Retrieving bucket contents from: {0}".format(bucket_name))
    try:
        files = cos_client.list_objects(Bucket=bucket_name)
        for file in files.get("Contents", []):
            print("Item: {0} ({1} bytes).".format(file["Key"], file["Size"]))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to retrieve bucket contents: {0}".format(e))

Referências do SDK.

Métodos

Obter conteúdo do arquivo de um item específico

def get_item(bucket_name, item_name):
    print("Retrieving item from bucket: {0}, key: {1}".format(bucket_name, item_name))
    try:
        file = cos_client.get_object(Bucket=bucket_name, Key=item_name)
        print("File Contents: {0}".format(file["Body"].read()))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to retrieve file contents: {0}".format(e))

Referências do SDK.

Métodos

Excluir um item de um depósito

def delete_item(bucket_name, object_name):
    try:
        cos_client.delete_object(Bucket=bucket_name, Key=object_name)
        print("Item: {0} deleted!\n".format(object_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to delete object: {0}".format(e))

Referências do SDK.

Métodos

Excluir múltiplos itens de um depósito

A solicitação de exclusão pode conter um máximo de 1000 chaves que você deseja excluir. Embora isso seja útil para reduzir o impacto no desempenho por solicitação, tenha cuidado ao excluir muitas chaves. Além disso, leve em conta os tamanhos dos objetos para assegurar o desempenho adequado.

def delete_items(bucket_name):
    try:
        delete_request = {
            "Objects": [
                { "Key": "deletetest/testfile1.txt" },
                { "Key": "deletetest/testfile2.txt" },
                { "Key": "deletetest/testfile3.txt" },
                { "Key": "deletetest/testfile4.txt" },
                { "Key": "deletetest/testfile5.txt" }
            ]
        }

        response = cos_client.delete_objects(
            Bucket=bucket_name,
            Delete=delete_request
        )

        print("Deleted items for {0}\n".format(bucket_name))
        print(json.dumps(response.get("Deleted"), indent=4))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to copy item: {0}".format(e))

Referências do SDK.

Métodos

Excluir um depósito

def delete_bucket(bucket_name):
    print("Deleting bucket: {0}".format(bucket_name))
    try:
        cos_client.delete_bucket(Bucket=bucket_name)
        print("Bucket: {0} deleted!".format(bucket_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to delete bucket: {0}".format(e))

Referências do SDK.

Métodos

Os nomes dos buckets ficam reservados por 10 a 15 minutos após a exclusão.

Execute um upload de múltiplas partes

Fazer upload do arquivo binário (método preferencial)

O upload_fileobj método do objeto S3 executa automaticamente um upload em várias partes quando necessário. A TransferConfig classe é usada para determinar o limite para usar o upload em várias partes.

def multi_part_upload(bucket_name, item_name, file_path):
    try:
        print("Starting file transfer for {0} to bucket: {1}\n".format(item_name, bucket_name))
        # set 5 MB chunks
        part_size = 1024 * 1024 * 5

        # set threadhold to 15 MB
        file_threshold = 1024 * 1024 * 15

        # set the transfer threshold and chunk size
        transfer_config = ibm_boto3.s3.transfer.TransferConfig(
            multipart_threshold=file_threshold,
            multipart_chunksize=part_size
        )

        # the upload_fileobj method will automatically execute a multi-part upload
        # in 5 MB chunks for all files over 15 MB
        with open(file_path, "rb") as file_data:
            cos_client.upload_fileobj(
                Bucket=bucket_name,
                Key=item_name,
                Fileobj=file_data,
                Config=transfer_config
            )

        print("Transfer for {0} Complete!\n".format(item_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to complete multi-part upload: {0}".format(e))

Referências do SDK.

Métodos

Executar manualmente um upload de múltiplas partes

Se desejado, a S3.Client classe pode ser usada para realizar um upload em várias partes. Isso poderá ser útil se mais controle sobre o processo de upload for necessário.

def multi_part_upload_manual(bucket_name, item_name, file_path):
    try:
        # create client object
        cos_client = ibm_boto3.client("s3",
            ibm_api_key_id=COS_API_KEY_ID,
            ibm_service_instance_id=COS_SERVICE_CRN,
            config=Config(signature_version="oauth"),
            endpoint_url=COS_ENDPOINT
        )

        print("Starting multi-part upload for {0} to bucket: {1}\n".format(item_name, bucket_name))

        # initiate the multi-part upload
        mp = cos_client.create_multipart_upload(
            Bucket=bucket_name,
            Key=item_name
        )

        upload_id = mp["UploadId"]

        # min 20MB part size
        part_size = 1024 * 1024 * 20
        file_size = os.stat(file_path).st_size
        part_count = int(math.ceil(file_size / float(part_size)))
        data_packs = []
        position = 0
        part_num = 0

        # begin uploading the parts
        with open(file_path, "rb") as file:
            for i in range(part_count):
                part_num = i + 1
                part_size = min(part_size, (file_size - position))

                print("Uploading to {0} (part {1} of {2})".format(item_name, part_num, part_count))

                file_data = file.read(part_size)

                mp_part = cos_client.upload_part(
                    Bucket=bucket_name,
                    Key=item_name,
                    PartNumber=part_num,
                    Body=file_data,
                    ContentLength=part_size,
                    UploadId=upload_id
                )

                data_packs.append({
                    "ETag":mp_part["ETag"],
                    "PartNumber":part_num
                })

                position += part_size

        # complete upload
        cos_client.complete_multipart_upload(
            Bucket=bucket_name,
            Key=item_name,
            UploadId=upload_id,
            MultipartUpload={
                "Parts": data_packs
            }
        )
        print("Upload for {0} Complete!\n".format(item_name))
    except ClientError as be:
        # abort the upload
        cos_client.abort_multipart_upload(
            Bucket=bucket_name,
            Key=item_name,
            UploadId=upload_id
        )
        print("Multi-part upload aborted for {0}\n".format(item_name))
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to complete multi-part upload: {0}".format(e))

Referências do SDK continuadas

Classes

Métodos

Upload de objeto grande usando o TransferManager

O TransferManager fornece uma outra maneira de executar grandes transferências de arquivos, incorporando automaticamente uploads de múltiplas partes sempre que necessário configurando parâmetros de configuração.

def upload_large_file(bucket_name, item_name, file_path):
    print("Starting large file upload for {0} to bucket: {1}".format(item_name, bucket_name))

    # set the chunk size to 5 MB
    part_size = 1024 * 1024 * 5

    # set threadhold to 5 MB
    file_threshold = 1024 * 1024 * 5

    # Create client connection
    cos_client = ibm_boto3.client("s3",
        ibm_api_key_id=COS_API_KEY_ID,
        ibm_service_instance_id=COS_SERVICE_CRN,
        config=Config(signature_version="oauth"),
        endpoint_url=COS_ENDPOINT
    )

    # set the transfer threshold and chunk size in config settings
    transfer_config = ibm_boto3.s3.transfer.TransferConfig(
        multipart_threshold=file_threshold,
        multipart_chunksize=part_size
    )

    # create transfer manager
    transfer_mgr = ibm_boto3.s3.transfer.TransferManager(cos_client, config=transfer_config)

    try:
        # initiate file upload
        future = transfer_mgr.upload(file_path, bucket_name, item_name)

        # wait for upload to complete
        future.result()

        print ("Large file upload complete!")
    except Exception as e:
        print("Unable to complete large file upload: {0}".format(e))
    finally:
        transfer_mgr.shutdown()

Listar itens em um depósito (v2)

O S3.Client possui um método atualizado para listar o conteúdo (list_objects_v2. Esse método permite limitar o número de registros que são retornados e recuperar os registros em lotes. Isso pode ser útil para paginar seus resultados em um aplicativo e melhorar o desempenho.

def get_bucket_contents_v2(bucket_name, max_keys):
    print("Retrieving bucket contents from: {0}".format(bucket_name))
    try:
        # create client object
        cos_client = ibm_boto3.client("s3",
            ibm_api_key_id=COS_API_KEY_ID,
            ibm_service_instance_id=COS_SERVICE_CRN,
            config=Config(signature_version="oauth"),
            endpoint_url=COS_ENDPOINT)

        more_results = True
        next_token = ""

        while (more_results):
            response = cos_client.list_objects_v2(Bucket=bucket_name, MaxKeys=max_keys, ContinuationToken=next_token)
            files = response["Contents"]
            for file in files:
                print("Item: {0} ({1} bytes).".format(file["Key"], file["Size"]))

            if (response["IsTruncated"]):
                next_token = response["NextContinuationToken"]
                print("...More results in next batch!\n")
            else:
                more_results = False
                next_token = ""

    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to retrieve bucket contents: {0}".format(e))

Referências do SDK.

Métodos

Criação de uma política de backup

# Config values
api_key = "<API_KEY>"
vault_crn = "<SERVICE_INSTANCE_ID>"
source_bucket_name = "<BACKUP_VAULT_NAME>"
policy_name = "<POLICY_NAME>"

# Authenticator and client setup
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# Create policy
create_backup_policy = rc_client.create_backup_policy(
        bucket=source_bucket_name,
        policy_name=policy_name,
        target_backup_vault_crn=vault_crn,
        backup_type="continuous",
        initial_retention={"delete_after_days": 1}
    )

# Print response
print(f" Policy created: { create_backup_policy }")

Listagem de uma política de backup

# Config values
api_key = "<API_KEY>"
source_bucket_name = "<BACKUP_VAULT_NAME>"

# Authenticator and client setup
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# List all backup policies
list_response = rc_client.list_backup_policies(bucket=source_bucket_name)

print("\nList of backup policies:")
for policy in list_response.result.get("backup_policies", []):
    print(policy)

Tenha uma política de backup

# Config
api_key = "<API_KEY>"
source_bucket_name = "<SOURCE_BUCKET_NAME>"
backup_vault_crn = "<BACKUP_VAULT_CRN>"
policy_name = "<POLICY_NAME>"

# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# Create backup policy
create_backup_policy_response = rc_client.create_backup_policy(
    bucket=source_bucket_name,
    policy_name=policy_name,
    target_backup_vault_crn=backup_vault_crn,
    backup_type="continuous",
    initial_retention={"delete_after_days": 1}
)

# Extract policy ID
policy_id = create_backup_policy_response.result.get("policy_id")

get_backup_policy_response = rc_client.get_backup_policy(
    bucket=source_bucket_name,
    policy_id=policy_id
)

print("\nFetched Backup Policy Details:")
print(get_backup_policy_response.result)

Excluir uma política de backup

# Config
api_key = "<API_KEY>"
source_bucket_name = "<SOURCE_BUCKET_NAME>"
policy_id = "<POLICY_ID>"

# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# Delete the backup policy
delete_backup_policy_response = rc_client.delete_backup_policy(
    bucket=source_bucket_name,
    policy_id=policy_id
)

print(f"Backup policy '{policy_id}' deleted successfully.")

Criação de um cofre de backup

# Config
api_key = "<API_KEY>"
service_instance_id = "<SERVICE_INSTANCE_ID>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
region = "<REGION>"

# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# Create a backup vault
create_backup_vault_response = rc_client.create_backup_vault(
    service_instance_id=service_instance_id,
    backup_vault_name=backup_vault_name,
    region=region
)

# Output result
print("Backup vault created:")
print(create_backup_vault_response.result)

Listagem de cofres de backup

# Config
api_key = "<API_KEY>"
service_instance_id = "<SERVICE_INSTANCE_ID>"

# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# List backup vaults
list_backup_vaults_response = rc_client.list_backup_vaults(
    service_instance_id=service_instance_id
)

print("List of backup vaults:")
print(list_backup_vaults_response.result)

Obter cofres de backup

# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"

# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# Get backup vault
get_backup_vault = rc_client.get_backup_vault(
    backup_vault_name=backup_vault_name
)

# Output result
print("Backup vault details:")
print(get_backup_vault.result)

Atualizar cofres de backup

# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"

# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# Update backup vault settings (disable activity tracking and metrics monitoring)
backup_vault_patch = {
    "activity_tracking": {"management_events": True},
    "metrics_monitoring": {"usage_metrics_enabled": True},
}

update_backup_vault_response = rc_client.update_backup_vault(
    backup_vault_name=backup_vault_name,
    backup_vault_patch=backup_vault_patch
)

# Output result
print("Backup vault updated successfully.")
print(update_backup_vault_response)

Excluir um cofre de backup

# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"

# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# Delete the backup vault
delete_vault_response = rc_client.delete_backup_vault(
    backup_vault_name=backup_vault_name
)

# Output result
print(f"Successfully deleted backup vault '{delete_vault_response}'.")

Faixas de recuperação de listagem

# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"

# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# List recovery ranges
recovery_ranges_response = rc_client.list_recovery_ranges(
    backup_vault_name=backup_vault_name
)

# Output recovery range results
print("Recovery Ranges:")
print(recovery_ranges_response.result)

Obter faixa de recuperação

# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
recovery_range_id = "<RECOVERY_RANGE_ID>"

# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

get_recovery_range_response = rc_client.get_source_resource_recovery_range(
    backup_vault_name=backup_vault_name,
    recovery_range_id=recovery_range_id
)
print("Recovery Range Details:")
print(get_recovery_range_response.result)

Atualizar faixa de recuperação

# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
recovery_range_id = "<RECOVERY_RANGE_ID>"

# Setup authenticator and client
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

recovery_range_patch_model = {}
recovery_range_patch_model['retention'] = {"delete_after_days": 99}

patch_response = rc_client.patch_source_resource_recovery_range(
    backup_vault_name=backup_vault_name,
    recovery_range_id=recovery_range_id,
    recovery_range_patch=recovery_range_patch_model
)
print("Patch Response Details:")
print(patch_response)

Iniciando uma restauração

# Configuration
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
target_bucket_crn = "<TARGET_BUCKET_CRN>"
recovery_range_id = "<RECOVERY_RANGE_ID>"
restore_point_in_time = "<RESTORE_POINT_IN_TIME>"

# Setup authenticator and clients
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# Initiate restore
create_restore = rc_client.create_restore(
    backup_vault_name=backup_vault_name,
    recovery_range_id=recovery_range_id,
    restore_type="in_place",
    restore_point_in_time=restore_point_in_time,
    target_resource_crn=target_bucket_crn
)
print(f"Restore initiated : {create_restore}")

Restauração de listagem

# Config
api_key = "<API_KEY>"
backup_vault_name = "<BACKUP_VAULT_NAME>"

# Setup authenticator and clients
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# List restore operations
get_store = rc_client.get_restore(
    backup_vault_name=backup_vault_name)

print("Restore response:")
print(get_store.result)

Obter detalhes da restauração

# Config
api_key = "<API_KEY>"
source_bucket_name = "<SOURCE_BUCKET_NAME>"
backup_vault_crn = "<BACKUP_VAULT_CRN>"
backup_vault_name = "<BACKUP_VAULT_NAME>"
target_bucket_crn = "<TARGET_BUCKET_CRN>"
recovery_range_id = "<RECOVERY_RANGE_ID>"
restore_point_in_time = "<RESTORE_POINT_IN_TIME>"

# Setup authenticator and clients
authenticator = IAMAuthenticator(apikey=api_key)
rc_client = ResourceConfigurationV1(authenticator=authenticator)

# Create restore
create_restore = rc_client.create_restore(
    backup_vault_name=backup_vault_name,
    recovery_range_id=recovery_range_id,
    restore_type="in_place",
    restore_point_in_time=restore_point_in_time,
    target_resource_crn=target_bucket_crn
)

restore_id = create_restore.result["restore_id"]

# List restore operations
get_store = rc_client.get_restore(
    backup_vault_name=backup_vault_name, restore_id=restore_id)

print("Restore response:")
print(get_store.result)

Crie um novo bucket COS com bloqueio de objeto ativado

 def create_bucket_with_objectlock(bucket_name):
         cos_cli.create_bucket(
             Bucket=bucket_name,
             ObjectLockEnabledForBucket=True,
         )
         print("Bucket: {0} created with objectlock enabled".format(bucket_name))

Coloque a configuração de bloqueio de objeto com modo de conformidade no bucket COS

def objectlock_configuration_on_bucket(bucket_name):

    # Putting default retenion on the COS bucket.
    default_retention_rule = {'DefaultRetention': {'Mode': 'COMPLIANCE', 'Years': 1}}
    object_lock_config = {'ObjectLockEnabled': 'Enabled', 'Rule': default_retention_rule}
    cos_cli.put_object_lock_configuration(Bucket=bucket_name, ObjectLockConfiguration=object_lock_config)

Coloque a configuração de bloqueio de objeto com modo de governança no bucket COS

def objectlock_configuration_with_governance_mode_on_bucket(bucket_name):

    # Putting default retenion on the COS bucket with governance mode.
    default_retention_rule = {'DefaultRetention': {'Mode': 'GOVERNANCE', 'Years': 1}}
    object_lock_config = {'ObjectLockEnabled': 'Enabled', 'Rule': default_retention_rule}
    cos_cli.put_object_lock_configuration(Bucket=bucket_name, ObjectLockConfiguration=object_lock_config)

Obter configuração de bloqueio de objeto no bucket COS

def objectlock_configuration_with_governance_mode_on_bucket(bucket_name):

    # Reading the objectlock configuration set on the bucket.
    response = cos_cli.get_object_lock_configuration(Bucket=bucket_name)
    print("Objectlock Configuration for {0} =>".format(bucket_name))
    print(response.ObjectLockConfiguration)

Carregar um objeto com modo de governança para o bucket COS

def upload_object_with_governance_mode(bucket_name,object_name,object_content):
        cos_cli.put_object(
            Bucket=bucket_name,
            Key=object_name,
            Body=object_content,
            ObjectLockMode='GOVERNANCE',
            ObjectLockRetainUntilDate=datetime(2025, 11, 15)
        )
        print("Object: {0} uploaded!".format(object_name))

Ative a retenção de bloqueio de objeto com o modo de conformidade no objeto

def objectlock_retention(bucket_name,object_name):
        # Put objectlock retenion on the  object uploaded to the bucket.
        date = datetime.now()+timedelta(seconds=5)
        retention_rule = {'Mode': 'COMPLIANCE', 'RetainUntilDate': date}
        cos_cli.put_object_retention(Bucket=bucket_name, Key=object_name, Retention=retention_rule)

Ative a retenção de bloqueio de objeto com o modo de governança no objeto

def objectlock_retention_with_governance_mode(bucket_name,object_name):
        # Put objectlock retenion with governance mode on the  object uploaded to the bucket.
        date = datetime.now()+timedelta(seconds=5)
        retention_rule = {'Mode': 'GOVERNANCE', 'RetainUntilDate': date}
        cos_cli.put_object_retention(Bucket=bucket_name, Key=object_name, Retention=retention_rule)

Obter retenção de bloqueio de objeto

def objectlock_retention_with_governance_mode(bucket_name,object_name):
        # Get objectlock retention of the above object.
        response = cos_cli.get_object_retention(Bucket=bucket_name, Key=object_name)
        print("Objectlock Retention for {0}=>".format(object_name))
        print(response.Retention)

Excluindo um objeto com o modo de governança de bloqueio de objetos usando a governança de bypass

def delete_object_with_bypass_governance(bucket_name,object_name):
        # Deleting an object with retention using bypass governance
        cos_cli.delete_object(Bucket=bucket_name, Key=object_name, BypassGovernanceRetention=True)

Usando o Key Protect

O Key Protect pode ser incluído em um depósito de armazenamento para criptografar dados sensíveis em repouso na nuvem.

Antes de iniciar

Os itens a seguir são necessários para criar um depósito com o Key-Protect ativado:

Recuperando o CRN da chave raiz

  1. Recupere o ID da instância para seu serviço Key Protect
  2. Use a API do Key Protect para recuperar todas as suas chaves disponíveis
  3. Recupere o CRN da chave raiz que você usa para ativar o Key Protect em seu depósito. O CRN é semelhante ao abaixo:

crn:v1:bluemix:public:kms:us-south:a/3d624cd74a0dea86ed8efe3101341742:90b6a1db-0fe1-4fe9-b91e-962c327df531:key:0bg3e33e-a866-50f2-b715-5cba2bc93234

Criando um depósito com o Key-Protect ativado

COS_KP_ALGORITHM = "<algorithm>"
COS_KP_ROOTKEY_CRN = "<root-key-crn>"

# Create a new bucket with key protect (encryption)
def create_bucket_kp(bucket_name):
    print("Creating new encrypted bucket: {0}".format(bucket_name))
    try:
        cos_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={
                "LocationConstraint":COS_BUCKET_LOCATION
            },
            IBMSSEKPEncryptionAlgorithm=COS_KP_ALGORITHM,
            IBMSSEKPCustomerRootKeyCrn=COS_KP_ROOTKEY_CRN
        )
        print("Encrypted Bucket: {0} created!".format(bucket_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to create encrypted bucket: {0}".format(e))

Valores da chave

  • <algorithm>- O algoritmo de criptografia usado para novos objetos adicionados ao bucket (o padrão é AES256 ).
  • <root-key-crn>- CRN da chave raiz obtida do serviço Key Protect.

Referências do SDK.

Métodos

Usando o Aspera High-Speed Transfer

Aviso sobre legado: O suporte para Aspera é considerado legado. É recomendado que os usuários usem o Aspera Transfer SDK[https://developer.ibm.com/apis/catalog/aspera--aspera-transfer-sdk/API Reference].

Aviso sobre legado: O suporte para Aspera é considerado legado. Recomenda-se que os usuários usem o Aspera Transfer SDK

Ao instalar a biblioteca do Aspera high-speed transfer, é possível usar as transferências de arquivos em alta velocidade em seu aplicativo. A biblioteca Aspera é de origem fechada e, portanto, uma dependência opcional para o SDK do COS (que usa uma licença do Apache).

Cada sessão do Aspera cria um processo ascp individual que é executado na máquina do cliente para executar a transferência. Assegure-se de que seu ambiente de computação possa permitir que esse processo seja executado.

Inicializando o AsperaTransferManager

Antes de inicializar o AsperaTransferManager, certifique-se de que você tem um objeto (não um resource ou client session) em funcionamento.

import ibm_boto3
from ibm_botocore.client import Config
from ibm_s3transfer.aspera.manager import AsperaTransferManager

COS_ENDPOINT = "<endpoint>" # Current list avaiable at https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints
COS_API_KEY_ID = "<api-key>"
COS_RESOURCE_CRN = "<resource-instance-id>"
COS_BUCKET_LOCATION = "<location>"

# Create resource
cos_client = ibm_boto3.client("s3",
    ibm_api_key_id=COS_API_KEY_ID,
    ibm_service_instance_id=COS_RESOURCE_CRN,
    config=Config(signature_version="oauth"),
    endpoint_url=COS_ENDPOINT
)

transfer_manager = AsperaTransferManager(cos)

É necessário fornecer uma Chave de API do IAM para os Aspera high-speed transfers. Credenciais de HMAC NÃO são suportadas atualmente Para obter mais informações sobre o IAM, clique aqui.

Para obter o rendimento mais alto, divida a transferência em um número especificado de sessões paralelas que enviam chunks de dados cujo tamanho é definido por um valor do limite.

A configuração típica para usar múltiplas sessões deve ser:

  • Taxa de destino de 2500 MBps
  • Limite de 100 MB (esse é o valor recomendado para a maioria dos aplicativos)
ms_transfer_config = AsperaConfig(multi_session="all",
                                  target_rate_mbps=2500,
                                  multi_session_threshold_mb=100)

No exemplo acima, o sdk gera sessões suficientes para tentar atingir a taxa de destino de 2500 MBps.

O gerenciamento de sessões também pode ser configurado explicitamente no SDK. Isso é útil em casos em que se deseja um controle mais preciso sobre a utilização de rede.

A configuração típica para usar múltiplas sessões explícitas deve ser:

  • 2 ou 10 sessões
  • Limite de 100 MB (esse é o valor recomendado para a maioria dos aplicativos)
from ibm_s3transfer.aspera.manager import AsperaConfig
# Configure 2 sessions for transfer
ms_transfer_config = AsperaConfig(multi_session=2,
                                  multi_session_threshold_mb=100)

# Create the Aspera Transfer Manager
transfer_manager = AsperaTransferManager(client=client,
                                         transfer_config=ms_transfer_config)

Para obter o melhor desempenho na maioria dos cenários, sempre utilize várias sessões para minimizar qualquer processamento associado à instanciação de uma transferência de alta velocidade Aspera. Se a sua capacidade de rede for pelo menos 1 Gbps, será necessário usar 10 sessões. As redes de largura da banda inferior devem usar duas sessões.

Upload de arquivo

bucket_name = "<bucket-name>"
upload_filename = "<absolute-path-to-file>"
object_name = "<item-name>"

# Create Transfer manager
with AsperaTransferManager(client) as transfer_manager:

    # Perform upload
    future = transfer_manager.upload(upload_filename, bucket_name, object_name)

    # Wait for upload to complete
    future.result()

Valores da chave

  • <bucket-name>- nome do bucket de destino
  • <absolute-path-to-file>- caminho do diretório e nome do arquivo a ser carregado
  • <item-name>- nome do novo arquivo adicionado ao bucket

Download de arquivo

bucket_name = "<bucket-name>"
download_filename = "<absolute-path-to-file>"
object_name = "<object-to-download>"

# Create Transfer manager
with AsperaTransferManager(client) as transfer_manager:

    # Get object with Aspera
    future = transfer_manager.download(bucket_name, object_name, download_filename)

    # Wait for download to complete
    future.result()

Valores da chave

  • <bucket-name>- nome do bucket na sua instância do serviço Object Storage que tem o recurso Aspera habilitado.
  • <absolute-path-to-file>- diretório e nome do arquivo onde salvar o arquivo no sistema local.
  • <object-to-download>- nome do arquivo no bucket a ser baixado.

Upload de diretório

bucket_name = "<bucket-name>"
# THIS DIRECTORY MUST EXIST LOCALLY, and have objects in it.
local_upload_directory = "<absolute-path-to-directory>"
# THIS SHOULD NOT HAVE A LEADING "/"
remote_directory = "<object prefix>"

# Create Transfer manager
with AsperaTransferManager(client) as transfer_manager:

    # Perform upload
    future = transfer_manager.upload_directory(local_upload_directory, bucket_name, remote_directory)

    # Wait for upload to complete
    future.result()

Valores da chave

  • <bucket-name>- nome do bucket na sua instância do serviço Object Storage que tem o recurso Aspera habilitado
  • <absolute-path-to-directory>- diretório local que contém os arquivos a serem carregados. Deve ter / inicial e final (ou seja, /Users/testuser/Documents/Upload/)
  • <object prefix>- nome do diretório no bucket para armazenar os arquivos. Não deve ter uma barra inicial / (isto é, newuploads/)

Download de diretório

bucket_name = "<bucket-name>"
# THIS DIRECTORY MUST EXIST LOCALLY
local_download_directory = "<absolute-path-to-directory>"
remote_directory = "<object prefix>"

# Create Transfer manager
with AsperaTransferManager(client) as transfer_manager:

    # Get object with Aspera
    future = transfer_manager.download_directory(bucket_name, remote_directory, local_download_directory)

    # Wait for download to complete
    future.result()

Valores da chave

  • <bucket-name>- nome do bucket na sua instância do serviço Object Storage que tem o recurso Aspera habilitado
  • <absolute-path-to-directory>- diretório local para salvar os arquivos baixados. Deve ter barra inicial / e final (ou seja, /Users/testuser/Downloads/)
  • <object prefix>- nome do diretório no bucket para armazenar os arquivos. Não deve ter uma barra inicial / (isto é, todownload/)

Usando assinantes

Os assinantes fornecem a observabilidade em transferências, anexando métodos de retorno de chamada customizados. Todas as transferências executam a transição entre as fases a seguir:

Queued - In Progress - Done

Há três assinantes disponíveis para cada fase:

  • CallbackOnQueued() - chamado quando uma nova transferência tiver sido incluída no AsperaTransferManager
  • CallbackOnProgress() - chamado quando uma transferência tiver transmitido dados (disparados repetidamente enquanto a transferência está em andamento).
  • CallbackOnDone() - chamado quando a transferência é concluída
bucket_name = "<bucket-name>"
local_download_directory = "<absolute-path-to-directory>"
remote_directory = "<object prefix>"

# Subscriber callbacks
class CallbackOnQueued(AsperaBaseSubscriber):
    def __init__(self):
        pass

    def on_queued(self, future, **kwargs):
        print("Directory download queued.")

class CallbackOnProgress(AsperaBaseSubscriber):
    def __init__(self):
        pass

    def on_progress(self, future, bytes_transferred, **kwargs):
        print("Directory download in progress: %s bytes transferred" % bytes_transferred)

class CallbackOnDone(AsperaBaseSubscriber):
    def __init__(self):
        pass

    def on_done(self, future, **kwargs):
        print("Downloads complete!")

# Create Transfer manager
transfer_manager = AsperaTransferManager(client)

# Attach subscribers
subscribers = [CallbackOnQueued(), CallbackOnProgress(), CallbackOnDone()]

# Get object with Aspera
future = transfer_manager.download_directory(bucket_name, remote_directory, local_download_directory, None, subscribers)

# Wait for download to complete
future.result()

Valores da chave

  • <bucket-name>- nome do bucket na sua instância do serviço Object Storage que tem o recurso Aspera habilitado
  • <absolute-path-to-directory>- diretório local para salvar os arquivos baixados. Deve ter barra inicial e final / (isto é, /Users/testuser/Downloads/)
  • <object prefix>- nome do diretório no bucket para armazenar os arquivos. Não deve ter uma barra inicial / (isto é, todownload/)

O código de amostra acima produz a saída a seguir:

Directory download queued.
Directory download in progress: 5632 bytes transferred
Directory download in progress: 1047552 bytes transferred
...
Directory download in progress: 53295130 bytes transferred
Directory download in progress: 62106855 bytes transferred
Download complete!

Pausar/Continuar/Cancelar

O SDK fornece a capacidade de gerenciar o progresso de transferências de arquivo/diretório por meio dos métodos a seguir do objeto AsperaTransferFuture:

  • pause()
  • resume()
  • cancel()

Não há efeitos colaterais ao chamar um dos métodos descritos acima. A limpeza e a manutenção adequadas são manipuladas pelo SDK.

# Create Transfer manager
bucket_name = "<bucket-name>"
local_download_directory = "<absolute-path-to-directory>"
remote_directory = "<object prefix>"

with AsperaTransferManager(client) as transfer_manager:

    # download a directory with Aspera
    future = transfer_manager.download_directory(bucket_name, remote_directory, local_download_directory, None, None)

    # pause the transfer
    future.pause()

    # resume the transfer
    future.resume()

    # cancel the transfer
    future.cancel()

Resolução de problemas do Aspera

Problema: Desenvolvedores usando qualquer versão do Python além do 3.6 podem ter falhas ao instalar ou usar o Aspera SDK.

Causa: Se houver diferentes versões do Python instaladas em seu ambiente, você poderá encontrar falhas de instalação ao tentar instalar o Aspera SDK. Isso pode ser causado por arquivos DLL ausentes ou uma DLL errada no caminho.

Solução: a primeira etapa para resolver esse problema seria reinstalar as bibliotecas do Aspera. Pode ter havido uma falha durante a instalação. Como resultado, isso pode ter afetado os arquivos DLL. Se isso não resolver os problemas, será solicitado que você atualize sua versão do Python. Se não conseguir fazer isso, você pode usar a instalação Intel® Distribution para Python *. Isso deve permitir a instalação do Aspera SDK no Python 3.6.x sem problemas.

Atualizando metadados

Há duas maneiras de atualizar os metadados em um objeto existente:

  • Uma solicitação PUT com os novos metadados e o conteúdo do objeto original
  • Executando uma solicitação COPY com os novos metadados especificando o objeto original como a origem da cópia

Usando PUT para atualizar metadados

Observação: a PUT solicitação substitui o conteúdo existente do objeto, portanto, ele deve primeiro ser baixado e reenviado com os novos metadados.

def update_metadata_put(bucket_name, item_name, key, value):
    try:
        # retrieve the existing item to reload the contents
        response = cos_client.get_object(Bucket=bucket_name, Key=item_name)
        existing_body = response.get("Body").read()

        # set the new metadata
        new_metadata = {
            key: value
        }

        cos_client.put_object(Bucket=bucket_name, Key=item_name, Body=existing_body, Metadata=new_metadata)

        print("Metadata update (PUT) for {0} Complete!\n".format(item_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        log_error("Unable to update metadata: {0}".format(e))

Usando COPY para atualizar metadados

def update_metadata_copy(bucket_name, item_name, key, value):
    try:
        # set the new metadata
        new_metadata = {
            key: value
        }

        # set the copy source to itself
        copy_source = {
            "Bucket": bucket_name,
            "Key": item_name
        }

        cos_client.copy_object(Bucket=bucket_name, Key=item_name, CopySource=copy_source, Metadata=new_metadata, MetadataDirective="REPLACE")

        print("Metadata update (COPY) for {0} Complete!\n".format(item_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        log_error("Unable to update metadata: {0}".format(e))

Usando o Immutable Object Storage

Incluir uma configuração de proteção em um depósito existente

Os objetos gravados em um depósito protegido não podem ser excluídos até que o período de proteção tenha expirado e todas as retenções legais no objeto sejam removidas. O valor de retenção padrão do depósito é fornecido para um objeto, a menos que um valor específico do objeto seja fornecido quando o objeto é criado. Os objetos em depósitos protegidos que não estão mais sob retenção (o período de retenção expirou e o objeto não tem nenhuma retenção legal), quando sobrescritos, ficarão novamente sob retenção. O novo período de retenção pode ser fornecido como parte da solicitação de sobrescrição do objeto ou o tempo de retenção padrão do depósito será fornecido para o objeto.

Os valores mínimo e máximo suportados para as configurações do período MinimumRetention de retenção, DefaultRetention, e MaximumRetention são um mínimo de 0 dias e um máximo de 365243 dias (1000 anos).

def add_protection_configuration_to_bucket(bucket_name):
    try:
        new_protection_config = {
            "Status": "Retention",
            "MinimumRetention": {"Days": 10},
            "DefaultRetention": {"Days": 100},
            "MaximumRetention": {"Days": 1000}
        }

        cos_client.put_bucket_protection_configuration(Bucket=bucket_name, ProtectionConfiguration=new_protection_config)

        print("Protection added to bucket {0}\n".format(bucket_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to set bucket protection config: {0}".format(e))

Verificar a proteção em um depósito

def get_protection_configuration_on_bucket(bucket_name):
    try:
        response = cos_client.get_bucket_protection_configuration(Bucket=bucket_name)
        protection_config = response.get("ProtectionConfiguration")

        print("Bucket protection config for {0}\n".format(bucket_name))
        print(protection_config)
        print("\n")
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to get bucket protection config: {0}".format(e))

Fazer upload de um objeto protegido

Os objetos em depósitos protegidos que não estão mais sob retenção (o período de retenção expirou e o objeto não tem nenhuma retenção legal), quando sobrescritos, ficarão novamente sob retenção. O novo período de retenção pode ser fornecido como parte da solicitação de sobrescrição do objeto ou o tempo de retenção padrão do depósito será fornecido para o objeto.

Valor Tipo Descrição
Retention-Period Número inteiro não negativo (segundos) O período de retenção para armazenar o objeto em segundos. O objeto não pode ser sobrescrito nem excluído até que o período de tempo especificado no período de retenção tenha decorrido. Se esse campo e Retention-Expiration-Date forem especificados, um erro 400 será retornado. Se nenhum for especificado, o período DefaultRetention do depósito será usado. Zero (0) é um valor legal que supõe que o período mínimo de retenção do depósito também é 0.
Retention-expiration-date Data (formato ISO 8601) A data na qual será legal excluir ou modificar o objeto. É possível especificar somente isso ou o cabeçalho Retention-Period. Se ambos forem especificados, um erro 400 será retornado. Se nenhum for especificado, o período DefaultRetention do depósito será usado.
Retention-legal-hold-id string Uma única retenção legal para aplicar ao objeto. Uma retenção legal é uma sequência longa de caracteres Y. O objeto não pode ser sobrescrito nem excluído até que todas as retenções legais associadas ao objeto sejam removidas.
def put_object_add_legal_hold(bucket_name, object_name, file_text, legal_hold_id):
    print("Add legal hold {0} to {1} in bucket {2} with a putObject operation.\n".format(legal_hold_id, object_name, bucket_name))
    cos_client.put_object(
        Bucket=bucket_name,
        Key=object_name,
        Body=file_text,
        RetentionLegalHoldId=legal_hold_id)
    print("Legal hold {0} added to object {1} in bucket {2}\n".format(legal_hold_id, object_name, bucket_name))

def copy_protected_object(source_bucket_name, source_object_name, destination_bucket_name, new_object_name):
    print("Copy protected object {0} from bucket {1} to {2}/{3}.\n".format(source_object_name, source_bucket_name, destination_bucket_name, new_object_name))

    copy_source = {
        "Bucket": source_bucket_name,
        "Key": source_object_name
    }

    cos_client.copy_object(
        Bucket=destination_bucket_name,
        Key=new_object_name,
        CopySource=copy_source,
        RetentionDirective="Copy"
    )

    print("Protected object copied from {0}/{1} to {2}/{3}\n".format(source_bucket_name, source_object_name, destination_bucket_name, new_object_name));

def complete_multipart_upload_with_retention(bucket_name, object_name, upload_id, retention_period):
    print("Completing multi-part upload for object {0} in bucket {1}\n".format(object_name, bucket_name))
    cos_client.complete_multipart_upload(
        Bucket=bucket_name,
        Key=object_name,
        MultipartUpload={
            "Parts":[{
                "ETag": part["ETag"],
                "PartNumber": 1
            }]
        },
        UploadId=upload_id,
        RetentionPeriod=retention_period
    )

    print("Multi-part upload completed for object {0} in bucket {1}\n".format(object_name, bucket_name))

def upload_file_with_retention(bucket_name, object_name, path_to_file, retention_period):
    print("Uploading file {0} to object {1} in bucket {2}\n".format(path_to_file, object_name, bucket_name))

    args = {
        "RetentionPeriod": retention_period
    }

    cos_client.upload_file(
        Filename=path_to_file,
        Bucket=bucket_name,
        Key=object_name,
        ExtraArgs=args
    )

    print("File upload complete to object {0} in bucket {1}\n".format(object_name, bucket_name))

Ampliar o período de retenção de um objeto protegido

O período de retenção de um objeto pode somente ser ampliado. Ele não pode ser diminuído do valor configurado atualmente.

O valor de expansão de retenção é configurado de uma de três maneiras:

  • tempo adicional do valor atual (Additional-Retention-Period ou método semelhante)
  • novo período de extensão em segundos (Extend-Retention-From-Current-Time ou método semelhante)
  • nova data de validade de retenção do objeto (New-Retention-Expiration-Date ou método semelhante)

O período de retenção atual armazenado nos metadados do objeto é aumentado pelo tempo adicional determinado ou substituído pelo novo valor, dependendo do parâmetro configurado na solicitação extendRetention. Em todos os casos, o parâmetro de ampliação de retenção é verificado com relação ao período de retenção atual e o parâmetro ampliado será aceito somente se o período de retenção atualizado for maior que o período de retenção atual.

Os objetos em depósitos protegidos que não estão mais sob retenção (o período de retenção expirou e o objeto não tem nenhuma retenção legal), quando sobrescritos, ficarão novamente sob retenção. O novo período de retenção pode ser fornecido como parte da solicitação de sobrescrição do objeto ou o tempo de retenção padrão do depósito será fornecido para o objeto.

def extend_retention_period_on_object(bucket_name, object_name, additional_seconds):
    print("Extend the retention period on {0} in bucket {1} by {2} seconds.\n".format(object_name, bucket_name, additional_seconds))

    cos_client.extend_object_retention(
        Bucket=bucket_ame,
        Key=object_name,
        AdditionalRetentionPeriod=additional_seconds
    )

    print("New retention period on {0} is {1}\n".format(object_name, additional_seconds))

Listar retenções legais em um objeto protegido

Essa operação retorna:

  • Data de criação do objeto
  • Período de retenção do objeto em segundos
  • Data de expiração de retenção calculada com base no período e data de criação
  • Lista de retenções legais
  • Identificador de retenção legal
  • Registro de data e hora em que a retenção legal foi aplicada

Se não houver retenções legais no objeto, um LegalHoldSet vazio será retornado. Se não houver nenhum período de retenção especificado no objeto, um erro 404 será retornado.

def list_legal_holds_on_object(bucket_name, object_name):
    print("List all legal holds on object {0} in bucket {1}\n".format(object_name, bucket_name));

    response = cos_client.list_legal_holds(
        Bucket=bucket_name,
        Key=object_name
    )

    print("Legal holds on bucket {0}: {1}\n".format(bucket_name, response))

Criar um website estático hospedado

Essa operação requer permissões, já que apenas o proprietário do depósito geralmente tem permissão para configurar um depósito para hospedar um website estático Os parâmetros determinam o sufixo padrão para os visitantes do site, bem como um documento de erro opcional

def putBucketWebsiteConfiguration(bucket_name):
    website_defaults = {
        'ErrorDocument': {'Key': 'error.html'},
        'IndexDocument': {'Suffix': 'index.html'},
    }
    cos_client.put_bucket_website(Bucket=bucket_name, WebsiteConfiguration=website_defaults)
    print("Website configuration set on bucket {0}\n".format(bucket_name))

Próximas etapas

Para obter mais informações, o código fonte pode ser localizado em GitHub.