Utilizzo del registro dello schema Event Streams

Schema Registry fornisce un repository centralizzato per la gestione e la convalida degli schemi. Gli schemi in un registro dello schema forniscono il contratto esplicito che un programma che genera un evento fornisce ad altri programmi che utilizzano tali eventi.

Panoramica degli schemi

Apache Kafka può gestire qualsiasi dato, ma non convalida le informazioni nei messaggi. Tuttavia, una gestione efficiente dei dati spesso richiede che includa informazioni specifiche in una determinata forma. Utilizzando schemi, è possibile definire la struttura dei dati in un messaggio, garantendo che sia i producer che i consumer utilizzino la struttura corretta.

Gli schemi consentono ai produttori di creare dati conformi a una struttura predefinita, definendo i campi che devono essere presenti insieme al tipo di ciascun campo. Questa definizione consente ai consumatori di analizzare tali dati e di interpretarli correttamente. Event Streams Il piano Enterprise supporta schemi e include un Registro schemi per utilizzare e gestire schemi.

È comune per tutti i messaggi di un argomento utilizzare lo stesso schema. La chiave ed il valore di un messaggio possono essere descritti da uno schema.

Diagramma della panoramica degli schemi.
Panoramica degli schemi

Registro schema

Gli schemi sono memorizzati in Event Streams Schema Registry. Oltre a memorizzare una cronologia delle versioni degli schemi, fornisce un'interfaccia per richiamarli. Ogni istanza del piano Enterprise Event Streams ha il proprio registro dello schema. È possibile memorizzare un massimo di 1000 schemi in un'istanza Enterprise.

Produttori e consumatori convalidano i dati rispetto allo schema specificato memorizzato nel registro dello schema (oltre a passare attraverso i broker Kafka ). Non è necessario trasferire gli schemi nei messaggi in questo modo, il che significa che i messaggi possono essere più piccoli.

Diagramma dell'architettura del registro di schema.
Architettura registro schema

Apache Avro formato dati

Gli schemi vengono definiti utilizzando Apache Avro, una tecnologia di serializzazione dei dati open source comunemente utilizzata con Apache Kafka. Fornisce un formato efficiente per la codifica dei dati, sia utilizzando il formato binario compatto, sia un formato JSON più verboso, ma leggibile dall'uomo.

Il Registro Schemi dell' Event Streams e utilizza formati di dati dell' Apache Avro. Quando i messaggi vengono inviati in formato Avro, contengono i dati e l'identificativo univoco per lo schema utilizzato. L'identificativo specifica quale schema nel registro deve essere utilizzato per il messaggio.

Avro supporta una vasta gamma di tipi di dati, inclusi i tipi primitivi (null, booleano, int, long, float, double, bytes e string) e i tipi complessi (record, enum, array, map, union e fixed).

Diagramma formato Avro.
Formato messaggio Avro

Serializzazione e deserializzazione

Un'applicazione di produzione utilizza un serializzatore per produrre messaggi conformi a uno schema specifico. Come accennato in precedenza, il messaggio contiene i dati in formato Avro, insieme all'identificatore dello schema.

Un'applicazione di consumo utilizza quindi un deserializer per utilizzare i messaggi serializzati utilizzando lo stesso schema. Quando un utente legge un messaggio inviato in formato Avro, il deserializer trova l'identificativo dello schema nel messaggio e richiama lo schema dal Registro schema per deserializzare i dati.

Questo processo fornisce un modo efficiente per garantire che i dati nei messaggi siano conformi alla struttura richiesta.

Il registro dello schema Event Streams supporta il serializzatore e il deserializzatore Kafka AVRO.

Diagramma di serializzazione e deserializzazione.
Serializer e deserializer

Diagramma di compatibilità e versioni.
Compatibilità e versioni

Versioni e compatibilità

Ogni volta che aggiungi uno schema e qualsiasi versione successiva dello stesso schema, Event Streams può convalidare automaticamente il formato e rifiutare lo schema se si verificano problemi. È possibile evolvere gli schemi nel tempo per soddisfare i requisiti che cambiano. Creare una nuova versione di uno schema esistente e il registro dello schema garantisce che la nuova versione sia compatibile con la nuova versione, il che significa che i produttori e i consumer che utilizzano la versione esistente non sono danneggiati dalla nuova versione.

Gli schemi vengono confrontati per evitare la creazione di schemi duplicati in cui gli schemi differiscono solo in un modo che non influisce sulla semantica dello schema. In alcuni casi, l'ordine delle proprietà JSON all'interno di uno schema può essere cruciale per il modo in cui lo schema viene utilizzato per la codifica e la decodificazione dei dati, ma in altri casi potrebbe non essere rilevante.

Ad esempio, la proprietà name di uno schema del record non viene utilizzata come parte del processo di codifica e decodifica in modo da poterla posizionare in qualsiasi punto all'interno dell'oggetto JSON del record. Tutte queste variazioni sono considerate lo stesso schema.

