반응형
Notice
Recent Posts
Recent Comments
관리 메뉴

간단한 개발관련 내용

카프카 SMT란? 본문

Message Queue/Kafka

카프카 SMT란?

vincenzo.dev.82 2024. 12. 10. 16:05
반응형

SMTSingle Message Transform의 약자로, Apache KafkaKafka Connect 프레임워크에서 사용되는 단일 메시지 변환 기능입니다. SMT는 Kafka Connect가 데이터를 소스 커넥터를 통해 가져오거나 싱크 커넥터를 통해 내보낼 때, 각 개별 메시지에 대해 변환 또는 수정을 적용하는 간단한 처리 단계입니다.


SMT의 역할

Kafka Connect에서 SMT는 데이터 흐름의 중간에서 작동하며, 각 메시지에 대해 필터링, 변환, 필드 추가 또는 제거와 같은 작업을 수행할 수 있습니다.

SMT를 사용하는 목적

  1. 데이터 변환: 메시지의 포맷이나 필드를 변환합니다.
  2. 데이터 정제: 불필요한 필드를 제거하거나 새로운 필드를 추가합니다.
  3. 메타데이터 추가: 타임스탬프나 키 값 등 메타데이터를 삽입합니다.
  4. 필터링: 조건에 맞는 특정 메시지만 전달합니다.
  5. 간단한 데이터 수정: 메시지의 값이나 키를 변경합니다.

SMT 동작 위치

Kafka Connect에서 SMT는 커넥터Kafka 사이에 위치합니다. 즉:

  • 소스 커넥터에서 SMT는 데이터를 Kafka 토픽으로 전송하기 전에 변환합니다.
  • 싱크 커넥터에서 SMT는 Kafka 토픽에서 데이터를 읽은 후 변환을 적용합니다.

주요 SMT 종류

Kafka Connect는 내장된 SMT를 제공하며, 필요에 따라 커스텀 SMT를 구현할 수도 있습니다.

1. 기본 SMT

  • ValueToKey: 메시지의 값을 키로 변환합니다.
  • ExtractField: 특정 필드를 추출하여 키 또는 값으로 사용합니다.
  • MaskField: 특정 필드의 데이터를 마스킹합니다 (예: 민감한 데이터).
  • TimestampRouter: 메시지의 타임스탬프를 기반으로 라우팅합니다.
  • RegexRouter: 정규식을 기반으로 토픽의 이름을 변경합니다.
  • ReplaceField: 필드를 추가하거나 제거합니다.
  • Flatten: 중첩된 구조를 평탄화합니다.
  • InsertField: 필드를 추가합니다 (예: 타임스탬프, 토픽 이름 등).

2. 사용자 정의 SMT

  • 필요한 기능이 기본 SMT에 없으면 커스텀 SMT를 개발할 수 있습니다.
  • Java로 SMT를 구현하고 Kafka Connect 플러그인으로 추가합니다.

SMT 예제

예를 들어, ExtractField SMT를 사용해서 메시지에서 특정 필드를 추출하고 키로 설정하는 예제입니다.

설정 예제

{
  "name": "mysql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "user",
    "database.password": "password",
    "database.server.id": "1234",
    "database.server.name": "mysql-db",
    "table.include.list": "test_db.test_table",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "transforms": "ExtractKey",
    "transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.ExtractKey.field": "id"
  }
}

설명

  1. transforms: 적용할 SMT 이름을 지정합니다. (여기서는 ExtractKey)
  2. transforms.ExtractKey.type: 사용할 SMT 유형을 명시합니다.
  3. transforms.ExtractKey.field: 추출할 필드의 이름입니다.

결과적으로 메시지의 특정 필드 idKafka 키로 설정합니다.


SMT의 장점

  • 간단함: 복잡한 코드 없이 설정 파일만으로 변환 작업이 가능합니다.
  • 데이터 정제: 소스 시스템에서 들어오는 데이터나 싱크로 보내는 데이터를 정제할 수 있습니다.
  • 재사용성: 여러 커넥터에서 동일한 변환을 사용할 수 있습니다.

커스텀 SMT 구현 예시

Java를 사용하여 커스텀 SMT를 구현할 수 있습니다.

커스텀 SMT 구현 코드

package com.example.kafka.transforms;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;

public class UpperCaseValueTransform<R extends ConnectRecord<R>> implements Transformation<R> {

    @Override
    public R apply(R record) {
        if (record.value() instanceof String) {
            String newValue = ((String) record.value()).toUpperCase();
            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(),
                    record.key(), record.valueSchema(), newValue, record.timestamp());
        }
        return record;
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef();
    }

    @Override
    public void close() {}

    @Override
    public void configure(java.util.Map<String, ?> configs) {}
}

설정 예시

"transforms": "UpperCaseValue",
"transforms.UpperCaseValue.type": "com.example.kafka.transforms.UpperCaseValueTransform"

 


결론

  • SMT (Single Message Transform)는 Kafka Connect에서 단일 메시지를 변환하는 데 사용됩니다.
  • 다양한 기본 SMT를 제공하며, 필요에 따라 커스텀 SMT를 구현할 수 있습니다.
  • SMT를 사용하면 데이터 흐름 중간에서 필터링, 변환, 정제 등을 쉽게 수행할 수 있습니다.

SMT는 복잡한 코드 없이 데이터 변환을 손쉽게 구현할 수 있는 강력한 도구입니다.

반응형