IBM Cloud Docs
메시지 생성

메시지 생성

생성자는 Kafka 토픽에 메시지 스트림을 공개하는 애플리케이션입니다. 이 정보는 Apache Kafka 프로젝트의 일부인 Java 프로그래밍 인터페이스에 초점을 맞추고 있습니다. 개념은 다른 언어에도 적용되지만 이름은 때때로 조금 다릅니다.

프로그래밍 인터페이스에서는 메시지를 레코드라고 합니다. 예를 들어, Java 클래스 org.apache.kafka.clients.producer.ProducerRecord는 생성자 API 관점에서 메시지를 나타내는 데 사용됩니다. 레코드메시지 용어는 같은 의미로 사용할 수 있지만, 기본적으로 레코드는 메시지를 표시하는 데 사용합니다.

생성자가 Kafka에 연결되면 초기 부트스트랩 연결이 이루어집니다. 이 연결은 클러스터에 있는 임의의 서버에 대한 연결일 수 있습니다. 생성자는 공개할 토픽에 대한 파티션 및 리더십 정보를 요청합니다. 그런 다음 프로듀서는 파티션 리더에 대한 또 다른 연결을 설정하고 메시지를 게시할 수 있습니다. 생성자가 Kafka 클러스터에 연결되면 이러한 조치가 내부적으로 자동으로 이루어집니다.

가용성을 보장하기 위해 Kafka 브로커는 하나의 브로커가 사용 불가능한 경우 다른 브로커가 여전히 생성자로부터 메시지를 수신하여 이용자에게 보낼 수 있도록 메시지를 복제합니다. Event Streams 는 복제 인수 3을 사용하며, 이는 각 메시지가 세 개의 브로커에 저장됨을 의미합니다. 메시지가 파티션 리더에게 전송되는 경우 이용자가 바로 해당 메시지를 사용할 수는 없습니다. 리더는 파티션에 메시지의 레코드를 추가하며 여기에 해당 파티션의 다음 오프셋 숫자를 지정합니다. 동기화 중인 레플리카의 모든 팔로워가 레코드를 복제하고 자신이 자신의 레플리카에 레코드를 작성했음을 인정하면 이제 레코드가 커밋되어 소비자들이 사용할 수 있게 됩니다.

각 메시지는 키와 값의 두 부분으로 구성된 레코드로 표시됩니다. 키는 일반적으로 메시지에 대한 데이터에 사용되며 값은 메시지의 본문입니다. 다른 시스템에 대한 커넥터와 같은 Kafka 에코시스템의 많은 도구는 값만 사용하고 키를 무시하므로, 모든 메시지 데이터를 값에 넣고 파티셔닝이나 로그 압축에 키를 사용하는 것이 가장 좋습니다. Kafka 읽은 모든 내용에 의존하여 키를 사용하지 마세요.

기타 많은 메시징 시스템에도 메시지와 함께 다른 정보를 전달하는 방법이 있습니다. Kafka 버전 0.11 이를 위해 레코드 헤더를 도입했습니다.

Event Streams에서 메시지 이용 과 함께 이 정보를 읽는 것이 유용할 수 있습니다.

구성 설정

생산자에 대한 많은 구성 설정이 존재합니다. 일괄 처리, 재시도, 메시지 승인 등 프로듀서의 여러 측면을 제어할 수 있습니다. 다음은 가장 중요한 내용입니다.

프로듀서 구성 설정
이름 설명 유효한 값 기본
key.serializer 키를 직렬화하는 데 사용되는 클래스. Serializer 인터페이스를 구현하는 Java 클래스(예: org.apache.kafka.common.serialization.StringSerializer)입니다. 기본값 없음 - 값을 지정해야 합니다.
value.serializer 값을 직렬화하는 데 사용되는 클래스. Serializer 인터페이스를 구현하는 Java 클래스(예: org.apache.kafka.common.serialization.StringSerializer)입니다. 기본값 없음 - 값을 지정해야 합니다.
acks 공개된 각 메시지를 수신확인하는 데 필요한 서버의 수. 이는 생성자에게 필요한 지속성 보장을 제어합니다. 0, 1, all(또는 -1) 모두 (Kafka 3.0 이상) 1 ( Kafka 3.0이전)
retries 전송 시 오류가 발생할 때 클라이언트가 메시지를 재전송하는 횟수. 0, ... 0
max.block.ms 전송 또는 메타데이터 요청이 대기를 차단할 수 있는 시간(밀리초). 0, ... 60000(1분)
max.in.flight.requests.per.connection 클라이언트가 추가 요청을 차단하기 전에 연결에서 보내는 승인되지 않은 요청의 최대 수입니다. 1, ... 5
request.timeout.ms 생성자가 요청에 대한 응답을 기다리는 최대 시간. 제한 시간이 경과하기 전에 응답이 수신되지 않으면 요청이 재시도되거나 재시도 횟수가 모두 소진되면 요청이 실패합니다. 0, ... 30000(30초)