La proprietà fields nel JSON di un schema di record è un caso in cui il suo ordine è importante. La specifica Avro richiede che i campi del record siano codificati e decodificati nell'ordine in cui appaiono nello schema utilizzato per l'operazione di codifica e decodifica.

Ad esempio, considerare i seguenti tre schemi.

Schema 1

{
  "type": "record",
  "name": "book",
  "fields": [
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "author",
      "type": "string"
    }
  ]
}

Schema 2

{
  "type": "record",
  "name": "book",
  "fields": [
    {
      "name": "author",
      "type": "string"
    },
    {
      "name": "title",
      "type": "string"
    }
  ]
}

Schema 3

{
  "type": "record",
  "name": "book",
  "fields": [
    {
      "type": "string"
      "name": "author",
    },
    {
      "type": "string"
      "name": "title",
    }
  ]
}

Lo schema 1 e lo schema 2 sono schemi distinti e il registro li memorizza come schemi separati. Non possono essere utilizzati in modo intercambiabile perché elencano i campi author e title in un ordine diverso. I dati codificati con lo schema 1 non verranno decodificati correttamente se il processo di decodifica ha utilizzato lo schema 2.

Quando si utilizza SerDes per creare il nuovo schema nell'ordine Schema 1, Schema 2 e Schema 3, il risultato sarà due nuovi schemi. Lo schema 1 e lo schema 2 sono diversi ma lo schema 3 è l'equivalente dello schema 2.

Quando si creano gli schemi utilizzando l'API REST, gli schemi vengono considerati corrispondenti solo se sono testualmente gli stessi, inclusi tutti i campi descrittivi e di ordinamento degli attributi. Ciò consente il caso in cui si desidera che lo schema 3 sia uno schema diverso.

Abilitazione del registro dello schema

Il registro dello schema è abilitato per impostazione predefinita per Event Streams istanze del servizio del piano Enterprise. Il registro dello schema non è disponibile per altri piani di manutenzione ( Event Streams ).

Accesso al registro dello schema

Per accedere allo Schema Registry, è necessario l' URL e dello Schema Registry, che si trova nelle credenziali di servizio del proprio servizio. Per visualizzare queste credenziali nell'IU, fai clic sulla tua istanza del servizio, seleziona Credenziali del servizio nel riquadro di navigazione a sinistra, quindi fai clic sul collegamento Visualizza le credenziali situato accanto a una delle credenziali del servizio elencate nella tabella:

Diagramma delle credenziali del servizio.
Kafka blocco credenziali

Il valore dell' kafka_http_url, è anche l' URL e del Registro Schema.

Autenticazione

Per accedere al registro schema, è necessaria anche una serie di credenziali che possono essere utilizzate per l'autenticazione con il registro. Esistono due opzioni, l'autenticazione di base con una chiave API o l'autenticazione del token di connessione.

Gli esempi in questo documento mostrano l'utilizzo della chiave API, ma è possibile utilizzare entrambe le opzioni.

Autenticazione con chiave API

Le credenziali del servizio hanno un apikey che puoi utilizzare come credenziale per l'autenticazione con il registro dello schema.

Puoi anche eseguire l'autenticazione utilizzando una chiave API che è stata concessa da un ID servizio, a condizione che l'ID servizio disponga di una politica che gli consenta almeno l'accesso del ruolo "lettore" all'istanza Event Streams. Questo approccio è più flessibile ed è una scelta migliore se si concede l'accesso a più persone o team. Per ulteriori dettagli, vedi l'argomento della guida Gestione dell'accesso alle tue risorse Event Streams.

La chiave API viene fornita come parte della password di un'intestazione di autenticazione di base dell' HTTP. La parte nome utente dell'intestazione è la parola "token".

Il comando curl da utilizzare è il seguente, dove $APIKEY viene sostituito con la tua chiave API:

curl -u token:$APIKEY ...

Autenticazione con token di connessione

È anche possibile utilizzare un token di connessione per un ID di sistema o un utente come credenziale. Questo è in genere un approccio più sicuro, in quanto ha meno potenziale per esporre la chiave API e il token di connessione scade automaticamente dopo qualche tempo.

Per ottenere un token, utilizza il comando IBM Cloud CLI ibmcloud iam oauth-tokens per generare il token. Includi questo token in un'intestazione " HTTP " nel formato "Authorization: Bearer $TOKEN", dove $TOKEN è il token del portatore:

curl -H "Authorization: Bearer $TOKEN" ...

Importazione dei dati da altri registri di schemi

È possibile importare i dati nel registro dello schema che è stato esportato da altri registri dello schema. Quando i dati vengono importati, l'ID globale associato a ciascuna versione della risorsa utente viene conservato. Ciò significa che puoi continuare a utilizzare i dati già archiviati in Kafka utilizzando gli stessi valori ID globale dello schema.

La CLI Event Streams supporta l'importazione dei dati utilizzando il formato di importazione ed esportazione del registro Apicurio, come nel seguente esempio.

