Message Queue/Kafka
카프카 SMT란?
vincenzo.dev.82
2024. 12. 10. 16:05
반응형
SMT는 Single Message Transform의 약자로, Apache Kafka의 Kafka Connect 프레임워크에서 사용되는 단일 메시지 변환 기능입니다. SMT는 Kafka Connect가 데이터를 소스 커넥터를 통해 가져오거나 싱크 커넥터를 통해 내보낼 때, 각 개별 메시지에 대해 변환 또는 수정을 적용하는 간단한 처리 단계입니다.
SMT의 역할
Kafka Connect에서 SMT는 데이터 흐름의 중간에서 작동하며, 각 메시지에 대해 필터링, 변환, 필드 추가 또는 제거와 같은 작업을 수행할 수 있습니다.
SMT를 사용하는 목적
- 데이터 변환: 메시지의 포맷이나 필드를 변환합니다.
- 데이터 정제: 불필요한 필드를 제거하거나 새로운 필드를 추가합니다.
- 메타데이터 추가: 타임스탬프나 키 값 등 메타데이터를 삽입합니다.
- 필터링: 조건에 맞는 특정 메시지만 전달합니다.
- 간단한 데이터 수정: 메시지의 값이나 키를 변경합니다.
SMT 동작 위치
Kafka Connect에서 SMT는 커넥터와 Kafka 사이에 위치합니다. 즉:
- 소스 커넥터에서 SMT는 데이터를 Kafka 토픽으로 전송하기 전에 변환합니다.
- 싱크 커넥터에서 SMT는 Kafka 토픽에서 데이터를 읽은 후 변환을 적용합니다.
주요 SMT 종류
Kafka Connect는 내장된 SMT를 제공하며, 필요에 따라 커스텀 SMT를 구현할 수도 있습니다.
1. 기본 SMT
- ValueToKey: 메시지의 값을 키로 변환합니다.
- ExtractField: 특정 필드를 추출하여 키 또는 값으로 사용합니다.
- MaskField: 특정 필드의 데이터를 마스킹합니다 (예: 민감한 데이터).
- TimestampRouter: 메시지의 타임스탬프를 기반으로 라우팅합니다.
- RegexRouter: 정규식을 기반으로 토픽의 이름을 변경합니다.
- ReplaceField: 필드를 추가하거나 제거합니다.
- Flatten: 중첩된 구조를 평탄화합니다.
- InsertField: 필드를 추가합니다 (예: 타임스탬프, 토픽 이름 등).
2. 사용자 정의 SMT
- 필요한 기능이 기본 SMT에 없으면 커스텀 SMT를 개발할 수 있습니다.
- Java로 SMT를 구현하고 Kafka Connect 플러그인으로 추가합니다.
SMT 예제
예를 들어, ExtractField SMT를 사용해서 메시지에서 특정 필드를 추출하고 키로 설정하는 예제입니다.
설정 예제
{
"name": "mysql-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "user",
"database.password": "password",
"database.server.id": "1234",
"database.server.name": "mysql-db",
"table.include.list": "test_db.test_table",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"transforms": "ExtractKey",
"transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractKey.field": "id"
}
}
설명
- transforms: 적용할 SMT 이름을 지정합니다. (여기서는
ExtractKey
) - transforms.ExtractKey.type: 사용할 SMT 유형을 명시합니다.
- transforms.ExtractKey.field: 추출할 필드의 이름입니다.
결과적으로 메시지의 특정 필드 id
를 Kafka 키로 설정합니다.
SMT의 장점
- 간단함: 복잡한 코드 없이 설정 파일만으로 변환 작업이 가능합니다.
- 데이터 정제: 소스 시스템에서 들어오는 데이터나 싱크로 보내는 데이터를 정제할 수 있습니다.
- 재사용성: 여러 커넥터에서 동일한 변환을 사용할 수 있습니다.
커스텀 SMT 구현 예시
Java를 사용하여 커스텀 SMT를 구현할 수 있습니다.
커스텀 SMT 구현 코드
package com.example.kafka.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
public class UpperCaseValueTransform<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public R apply(R record) {
if (record.value() instanceof String) {
String newValue = ((String) record.value()).toUpperCase();
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(),
record.key(), record.valueSchema(), newValue, record.timestamp());
}
return record;
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {}
@Override
public void configure(java.util.Map<String, ?> configs) {}
}
설정 예시
"transforms": "UpperCaseValue",
"transforms.UpperCaseValue.type": "com.example.kafka.transforms.UpperCaseValueTransform"
결론
- SMT (Single Message Transform)는 Kafka Connect에서 단일 메시지를 변환하는 데 사용됩니다.
- 다양한 기본 SMT를 제공하며, 필요에 따라 커스텀 SMT를 구현할 수 있습니다.
- SMT를 사용하면 데이터 흐름 중간에서 필터링, 변환, 정제 등을 쉽게 수행할 수 있습니다.
SMT는 복잡한 코드 없이 데이터 변환을 손쉽게 구현할 수 있는 강력한 도구입니다.
반응형