더 많은 구성 설정을 사용할 수 있지만, 실험하기 전에 Apache Kafka 설명서를 꼼꼼히 읽어보세요.

파티셔닝

Kafka에서 파티션은 확장 단위입니다. 따라서 파티셔닝은 토픽 데이터가 여러 개의 병렬 스트림으로 흐르도록 하기 때문에 처리량을 늘리는 데 효과적인 방법입니다.

생성자가 토픽에 대한 메시지를 공개하는 경우 사용할 파티션을 선택할 수 있습니다. 순서가 중요한 경우, 파티션은 레코드의 정렬된 순서이지만 토픽은 하나 이상의 파티션으로 구성되어 있다는 점에 유의하세요. 메시지 세트를 순서대로 전달하려는 경우 이러한 메시지가 모두 동일한 파티션으로 이동해야 합니다. 이를 수행하는 가장 간단한 방법은 모든 메시지에 동일한 키를 제공하는 것입니다.

생성자는 메시지를 공개할 때 파티션 번호를 명시적으로 지정할 수 있습니다. 이렇게 하면 직접적으로 제어할 수 있지만, 파티션 선택사항을 관리해야 하므로 생성자 코드가 복잡해집니다. 자세한 정보는 메소드 호출 Producer.partitionsFor를 참조하십시오. 예를 들어, 이 호출은 다음에 대해 설명합니다 Kafka 버전 2.2.0.

생성자가 파티션 번호를 지정하지 않으면 파티셔너가 파티션을 선택합니다. Kafka 생성자에 빌드된 기본 파티셔너는 다음과 같이 작동합니다.

  • 레코드에 키가 없는 경우 라운드 로빈 방식으로 파티션을 선택합니다.
  • 레코드에 키가 있는 경우 키의 해시 값을 계산하여 파티션을 선택합니다. 이렇게 하면 동일한 키를 가진 모든 메시지에 대해 동일한 파티션을 선택하게 됩니다.

또한 고유 사용자 정의 파티셔너를 작성할 수 있습니다. 사용자 정의 파티셔너는 임의의 스킴을 선택하여 파티션에 레코드를 지정할 수 있습니다. 예를 들어, 키의 정보 서브세트 또는 애플리케이션 특정 ID를 사용하십시오.

메시지 순서 지정

Kafka는 일반적으로 생성자가 전송한 순서대로 메시지를 씁니다. 그러나 특정 상황에서는 재시도로 인해 메시지가 중복되거나 순서가 변경될 수 있습니다. 일련의 메시지를 순서대로 보내려면 메시지 순서를 보장할 수 있는 유일한 방법이므로 모두 같은 파티션에 쓰도록 하는 것이 중요합니다.

프로듀서는 자동으로 메시지 전송을 다시 시도할 수도 있습니다. 이 재시도 기능을 활성화하는 것이 좋은데, 애플리케이션 코드가 직접 재시도를 수행해야 하기 때문입니다. Kafka의 일괄처리와 자동 재시도의 조합으로 인해 메시지가 복제되고 다시 정렬될 수 있습니다.

예를 들어, 주제에서 세 개의 메시지 <M1, M2, M3>의 시퀀스를 공개하는 경우입니다. 레코드가 모두 같은 배치에 속할 수 있으므로 파티션 리더에게 모두 함께 전송됩니다. 그러면 리더는 파티션에 모든 레코드를 쓰고 개별 레코드로 복제합니다. 장애가 발생하면 M1 및 M2 가 파티션에 추가되지만 M3 는 추가되지 않을 수 있습니다. 프로듀서는 확인을 받지 못했으므로 <M1, M2, M3> 전송을 다시 시도합니다. 새 리더가 M1, M2, M3 파티션에 쓰면 이제 <M1, M2, M1, M2, M3> 포함되며, 복제된 M1 원래 M2 따라오게 됩니다. 각 브로커로 전달되는 요청의 수를 하나로 제한하면 다시 정렬을 방지할 수 있습니다. <M1, M2, M2, M3>와 같은 단일 레코드가 여전히 중복되지만 순서가 잘못된 시퀀스를 가져오지 않을 수 있습니다. Kafka 버전 0.11 이상에서는 M2 중복을 방지하기 위해 무능력한 프로듀서 기능을 사용할 수도 있습니다.