ibmcloud es schema-import import.zip

È possibile generare i dati da importare utilizzando il programma di utilità Apicurio registry exportConfluent, che esporta i dati da un registro schema Confluent. Event Streams è stato testato con la versione 2.6.x di questa utilità.

Se il registro di schema Event Streams ha già una voce con lo stesso ID globale di una versione della risorsa utente che si sta importando, l'operazione di importazione non riesce e viene richiesto di rimuovere la versione della risorsa utente se si desidera continuare.

Endpoint REST registro schema

L'API REST offre quattro funzionalità principali:

  1. Creazione, lettura ed eliminazione di schemi.
  2. Creazione, lettura ed eliminazione di singole versioni di uno schema.
  3. Lettura e aggiornamento della regola di compatibilità globale per il registro.
  4. Creare, leggere, aggiornare ed eliminare le regole di compatibilità che si applicano ai singoli schemi.

Per le azioni che modificano la versione dello schema, come la creazione, l'aggiornamento o l'eliminazione della risorsa utente, le versioni della risorsa utente e le regole, viene generato un evento del programma di traccia dell'attività per segnalare l'azione. Per ulteriori informazioni, vedi Eventi di Activity Tracker.

Errori

Se si verifica una condizione di errore, il registro dello schema restituisce un codice di stato di tipo " non-2XX " ( HTTP ). Il corpo della risposta contiene un oggetto JSON nel formato seguente:

{
    "error_code":404,
    "message":"No artifact with id 'my-schema' might be found."
}

Le proprietà dell'oggetto JSON di errore sono le seguenti:

Nome proprietà Descrizione
Error_Code Il codice di stato della risposta ( HTTP ).
Messaggio Una descrizione della causa del problema.
Incidente Questo campo viene incluso solo se l'errore è il risultato di un problema con il registro dello schema. Questo valore può essere utilizzato dal servizio IBM per correlare una richiesta alle informazioni diagnostiche acquisite dal registro.

Imposta uno stato schema

Questo punto finale viene utilizzato per impostare lo stato di uno schema nel registro su ENABLED o DISABLED. Lo stato di uno schema può essere impostato emettendo una richiesta PUT all'endpoint /artifacts/{schema-id}/state (dove {schema-id} è l'ID dello schema). Se la richiesta ha esito positivo, viene restituita una risposta vuota e un codice di stato di 204 (nessun contenuto).

Richiesta curl di esempio:

curl -u token:$APIKEY –X PUT $URL/artifacts/my-schema/state -d '{"state": "DISABLED"}'

L'impostazione dello stato di uno schema richiede:

  • Accesso ruolo gestore alla risorsa dello schema che corrisponde allo schema modificato.

Imposta uno stato di versione dello schema

Questo endpoint viene utilizzato per impostare lo stato di una versione di uno schema nel registro su ENABLED o DISABLED. Lo stato di una versione dello schema può essere impostato emettendo una richiesta PUT all'endpoint /artifacts/{schema-id}/versions/{version}/state (dove {schema-id} è l'ID dello schema e {version} è il numero di versione della versione dello schema). Se la richiesta ha esito positivo, viene restituita una risposta vuota e un codice di stato di 204 (nessun contenuto).

Richiesta curl di esempio:

curl -u token:$APIKEY –X PUT $URL/artifacts/my-schema/versions/1/state -d '{"state": "DISABLED"}'

L'impostazione di uno stato di versione dello schema richiede:

  • Accesso del ruolo gestore alla risorsa dello schema che corrisponde allo schema che si sta modificando.

Crea uno schema

Questo endpoint viene utilizzato per memorizzare uno schema nel registro. I dati dello schema vengono inviati come corpo della richiesta POST. È possibile includere un ID per lo schema utilizzando il file ‘X-Registry-ArtifactId' intestazione della richiesta. Se questa intestazione non è presente nella richiesta, viene generato un ID. L'intestazione del tipo di contenuto deve essere impostata su "application/json".

Richiesta curl di esempio:

curl -u token:$APIKEY -H 'Content-Type: application/json' -H 'X-Registry-ArtifactId: my-schema' $URL/artifacts -d '{"type":"record","name":"Citizen","fields":[{"name": "firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"phoneNumber","type":"string"}]}'

Risposta di esempio:

{"id":"my-schema","type":"AVRO","version":1,"createdBy":"","createdOn":1579267788258,"modifiedBy":"","modifiedOn":1579267788258,"globalId":75}

La creazione di uno schema richiede almeno entrambi:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.
  • Accesso del ruolo writer alla risorsa dello schema che corrisponde allo schema creato.

Un evento del programma di traccia dell'attività viene generato per segnalare l'azione. Per ulteriori informazioni, vedi Eventi di Activity Tracker.

Elenca schemi

