일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- nginx설치
- GCM 번역
- 카프카 트랜잭션
- notification
- Push
- kafka
- 웹사이트 성능
- 디자인패턴
- 성능
- git
- nginx설정
- JPA
- GCM
- 웹사이트최적화기법
- 푸시
- Design Pattern
- graphql
- 웹사이트성능
- nginx
- 푸시 번역
- 자바스크립트
- 도메인 주도 개발
- 카프카
- 페이스북 번역
- php
- ddd
- gcm 푸시 번역
- Java
- APNS
Archives
- Today
- Total
간단한 개발관련 내용
카프카 SMT란? 본문
반응형
SMT는 Single Message Transform의 약자로, Apache Kafka의 Kafka Connect 프레임워크에서 사용되는 단일 메시지 변환 기능입니다. SMT는 Kafka Connect가 데이터를 소스 커넥터를 통해 가져오거나 싱크 커넥터를 통해 내보낼 때, 각 개별 메시지에 대해 변환 또는 수정을 적용하는 간단한 처리 단계입니다.
SMT의 역할
Kafka Connect에서 SMT는 데이터 흐름의 중간에서 작동하며, 각 메시지에 대해 필터링, 변환, 필드 추가 또는 제거와 같은 작업을 수행할 수 있습니다.
SMT를 사용하는 목적
- 데이터 변환: 메시지의 포맷이나 필드를 변환합니다.
- 데이터 정제: 불필요한 필드를 제거하거나 새로운 필드를 추가합니다.
- 메타데이터 추가: 타임스탬프나 키 값 등 메타데이터를 삽입합니다.
- 필터링: 조건에 맞는 특정 메시지만 전달합니다.
- 간단한 데이터 수정: 메시지의 값이나 키를 변경합니다.
SMT 동작 위치
Kafka Connect에서 SMT는 커넥터와 Kafka 사이에 위치합니다. 즉:
- 소스 커넥터에서 SMT는 데이터를 Kafka 토픽으로 전송하기 전에 변환합니다.
- 싱크 커넥터에서 SMT는 Kafka 토픽에서 데이터를 읽은 후 변환을 적용합니다.
주요 SMT 종류
Kafka Connect는 내장된 SMT를 제공하며, 필요에 따라 커스텀 SMT를 구현할 수도 있습니다.
1. 기본 SMT
- ValueToKey: 메시지의 값을 키로 변환합니다.
- ExtractField: 특정 필드를 추출하여 키 또는 값으로 사용합니다.
- MaskField: 특정 필드의 데이터를 마스킹합니다 (예: 민감한 데이터).
- TimestampRouter: 메시지의 타임스탬프를 기반으로 라우팅합니다.
- RegexRouter: 정규식을 기반으로 토픽의 이름을 변경합니다.
- ReplaceField: 필드를 추가하거나 제거합니다.
- Flatten: 중첩된 구조를 평탄화합니다.
- InsertField: 필드를 추가합니다 (예: 타임스탬프, 토픽 이름 등).
2. 사용자 정의 SMT
- 필요한 기능이 기본 SMT에 없으면 커스텀 SMT를 개발할 수 있습니다.
- Java로 SMT를 구현하고 Kafka Connect 플러그인으로 추가합니다.
SMT 예제
예를 들어, ExtractField SMT를 사용해서 메시지에서 특정 필드를 추출하고 키로 설정하는 예제입니다.
설정 예제
{
"name": "mysql-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "user",
"database.password": "password",
"database.server.id": "1234",
"database.server.name": "mysql-db",
"table.include.list": "test_db.test_table",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"transforms": "ExtractKey",
"transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractKey.field": "id"
}
}
설명
- transforms: 적용할 SMT 이름을 지정합니다. (여기서는
ExtractKey
) - transforms.ExtractKey.type: 사용할 SMT 유형을 명시합니다.
- transforms.ExtractKey.field: 추출할 필드의 이름입니다.
결과적으로 메시지의 특정 필드 id
를 Kafka 키로 설정합니다.
SMT의 장점
- 간단함: 복잡한 코드 없이 설정 파일만으로 변환 작업이 가능합니다.
- 데이터 정제: 소스 시스템에서 들어오는 데이터나 싱크로 보내는 데이터를 정제할 수 있습니다.
- 재사용성: 여러 커넥터에서 동일한 변환을 사용할 수 있습니다.
커스텀 SMT 구현 예시
Java를 사용하여 커스텀 SMT를 구현할 수 있습니다.
커스텀 SMT 구현 코드
package com.example.kafka.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
public class UpperCaseValueTransform<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public R apply(R record) {
if (record.value() instanceof String) {
String newValue = ((String) record.value()).toUpperCase();
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(),
record.key(), record.valueSchema(), newValue, record.timestamp());
}
return record;
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {}
@Override
public void configure(java.util.Map<String, ?> configs) {}
}
설정 예시
"transforms": "UpperCaseValue",
"transforms.UpperCaseValue.type": "com.example.kafka.transforms.UpperCaseValueTransform"
결론
- SMT (Single Message Transform)는 Kafka Connect에서 단일 메시지를 변환하는 데 사용됩니다.
- 다양한 기본 SMT를 제공하며, 필요에 따라 커스텀 SMT를 구현할 수 있습니다.
- SMT를 사용하면 데이터 흐름 중간에서 필터링, 변환, 정제 등을 쉽게 수행할 수 있습니다.
SMT는 복잡한 코드 없이 데이터 변환을 손쉽게 구현할 수 있는 강력한 도구입니다.
반응형