요청이 한 번만 발생해도 성능에 미치는 영향이 크기 때문에 가끔 발생하는 메시지 중복을 처리하도록 애플리케이션을 작성하는 것이 Kafka 일반적인 관행입니다.

메시지 수신확인

메시지를 게시할 때 acks 프로듀서 구성을 사용하여 필요한 승인 수준을 선택할 수 있습니다. 선택사항은 처리량과 신뢰성 간의 밸런스를 나타냅니다. 다음 세 가지 레벨이 존재합니다.

acks=0(신뢰성이 가장 낮음)
메시지는 네트워크에 쓰여지는 즉시 전송된 것으로 간주됩니다. 파티션 리더로부터 수신확인은 없습니다. 따라서 파티션 리더십이 변경되면 메시지가 유실될 수 있습니다. 이 수준의 승인은 빠르지만 일부 상황에서는 메시지가 손실될 가능성이 있습니다.
acks=1(기본값)
파티션 리더가 파티션에 레코드를 성공적으로 작성하는 즉시 프로듀서에게 메시지가 승인됩니다. 레코드가 동기화 중인 복제본에 도달하기 전에 승인이 이루어지기 때문에 리더가 실패하지만 팔로워가 아직 메시지를 받지 못한 경우 메시지가 손실될 수 있습니다. 파티션 리더십이 변경되면 이전 리더가 프로듀서에게 알리고, 프로듀서는 오류를 처리하고 새 리더에게 메시지 전송을 다시 시도할 수 있습니다. 메시지는 모든 복제본에서 수신을 확인하기 전에 승인되기 때문에 파티션 리더십이 변경되면 승인되었지만 아직 완전히 복제되지 않은 메시지가 손실될 수 있습니다.
acks=all(신뢰성이 가장 높음)
파티션 리더가 레코드를 성공적으로 작성하고 동기화된 모든 복제본이 동일한 작업을 수행하면 프로듀서에게 이 메시지가 표시됩니다. 하나 이상의 동기화 복제본을 사용할 수 있는 경우 파티션 리더십이 변경되어도 메시지가 유실되지 않습니다.

생성자에게 메시지가 수신확인될 때까지 기다리지 않더라도 커미트된 메시지는 이용 가능하며 이는 동기화 복제본으로 복제되었음을 의미합니다. 즉, 생성자의 관점에서 메시지 전송 대기 시간은 메시지를 전송하는 생성자로부터 메시지를 수신하는 이용자까지 측정되는 엔드-투-엔드 대기 시간보다 짧습니다.

가능하면 다음 메시지를 게시하기 전에 메시지가 승인될 때까지 기다리지 마세요. 대기하면 생성자가 메시지를 함께 일괄처리할 수 없으며 메시지를 공개하는 비율이 네트워크의 라운드트립 대기 시간 아래로 줄어듭니다.

일괄처리, 조절 및 압축

효율성을 위해 제작자는 서버로 전송하기 위해 레코드 배치를 한꺼번에 수집합니다. 압축을 사용하는 경우, 생성자는 각 일괄처리를 압축하며 이로 인해 네트워크를 통해 전송해야 하는 데이터가 줄어들어 성능이 향상될 수 있습니다.

서버에 전송할 때보다 빨리 메시지를 공개하려는 경우 생성자가 메시지를 일괄처리된 요청으로 자동 버퍼링합니다. 생성자는 각 파티션에 대해 미전송 레코드의 버퍼를 유지합니다. 일괄 처리로도 원하는 속도를 달성할 수 없는 시점이 있습니다.

영향을 미치는 다른 요인도 있습니다. 개별 생성자 또는 이용자가 클러스터에 너무 많지 않도록 Event Streams가 처리량 할당을 적용합니다. 각 생성자가 데이터를 전송하는 비율이 계산되고 해당 할당량을 초과하려는 생성자는 조절됩니다. 생성자에 대한 응답 전송을 약간 지연하여 조절이 적용됩니다. 대개 이렇게 하면 자연스럽게 제어됩니다.

처리량 안내에 대한 자세한 정보는 한계 및 할당량 을 참조하십시오.

