| 노드 카테고리 | V1 | V2 | 비고 |
|---|---|---|---|
| Source | O Kafka, JDBC 등 검증된 커넥터 전체 제공 |
O NHN Cloud/오브젝트 스토리지 커넥터 우선, Kafka·JDBC는 추후 제공 예정 |
각 노드 섹션의 지원 엔진 타입 표로 최종 지원 여부를 확인하세요. |
| Filter | O Alter, Grok, Mutate 등 기존 플러그인 전부 제공 |
O JSON/Date 공통 노드 + Coerce/Copy/Rename/Strip 등 V2 전용 노드 포함 |
동일 노드라도 파라미터/기본값이 다를 수 있으니 “엔진 타입별 파라미터” 표를 확인하세요. |
| Branch | O | O | |
| Sink | O Object Storage, S3, Kafka 등 전체 제공 |
O 대부분 공통 제공. Parquet 고급 옵션 등은 현재 V1 우선 |
속성 표의 지원 엔진 타입 열에서 세부 제한을 확인하세요. |
V1 전용 또는 우선 제공 노드/기능
- Source > (Apache) Kafka, Source > JDBC는 현재 V1에서만 실행되며 V2는 추후 지원 예정입니다.
- Filter > (Logstash) Grok, Filter > Mutate 등 일부 필터는 V1에서만 동작합니다.
- Sink > (NHN Cloud) Object Storage, (Amazon) S3의 Parquet 압축/포맷 고급 옵션은 V1 설정만 허용되며, V2는 추후 지원 예정입니다.
V2 전용 노드/추가 기능
- Filter > Coerce, Filter > Copy, Filter > Rename, Filter > Strip는 V2에서 제공되는 관리용 노드입니다.
- Filter > JSON의 덮어쓰기, 원본 필드 삭제와 같이 지원 엔진 타입: V2로 표시된 속성은 V2에서만 설정할 수 있습니다.
- 모니터링, 템플릿 섹션에 V2는 추후 제공으로 표기된 차트/옵션은 출시 후 자동으로 업데이트되며, 현재는 V1 실행 정보만 노출됩니다.
엔진 타입에 따라 호환 여부가 갈리는 노드는 항상 ### 지원 엔진 타입 표와 비고/주의 블록에 최신 상태가 기재됩니다. 새 플로우를 설계할 때는 위 요약으로 대략적인 범위를 파악한 뒤, 실제 사용할 노드 섹션에서 세부 지원 현황과 제한 사항을 다시 확인하세요.
V2는 리전 또는 프로젝트가 서로 다른 Object Storage이지만 버킷명은 동일한 경우, 하나의 플로우에서 함께 사용이 불가능합니다.
불가능한 연결 설정 예제
{{ executionTime }}{{ MINUTE }}{{ HOUR }}{{ DAY }}{{ MONTH }}{{ YEAR }}{{ time | startOf: unit }}unit으로 정의된 시간대의 시작 시간을 리턴합니다.{{ time | endOf: unit }}unit으로 정의된 시간대의 마지막 시간을 리턴합니다.{{ time | subTime: delta, unit }}unit으로 정의된 시간대의 delta만큼 뺀 시간을 리턴합니다.{{ time | addTime: delta, unit }}unit으로 정의된 시간대의 delta만큼 더한 시간을 리턴합니다.{{ time | format: formatStr }}formatStr 형태로 리턴합니다.TRUE 혹은 FALSE를 선택합니다.+ 버튼을 클릭하면 배열에 문자열이 삽입됩니다.["message" , "yyyy-MM-dd HH:mm:ssZ", "ISO8601"]를 입력하고자 하는 경우 message, yyyy-MM-dd HH:mm:ssZ, ISO8601의 순서로 배열에 문자열을 삽입합니다.| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 유형 | - | string | V1 | 각 메시지에 주어진 값으로 type 필드를 생성합니다. |
|
| 아이디 | - | string | V1, V2 | 노드의 아이디를 설정합니다. 이 속성에 정의된 값으로 차트보드에 노드 이름을 표기합니다. |
|
| 태그 | - | array of strings | V1 | 각 메시지에 주어진 값의 태그를 추가합니다. | |
| 필드 추가 | - | hash | V1 | 커스텀 필드를 추가할 수 있습니다.%{[depth1_field]}로 각 필드의 값을 가져와 필드를 추가할 수 있습니다. |
{
"my_custom_field": "%{[json_body][logType]}"
}
현재 세션 로그와 크래시 로그는 지원하지 않습니다.| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | O |
조회 시작 시간 이후의 데이터를 계속해서 처리합니다.조회 시작 시간, 조회 종료 시간 사이에 해당하는 데이터를 모두 처리하고 플로우를 종료합니다.| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| Appkey | - | string | V1, V2 | Log & Crash Search의 앱키를 입력합니다. | |
| SecretKey | - | string | V1, V2 | Log & Crash Search의 시크릿키를 입력합니다. | |
| 조회 시작 시간 | {{executionTime}} |
string | V1, V2 | 로그 조회의 시작 시간을 입력합니다. 오프셋이 포함된 ISO 8601 형식 또는 DSL 형식으로 입력해야 합니다. 예: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| 조회 종료 시간 | - | string | V1, V2 | 로그 조회의 종료 시간을 입력합니다. 오프셋이 포함된 ISO 8601 형식 또는 DSL 형식으로 입력해야 합니다. 예: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| 재시도 횟수 | - | number | V1 | 로그 조회가 실패했을 때 재시도할 최대 횟수를 입력합니다. | |
| 검색 쿼리 | * |
string | V1, V2 | Log & Crash Search 조회 요청 시 사용할 검색 쿼리를 입력합니다. 자세한 쿼리 작성 방법은 Log & Crash Search 서비스의 'Lucene 쿼리 가이드'를 참고하세요. |
지원 코덱: * json 코덱 - JSON 형식 데이터 파싱
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | O |
조회 시작 시간 이후의 데이터를 계속해서 처리합니다.조회 시작 시간, 조회 종료 시간 사이에 해당하는 데이터를 모두 처리하고 플로우를 종료합니다.| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| Appkey | - | string | V1, V2 | CloudTrail의 앱키를 입력합니다. | |
| User Access Key ID | - | string | V2 | 사용자 계정의 User Access Key ID를 입력합니다. | |
| Secret Access Key | - | string | V2 | 사용자 계정의 User Secret Key를 입력합니다. | |
| 조회 시작 시간 | {{executionTime}} |
string | V1, V2 | 데이터 조회의 시작 시간을 입력합니다. 오프셋이 포함된 ISO 8601 형식 또는 DSL 형식으로 입력해야 합니다. 예: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| 조회 종료 시간 | - | string | V1, V2 | 데이터 조회의 종료 시간을 입력합니다. 오프셋이 포함된 ISO 8601 형식 또는 DSL 형식으로 입력해야 합니다. 예: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| 재시도 횟수 | - | number | V1 | 데이터 조회가 실패했을 때 재시도할 최대 횟수를 입력합니다. | |
| 이벤트 타입 | * |
string | V2 | 조회할 이벤트 ID를 입력합니다. |
지원 코덱: * json 코덱 - JSON 형식 데이터 파싱
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | O | V2 엔진 타입에서 Object Storage 연결 시 주의점 참고 |
리스트 갱신 주기마다 오브젝트 리스트를 갱신하며, 새롭게 추가된 오브젝트들을 읽어 데이터를 처리합니다.| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 버킷 | - | string | V1, V2 | 데이터를 읽을 버킷 이름을 입력합니다. | |
| 리전 | - | string | V1, V2 | 저장소에 설정된 리전 정보를 입력합니다. | |
| 비밀 키 | - | string | V1, V2 | S3가 발급한 자격 증명 비밀 키를 입력합니다. | |
| 액세스 키 | - | string | V1, V2 | S3가 발급한 자격 증명 액세스 키를 입력합니다. | |
| 리스트 갱신 주기 | 60 |
number | V1, V2 | 버킷에 포함된 오브젝트 리스트 갱신 주기를 입력합니다. | |
| 메타데이터 포함 여부 | false |
boolean | V1 | S3 오브젝트의 메타데이터를 키로 포함할지 여부를 결정합니다. 메타데이터 필드를 Sink 플러그인에 노출하기 위해서는 filter 노드 유형을 조합해야 합니다(하단 가이드 참조). | 생성되는 필드는 다음과 같습니다. last_modified: 오브젝트가 마지막으로 수정된 시간 content_length: 오브젝트 크기 key: 오브젝트 이름 content_type: 오브젝트 형식 metadata: 메타데이터 etag: etag |
| Prefix | - | string | V1, V2 | 읽어 올 오브젝트의 접두사를 입력합니다. | |
| 제외할 키 패턴 | - | string | V1, V2 | 읽지 않을 오브젝트의 패턴을 입력합니다. | |
| 처리 완료 오브젝트 삭제 | false |
boolean | V1 | 속성값이 true일 경우 읽기 완료한 오브젝트를 삭제합니다. |
메타데이터 포함 여부 설정 활성화 시 메타데이터 필드가 생성되나, 별도로 일반 필드로 주입하는 작업을 거치지 않는다면 Sink 플러그인에서 노출하지 않습니다.{
// 일반 필드
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "오브젝트 내용..."
// 메타데이터 필드
// 사용자가 일반 필드로 주입하기 전까지 Sink 플러그인에 노출할 수 없음
// "[@metadata][s3][last_modified]": 2024-01-05T01:35:50.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][metadata]": {}
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
}
{
"last_modified": "%{[@metadata][s3][last_modified]}"
"content_length": "%{[@metadata][s3][content_length]}"
"key": "%{[@metadata][s3][key]}"
"content_type": "%{[@metadata][s3][content_type]}"
"metadata": "%{[@metadata][s3][metadata]}"
"etag": "%{[@metadata][s3][etag]}"
}
{
// 일반 필드
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "오브젝트 내용..."
"last_modified": 2024-01-05T01:35:50.000Z
"content_length": 220
"key": "{filename}"
"content_type": "text/plain"
"metadata": {}
"etag": "\"56ad65461e0abb907465bacf6e4f96cf\""
// 메타데이터 필드
// "[@metadata][s3][last_modified]": 2024-01-05T01:35:50.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][metadata]": {}
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
}
지원 코덱: * plain 코덱 - 원본 데이터 문자열 저장 * json 코덱 - JSON 형식 데이터 파싱 * line 코덱 - 행 단위 메시지 처리(엔진 V1만 지원)
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | O | V2 엔진 타입에서 Object Storage 연결 시 주의점 참고 |
리스트 갱신 주기마다 오브젝트 리스트를 갱신하며, 새롭게 추가된 오브젝트들을 읽어 데이터를 처리합니다.| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 엔드포인트 | - | string | V1, V2 | S3 저장소 엔드포인트를 입력합니다. | HTTP, HTTPS URL 형태만 입력 가능합니다. |
| 버킷 | - | string | V1, V2 | 데이터를 읽을 버킷 이름을 입력합니다. | |
| 리전 | * V1: us-east-1* V2: - |
string | V1, V2 | 저장소에 설정된 리전 정보를 입력합니다. | |
| 세션 토큰 | - | string | V1 | AWS 세션 토큰을 입력합니다. | |
| 비밀 키 | - | string | V1, V2 | S3가 발급한 자격 증명 비밀 키를 입력합니다. | |
| 액세스 키 | - | string | V1, V2 | S3가 발급한 자격 증명 액세스 키를 입력합니다. | |
| 리스트 갱신 주기 | 60 |
number | V1, V2 | 버킷에 포함된 오브젝트 리스트 갱신 주기를 입력합니다. | |
| 메타데이터 포함 여부 | false |
boolean | V1 | S3 오브젝트의 메타데이터를 키로 포함할지 여부를 결정합니다. 메타데이터 필드를 Sink 플러그인에 노출하기 위해서는 filter 노드 유형을 조합해야 합니다(하단 가이드 참조). | 생성되는 필드는 다음과 같습니다. server_side_encryption: 서버 측 암호화 알고리즘 last_modified: 오브젝트가 마지막으로 수정된 시간 content_length: 오브젝트 크기 key: 오브젝트 이름 content_type: 오브젝트 형식 metadata: 메타데이터 etag: etag |
| Prefix | * V1: nil * V2: - |
string | V1, V2 | 읽어 올 오브젝트의 접두사를 입력합니다. | |
| 제외할 키 패턴 | * V1: nil * V2: - |
string | V1, V2 | 읽지 않을 오브젝트의 패턴을 입력합니다. | |
| 처리 완료 오브젝트 삭제 | false |
boolean | V1 | 속성값이 true일 경우 읽기 완료한 오브젝트를 삭제합니다. | |
| 추가 설정 | - | hash | V1 | S3 서버와 연결할 때 사용할 추가적인 설정을 입력합니다. | 가이드 |
| 경로 방식 요청 | false |
boolean | V2 | 경로 방식 요청을 사용할지 여부를 결정합니다. |
주의
{"force_path_style" : true}true로 설정메타데이터 포함 여부 설정 활성화 시 메타데이터 필드가 생성되나, 별도로 일반 필드로 주입하는 작업을 거치지 않는다면 Sink 플러그인에서 노출하지 않습니다.{
// 일반 필드
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "오브젝트 내용..."
// 메타데이터 필드
// 사용자가 일반 필드로 주입하기 전까지 Sink 플러그인에 노출할 수 없음
// "[@metadata][s3][server_side_encryption]": "AES256"
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][last_modified]": 2024-01-05T02:27:26.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][metadata]": {}
}
{
"server_side_encryption": "%{[@metadata][s3][server_side_encryption]}"
"etag": "%{[@metadata][s3][etag]}"
"content_type": "%{[@metadata][s3][content_type]}"
"key": "%{[@metadata][s3][key]}"
"last_modified": "%{[@metadata][s3][last_modified]}"
"content_length": "%{[@metadata][s3][content_length]}"
"metadata": "%{[@metadata][s3][metadata]}"
}
{
// 일반 필드
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "오브젝트 내용..."
"server_side_encryption": "AES256"
"etag": "\"56ad65461e0abb907465bacf6e4f96cf\""
"content_type": "text/plain"
"key": "{filename}"
"last_modified": 2024-01-05T01:35:50.000Z
"content_length": 220
"metadata": {}
// 메타데이터 필드
// "[@metadata][s3][server_side_encryption]": "AES256"
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][last_modified]": 2024-01-05T02:27:26.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][metadata]": {}
}
지원 코덱: * plain 코덱 - 원본 데이터 문자열 저장 * json 코덱 - JSON 형식 데이터 파싱 * line 코덱 - 행 단위 메시지 처리(엔진 V1만 지원)
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | - | 추후 지원 예정 |
주의
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 브로커 서버 목록 | localhost:9092 |
string | V1 | Kafka 브로커 서버를 입력합니다. 서버가 여러 대일 경우 콤마(,)로 구분합니다. |
Kafka 공식 문서의 bootstrap.servers 속성 참고 예: 10.100.1.1:9092,10.100.1.2:9092 |
| 컨슈머 그룹 아이디 | dataflow |
string | V1 | Kafka Consumer Group을 식별하는 ID를 입력합니다. | Kafka 공식 문서의 group.id 속성 참고 |
| 토픽 목록 | dataflow |
array of strings | V1 | 메시지를 수신할 Kafka 토픽 목록을 입력합니다. | |
| 토픽 패턴 | - | string | V1 | 메시지를 수신할 Kafka 토픽 패턴을 입력합니다. | 예: *-messages |
| 내부 토픽 제외 여부 | true |
boolean | V1 | __consumer_offsets와 같은 내부 토픽을 제외합니다. | Kafka 공식 문서의 exclude.internal.topics 속성 참고 수신 대상에서 __consumer_offsets와 같은 내부 토픽을 제외합니다. |
| 클라이언트 아이디 | dataflow |
string | V1 | Kafka Consumer를 식별하는 ID를 입력합니다. | Kafka 공식 문서의 client.id 속성 참고 |
| 파티션 할당 정책 | - | string | V1 | Kafka에서 메시지 수신 시 컨슈머 그룹에 어떻게 파티션을 할당할지 결정합니다. | Kafka 공식 문서의 partition.assignment.strategy 속성 참고 org.apache.kafka.clients.consumer.RangeAssignor org.apache.kafka.clients.consumer.RoundRobinAssignor org.apache.kafka.clients.consumer.StickyAssignor org.apache.kafka.clients.consumer.CooperativeStickyAssignor |
| 오프셋 설정 | latest |
enum | V1 | 컨슈머 그룹의 오프셋을 설정하는 기준을 입력합니다. | Kafka 공식 문서의 auto.offset.reset 속성 참고 아래 설정 모두 컨슈머 그룹이 이미 존재하는 경우 기존 오프셋을 유지합니다. none: 컨슈머 그룹이 없으면 오류를 반환합니다. earliest: 컨슈머 그룹이 없으면 파티션의 가장 오래된 오프셋으로 초기화합니다. latest: 컨슈머 그룹이 없으면 파티션의 가장 최근 오프셋으로 초기화합니다. |
| 오프셋 커밋 주기 | 5000 |
number | V1 | 컨슈머 그룹의 오프셋을 갱신할 주기를 입력합니다. | Kafka 공식 문서의 auto.commit.internal.ms 속성 참고 |
| 오프셋 자동 커밋 여부 | true |
boolean | V1 | 컨슈머 오프셋을 자동으로 갱신할지를 결정합니다. | Kafka 공식 문서의 enable.auto.commit 속성 참고 |
| 키 역직렬화 유형 | org.apache.kafka.common.serialization.StringDeserializer |
string | V1 | 수신하는 메시지의 키를 직렬화할 방법을 입력합니다. | Kafka 공식 문서의 key.deserializer 속성 참고 |
| 메시지 역직렬화 유형 | org.apache.kafka.common.serialization.StringDeserializer |
string | V1 | 수신하는 메시지의 값을 직렬화할 방법을 입력합니다. | Kafka 공식 문서의 value.deserializer 속성 참고 |
| 메타데이터 생성 여부 | false |
boolean | V1 | 속성값이 true일 경우 메시지에 대한 메타데이터 필드를 생성합니다. 메타데이터 필드를 Sink 플러그인에 노출하기 위해서는 filter 노드 유형을 조합해야 합니다(하단 가이드 참조). | 생성되는 필드는 다음과 같습니다. topic: 메시지를 수신한 토픽 consumer_group: 메시지를 수신하는 데 사용한 컨슈머 그룹 아이디 partition: 메시지를 수신한 토픽의 파티션 번호 offset: 메시지를 수신한 파티션의 오프셋 key: 메시지 키를 포함하는 ByteBuffer |
| Fetch 최소 크기 | - | number | V1 | 한 번의 fetch 요청으로 가져올 데이터의 최소 크기를 입력합니다. | Kafka 공식 문서의 fetch.min.bytes 속성 참고 |
| 전송 버퍼 크기 | - | number | V1 | 데이터를 전송하는 데 사용하는 TCP send 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 send.buffer.bytes 속성 참고 |
| 재시도 요청 주기 | - | number | V1 | 전송 요청이 실패했을 때 재시도할 주기(ms)를 입력합니다. | Kafka 공식 문서의 retry.backoff.ms 속성 참고 |
| 순환 중복 검사 | true |
boolean | V1 | 메시지의 CRC를 검사합니다. | Kafka 공식 문서의 check.crcs 속성 참고 |
| 서버 재연결 주기 | - | number | V1 | 브로커 서버에 연결이 실패했을 때 재시도할 주기를 입력합니다. | Kafka 공식 문서의 reconnect.backoff.ms 속성 참고 |
| Poll 타임아웃 | 100 |
number | V1 | 토픽에서 새로운 메시지를 가져오는 요청에 대한 타임아웃(ms)을 입력합니다. | |
| 파티션당 Fetch 최대 크기 | - | number | V1 | 파티션당 한 번의 fetch 요청으로 가져올 최대 크기를 입력합니다. | Kafka 공식 문서의 max.partition.fetch.bytes 속성 참고 |
| 서버 요청 타임아웃 | - | number | V1 | 전송 요청에 대한 타임아웃(ms)을 입력합니다. | Kafka 공식 문서의 request.timeout.ms 속성 참고 |
| TCP 수신 버퍼 크기 | - | number | V1 | 데이터를 읽는 데 사용하는 TCP receive 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 receive.buffer.bytes 속성 참고 |
| 세션 타임아웃 | - | number | V1 | 컨슈머의 세션 타임아웃(ms)을 입력합니다. 컨슈머가 해당 시간 안에 heartbeat를 보내지 못할 경우 컨슈머 그룹에서 제외합니다. |
Kafka 공식 문서의 session.timeout.ms 속성 참고 |
| 최대 poll 메시지 개수 | - | number | V1 | 한 번의 poll 요청으로 가져올 최대 메시지 개수를 입력합니다. | Kafka 공식 문서의 max.poll.records 속성 참고 |
| 최대 poll 주기 | - | number | V1 | poll 요청 간 최대 주기(ms)를 입력합니다. | Kafka 공식 문서의 max.poll.interval.ms 속성 참고 |
| Fetch 최대 크기 | - | number | V1 | 한 번의 fetch 요청으로 가져올 최대 크기를 입력합니다. | Kafka 공식 문서의 fetch.max.bytes 속성 참고 |
| Fetch 최대 대기 시간 | - | number | V1 | Fetch 최소 크기 설정 만큼의 데이터가 모이지 않은 경우 fetch 요청을 보낼 대기 시간(ms)을 입력합니다. |
Kafka 공식 문서의 fetch.max.wait.ms 속성 참고 |
| 컨슈머 헬스체크 주기 | - | number | V1 | 컨슈머가 heartbeat를 보내는 주기(ms)를 입력합니다. | Kafka 공식 문서의 heartbeat.interval.ms 속성 참고 |
| 메타데이터 갱신 주기 | - | number | V1 | 파티션, 브로커 서버 상태 등을 갱신할 주기(ms)를 입력합니다. | Kafka 공식 문서의 metadata.max.age.ms 속성 참고 |
| IDLE 타임아웃 | - | number | V1 | 데이터 전송이 없는 커넥션을 닫을 대기 시간(ms)을 입력합니다. | Kafka 공식 문서의 connections.max.idle.ms 속성 참고 |
메타데이터 생성 여부 설정 활성화 시 메타데이터 필드가 생성되나, 별도로 일반 필드로 주입하는 작업을 거치지 않는다면 Sink 플러그인에서 노출하지 않습니다.{
// 일반 필드
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "kafka 토픽 메시지..."
// 메타데이터 필드
// 사용자가 일반 필드로 주입하기 전까지 Sink 플러그인에 노출할 수 없음
// "[@metadata][kafka][topic]": "my-topic"
// "[@metadata][kafka][consumer_group]": "my_consumer_group"
// "[@metadata][kafka][partition]": "1"
// "[@metadata][kafka][offset]": "123"
// "[@metadata][kafka][key]": "my_key"
// "[@metadata][kafka][timestamp]": "-1"
}
{
"kafka_topic": "%{[@metadata][kafka][topic]}"
"kafka_consumer_group": "%{[@metadata][kafka][consumer_group]}"
"kafka_partition": "%{[@metadata][kafka][partition]}"
"kafka_offset": "%{[@metadata][kafka][offset]}"
"kafka_key": "%{[@metadata][kafka][key]}"
"kafka_timestamp": "%{[@metadata][kafka][timestamp]}"
}
{
// 일반 필드
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "kafka 토픽 메시지..."
"kafka_topic": "my-topic"
"kafka_consumer_group": "my_consumer_group"
"kafka_partition": "1"
"kafka_offset": "123"
"kafka_key": "my_key"
"kafka_timestamp": "-1"
// 메타데이터 필드
// "[@metadata][kafka][topic]": "my-topic"
// "[@metadata][kafka][consumer_group]": "my_consumer_group"
// "[@metadata][kafka][partition]": "1"
// "[@metadata][kafka][offset]": "123"
// "[@metadata][kafka][key]": "my_key"
// "[@metadata][kafka][timestamp]": "-1"
}
지원 코덱: * plain 코덱 - 원본 데이터 문자열 저장 * json 코덱 - JSON 형식 데이터 파싱 * line 코덱 - 행 단위 메시지 처리
쿼리 실행 주기마다 쿼리를 실행하여 데이터를 처리합니다.| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | - | 추후 지원 예정 |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 사용자 | - | string | V1 | DB 사용자를 입력합니다. | |
| 연결 문자열 | - | string | V1 | DB 연결 정보를 입력합니다. | 예: jdbc:mysql://my.sql.endpoint:3306/my_db_name |
| 비밀번호 | - | string | V1 | 사용자 비밀번호를 입력합니다. | |
| 쿼리 | - | string | V1 | 메시지를 생성할 쿼리를 작성합니다. | |
| 컬럼 소문자화 변환 여부 | false |
boolean | V1 | 쿼리 결과로 얻는 컬럼명을 소문자화할지를 결정합니다. | |
| 쿼리 실행 주기 | * * * * * |
string | V1 | 쿼리의 실행 주기를 cron-like 표현으로 입력합니다. | |
| 트래킹 컬럼 | - | string | V1 | 추적할 컬럼을 선택합니다. | 사전 정의된 파라미터 :sql_last_value로 마지막 쿼리 결과에서 추적할 컬럼에 해당하는 값을 사용할 수 있습니다.아래 쿼리 작성법을 참고하세요. |
| 트래킹 컬럼 종류 | numeric |
string | V1 | 추적할 컬럼의 데이터 종류를 선택합니다. | 예: numeric or timestamp |
| 시간대 | - | string | V1 | timestamp 타입의 컬럼을 human-readable 문자열로 변환할 때 사용하는 시간대를 정의합니다. | 예: Asia/Seoul |
| 페이징 적용 여부 | false |
boolean | V1 | 쿼리에 페이징을 적용할지 여부를 결정합니다. | 페이징이 적용되면 쿼리가 여러 개로 쪼개져서 실행되며, 순서는 보장되지 않습니다. |
| 페이지 크기 | - | number | V1 | 페이징이 적용된 쿼리에서, 한 번에 쿼리 할 페이지 크기를 결정합니다. |
:sql_last_value 를 통해 가장 마지막에 실행된 쿼리 결과에서 트래킹 컬럼에 해당하는 값을 사용할 수 있습니다(초깃값은 트래킹 컬럼 종류가 numeric이라면 0, timestamp라면 1970-01-01 00:00:00).SELECT * FROM MY_TABLE WHERE id > :sql_last_value
:sql_last_value를 추가합니다.SELECT * FROM MY_TABLE WHERE id > :sql_last_value and id > custom_value order by id ASC
지원 코덱: * plain 코덱 - 원본 데이터 문자열 저장
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 아이디 | - | string | V1, V2 | 노드의 아이디를 설정합니다. 이 속성에 정의된 값으로 차트보드에 노드 이름을 표기합니다. |
|
| 태그 추가 | - | array of strings | V1 | 각 메시지에 태그를 추가합니다. | |
| 태그 삭제 | - | array of strings | V1 | 각 메시지에 주어진 태그를 삭제합니다. | |
| 필드 삭제 | - | array of strings | V1 | 각 메시지의 필드를 삭제합니다. | |
| 필드 추가 | - | hash | V1 | 커스텀 필드를 추가할 수 있습니다.%{[depth1_field]}로 각 필드의 값을 가져와 필드를 추가할 수 있습니다. |
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | X |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 필드 변경 | - | array of strings | V1 | 필드 값을 주어진 값과 비교하여 같을 경우 해당 필드의 값을 주어진 값으로 수정합니다. | |
| 필드 덮어쓰기 | - | array of strings | V1 | 필드 값을 주어진 값과 비교하여 같을 경우 다른 필드의 값을 주어진 값으로 수정합니다. | |
| Coalesce | - | array of strings | V1 | 하나의 필드에 뒤이어 오는 필드 중 처음으로 null이 아닌 값을 할당합니다. |
["logType", "ERROR", "FAIL"]["logType", "FAIL", "isBillingTarget", "false"]{
"logType": "ERROR",
"isBillingTarget": "true"
}
{
"logType": "FAIL",
"isBillingTarget": "false"
}
["logType", "ERROR", "isBillingTarget", "false"]{
"logType": "ERROR",
"isBillingTarget": "true"
}
{
"logType": "ERROR",
"isBillingTarget": "false"
}
["reason", "CONNECTION_TIMEOUT", "MONGODB_CONNECTION_TIMEOUT"]{
"reason": "CONNECTION_TIMEOUT"
}
{
"reason": "MONGODB_CONNECTION_TIMEOUT"
}
["reason", "%{webClientReason}", "%{mongoReason}", "%{redisReason}"]{
"mongoReason": "COLLECTION_NOT_FOUND"
}
{
"reason": "COLLECTION_NOT_FOUND",
"mongoReason": "COLLECTION_NOT_FOUND"
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | - | 추후 지원 예정 |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 모드 | - | enum | V1 | 암호화 모드와 복호화 모드 중 선택합니다. | 목록 중에 하나를 선택합니다. |
| 앱키 | - | string | V1 | 암/복호화에 사용할 키를 저장한 SKM 앱키를 입력합니다. | |
| 키 ID | - | string | V1 | 암/복호화에 사용할 키를 저장한 SKM 키 ID를 입력합니다. | |
| 키 버전 | - | string | V1 | 암/복호화에 사용할 키를 저장한 SKM 키 버전을 입력합니다. | |
| 소스 필드 | message |
string | V1 | 암/복호화할 필드명을 입력합니다. | |
| 저장할 필드 | message |
string | V1 | 암/복호화 결과를 저장할 필드명을 입력합니다. |
encryptSKM 앱키SKM 대칭키 ID116{
"message": "this is plain message"
}
{
"message": "this is plain message",
"encrypted_message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
decryptSKM 앱키SKM 대칭키 ID116{
"message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
{
"decrypted_message": "this is plain message",
"message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | - | 추후 지원 예정 |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| Match | - | hash | V1 | 파싱할 문자열의 정보를 입력합니다. | |
| 패턴 정의 | - | hash | V1 | 파싱할 토큰의 규칙의 사용자 정의 패턴을 정규표현식으로 입력합니다. | 시스템 정의 패턴은 아래 링크를 확인하세요. https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/grok-patterns |
| 실패 태그 | _grokparsefailure |
array of strings | V1 | 문자열 파싱에 실패할 경우 정의할 태그명을 입력합니다. | |
| 타임아웃 | 30000 |
number | V1 | 문자열 파싱이 될 때까지 기다리는 시간을 입력합니다. | |
| 타임아웃 태그 | _groktimeout |
string | V1 | 타임아웃 시 설정할 태그를 입력합니다. | |
| 덮어쓰기 | - | array of strings | V1 | 파싱 후 지정된 필드에 값을 쓸 때 해당 필드에 이미 값이 정의되어 있을 경우 덮어쓸 필드명들을 입력합니다. | |
| 이름이 지정된 값만 저장 | true |
boolean | V1 | 속성값이 true일 경우 이름이 지정되지 않은 파싱 결과를 저장하지 않습니다. | |
| 빈 문자열 캡처 | false |
boolean | V1 | 속성값이 true일 경우 빈 문자열도 필드에 저장합니다. | |
| Match 시 종료 여부 | true |
boolean | V1 | 속성값이 true일 경우 grok match 결과가 참이면 플러그인을 종료합니다. |
{ "message": "%{IP:clientip} %{HYPHEN} %{USER} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response} %{NUMBER:bytes}" }{ "HYPHEN": "-*" }{
"message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \\\"GET /apache_pb.gif HTTP/1.0\\\" 200 2326"
}
{
"message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \\\"GET /apache_pb.gif HTTP/1.0\\\" 200 2326",
"timestamp": "10/Oct/2000:13:55:36 -0700",
"clientip": "127.0.0.1",
"verb": "GET",
"httpversion": "1.0",
"response": "200",
"bytes": "2326",
"request": "/apache_pb.gif"
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | O |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 저장할 필드 | - | string | V1, V2 | CSV 파싱 결과를 저장할 필드명를 입력합니다. | |
| Quote | " |
string | V1, V2 | 컬럼 필드를 나누는 문자를 입력합니다. | |
| 첫 행 무시 여부 | false |
boolean | V1, V2 | 속성값이 true일 경우 읽은 데이터 중 첫 행에 입력된 컬럼 이름을 무시합니다. | |
| 컬럼 | - | array of strings | V1 | 컬럼 이름을 입력합니다. | |
| 구분자 | , | string | V1, V2 | 컬럼을 구분할 문자열을 입력합니다. | |
| 소스 필드 | * V1:message* V2: - |
string | V1, V2 | CSV 파싱할 필드명을 입력합니다. | |
| 스키마 | - | hash | V1, V2 | 각 컬럼의 이름과 자료형을 dictionary 형태로 입력합니다. | 엔진 타입에 따른 스키마 입력 방법 참고 |
| 덮어쓰기 | false |
boolean | V2 | true일 경우 CSV 파싱 결과가 저장할 필드나 기존 필드와 겹치면 덮어씌웁니다. | |
| 원본 필드 삭제 | false |
boolean | V2 | CSV 파싱이 완료되면 소스 필드를 삭제합니다. 파싱이 실패한다면 유지합니다. |
message["one", "two", "t hree"]{"one": "string", "two": "string", "t hree": "string"}{
"message": "hey,foo,\\\"bar baz\\\""
}
{
"message": "hey,foo,\"bar baz\"",
"one": "hey",
"t hree": "bar baz",
"two": "foo"
}
message["one", "two", "t hree"]{"two": "integer", "t hree": "boolean"}{"one": "string", "two": "integer", "t hree": "boolean"}{
"message": "\\\"wow hello world!\\\", 2, false"
}
{
"message": "\\\"wow hello world!\\\", 2, false",
"one": "wow hello world!",
"t hree": false,
"two": 2
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | O |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 소스 필드 | * V1: message* V2: - |
string | V1, V2 | JSON 문자열을 파싱할 필드명을 입력합니다. | |
| 저장할 필드 | - | string | V1, V2 | JSON 파싱 결과를 저장할 필드명을 입력합니다. 만약 속성값을 지정하지 않을 경우 root 필드에 결과를 저장합니다. |
|
| 덮어쓰기 | false |
boolean | V2 | true일 경우 JSON 파싱 결과가 저장할 필드나 기존 필드와 겹치면 덮어씌웁니다. | |
| 원본 필드 삭제 | false |
boolean | V2 | JSON 파싱이 완료되면 소스 필드를 삭제합니다. 파싱이 실패한다면 유지합니다. |
messagejson_parsed_messsage{
"message": "{\\\"json\\\": \\\"parse\\\", \\\"example\\\": \\\"string\\\"}"
}
{
"json_parsed_message": {
"json": "parse",
"example": "string"
},
"message": "{\\\"json\\\": \\\"parse\\\", \\\"example\\\": \\\"string\\\"}"
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | O |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 소스 필드 | - | string | V1, V2 | 문자열을 가져오기 위한 필드명을 입력합니다. | |
| 형식 | - | array of strings | V1, V2 | 문자열을 가져오기 위한 형식을 입력합니다. | 사전 정의된 형식은 다음과 같습니다. ISO8601, UNIX, UNIX_MS, TAI64N(V2 미지원) |
| Locale | * V1: - * V2: ko_KR |
string | V1, V2 | Date 문자열 분석을 위해 사용할 Locale을 입력합니다. | 예: en, en-US, ko_KR |
| 저장할 필드 | * V1: @timestamp* V2: - |
string | V1, V2 | Date 문자열 파싱 결과를 저장할 필드명을 입력합니다. | |
| 실패 태그 | _dateparsefailure |
array of strings | V1 | Date 문자열 파싱에 실패했을 경우 정의할 태그명을 입력합니다. | |
| 시간대 | * V1: - * V2: Asia/Seoul |
string | V1, V2 | 날짜의 시간대를 입력합니다. | 예: Asia/Seoul |
message["yyyy-MM-dd HH:mm:ssZ", "ISO8601"]timeAsia/Seoul{
"message": "2017-03-16T17:40:00"
}
{
"message": "2017-03-16T17:40:00",
"time": 2022-04-04T09:08:01.222Z
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | O |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| UUID 저장 필드 | - | string | V1, V2 | UUID 생성 결과값을 저장할 필드명을 입력합니다. | |
| 덮어쓰기 | false |
boolean | V1, V2 | 지정된 필드명에 값이 존재할 경우 이를 덮어쓸지 여부를 선택합니다. |
userId{
"message": "uuid test message"
}
{
"userId": "70186b1e-bdec-43d6-8086-ed0481b59370",
"message": "uuid test message"
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | X | |
| V2 | O |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 대상 필드 | - | string | V2 | 데이터 타입을 변환할 대상 필드를 입력합니다. | |
| 변환 타입 | - | enum | V2 | 변환할 데이터 타입을 선택합니다. * 제공 타입: STRING, INTEGER, FLOAT, DOUBLE, BOOLEAN |
messageINTEGER{
"message": "2025"
}
{
"message": 2025
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | X |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 소스 필드 | message |
string | V1 | 메시지를 분리할 필드명을 입력합니다. | |
| 저장할 필드 | - | string | V1 | 분리된 메시지를 저장할 필드명을 입력합니다. | |
| 구분자 | \n |
string | V1 | 분할할 문자열의 구분자를 입력합니다. 만약 array 객체라면 이 설정은 무시됩니다. |
message{
"message": [
{"number": 1},
{"number": 2}
]
}
{
"message": [
{"number": 1},
{"number": 2}
],
"number": 1
}
{
"message": [
{"number": 1},
{"number": 2}
],
"number": 2
}
message,{
"message": "1,2"
}
{
"message": "1"
}
{
"message": "2"
}
messagetarget,{
"message": "1,2"
}
{
"message": "1,2",
"target": "1"
}
{
"message": "1,2",
"target": "2"
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | X |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| Byte 길이 | - | number | V1 | 문자열을 표현할 최대 byte 길이를 입력합니다. | |
| 소스 필드 | - | string | V1 | truncate 대상 필드명을 입력합니다. |
message{
"message": "이 메시지는 너무 깁니다."
}
{
"message": "이 메세"
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | X |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 기본값 설정 | - | hash | V1 | null 값을 기본값으로 대체합니다. | |
| 필드명 변경 | - | hash | V1 | 필드 이름을 변경합니다. | |
| 필드값 갱신 | - | hash | V1 | 필드 값을 새 값으로 교체합니다. 필드가 없다면 아무런 동작도 하지 않습니다. | |
| 값 대체 | - | hash | V1 | 필드 값을 새 값으로 교체. 필드가 없다면 새로 생성합니다. | |
| 타입 변환 | - | hash | V1 | 필드 값을 다른 타입으로 변환합니다. | 지원하는 타입은 integer, interger_eu, float, float_eu, string, boolean입니다. |
| 문자열 치환 | - | array of strings | V1 | 정규식으로 문자열 일부를 교체합니다. | |
| 대문자 변환 | - | array of strings | V1 | 필드의 문자열을 대문자로 변경합니다. | |
| 첫 글자 대문자화 | - | array of strings | V1 | 필드의 첫 글자를 대문자로 변환하고 나머지는 소문자로 변환합니다. | |
| 소문자 변환 | - | array of strings | V1 | 대상 필드의 문자열을 소문자로 변경합니다. | |
| 공백 제거 | - | array of strings | V1 | 필드의 문자열 앞뒤 공백을 제거합니다. | |
| 문자열 분할 | - | hash | V1 | 구분자를 이용해 문자열을 배열로 분할합니다. | |
| 배열 결합 | - | hash | V1 | 구분자를 이용하여 배열의 요소를 하나의 문자열로 합칩니다. | |
| 필드 병합 | - | hash | V1 | 두 필드를 합칩니다. | |
| 필드 복사 | - | hash | V1 | 기존 필드를 다른 필드로 복사합니다. 필드가 존재한다면 덮어씁니다. | |
| 실패 태그 | _mutate_error |
string | V1 | 오류가 발생한 경우 정의할 태그를 입력합니다. |
{"fieldname": "new value"}["fieldname"]{
"fieldname": "old value"
}
{
"fieldname": "NEW VALUE"
}
{
"fieldname": "default_value"
}
{
"fieldname": null
}
{
"fieldname": "default_value"
}
{
"fieldname": "changed_fieldname"
}
{
"fieldname": "Hello World!"
}
{
"changed_fieldname": "Hello World!"
}
{
"fieldname": "%{other_fieldname}: %{fieldname}",
"not_exist_fieldname": "DataFlow"
}
{
"fieldname": "Hello World!",
"other_fieldname": "DataFlow"
}
{
"fieldname": "DataFlow: Hello World!",
"other_fieldname": "DataFlow"
}
{
"fieldname": "%{other_fieldname}: %{fieldname}",
"not_exist_fieldname": "DataFlow"
}
{
"fieldname": "Hello World!",
"other_fieldname": "DataFlow"
}
{
"fieldname": "DataFlow: Hello World!",
"other_fieldname": "DataFlow",
"not_exist_fieldname": "DataFlow"
}
{
"message1": "integer",
"message2": "boolean"
}
{
"message1": "1000",
"message2": "true"
}
{
"message1": 1000,
"message2": true
}
1000true는 1, false는 0으로 변환됩니다.10001000.5true는 1.0, false는 0.0으로 변환됩니다.1000.51은 true, 0은 false로 변환됩니다.1.0은 true, 0.0은 false로 변환됩니다."true", "t", "yes", "y", "1", "1.0"은 true, "false", "f", "no", "n", "0", "0.0"은 false로 변환됩니다. 빈 문자열은 false로 변환됩니다.["fieldname", "/", "_", "fieldname2", "[\\?#-]", "."]
fieldname 필드의 문자열 값에서 /를 _로, fieldname2 필드의 문자열 값에서 \, ?, #, -를 .으로 치환합니다.{
"fieldname": "Hello/World",
"fieldname2": "Hello\\?World#Test-123"
}
{
"fieldname": "Hello_World",
"fieldname2": "Hello.World.Test.123"
}
["fieldname"]
{
"fieldname": "hello world"
}
{
"fieldname": "HELLO WORLD"
}
["fieldname"]
{
"fieldname": "hello world"
}
{
"fieldname": "Hello world"
}
["fieldname"]
{
"fieldname": "HELLO WORLD"
}
{
"fieldname": "hello world"
}
["field1", "field2"]
{
"field1": "Hello World! ",
"field2": " Hello DataFlow!"
}
{
"field1": "Hello World!",
"field2": "Hello DataFlow!"
}
{
"fieldname": ","
}
{
"fieldname": "Hello,World"
}
{
"fieldname": ["Hello", "World"]
}
{
"fieldname": ","
}
{
"fieldname": ["Hello", "World"]
}
{
"fieldname": "Hello,World"
}
{
"array_data1": "string_data1",
"string_data2": "string_data1",
"json_data1": "json_data2"
}
{
"array_data1": ["array_data1"],
"string_data1": "string_data1",
"string_data2": "string_data2",
"json_data1": {"json_field1": "json_data1"},
"json_data2": {"json_field2": "json_data2"}
}
{
"array_data1": ["array_data1", "string_data1"],
"string_data1": "string_data1",
"string_data2": ["string_data2", "string_data1"],
"json_data1": {"json_field2" : "json_data2", "json_field1": "json_data1"},
"json_data2": {"json_field2": "json_data2"}
}
{
"source_field": "dest_field"
}
{
"source_field": "Hello World!"
}
{
"source_field": "Hello World!",
"dest_field": "Hello World!"
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | X | |
| V2 | O |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 대상 필드 | - | string | V2 | 기본값을 지정할 필드명을 입력합니다. | |
| 기본 값 | - | string | V2 | 기본값을 입력합니다. |
fieldnamedefault_value{
"fieldname": null
}
{
"fieldname": "default_value"
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | X | |
| V2 | O |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 대상 필드 | - | string | V2 | 복사할 소스 필드명을 입력합니다. | |
| 저장할 필드 | - | string | V2 | 복사한 결과를 저장할 필드명을 입력합니다. | |
| 덮어쓰기 | false |
boolean | V2 | true일 경우 저장할 필드가 이미 존재하면 덮어씌웁니다. |
source_fielddest_field{
"source_field": "Hello World!"
}
{
"source_field": "Hello World!",
"dest_field": "Hello World!"
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | X | |
| V2 | O |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 소스 필드 | string | V2 | 이름을 변경할 소스 필드를 입력합니다. | ||
| 대상 필드 | - | string | V2 | 변경할 필드명을 입력합니다. | |
| 덮어쓰기 | false |
boolean | V2 | true일 경우 대상 필드가 이미 존재할 경우 덮어씌웁니다. |
fieldnamechanged_fieldname{
"fieldname": "Hello World!"
}
{
"changed_fieldname": "Hello World!"
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | X | |
| V2 | O |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 대상 필드 | - | array of strings | V2 | 공백을 제거할 대상 필드들을 입력합니다. |
["field1", "field2"]{
"field1": "Hello World! ",
"field2": " Hello DataFlow!"
}
{
"field1": "Hello World!",
"field2": "Hello DataFlow!"
}
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 아이디 | - | string | 노드의 아이디를 설정합니다. 이 속성에 정의된 값으로 차트보드에 노드 이름을 표기합니다. |
/{bucket_name}/year={yyyy}/month={MM}/day={dd}/hour={HH}/ls.s3.{uuid}.{yyyy}-{MM}-{dd}T{HH}.{mm}.part{seq_id}.txt/{bucket_name}/year={yyyy}/month={MM}/day={dd}/hour={HH}/part-{uuid}-{file_counter} 참고
Prefix 입력 조건에 따른 출력 경로 예시는 엔진 V1을 기준으로 작성되었습니다.| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | O | V2 엔진 타입에서 Object Storage 연결 시 주의점 참고 |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 리전 | - | enum | V1, V2 | Object Storage 상품의 리전을 입력합니다. | |
| 버킷 | - | string | V1, V2 | 버킷 이름을 입력합니다. | |
| 비밀 키 | - | string | V1, V2 | S3 API 자격 증명 비밀 키를 입력합니다. | |
| 액세스 키 | - | string | V1, V2 | S3 API 자격 증명 액세스 키를 입력합니다. | |
| Prefix | /year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH} |
string | V1, V2 | 오브젝트 업로드 시 이름 앞에 붙일 접두사를 입력합니다. 필드 또는 시간 형식을 입력할 수 있습니다. |
사용 가능한 시간 형식 |
| Prefix 시간 필드 | * V1: @timestamp * V2: - |
string | V1, V2 | Prefix에 적용할 시간 필드를 입력합니다. | |
| Prefix 시간 필드 타입 | DATE_FILTER_RESULT |
enum | V1, V2 | Prefix에 적용할 시간 필드의 타입을 입력합니다. | 엔진 타입 V2는 DATE_FILTER_RESULT 타입만 가능(추후 다른 타입 지원 예정) |
| Prefix 시간대 | UTC |
string | V1, V2 | Prefix에 적용할 시간 필드의 타임 존을 입력합니다. | |
| Prefix 시간 적용 fallback | _prefix_datetime_parse_failure |
string | V1, V2 | Prefix 시간 적용에 실패한 경우 대체할 Prefix를 입력합니다. | |
| 인코딩 | none |
enum | V1 | 인코딩 여부를 입력합니다. gzip 인코딩을 사용할 수 있습니다. | |
| 오브젝트 로테이션 정책 | size_and_time |
enum | V1 | 오브젝트의 생성 규칙을 결정합니다. | size_and_time: 오브젝트의 크기와 시간을 이용하여 결정 size: 오브젝트의 크기를 이용하여 결정 time: 시간을 이용하여 결정 엔진 타입 V2는 size_and_time만 지원 |
| 기준 시각 | 1 |
number | V1, V2 | 오브젝트를 분할할 기준이 될 시간을 설정합니다. | 오브젝트 로테이션 정책이 size_and_time 또는 time인 경우 설정 |
| 기준 오브젝트 크기 | 5242880 |
number | V1, V2 | 오브젝트를 분할할 기준이 될 크기(단위: byte)를 설정합니다. | 오브젝트 로테이션 정책이 size_and_time 또는 size인 경우 설정 |
| 비활성 간격 | 1 |
number | V2 | 데이터 인입이 없는 상태가 지속될 때 오브젝트를 분할하는 기준 시간을 설정합니다. | 설정된 시간 동안 데이터 인입이 없으면 현재 오브젝트가 업로드되며, 이후 새로 인입되는 데이터는 새로운 오브젝트에 작성됩니다. |
지원 코덱: * plain 코덱 - 원본 데이터 문자열 저장(엔진 V1만 지원) * json 코덱 - JSON 형식 데이터 파싱 * line 코덱 - 행 단위 메시지 처리 * parquet 코덱 - 압축된 컬럼형 저장 형식(엔진 V1만 지원)
obs-test-container/dataflow/%{deployment}{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/dataflow/production/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
obs-test-container/dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}logTimeISO8601Asia/Seoul{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/dataflow/year=2022/month=11/day=21/hour=16/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
obs-test-container/dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}logTimeTIMESTAMP_SECAsia/Seoul_failure{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/_failure/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | O | V2 엔진 타입에서 Object Storage 연결 시 주의점 참고 |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 리전 | * V1: us-east-1* V2: - |
enum | V1, V2 | S3 상품의 리전을 입력합니다. | s3 region |
| 버킷 | - | string | V1, V2 | 버킷 이름을 입력합니다. | |
| 액세스 키 | - | string | V1, V2 | S3 API 자격 증명 액세스 키를 입력합니다. | |
| 비밀 키 | - | string | V1, V2 | S3 API 자격 증명 비밀 키를 입력합니다. | |
| 서명 버전 | v4 |
enum | V1 | AWS 요청을 서명할 때 사용할 버전을 입력합니다. | |
| 세션 토큰 | - | string | V1 | AWS 임시 자격 증명을 위한 세션 토큰을 입력합니다. | 세션 토큰 가이드 |
| Prefix | - | string | V1, V2 | 오브젝트 업로드 시 이름 앞에 붙일 접두사를 입력합니다. 필드 또는 시간 형식을 입력할 수 있습니다. |
사용 가능한 시간 형식 |
| Prefix 시간 필드 | * V1: @timestamp* v2: - |
string | V1, V2 | Prefix에 적용할 시간 필드를 입력합니다. | |
| Prefix 시간 필드 타입 | DATE_FILTER_RESULT |
enum | V1, V2 | Prefix에 적용할 시간 필드의 타입을 입력합니다. | 엔진 타입 V2는 DATE_FILTER_RESULT 타입만 가능 (추후 다른 타입 지원 예정) |
| Prefix 시간대 | UTC |
string | V1, V2 | Prefix에 적용할 시간 필드의 타임 존을 입력합니다. | |
| Prefix 시간 적용 fallback | _prefix_datetime_parse_failure |
string | V1, V2 | Prefix 시간 적용에 실패한 경우 대체할 Prefix를 입력합니다. | |
| 스토리지 클래스 | STANDARD |
enum | V1 | 오브젝트를 업로드할 때 사용할 스토리지 클래스를 설정합니다. | 스토리지 클래스 가이드 |
| 인코딩 | none |
enum | V1 | 인코딩 여부를 입력합니다. gzip 인코딩을 사용할 수 있습니다. | |
| 오브젝트 로테이션 정책 | size_and_time |
enum | V1 | 오브젝트의 생성 규칙을 결정합니다. | size_and_time: 오브젝트의 크기와 시간을 이용하여 결정 size: 오브젝트의 크기를 이용하여 결정 time: 시간을 이용하여 결정 엔진 타입 V2는 size_and_time만 지원 |
| 기준 시각 | 1 |
number | V1, V2 | 오브젝트를 분할할 기준이 될 시간을 설정합니다. | 오브젝트 로테이션 정책이 size_and_time 또는 time인 경우 설정 |
| 기준 오브젝트 크기 | 5242880 |
number | V1, V2 | 오브젝트를 분할할 기준이 될 크기를 설정합니다. | 오브젝트 로테이션 정책이 size_and_time 또는 size인 경우 설정 |
| ACL | private |
enum | V1 | 오브젝트를 업로드했을 때 설정할 ACL 정책을 입력합니다. | |
| 추가 설정 | - | hash | V1 | S3에 연결하기 위한 추가 설정을 입력합니다. | 가이드 |
| 경로 방식 요청 | false |
boolean | V2 | 경로 방식 요청을 사용할지 여부를 결정합니다. | |
| 비활성 간격 | 1 |
number | V2 | 데이터 인입이 없는 상태가 지속될 때 오브젝트를 분할하는 기준 시간을 설정합니다. | 설정된 시간 동안 데이터 인입이 없으면 현재 오브젝트가 업로드되며, 이후 새로 인입되는 데이터는 새로운 오브젝트에 작성됩니다. |
주의
{"force_path_style" : true}true로 설정지원 코덱: * plain 코덱 - 원본 데이터 문자열 저장(엔진 V1만 지원) * json 코덱 - JSON 형식 데이터 파싱 * line 코덱 - 행 단위 메시지 처리 * parquet 코덱 - 압축된 컬럼형 저장 형식(엔진 V1만 지원)
true로 설정하는 경우 AWS S3에서 307 응답을 리턴하는 경우 redirect 경로를 추적{
follow_redirects → true
}
{
retry_limit → 5
}
true일 경우 URL이 virtual-hosted-style이 아닌 path-style이어야 합니다. 참조{
force_path_style → true
}
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | - | 추후 지원 예정 |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 토픽 | - | string | V1 | 메시지를 전송할 Kafka 토픽 이름을 입력합니다. | |
| 브로커 서버 목록 | localhost:9092 |
string | V1 | Kafka 브로커 서버를 입력합니다. 서버가 여러 대일 경우 콤마(,)로 구분합니다. |
Kafka 공식 문서의 bootstrap.servers 속성 참고예: 10.100.1.1:9092,10.100.1.2:9092 |
| 클라이언트 아이디 | dataflow |
string | V1 | Kafka Producer를 식별하는 ID를 입력합니다. | Kafka 공식 문서의 client.id 속성 참고 |
| 메시지 직렬화 유형 | org.apache.kafka.common.serialization.StringSerializer |
string | V1 | 전송하는 메시지의 값을 직렬화할 방법을 입력합니다. | Kafka 공식 문서의 value.serializer 속성 참고 |
| 압축 유형 | none |
enum | V1 | 전송하는 데이터를 압축할 방법을 입력합니다. | Kafka 공식 문서의 compression.type 속성 참고none, gzip, snappy, lz4 중 선택 |
| 메시지 키 | - | string | V1 | 메시지 키로 사용할 필드를 입력합니다. 형태는 다음과 같습니다.예: %{FIELD} | |
| 키 직렬화 유형 | org.apache.kafka.common.serialization.StringSerializer |
string | V1 | 전송하는 메시지의 키를 직렬화할 방법을 입력합니다. | Kafka 공식 문서의 key.serializer 속성 참고 |
| 메타데이터 갱신 주기 | 300000 |
number | V1 | 파티션, 브로커 서버 상태 등을 갱신할 주기(ms)를 입력합니다. | Kafka 공식 문서의 metadata.max.age.ms 속성 참고 |
| 최대 요청 크기 | 1048576 |
number | V1 | 전송 요청당 최대 크기(byte)를 입력합니다. | Kafka 공식 문서의 max.request.size 속성 참고 |
| 서버 재연결 주기 | 50 |
number | V1 | 브로커 서버에 연결이 실패했을 때 재시도할 주기를 입력합니다. | Kafka 공식 문서의 reconnect.backoff.ms 속성 참고 |
| 배치 크기 | 16384 |
number | V1 | 배치 요청으로 전송할 크기(byte)를 입력합니다. | Kafka 공식 문서의 batch.size 속성 참고 |
| 버퍼 메모리 | 33554432 |
number | V1 | Kafka 전송에 사용하는 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 buffer.memory 속성 참고 |
| 수신 버퍼 크기 | 32768 |
number | V1 | 데이터를 읽는 데 사용하는 TCP receive 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 receive.buffer.bytes 속성 참고 |
| 전송 지연 시간 | 0 |
number | V1 | 메시지 전송을 지연할 시간을 입력합니다. 지연된 메시지는 배치 요청으로 한번에 전송합니다. | Kafka 공식 문서의 linger.ms 속성 참고 |
| 서버 요청 타임아웃 | 40000 |
number | V1 | 전송 요청에 대한 타임아웃(ms)을 입력합니다. | Kafka 공식 문서의 request.timeout.ms 속성 참고 |
| 전송 버퍼 크기 | 131072 |
number | V1 | 데이터를 전송하는 데 사용하는 TCP send 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 send.buffer.bytes 속성 참고 |
| ack 속성 | 1 |
enum | V1 | 브로커 서버에서 메시지를 받았는지 확인하는 설정을 입력합니다. | Kafka 공식 문서의 acks 속성 참고0 - 메시지 수신 여부를 확인하지 않습니다. 1 - 토픽의 leader가 follower가 데이터를 복사하는 것을 기다리지 않고 메시지를 수신했다는 응답을 합니다. all - 토픽의 leader가 follower가 데이터를 복사하는 것을 기다린 뒤 메시지를 수신했다는 응답을 합니다. |
| 재시도 요청 주기 | 100 |
number | V1 | 전송 요청이 실패했을 때 재시도할 주기(ms)를 입력합니다. | Kafka 공식 문서의 retry.backoff.ms 속성 참고 |
| 재시도 횟수 | - | number | V1 | 전송 요청이 실패했을 때 재시도할 최대 횟수를 입력합니다. | Kafka 공식 문서의 retries 속성 참고설정값을 초과하여 재시도하는 경우 데이터 유실이 발생할 수 있습니다. |
지원 코덱: * plain 코덱 - 원본 데이터 문자열 출력 * json 코덱 - JSON 형식 데이터 출력 * line 코덱 - 행 단위 메시지 출력
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | O |
지원 코덱: * json 코덱 - JSON 형식 데이터 출력 * plain 코덱 - 원본 데이터 문자열 출력 * line 코덱 - 행 단위 메시지 출력 * debug 코덱 - 디버깅용 상세 출력(엔진 V1만 지원)
| 엔진 타입 | 지원 여부 | 비고 |
|---|---|---|
| V1 | O | |
| V2 | O |
| 속성명 | 기본값 | 자료형 | 지원 엔진 타입 | 설명 | 비고 |
|---|---|---|---|---|---|
| 조건문 | - | string | V1, V2 | 메시지를 필터링할 조건을 입력합니다. | 엔진 타입에 따라 조건 문법이 다릅니다. 아래 예제를 참고하세요. |
[logLevel] == "ERROR"logLevel == "ERROR"{
"logLevel": "ERROR"
}
{
"logLevel": "INFO"
}
[response][status] == 200response.status == 200 또는 response["status"] == 200{
"resposne": {
"status": 200
}
}
{
"resposne": {
"status": 404
}
}