È possibile creare un elenco di ID di tutti gli schemi memorizzati nel registro effettuando una richiesta GET all'endpoint /artifacts. È possibile formattare la risposta utilizzando il parametro jsonformat (sono supportati solo i formati string e object ). Il formato stringa è quello predefinito e restituisce un array di ID risorsa utente (stringhe). Solo le risorse utente abilitate sono incluse nell'array quando questa opzione è impostata. Il formato dell'oggetto restituisce un oggetto JSON che contiene un array in cui ciascuna voce nell'array corrisponde a una risorsa utente nel registro. Quando questa opzione è impostata, vengono restituite entrambe le risorse abilitate e disabilitate.

Richiesta curl di esempio:

curl -u token:$APIKEY $URL/artifacts

oppure

curl -u token:$APIKEY $URL/artifacts?jsonformat=string

oppure

curl -u token:$APIKEY $URL/artifacts?jsonformat=object

Risposta di esempio quando jsonformat è stringa o non è fornito (valore predefinito stringa):

["my-schema-2","my-schema-4"]

Risposta di esempio quando jsonformat è oggetto:

{"artifacts":[{"id":"my-schema","state":"DISABLED"},{"id":"my-schema-2","state":"ENABLED"},{"id":"my-schema-3","state":"DISABLED"},{"id":"my-schema-4","state":"ENABLED"}],"count":4}

L'elenco degli schemi richiede almeno:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.

Stato ed eliminazione degli schemi

L'eliminazione dello schema è un processo a due fasi. La prima fase dell'eliminazione conserva lo schema nel registro, ma lo nasconde ad alcune operazioni. La seconda fase rimuove definitivamente lo schema, ma può essere applicata solo dopo la prima fase. Il processo di eliminazione in due fasi si applica a livello di risorsa utente e anche a livello di versione.

Le due fasi dell'eliminazione vengono eseguite con uno stato abilitato o disabilitato associato sia alle risorse utente che alle versioni (prima fase) ed eliminando le API per le risorse e le versioni (seconda fase).

Una risorsa utente o una versione disabilitata può essere rilevata utilizzando una proprietà 'state' restituita dalle operazioni che elencano le risorse utente o le versioni oppure richiamando i dettagli di una risorsa utente o versione. Gli schemi disabilitati vengono conteggiati nella quota di schema di 1000 schemi per istanza Enterprise.

Elimina uno schema

Gli schemi vengono eliminati dal registro immettendo una richiesta DELETE all'endpoint /artifacts/{schema-id} (dove {schema-id} è l'ID dello schema). Se l'operazione ha esito positivo, viene restituita una risposta vuota e un codice di stato di 204 (nessun contenuto).

Richiesta curl di esempio:

curl -u token:$APIKEY -X DELETE $URL/artifacts/my-schema

L'eliminazione di uno schema richiede almeno entrambi:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.
  • Accesso ruolo gestore alla risorsa dello schema che corrisponde allo schema eliminato.

Un evento del programma di traccia dell'attività viene generato per segnalare l'azione. Per ulteriori informazioni, vedi Eventi di Activity Tracker.

Creare una nuova versione di uno schema

Per creare una nuova versione di uno schema, effettuare una richiesta POST all'endpoint /artifacts/{schema-id}/versions (dove {schema-id} è l'ID dello schema). Il contenuto della richiesta deve contenere la nuova versione dello schema.

Se la richiesta ha esito positivo, il nuovo schema viene creato come la versione più recente dello schema, con un numero di versione appropriato e viene restituita una risposta con codice di stato 200 (OK) e viene restituito un payload che contiene i metadati che descrivono la nuova versione (incluso il numero di versione).

Richiesta curl di esempio:

curl -u token:$APIKEY -H 'Content-Type: application/json' $URL/artifacts/my-schema/versions -d '{"type":"record","name":"Citizen","fields":[{"name": "firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"phoneNumber","type":"string"}]}'

Risposta di esempio:

{"id":"my-schema","type":"AVRO","version":2,"createdBy":"","createdOn": 1579267978382,"modifiedBy":"","modifiedOn":1579267978382,"globalId":83}

La creazione di una nuova versione di uno schema richiede almeno entrambi:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.
  • Accesso ruolo writer alla risorsa dello schema che corrisponde allo schema che ottiene una nuova versione.

Un evento del programma di traccia dell'attività viene generato per segnalare l'azione. Per ulteriori informazioni, vedi Eventi di Activity Tracker.

Ottenere la versione più recente di un schema