요약하면 메시지가 공개되면 해당 레코드가 먼저 생성자의 버퍼에 기록됩니다. 백그라운드에서 생성자는 레코드를 일괄처리하고 서버에 전송합니다. 그런 다음 서버는 생성자에게 응답하며, 생성자가 너무 빨리 공개하는 경우 조절 지연을 적용합니다. 프로듀서의 버퍼가 가득 차면 프로듀서의 전송 호출이 지연되지만 결국 예외를 제외하고는 실패할 수 있습니다.

전달 시맨틱

Kafka는 다음과 같은 다양한 다중 메시지 전달 시맨틱을 제공합니다.

  • 최대 한 번만: 메시지가 손실되어 다시 전달되지 않을 수 있습니다.
  • 적어도 한 번은: 메시지는 손실되지 않지만 중복될 수 있습니다.
  • 정확히 한 번입니다: 메시지는 절대 손실되지 않으며 중복되지 않습니다.

전달 시맨틱은 다음 설정으로 결정됩니다.

  • acks
  • retries
  • enable.idempotence

기본적으로 Kafka 시맨틱을 한 번 이상 사용합니다.

정확히 한 번만 의미론을 활성화하려면 무능력한 또는 트랜잭션 생산자를 사용해야 합니다. 멱등 생성자는 enable.idempotencetrue로 설정하여 사용으로 설정되며, 재시도한 경우에도 각 메시지의 정확한 1개 사본이 Kafka에 쓰여지도록 보장합니다. 트랜잭션 생성자는 다중 파티션으로 데이터의 전송을 가능하게 합니다(예: 모든 메시지가 전달 완료되거나 어떤 메시지도 전달되지 않음). 즉 트랜잭션이 모두 커미트되거나 모두 버려집니다. 트랜잭션에 오프셋을 포함시켜 Kafka 메시지를 읽고, 처리하고, 쓰는 애플리케이션을 구축할 수도 있습니다.

코드 스니펫

이러한 코드 스니펫은 관련된 개념을 설명하기 위해 상위 레벨에 있습니다. 전체 예제는 GitHub의 Event Streams 샘플을 참조하십시오.

이용자를 Event Streams에 연결하려면 서비스 인증 정보를 작성해야 합니다. 자세한 정보는 Event Streams에 연결을 참조하십시오.

생성자 코드에서 먼저 구성 특성 세트를 빌드해야 합니다. Event Streams 대한 모든 연결은 TLS와 사용자 및 비밀번호 인증을 사용하여 보호되므로 최소한 이러한 속성이 필요합니다. 부트스트랩_엔디포인트, 사용자, 비밀번호를 자신의 서비스 자격 증명에 있는 비밀번호로 바꿉니다:

Properties props = new Properties();
 props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
 props.put("security.protocol", "SASL_SSL");
 props.put("sasl.mechanism", "PLAIN");
 props.put("ssl.protocol", "TLSv1.2");
 props.put("ssl.enabled.protocols", "TLSv1.2");
 props.put("ssl.endpoint.identification.algorithm", "HTTPS");

메시지를 보내려면 키와 값에 대한 직렬화기(예: 직렬화기)도 지정해야 합니다:

 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

이러한 시리얼라이저는 이용자가 사용하는 디시리얼라이저와 일치해야 합니다.

그런 다음 KafkaProducer 사용하여 메시지를 전송하고, 각 메시지는 ProducerRecord 표현됩니다. 완료되면 KafkaProducer를 닫으십시오. 이 코드는 메시지를 전송하지만 전송이 성공했는지 여부를 확인하기 위해 기다리지는 않습니다. 메시지는 문자열 key 를 키로 하고 문자열 value 에 값을 지정하여 T1 토픽으로 송신됩니다.

 Producer<String, String> producer = new KafkaProducer<>(props);
 producer.send(new ProducerRecord<String, String>("T1", "key", "value"));
 producer.close();

send() 메소드는 비동기식이며 완료를 확인하는 데 사용할 수 있는 Future를 리턴합니다.

 Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>("T1", "key", "value"));
// Do some other stuff
// Now wait for the result of the send
 RecordMetadata rm = f.get();
 long offset = rm.offset;

또는 메시지를 보낼 때 콜백을 제공할 수도 있습니다:

producer.send(new ProducerRecord<String,String>("T1","key","value", new Callback() {
          public void onCompletion(RecordMetadata metadata, Exception exception) {
                     // This is called when the send completes, either successfully or with an exception
          }
});

자세한 정보는 Kafka 클라이언트의 Javadoc 을 참조하십시오.{: external}.