Per richiamare l'ultima versione di uno schema particolare, effettuare una richiesta GET all'endpoint /artifacts/{schema-id} (dove {schema-id} è l'ID dello schema). Se l'operazione ha esito positivo, l'ultima versione dello schema viene restituita nel payload della risposta.

Richiesta curl di esempio:

curl -u token:$APIKEY $URL/artifacts/my-schema

Risposta di esempio:

{"type":"record","name":"Citizen","fields":[{"name": "firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"phoneNumber","type":"string"}]}

Per ottenere la versione più recente di uno schema è necessario almeno:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.
  • Accesso del ruolo di lettore alla risorsa di schema che corrisponde allo schema richiamato.

Ottenimento di una versione specifica di uno schema

Per richiamare una versione specifica di uno schema, effettuare una richiesta GET all'endpoint /artifacts/{schema-id}/versions/{version} (dove {schema-id} è l'ID dello schema e {version} è il numero di versione della versione specifica che è necessario richiamare). Se l'operazione ha esito positivo, la versione specificata dello schema viene restituita nel payload della risposta.

Esempio di richiesta curl

curl -u token:$APIKEY $URL/artifacts/my-schema/versions/3

Risposta di esempio:

{"type":"record","name":"Citizen","fields":[{"name": "firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"phoneNumber","type":"string"}]}

Per ottenere la versione più recente di uno schema è necessario almeno:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.
  • Accesso del ruolo di lettore alla risorsa di schema che corrisponde allo schema richiamato.

Elenco di tutte le versioni di un schema

Per elencare tutte le versioni di uno schema attualmente memorizzate nel registro, effettuare una richiesta GET all'endpoint /artifacts/{schema-id}/versions (dove {schema-id} è l'ID dello schema). Se l'operazione ha esito positivo, viene restituito un elenco di tutti i numeri di versione correnti per lo schema nel payload della risposta. È possibile formattare la risposta utilizzando il parametro jsonformat (sono supportati solo i formati number e object ). Se si specifica 'number' (valore predefinito), la risposta è un array di valori numerici che corrisponde alle versioni abilitate della risorsa utente (le versioni disabilitate vengono omesse). È lo stesso formato dell'endpoint attualmente generato. Se si specifica 'object', la risposta è un oggetto JSON che contiene un array di oggetti JSON che rappresentano le versioni della risorsa utente. Entrambe le versioni, abilitata e disabilitata, sono incluse nell'array.

Richiesta curl di esempio:

curl -u token:$APIKEY $URL/artifacts/my-schema/versions

oppure

curl -u token:$APIKEY $URL/artifacts/my-schema/versions?jsonformat=number

oppure

curl -u token:$APIKEY $URL/artifacts/my-schema/versions?jsonformat=object

Risposta di esempio quando jsonformat è un numero o non viene fornito (valore predefinito: numero):

[1,3,4,6,7]

Risposta di esempio quando jsonformat è oggetto:

{"versions":[{"id":1,"state":"ENABLED"},{"id":2,"state":"DISABLED"},{"id":3,"state":"ENABLED"},{"id":4,"state":"ENABLED"},{"id":5,"state":"DISABLED"},{"id":6,"state":"ENABLED"},{"id":7,"state":"ENABLED"}],"count":7}

Il richiamo dell'elenco di versioni disponibili di uno schema richiede almeno entrambi:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.
  • Accesso del ruolo di lettore alla risorsa di schema che corrisponde allo schema richiamato.

Eliminazione di una versione di uno schema

Le versioni dello schema vengono eliminate dal registro emettendo una richiesta DELETE all'endpoint /artifacts/{schema-id}/versions/{version} (dove {schema-id} è l'ID dello schema e {version} è il numero di versione della versione dello schema). Se l'operazione ha esito positivo, viene restituita una risposta vuota e un codice di stato di 204 (nessun contenuto). L'eliminazione dell'unica versione rimanente di uno schema elimina anche lo schema.

Richiesta curl di esempio:

curl -u token:$APIKEY -X DELETE $URL/artifacts/my-schema/versions/3

L'eliminazione di una versione dello schema richiede almeno entrambi:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.
  • Accesso ruolo gestore alla risorsa dello schema che corrisponde allo schema eliminato.

Un evento del programma di traccia dell'attività viene generato per segnalare l'azione. Per ulteriori informazioni, vedi Eventi di Activity Tracker.

Acquisizione dell'ID univoco globale specifico di una versione dello schema

Per richiamare un ID univoco globale specifico di una versione dello schema, effettuare una richiesta GET all'endpoint /artifacts/{artifactId}/versions/{version}/meta (dove {artifactId} è l'ID della risorsa e {version} è il numero di versione della versione specifica che è necessario richiamare). Se l'operazione ha esito positivo, l'ID univoco globale specifico di una versione dello schema viene restituito nel payload della risposta.

Richiesta curl di esempio:

curl -u token:$APIKEY $URL/artifacts/9030f450-45fb-4750-bb37-771ad49ee0e8/versions/1/meta

Risposta di esempio:

{"id":"9030f450-45fb-4750-bb37-771ad49ee0e8","type":"AVRO","version":1,"createdOn":1682340169202,"modifiedOn":1682340169202,"globalId":1}

Il richiamo dell'ID univoco globale di una versione dello schema richiede almeno i seguenti tipi di accesso:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.
  • Accesso del ruolo di lettore alla risorsa di schema che corrisponde allo schema richiamato.

Aggiornamento di una regola globale

Le regole di compatibilità globale possono essere aggiornate emettendo una richiesta PUT all'endpoint /rules/{rule-type} (dove {rule-type} identifica il tipo di regola globale da aggiornare - attualmente l'unico tipo supportato è COMPATIBILITY), con la nuova configurazione della regola nel corpo della richiesta. Se la richiesta ha esito positivo, la configurazione della regola appena aggiornata viene restituita nel payload della risposta, insieme al codice di stato 200 (OK).

Il documento JSON inviato nel corpo della richiesta deve avere le seguenti proprietà:

Nome proprietà Descrizione
Immettere Deve essere sempre impostato sul valore COMPATIBILITY.
Config Deve essere impostato su uno dei seguenti valori: NONE, BACKWARD, BACKWARD_TRANSITIVE, FORWARD, FORWARD_TRANSITIVE, FULL o FULL_TRANSITIVE (consultare la sezione relativa alle regole di compatibilità per i dettagli su ciascuno di questi valori).

Richiesta curl di esempio:

curl -u token:$APIKEY -X PUT $URL/rules/COMPATIBILITY -d '{"type":"COMPATIBILITY","config":"BACKWARD"}'

Risposta di esempio:

{"type":"COMPATIBILITY","config":"BACKWARD"}

L'aggiornamento di una configurazione di regole globali richiede almeno:

  • Accesso del ruolo di gestore al tipo di risorsa cluster Event Streams.

Un evento del programma di traccia dell'attività viene generato per segnalare l'azione. Per ulteriori informazioni, vedi Eventi di Activity Tracker.

Acquisizione del valore corrente di una regola globale

Il valore corrente di una regola globale viene richiamato emettendo una richiesta GET all'endpoint /rules/{rule-type} (dove {rule-type} è il tipo di regola globale da richiamare - attualmente l'unico tipo supportato è COMPATIBILITY). Se la richiesta ha esito positivo, la configurazione della regola corrente viene restituita nel payload della risposta, insieme al codice di stato 200 (OK).

Richiesta curl di esempio:

curl -u token:$APIKEY $URL/rules/COMPATIBILITY

Risposta di esempio:

{"type":"COMPATIBILITY","config":"BACKWARD"}

Il richiamo della configurazione della regola globale richiede almeno:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.

Creazione di una regola per schema

Le regole possono essere applicate a uno schema specifico, sovrascrivendo le regole globali impostate, effettuando una richiesta POST all'endpoint /artifacts/{schema-id}/rules (dove {schema-id} è l'ID dello schema), con il tipo e il valore della nuova regola contenuto nel corpo della richiesta (attualmente l'unico tipo supportato è COMPATIBILITY). Se l'operazione ha esito positivo, vengono restituiti una risposta vuota e un codice di stato di 204 (nessun contenuto).

Richiesta curl di esempio:

curl -u token:$APIKEY $URL/artifacts/my-schema/rules -d '{"type":"COMPATIBILITY","config":"FORWARD"}'

La creazione di regole per schema richiede almeno:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.
  • Accesso ruolo gestore alla risorsa dello schema per cui si applica la regola.

Un evento del programma di traccia dell'attività viene generato per segnalare l'azione. Per ulteriori informazioni, vedi Eventi di Activity Tracker.

Richiamo di una regola per schema

Per richiamare il valore corrente di un tipo di regola applicato a uno schema specifico, viene effettuata una richiesta GET all'endpoint /artifacts/{schema-id}/rules/{rule-type} (dove {schema-id} è l'ID dello schema e {rule-type} è il tipo di regola globale da richiamare - attualmente l'unico tipo supportato è COMPATIBILITY). Se la richiesta ha esito positivo, il valore della regola corrente viene restituito nel payload della risposta, insieme al codice di stato 200 (OK).

Richiesta curl di esempio:

curl -u token:$APIKEY $URL/artifacts/my-schema/rules/COMPATIBILITY

Risposta di esempio:

{"type":"COMPATIBILITY","config":"FORWARD"}

L'ottenimento delle regole per schema richiede almeno:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.
  • Accesso ruolo di lettore alla risorsa dello schema a cui si applica la regola.

Aggiornamento di una regola per schema

Le regole applicate a uno schema specifico vengono modificate effettuando una richiesta PUT all'endpoint /artifacts/{schema-id}/rules/{rule-type} (dove {schema-id} è l'ID dello schema e {rule-type} è il tipo di regola globale da richiamare - attualmente l'unico tipo supportato è COMPATIBILITY). Se la richiesta ha esito positivo, la configurazione della regola appena aggiornata viene restituita nel payload della risposta, insieme al codice di stato 200 (OK).

Richiesta curl di esempio:

curl -u token:$APIKEY -X PUT $URL/artifacts/my-schema/rules/COMPATIBILITY -d '{"type":"COMPATIBILITY","config":"BACKWARD"}'

Risposta di esempio:

{"type":"COMPATIBILITY","config":"BACKWARD"}

L'aggiornamento di una regola per schema richiede almeno:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.
  • Accesso ruolo gestore alla risorsa schema a cui si applica la regola.

Un evento del programma di traccia dell'attività viene generato per segnalare l'azione. Per ulteriori informazioni, vedi Eventi di Activity Tracker.

Eliminazione di una regola per schema

Le regole applicate a uno schema specifico vengono eliminate effettuando una richiesta DELETE all'endpoint /artifacts/{schema-id}/rules/{rule-type} (dove {schema-id} è l'ID dello schema e {rule-type} è il tipo di regola globale da richiamare - attualmente l'unico tipo supportato è COMPATIBILITY). Se la richiesta ha esito positivo, viene restituita una risposta vuota, con un codice di stato di 204 (nessun contenuto).

Richiesta curl di esempio:

curl -u token:$APIKEY -X DELETE $URL/artifacts/my-schema/rules/COMPATIBILITY

L'eliminazione di una regola per schema richiede almeno:

  • Accesso del ruolo lettore al tipo di risorsa cluster Event Streams.
  • Accesso ruolo gestore alla risorsa schema a cui si applica la regola.

Un evento del programma di traccia dell'attività viene generato per segnalare l'azione. Per ulteriori informazioni, vedi Eventi di Activity Tracker.

Applicazione delle regole di compatibilità alle nuove versioni degli schemi

Il registro dello schema supporta l'applicazione di regole di compatibilit ... quando si crea una nuova versione di uno schema. Se viene effettuata una richiesta di creazione di una nuova versione dello schema che non è conforme alla regola di compatibilità richiesta, il registro rifiuta la richiesta. Sono supportate le seguenti regole:

Regola di compatibilità Testato rispetto a Descrizione
Nessuna N/D Non viene eseguito alcun controllo di compatibilità quando viene creata una nuova versione dello schema.
Indietro Ultima versione dello schema Una nuova versione dello schema può omettere i campi presenti nella versione esistente dello schema.
BACKWARD_TRANSITIVE Tutte le versioni dello schema Una nuova versione dello schema può aggiungere campi facoltativi che non sono presenti nella versione esistente dello schema.
Avanti Ultima versione dello schema Una nuova versione dello schema può aggiungere campi non presenti nella versione esistente dello schema.
INOLTRA_TRANSITIVA Tutte le versioni dello schema Una nuova versione dello schema può omettere i campi facoltativi presenti nella versione esistente dello schema.
COMPLETO Ultima versione dello schema Una nuova versione dello schema può aggiungere campi facoltativi che non sono presenti nella versione esistente dello schema.
COMPLETA_TRANSITIVA Tutte le versioni dello schema Una nuova versione dello schema può omettere i campi facoltativi presenti nella versione esistente dello schema.

Queste regole possono essere applicate in due ambiti:

  1. In un ambito globale, che è il valore predefinito utilizzato quando si crea una nuova versione dello schema.
  2. A livello per schema. Se viene definita una regola di livello per schema, sovrascrive il valore predefinito globale per lo schema particolare.

Per impostazione predefinita, il registro ha un'impostazione della regola di compatibilità globale di NONE. È necessario definire le regole di livello per schema, altrimenti lo schema utilizza l'impostazione globale.

Descrizione API completa

Per una descrizione dell'API REST con esempi, consulta Event Streams schema - registry - rest.

Puoi scaricare la specifica completa per l'API dal file Event Streams Schema Registry REST API YAML. Per visualizzare il file Swagger, utilizzare gli strumenti Swagger, ad esempio Swagger Editor.

Per ulteriori informazioni sull'accesso a Schema Registry utilizzando un SDK, vedi Event Streams Schema Registry REST API.

Per informazioni sulle risorse e le origini dati Event Streams su Terraform, vedi risorse e origini dati.

Utilizzo del registro dello schema con i SerDes di terze parti

Schema Registry supporta l'uso delle seguenti terze parti SerDes:

  • SerDes confluenti

Per configurare i SerDes confluenti per utilizzare il registro dello schema, devi specificare due proprietà nella configurazione del tuo client Kafka:

Nome proprietà Valore
SCHEMA_REGISTRY_URL_CONFIG Imposta questo valore come URL del Registro di sistema, includendo le tue credenziali come autenticazione di base e con un percorso di /confluent. Ad esempio, se $APIKEY è la chiave API da utilizzare e $HOST è l'host dal campo kafka_http_url nella scheda Credenziali del servizio , il valore ha il formato: https://token:{$APIKEY}@{$HOST}/{confluent}
BASIC_AUTH_CREDENTIALS_SOURCE Impostare su URL. Questo istruisce l' SerDes a a utilizzare un'autenticazione di base ( HTTP ) utilizzando le credenziali fornite nel Registro Schema ( URL ).

È anche possibile fornire le seguenti proprietà per controllare la selezione dello schema (strategia di denominazione dell'oggetto):

Nome proprietà Valore
STRATEGIA_NOME_SOGGETTO_VALORE TopicNameStrategy(predefinito), RecordNameStrategy e TopicRecordNameStrategy sono supportati. Ad esempio, per specificare che lo schema per il valore del messaggio viene selezionato utilizzando a TopicRecordNameStrategy, potresti utilizzare le seguenti proprietà client: configs.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
KEY_SUBJECT_NAME_STRATEGY TopicNameStrategy (predefinito), RecordNameStrategy e TopicRecordNameStrategy sono supportati. Vedere VALUE_SUBJECT_NAME_STRATEGY per un esempio.

Il seguente diagramma mostra un esempio delle proprietà richieste per creare un produttore Kafka che utilizza i SerDes confluenti e che può essere connesso al servizio Event Streams:

Kafka properties for Confluent Serdes
Kafka properties for Confluent Serdes

Se un messaggio viene inviato utilizzando uno schema che non si trova nel registro, SerDes tenta di creare il nuovo schema, o la versione dello schema, nel registro. Se questo comportamento non è richiesto, è possibile disabilitarlo rimuovendo l'autorizzazione di scrittura per le risorse dello schema dall'applicazione. Consultare Gestione dell'accesso al registro dello schema.

L'opzione normalize per le ricerche e la registrazione dello schema non è supportata.

Utilizzo del registro di schemi con strumenti che utilizzano l'API del registro di confluenza

Il registro dello schema supporta una sottoserie dell'API fornita dalla versione 7.2 del registro dello schema confluente. Ciò ha lo scopo di fornire una compatibilità limitata con gli strumenti progettati per funzionare con Confluent Schema Registry. Sono implementati solo l'endpoint REST dell' HTTP, con i seguenti percorsi:

  • compatibilità
  • configurazione
  • schemi
  • subjects

Per configurare un'applicazione per utilizzare questa API di compatibilit ..., specificare l'endpoint Registro schema nel seguente formato:

https://token:{$APIKEY}@{$HOST}/{confluent}

dove:

  • $APIKEY è la chiave API da utilizzare dalla scheda Credenziali del servizio
  • $HOST è l'host dal campo kafka_http_url nella scheda Credenziali del servizio.

Utilizzo del registro dello schema con strumenti di terze parti

Il registro dello schema può essere verificato con strumenti di terzi, come kafka-avro-console-producer.sh e kafka-avro-console-consumer.sh, che consentono il test di conformità allo schema utilizzando i SerDesconfluenti.

Per eseguire il producer o lo strumento consumer, è necessaria una proprietà comune con le opzioni di connessione per l'istanza Enterprise Event Streams.

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="apikey";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

Produttore e consumatore console Avro

Puoi utilizzare gli strumenti producer e consumer della console Kafka avro con Event Streams. È necessario fornire una proprietà client e, inoltre, il metodo di connessione e le credenziali per il registro di schema devono essere forniti come argomenti --property della riga comandi. Esistono due metodi di connessione utilizzando una fonte di credenziali di USER_INFO o di URL.

Per eseguire utilizzando il metodo source delle credenziali di URL, utilizzare il seguente codice.

    ./kafka-avro-console-[producer|consumer] --broker-list $BOOTSTRAP_ENDPOINTS --topic schema-test --property schema.registry.url=$SCHEMA_REGISTRY_URL --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' --property basic.auth.credentials.source=URL --producer.config $CONFIG_FILE

Sostituisci le seguenti variabili nell'esempio con i tuoi valori.

  • BOOTSTRAP_ENDPOINTS con il valore dalla tua console Event Streams Credenziali del servizio nella console IBM Cloud, sotto forma di elenco dei tuoi server di avvio.
  • SCHEMA_REGISTRY_URL con il valore kafka_http_url dalla console Event Streams Credenziali del servizio nella console IBM Cloud, con il nome utente token e apikey, insieme al percorso /confluent (ad esempio, https://{token}:{apikey}@{kafka_http_url}/{confluent}).
  • CONFIG_FILE con il percorso del file di configurazione.

Per eseguire utilizzando il metodo di origine credenziali USER_INFO, utilizzare il seguente codice.

    ./kafka-avro-console-[producer|consumer] --broker-list $BOOTSTRAP_ENDPOINTS --topic schema-test --property schema.registry.url=$SCHEMA_REGISTRY_URL --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=token:apikey --producer.config $CONFIG_FILE

Sostituisci le seguenti variabili nell'esempio con i tuoi valori.

  • BOOTSTRAP_ENDPOINTS con il valore dalla tua console Event Streams Credenziali del servizio nella console IBM Cloud, sotto forma di elenco dei tuoi server di avvio.
  • SCHEMA_REGISTRY_URLcon il valore kafka_http_url dalla console Event Streams Service Credentials nella console IBM Cloud, con il percorso /confluent (ad esempio https://{kafka_http_url}/{confluent}).
  • CONFIG_FILE con il percorso del file di configurazione.