리전 또는 프로젝트가 서로 다른 Object Storage이지만 버킷 이름은 동일한 경우, 하나의 플로우에서 함께 사용할 수 없습니다.
불가능한 연결 설정 예제
플로우 실행에 필요한 DSL 정의입니다.
{{ executionTime }}{{ MINUTE }}{{ HOUR }}{{ DAY }}{{ MONTH }}{{ YEAR }}{{ time | startOf: unit }}unit으로 정의된 시간대의 시작 시간을 반환합니다.{{ time | endOf: unit }}unit으로 정의된 시간대의 마지막 시간을 반환합니다.{{ time | subTime: delta, unit }}unit으로 정의된 시간대의 delta만큼 뺀 시간을 반환합니다.{{ time | addTime: delta, unit }}unit으로 정의된 시간대의 delta만큼 더한 시간을 반환합니다.{{ time | format: formatStr }}formatStr 형태로 반환합니다.문자열을 입력합니다.
드롭다운 메뉴에서 TRUE 또는 FALSE를 선택합니다.
드롭다운 메뉴에서 항목을 선택합니다.
+ 버튼을 클릭하면 배열에 문자열이 삽입됩니다.["message" , "yyyy-MM-dd HH:mm:ssZ", "ISO8601"]를 입력하려면 message, yyyy-MM-dd HH:mm:ssZ, ISO8601의 순서로 배열에 문자열을 삽입합니다.JSON 형식의 문자열을 입력합니다.
| 데이터 타입 | 설명 |
|---|---|
| String | 문자열 |
| Integer | 32비트 정수 |
| Long | 64비트 정수 |
| Float | 32비트 부동 소수점 |
| Double | 64비트 부동 소수점 |
| Boolean | 참/거짓 |
| Timestamp | 날짜와 시간 |
| Array | 배열 |
message 필드에 고정 매핑되므로 해당 필드만 정의 가능합니다.플로우로 데이터를 인입할 엔드포인트를 정의하는 노드 유형입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 아이디 | - | string | 노드의 아이디를 설정합니다. 이 속성에 정의된 값으로 차트보드에 노드 이름을 표기합니다. |
조회 시작 시간 이후의 데이터를 계속해서 처리합니다.조회 시작 시간, 조회 종료 시간 사이에 해당하는 데이터를 모두 처리하고 플로우를 종료합니다.| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| Appkey | - | string | Log & Crash Search의 앱키를 입력합니다. | |
| SecretKey | - | string | Log & Crash Search의 시크릿키를 입력합니다. | |
| 조회 시작 시간 | {{executionTime}} | string | 로그 조회의 시작 시간을 입력합니다. 오프셋이 포함된 ISO 8601 형식 또는 DSL 형식으로 입력해야 합니다. 예: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| 조회 종료 시간 | - | string | 로그 조회의 종료 시간을 입력합니다. 오프셋이 포함된 ISO 8601 형식 또는 DSL 형식으로 입력해야 합니다. 예: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| 검색 쿼리 | * | string | Log & Crash Search 조회 요청 시 사용할 검색 쿼리를 입력합니다. 자세한 쿼리 작성 방법은 Log & Crash Search 서비스의 'Lucene 쿼리 가이드'를 참고하세요. |
지원 코덱 * JSON 코덱 - JSON 형식 데이터 파싱
조회 시작 시간 이후의 데이터를 계속해서 처리합니다.조회 시작 시간, 조회 종료 시간 사이에 해당하는 데이터를 모두 처리하고 플로우를 종료합니다.| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| Appkey | - | string | CloudTrail의 앱키를 입력합니다. | |
| User Access Key ID | - | string | 사용자 계정의 User Access Key ID를 입력합니다. | |
| Secret Access Key | - | string | 사용자 계정의 User Secret Key를 입력합니다. | |
| 조회 시작 시간 | {{executionTime}} | string | 데이터 조회의 시작 시간을 입력합니다. 오프셋이 포함된 ISO 8601 형식 또는 DSL 형식으로 입력해야 합니다. 예: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| 조회 종료 시간 | - | string | 데이터 조회의 종료 시간을 입력합니다. 오프셋이 포함된 ISO 8601 형식 또는 DSL 형식으로 입력해야 합니다. 예: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| 이벤트 타입 | * | string | 조회할 이벤트 ID를 입력합니다. |
지원 코덱 * JSON 코덱 - JSON 형식 데이터 파싱
리스트 갱신 주기마다 오브젝트 리스트를 갱신하며, 새롭게 추가된 오브젝트들을 읽어 데이터를 처리합니다.| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 버킷 | - | string | 데이터를 읽을 버킷 이름을 입력합니다. | |
| 리전 | - | string | 저장소에 설정된 리전 정보를 입력합니다. | |
| 비밀 키 | - | string | S3가 발급한 자격 증명 비밀 키를 입력합니다. | |
| 액세스 키 | - | string | S3가 발급한 자격 증명 액세스 키를 입력합니다. | |
| 리스트 갱신 주기 | 60 | number | 버킷에 포함된 오브젝트 리스트 갱신 주기를 입력합니다. | |
| Prefix | - | string | 읽어 올 오브젝트의 접두사를 입력합니다. | |
| 제외할 키 패턴 | - | string | 읽지 않을 오브젝트의 패턴을 입력합니다. |
지원 코덱 * PLAIN 코덱 - 원본 데이터 문자열 저장 * JSON 코덱 - JSON 형식 데이터 파싱
리스트 갱신 주기마다 오브젝트 리스트를 갱신하며, 새롭게 추가된 오브젝트들을 읽어 데이터를 처리합니다.| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 버킷 | - | string | 데이터를 읽을 버킷 이름을 입력합니다. | |
| 리전 | - | string | 저장소에 설정된 리전 정보를 입력합니다. | |
| 비밀 키 | - | string | S3가 발급한 자격 증명 비밀 키를 입력합니다. | |
| 액세스 키 | - | string | S3가 발급한 자격 증명 액세스 키를 입력합니다. | |
| 리스트 갱신 주기 | 60 | number | 버킷에 포함된 오브젝트 리스트 갱신 주기를 입력합니다. | |
| Prefix | - | string | 읽어 올 오브젝트의 접두사를 입력합니다. | |
| 제외할 키 패턴 | - | string | 읽지 않을 오브젝트의 패턴을 입력합니다. |
지원 코덱 * PLAIN 코덱 - 원본 데이터 문자열 저장 * JSON 코덱 - JSON 형식 데이터 파싱
리스트 갱신 주기마다 오브젝트 리스트를 갱신하며, 새롭게 추가된 오브젝트들을 읽어 데이터를 처리합니다.| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 엔드포인트 | - | string | S3 저장소 엔드포인트를 입력합니다. | HTTP, HTTPS URL 형태만 입력 가능합니다. |
| 버킷 | - | string | 데이터를 읽을 버킷 이름을 입력합니다. | |
| 리전 | - | string | 저장소에 설정된 리전 정보를 입력합니다. | |
| 비밀 키 | - | string | S3가 발급한 자격 증명 비밀 키를 입력합니다. | |
| 액세스 키 | - | string | S3가 발급한 자격 증명 액세스 키를 입력합니다. | |
| 리스트 갱신 주기 | 60 | number | 버킷에 포함된 오브젝트 리스트 갱신 주기를 입력합니다. | |
| Prefix | - | string | 읽어 올 오브젝트의 접두사를 입력합니다. | |
| 제외할 키 패턴 | - | string | 읽지 않을 오브젝트의 패턴을 입력합니다. | |
| 경로 방식 요청 | false | boolean | 경로 방식 요청을 사용할지 여부를 결정합니다. |
주의
true로 설정해야 합니다.지원 코덱 * PLAIN 코덱 - 원본 데이터 문자열 저장 * JSON 코덱 - JSON 형식 데이터 파싱
NHN Cloud의 EasyQueue에서 데이터를 수신하는 노드입니다.
STREAMING: 큐에 새로운 메시지가 도착할 때마다 데이터를 처리합니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 앱키 | - | string | EasyQueue의 앱키를 입력합니다. | |
| User Access Key ID | - | string | 사용자 계정의 User Access Key ID를 입력합니다. | |
| Secret Access Key | - | string | 사용자 계정의 User Secret Key를 입력합니다. | |
| 브로커 서버 목록 | - | string | Kafka 브로커 서버를 입력합니다. 서버가 여러 대일 경우 콤마(,)로 구분합니다. |
Kafka 공식 문서의 bootstrap.servers 속성 참고 예: 10.100.1.1:9092,10.100.1.2:9092 |
| 컨슈머 그룹 아이디 | dataflow | string | Kafka Consumer Group을 식별하는 ID를 입력합니다. | Kafka 공식 문서의 group.id 속성 참고 |
| 토픽 목록 | - | array of strings | 메시지를 수신할 Kafka 토픽 목록을 입력합니다. | |
| 토픽 패턴 | - | string | 메시지를 수신할 Kafka 토픽 패턴을 입력합니다. | 예: *-messages |
| 내부 토픽 제외 여부 | true | boolean | __consumer_offsets와 같은 내부 토픽을 제외합니다. | Kafka 공식 문서의 exclude.internal.topics 속성 참고 수신 대상에서 __consumer_offsets와 같은 내부 토픽을 제외합니다. |
| 클라이언트 아이디 | dataflow | string | Kafka Consumer를 식별하는 ID를 입력합니다. | Kafka 공식 문서의 client.id 속성 참고 |
| 격리 수준 | read_committed | enum | 컨슈머가 트랜잭션이 커밋되지 않은 메시지까지 읽을지, 커밋된 메시지만 읽을지를 결정합니다. | Kafka 공식 문서의 isolation.level 속성 참고read_uncommitted: 모든 메시지를 오프셋 순서대로 읽습니다. read_committed: 커밋된 트랜잭션의 메시지만 읽습니다. |
| 파티션 할당 정책 | ["RANGE", "COOPERATIVE_STICKY"] | array of strings | Kafka에서 메시지 수신 시 컨슈머 그룹에 어떻게 파티션을 할당할지 결정합니다. | Kafka 공식 문서의 partition.assignment.strategy 속성 참고 org.apache.kafka.clients.consumer.RangeAssignor org.apache.kafka.clients.consumer.RoundRobinAssignor org.apache.kafka.clients.consumer.StickyAssignor org.apache.kafka.clients.consumer.CooperativeStickyAssignor |
| 오프셋 설정 | latest | enum | 컨슈머 그룹의 오프셋을 설정하는 기준을 입력합니다. | Kafka 공식 문서의 auto.offset.reset 속성 참고 아래 설정 모두 컨슈머 그룹이 이미 존재하는 경우 기존 오프셋을 유지합니다. none: 컨슈머 그룹이 없으면 오류를 반환합니다. earliest: 컨슈머 그룹이 없으면 파티션의 가장 오래된 오프셋으로 초기화합니다. latest: 컨슈머 그룹이 없으면 파티션의 가장 최근 오프셋으로 초기화합니다. |
| 키 역직렬화 유형 | STRING | enum | 수신하는 메시지의 키의 타입을 입력합니다. | Kafka 공식 문서의 key.deserializer 속성 참고 |
| 메타데이터 생성 여부 | false | boolean | 속성값이 true일 경우 메시지에 대한 메타데이터 필드를 생성합니다. 메타데이터는 kafka_metadata 필드에 생성됩니다. |
생성되는 필드는 다음과 같습니다. topic: 메시지를 수신한 토픽 groupId: 메시지를 수신하는 데 사용한 컨슈머 그룹 아이디 partition: 메시지를 수신한 토픽의 파티션 번호 offset: 메시지를 수신한 파티션의 오프셋 key: 메시지 키 |
| Fetch 최소 크기 | 1 | number | 한 번의 fetch 요청으로 가져올 데이터의 최소 크기(byte)를 입력합니다. | Kafka 공식 문서의 fetch.min.bytes 속성 참고 |
| 전송 버퍼 크기 | 131072 | number | 데이터를 전송하는 데 사용하는 TCP send 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 send.buffer.bytes 속성 참고 |
| 재시도 요청 주기 | 100 | number | 전송 요청이 실패했을 때 재시도할 주기(ms)를 입력합니다. | Kafka 공식 문서의 retry.backoff.ms 속성 참고 |
| 순환 중복 검사 | true | boolean | 메시지의 CRC를 검사합니다. | Kafka 공식 문서의 check.crcs 속성 참고 |
| 서버 재연결 주기 | 50 | number | 브로커 서버에 연결이 실패했을 때 재시도할 주기(ms)를 입력합니다. | Kafka 공식 문서의 reconnect.backoff.ms 속성 참고 |
| 파티션 당 Fetch 최대 크기 | 1048576 | number | 파티션 당 한 번의 fetch 요청으로 가져올 최대 크기(byte)를 입력합니다. | Kafka 공식 문서의 max.partition.fetch.bytes 속성 참고 |
| 서버 요청 타임아웃 | 30000 | number | 전송 요청에 대한 타임아웃(ms)을 입력합니다. | Kafka 공식 문서의 request.timeout.ms 속성 참고 |
| TCP 수신 버퍼 크기 | 65536 | number | 데이터를 읽는 데 사용하는 TCP receive 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 receive.buffer.bytes 속성 참고 |
| 세션 타임아웃 | 45000 | number | 컨슈머의 세션 타임아웃(ms)을 입력합니다. 컨슈머가 해당 시간 안에 heartbeat를 보내지 못할 경우 컨슈머 그룹에서 제외합니다. |
Kafka 공식 문서의 session.timeout.ms 속성 참고 |
| 최대 poll 메시지 개수 | 500 | number | 한 번의 poll 요청으로 가져올 최대 메시지 개수를 입력합니다. | Kafka 공식 문서의 max.poll.records 속성 참고 |
| 최대 poll 주기 | 300000 | number | poll 요청 간 최대 주기(ms)를 입력합니다. | Kafka 공식 문서의 max.poll.interval.ms 속성 참고 |
| Fetch 최대 크기 | 52428800 | number | 한 번의 fetch 요청으로 가져올 최대 크기(byte)를 입력합니다. | Kafka 공식 문서의 fetch.max.bytes 속성 참고 |
| Fetch 최대 대기 시간 | 500 | number | Fetch 최소 크기 설정 만큼의 데이터가 모이지 않은 경우 fetch 요청을 보낼 대기 시간(ms)을 입력합니다. |
Kafka 공식 문서의 fetch.max.wait.ms 속성 참고 |
| 컨슈머 헬스체크 주기 | 3000 | number | 컨슈머가 heartbeat를 보내는 주기(ms)를 입력합니다. | Kafka 공식 문서의 heartbeat.interval.ms 속성 참고 |
| 메타데이터 갱신 주기 | 300000 | number | 파티션, 브로커 서버 상태 등을 갱신할 주기(ms)를 입력합니다. | Kafka 공식 문서의 metadata.max.age.ms 속성 참고 |
| IDLE 타임아웃 | 540000 | number | 데이터 전송이 없는 커넥션을 닫을 대기 시간(ms)을 입력합니다. | Kafka 공식 문서의 connections.max.idle.ms 속성 참고 |
| 추가 설정 | - | hash | Kafka 연결에 사용할 추가 Consumer 설정을 입력합니다. | Kafka 공식 문서 참고 |
지원 코덱 * PLAIN 코덱 - 원본 데이터 문자열 저장 * JSON 코덱 - JSON 형식 데이터 파싱
Kafka에서 데이터를 수신하는 노드입니다.
STREAMING: 토픽에 새로운 메시지가 도착할 때마다 데이터를 처리합니다.
주의
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 브로커 서버 목록 | - | string | Kafka 브로커 서버를 입력합니다. 서버가 여러 대일 경우 콤마(,)로 구분합니다. |
Kafka 공식 문서의 bootstrap.servers 속성 참고 예: 10.100.1.1:9092,10.100.1.2:9092 |
| 컨슈머 그룹 아이디 | dataflow | string | Kafka Consumer Group을 식별하는 ID를 입력합니다. | Kafka 공식 문서의 group.id 속성 참고 |
| 토픽 목록 | - | array of strings | 메시지를 수신할 Kafka 토픽 목록을 입력합니다. | |
| 토픽 패턴 | - | string | 메시지를 수신할 Kafka 토픽 패턴을 입력합니다. | 예: *-messages |
| 내부 토픽 제외 여부 | true | boolean | __consumer_offsets와 같은 내부 토픽을 제외합니다. | Kafka 공식 문서의 exclude.internal.topics 속성 참고 수신 대상에서 __consumer_offsets와 같은 내부 토픽을 제외합니다. |
| 클라이언트 아이디 | dataflow | string | Kafka Consumer를 식별하는 ID를 입력합니다. | Kafka 공식 문서의 client.id 속성 참고 |
| 파티션 할당 정책 | ["RANGE", "COOPERATIVE_STICKY"] | array of strings | Kafka에서 메시지 수신 시 컨슈머 그룹에 어떻게 파티션을 할당할지 결정합니다. | Kafka 공식 문서의 partition.assignment.strategy 속성 참고 org.apache.kafka.clients.consumer.RangeAssignor org.apache.kafka.clients.consumer.RoundRobinAssignor org.apache.kafka.clients.consumer.StickyAssignor org.apache.kafka.clients.consumer.CooperativeStickyAssignor |
| 오프셋 설정 | latest | enum | 컨슈머 그룹의 오프셋을 설정하는 기준을 입력합니다. | Kafka 공식 문서의 auto.offset.reset 속성 참고 아래 설정 모두 컨슈머 그룹이 이미 존재하는 경우 기존 오프셋을 유지합니다. none: 컨슈머 그룹이 없으면 오류를 반환합니다. earliest: 컨슈머 그룹이 없으면 파티션의 가장 오래된 오프셋으로 초기화합니다. latest: 컨슈머 그룹이 없으면 파티션의 가장 최근 오프셋으로 초기화합니다. |
| 키 역직렬화 유형 | STRING | enum | 수신하는 메시지의 키의 타입을 입력합니다. | Kafka 공식 문서의 key.deserializer 속성 참고 |
| 메타데이터 생성 여부 | false | boolean | 속성값이 true일 경우 메시지에 대한 메타데이터 필드를 생성합니다. 메타데이터는 kafka_metadata 필드에 생성됩니다. |
생성되는 필드는 다음과 같습니다. topic: 메시지를 수신한 토픽 groupId: 메시지를 수신하는 데 사용한 컨슈머 그룹 아이디 partition: 메시지를 수신한 토픽의 파티션 번호 offset: 메시지를 수신한 파티션의 오프셋 key: 메시지 키 |
| Fetch 최소 크기 | 1 | number | 한 번의 fetch 요청으로 가져올 데이터의 최소 크기(byte)를 입력합니다. | Kafka 공식 문서의 fetch.min.bytes 속성 참고 |
| 전송 버퍼 크기 | 131072 | number | 데이터를 전송하는 데 사용하는 TCP send 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 send.buffer.bytes 속성 참고 |
| 재시도 요청 주기 | 100 | number | 전송 요청이 실패했을 때 재시도할 주기(ms)를 입력합니다. | Kafka 공식 문서의 retry.backoff.ms 속성 참고 |
| 순환 중복 검사 | true | boolean | 메시지의 CRC를 검사합니다. | Kafka 공식 문서의 check.crcs 속성 참고 |
| 서버 재연결 주기 | 50 | number | 브로커 서버에 연결이 실패했을 때 재시도할 주기(ms)를 입력합니다. | Kafka 공식 문서의 reconnect.backoff.ms 속성 참고 |
| 파티션 당 Fetch 최대 크기 | 1048576 | number | 파티션 당 한 번의 fetch 요청으로 가져올 최대 크기(byte)를 입력합니다. | Kafka 공식 문서의 max.partition.fetch.bytes 속성 참고 |
| 서버 요청 타임아웃 | 30000 | number | 전송 요청에 대한 타임아웃(ms)을 입력합니다. | Kafka 공식 문서의 request.timeout.ms 속성 참고 |
| TCP 수신 버퍼 크기 | 65536 | number | 데이터를 읽는 데 사용하는 TCP receive 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 receive.buffer.bytes 속성 참고 |
| 세션 타임아웃 | 45000 | number | 컨슈머의 세션 타임아웃(ms)을 입력합니다. 컨슈머가 해당 시간 안에 heartbeat를 보내지 못할 경우 컨슈머 그룹에서 제외합니다. |
Kafka 공식 문서의 session.timeout.ms 속성 참고 |
| 최대 poll 메시지 개수 | 500 | number | 한 번의 poll 요청으로 가져올 최대 메시지 개수를 입력합니다. | Kafka 공식 문서의 max.poll.records 속성 참고 |
| 최대 poll 주기 | 300000 | number | poll 요청 간 최대 주기(ms)를 입력합니다. | Kafka 공식 문서의 max.poll.interval.ms 속성 참고 |
| Fetch 최대 크기 | 52428800 | number | 한 번의 fetch 요청으로 가져올 최대 크기(byte)를 입력합니다. | Kafka 공식 문서의 fetch.max.bytes 속성 참고 |
| Fetch 최대 대기 시간 | 500 | number | Fetch 최소 크기 설정 만큼의 데이터가 모이지 않은 경우 fetch 요청을 보낼 대기 시간(ms)을 입력합니다. |
Kafka 공식 문서의 fetch.max.wait.ms 속성 참고 |
| 컨슈머 헬스체크 주기 | 3000 | number | 컨슈머가 heartbeat를 보내는 주기(ms)를 입력합니다. | Kafka 공식 문서의 heartbeat.interval.ms 속성 참고 |
| 메타데이터 갱신 주기 | 300000 | number | 파티션, 브로커 서버 상태 등을 갱신할 주기(ms)를 입력합니다. | Kafka 공식 문서의 metadata.max.age.ms 속성 참고 |
| IDLE 타임아웃 | 540000 | number | 데이터 전송이 없는 커넥션을 닫을 대기 시간(ms)을 입력합니다. | Kafka 공식 문서의 connections.max.idle.ms 속성 참고 |
| 격리 수준 | read_committed | enum | 컨슈머가 트랜잭션이 커밋되지 않은 메시지까지 읽을지, 커밋된 메시지만 읽을지를 결정합니다. | Kafka 공식 문서의 isolation.level 속성 참고read_uncommitted: 모든 메시지를 오프셋 순서대로 읽습니다. read_committed: 커밋된 트랜잭션의 메시지만 읽습니다. |
| 추가 설정 | - | hash | Kafka 연결에 사용할 추가 Consumer 설정을 입력합니다. | Kafka 공식 문서 참고 |
지원 코덱 * PLAIN 코덱 - 원본 데이터 문자열 저장 * JSON 코덱 - JSON 형식 데이터 파싱
인입된 데이터를 어떻게 처리할지 정의하는 노드 유형입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 아이디 | - | string | 노드의 아이디를 설정합니다. 이 속성에 정의된 값으로 차트보드에 노드 이름을 표기합니다. |
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 모드 | - | enum | 암호화 모드와 복호화 모드 중 선택합니다. | 목록 중에 하나를 선택합니다. |
| 앱키 | - | string | 암/복호화에 사용할 키를 저장한 SKM 앱키를 입력합니다. | |
| 키 ID | - | string | 암/복호화에 사용할 키를 저장한 SKM 키 ID를 입력합니다. | |
| 키 버전 | - | string | 암/복호화에 사용할 키를 저장한 SKM 키 버전을 입력합니다. | |
| 소스 필드 | - | string | 암/복호화할 필드명을 입력합니다. | 스키마 정의 시 드롭다운 제공 |
| 저장할 필드 | - | string | 암/복호화 결과를 저장할 필드명을 입력합니다. | |
| 덮어쓰기 | false | boolean | 지정한 대상 필드명에 값이 존재하는 경우 이를 덮어쓸지 여부를 선택합니다. |
encryptSKM 앱키SKM 대칭키 ID1{
"message": "this is plain message"
}
{
"message": "this is plain message",
"encrypted_message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
decryptSKM 앱키SKM 대칭키 ID1{
"message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
{
"message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e",
"decrypted_message": "this is plain message"
}
CSV 형식의 메시지를 파싱해 필드에 저장하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 저장할 필드 | - | string | CSV 파싱 결과를 저장할 필드명을 입력합니다. | |
| Quote | " | string | 칼럼 필드를 나누는 문자를 입력합니다. | |
| 첫 행 무시 여부 | false | boolean | 속성값이 true일 경우 읽은 데이터 중 첫 행에 입력된 칼럼 이름을 무시합니다. | |
| 구분자 | , | string | 칼럼을 구분할 문자열을 입력합니다. | |
| 소스 필드 | - | string | CSV 파싱할 필드명을 입력합니다. | |
| 스키마 | - | hash | 각 칼럼의 이름과 자료형을 dictionary 형태로 입력합니다. | 스키마 입력 방법 참고 |
| 덮어쓰기 | false | boolean | true일 경우 CSV 파싱 결과가 저장할 필드나 기존 필드와 겹치면 덮어씁니다. | |
| 원본 필드 삭제 | false | boolean | CSV 파싱이 완료되면 소스 필드를 삭제합니다. 파싱이 실패한다면 유지합니다. |
칼럼 타입을 지원하지 않고 전체 칼럼 및 자료형을 스키마로 입력받습니다.
message{"one": "string", "two": "string", "t hree": "string"}{
"message": "hey,foo,\"bar baz\""
}
{
"message": "hey,foo,\"bar baz\"",
"one": "hey",
"t hree": "bar baz",
"two": "foo"
}
message{"one": "string", "two": "integer", "t hree": "boolean"}{
"message": "\"wow hello world!\", 2, false"
}
{
"message": "\"wow hello world!\", 2, false",
"one": "wow hello world!",
"t hree": false,
"two": 2
}
JSON 문자열을 파싱하여 지정된 필드에 저장하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 소스 필드 | - | string | JSON 문자열을 파싱할 필드명을 입력합니다. | |
| 저장할 필드 | - | string | JSON 파싱 결과를 저장할 필드명을 입력합니다. 만약 속성값을 지정하지 않을 경우 root 필드에 결과를 저장합니다. |
|
| 덮어쓰기 | false | boolean | true일 경우 JSON 파싱 결과가 저장할 필드나 기존 필드와 겹치면 덮어씁니다. | |
| 원본 필드 삭제 | false | boolean | JSON 파싱이 완료되면 소스 필드를 삭제합니다. 파싱이 실패한다면 유지합니다. | |
| 스키마 | - | hash | 각 필드의 이름과 자료형을 dictionary 형태로 입력합니다. | 스키마 입력 방법 참고 |
칼럼 타입을 지원하지 않고 전체 칼럼 및 자료형을 스키마로 입력받습니다.
messagejson_parsed_message{
"message": "{\"json\": \"parse\", \"example\": \"string\"}"
}
{
"json_parsed_message": {
"json": "parse",
"example": "string"
},
"message": "{\"json\": \"parse\", \"example\": \"string\"}"
}
messagejson_parsed_message{"json": "string", "example": "integer"}{
"message": "{\"json\": \"parse\", \"example\": \"123\"}"
}
{
"json_parsed_message": {
"json": "parse",
"example": 123
},
"message": "{\"json\": \"parse\", \"example\": \"123\"}"
}
Date 문자열을 파싱하여 timestamp 형태로 저장하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 소스 필드 | - | string | 문자열을 가져오기 위한 필드명을 입력합니다. | 스키마 정의 시 드롭다운 제공 |
| 형식 | - | array of strings | 문자열을 가져오기 위한 형식을 입력합니다. | 사전 정의된 형식은 다음과 같습니다. ISO8601, UNIX, UNIX_MS |
| Locale | ko_KR | string | Date 문자열 분석을 위해 사용할 Locale을 입력합니다. | 예: en, en-US, ko_KR |
| 저장할 필드 | - | string | Date 문자열 파싱 결과를 저장할 필드명을 입력합니다. | |
| 시간대 | Asia/Seoul | string | 날짜의 시간대를 입력합니다. | 예: Asia/Seoul |
message["yyyy-MM-dd HH:mm:ssZ", "ISO8601"]timeAsia/Seoul{
"message": "2017-03-16T17:40:00"
}
{
"message": "2017-03-16T17:40:00",
"time": 2022-04-04T09:08:01.222Z
}
UUID를 생성하여 필드에 저장하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| UUID 저장 필드 | - | string | UUID 생성 결과값을 저장할 필드명을 입력합니다. | |
| 덮어쓰기 | false | boolean | 지정된 필드명에 값이 존재할 경우 이를 덮어쓸지 여부를 선택합니다. |
UUID 저장 필드 → userId
{
"message": "uuid test message"
}
{
"userId": "70186b1e-bdec-43d6-8086-ed0481b59370",
"message": "uuid test message"
}
특정 필드의 데이터 타입을 변환하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 대상 필드 | - | string | 데이터 타입을 변환할 대상 필드를 입력합니다. | 스키마 정의 시 드롭다운 제공 |
| 변환 타입 | - | enum | 변환할 데이터 타입을 선택합니다. * 제공 타입: STRING, INTEGER, FLOAT, DOUBLE, BOOLEAN |
messageINTEGER{
"message": "2025"
}
{
"message": 2025
}
null 값을 기본값으로 대체하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 대상 필드 | - | string | 기본값을 지정할 필드명을 입력합니다. | 스키마 정의 시 드롭다운 제공 |
| 기본값 | - | string | 기본값을 입력합니다. |
fieldnamedefault_value{
"fieldname": null
}
{
"fieldname": "default_value"
}
기존 필드를 다른 필드로 복사하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 대상 필드 | - | string | 복사할 소스 필드명을 입력합니다. | 스키마 정의 시 드롭다운 제공 |
| 저장할 필드 | - | string | 복사한 결과를 저장할 필드명을 입력합니다. | |
| 덮어쓰기 | false | boolean | true일 경우 저장할 필드가 이미 존재하면 덮어씁니다. |
source_fielddest_field{
"source_field": "Hello World!"
}
{
"source_field": "Hello World!",
"dest_field": "Hello World!"
}
필드 이름을 변경하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 소스 필드 | string | 이름을 변경할 소스 필드를 입력합니다. | 스키마 정의 시 드롭다운 제공 | |
| 대상 필드 | - | string | 변경할 필드명을 입력합니다. | |
| 덮어쓰기 | false | boolean | true일 경우 대상 필드가 이미 존재할 경우 덮어씁니다. |
fieldnamechanged_fieldname{
"fieldname": "Hello World!"
}
{
"changed_fieldname": "Hello World!"
}
필드의 문자열 앞뒤 공백을 제거하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 대상 필드 | - | array of strings | 공백을 제거할 대상 필드들을 입력합니다. | 스키마 정의 시 드롭다운 제공(복수 선택) |
대상 필드 → ["field1", "field2"]
{
"field1": "Hello World! ",
"field2": " Hello DataFlow!"
}
{
"field1": "Hello World!",
"field2": "Hello DataFlow!"
}
필드를 삭제하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 삭제할 필드 | - | array of strings | 삭제할 필드명 목록을 입력합니다. | 스키마 정의 시 드롭다운 제공(복수 선택) |
삭제할 필드 → ["field2", "field3"]
{
"field1": "value1",
"field2": "value2",
"field3": "value3",
"field4": "value4"
}
{
"field1": "value1",
"field4": "value4"
}
정규식을 이용해 문자열 필드를 토큰화하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 소스 필드 | - | string | 토큰화할 소스 필드명을 입력합니다. | |
| 저장할 필드 | - | string | 토큰화 결과를 저장할 필드명을 입력합니다. | |
| 정규식 | \s+ | string | 토큰화에 사용될 정규식을 입력합니다. | |
| 모드 | SEPARATOR | enum | 토큰화 모드를 선택합니다. | SEPARATOR: 정규식을 구분자로 사용 MATCH: 정규식을 토큰 매칭에 사용 |
| 최소 토큰 길이 | 1 | number | 토큰의 최소 길이를 입력합니다. 최소 토큰 길이보다 짧은 토큰은 결과에서 제외됩니다. | |
| 덮어쓰기 | false | boolean | true일 경우 저장할 필드가 이미 존재하면 덮어씁니다. |
src_fieldtarget_field,SEPARATOR{
"src_field": "foo,bar,baz"
}
{
"src_field": "foo,bar,baz",
"target_field": ["foo", "bar", "baz"]
}
src_fieldtarget_field[^,]+MATCH{
"src_field": "foo,bar,baz"
}
{
"src_field": "foo,bar,baz",
"target_field": ["foo", "bar", "baz"]
}
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 비율 | - | number | 메시지를 다음 노드로 전달할 비율을 입력합니다. | |
| 시드 | - | number | 난수 생성 시 사용할 시드를 입력합니다. 시드가 같고 입력 메시지가 동일하다면 결과는 동일합니다. |
문자열 배열 필드에 포함된 Stop Word(불용어)를 제거하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 소스 필드 | - | string | 불용어를 제거할 소스 필드명을 입력합니다. | |
| 저장할 필드 | - | string | 불용어 제거 결과를 저장할 필드명을 입력합니다. | |
| 기본 제공 불용어 사전 언어 | none | enum | 불용어 제거에 사용할 기본 제공 불용어 사전의 언어를 선택합니다. | |
| 불용어 사전 | string | 불용어 제거에 사용할 단어 목록을 입력합니다. 각 단어는 줄바꿈으로 구분됩니다. | ||
| 대소문자 구분 여부 | false | boolean | 대소문자 구분 여부를 선택합니다. | |
| 덮어쓰기 | false | boolean | true일 경우 저장할 필드가 이미 존재하면 덮어씁니다. |
src_fieldtarget_fieldis
a
{
"src_field": ["hello", "world", "this", "is", "a", "test"]
}
{
"src_field": ["hello", "world", "this", "is", "a", "test"],
"target_field": ["hello", "world", "this", "test"]
}
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 소스 필드 | - | string | 패턴을 추출할 원본 필드명을 입력합니다. | |
| 대상 필드 | - | string | 추출 결과를 저장할 필드명을 입력합니다. 입력하지 않으면 결과가 root에 바로 추가됩니다. | |
| 사용자 정의 패턴 | - | hash | 기본 제공 패턴 외에 추가로 사용할 패턴을 정의합니다. 패턴명과 정규표현식을 key-value 형태로 입력합니다. | 동일한 패턴명이 기본 제공 패턴에 존재할 경우, 사용자 정의 패턴이 우선 적용되어 기본 패턴을 재정의할 수 있습니다. |
| 패턴 표현식 | - | string | 데이터에서 추출할 필드와 패턴을 Grok 표현식으로 입력합니다. | |
| 덮어쓰기 | false | boolean | 대상 필드에 이미 값이 존재하는 경우 추출 결과로 덮어쓸지 여부를 설정합니다. |
기본 제공 패턴
자주 사용되는 패턴들을 사전 정의하여 제공합니다. 날짜/시간, IP 주소, URL, 로그 레벨 등 여러 가지 상황에 필요한 다양한 패턴이 포함되어 있습니다. 기본 제공 패턴은 내부적으로 다른 패턴들을 참조하는 계층 구조를 가지므로, 지정한 필드명 외에 추가 필드가 생성될 수 있습니다. 기본 제공 패턴 목록 참고
log_messageresult{"CUSTOM_PHONE_NUMBER": "01[016789]-\d{3,4}-\d{4}", "CUSTOM_EMPLOYEE_ID": "EMP-\d{6}", "CUSTOM_ORDER_ID": "ORD-[A-Z]{3}-\d{8}"}%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{CUSTOM_EMPLOYEE_ID:custom_emp_id} %{CUSTOM_PHONE_NUMBER:custom_phone_number} %{CUSTOM_ORDER_ID:custom_order_id} %{GREEDYDATA:message}{
"log_message": "2024-03-15T09:30:00.000Z INFO EMP-123456 010-1234-5678 ORD-ABC-12345678 Order processing started",
"created_by": "DataFlow"
}
{
"log_message": "2024-03-15T09:30:00.000Z INFO EMP-123456 010-1234-5678 ORD-ABC-12345678 Order processing started",
"created_by": "DataFlow",
"result": {
"YEAR": "2024",
"MONTHNUM": "03",
"ISO8601_TIMEZONE": "Z",
"MONTHDAY": "15",
"HOUR": [
"09",
null
],
"MINUTE": [
"30",
null
],
"SECOND": "00.000",
"timestamp": "2024-03-15T09:30:00.000Z",
"level": "INFO",
"custom_emp_id": "EMP-123456",
"custom_phone_number": "010-1234-5678",
"custom_order_id": "ORD-ABC-12345678",
"message": "Order processing started"
}
}
Filter 작업을 마친 데이터를 적재할 엔드포인트를 정의하는 노드 유형입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 아이디 | - | string | 노드의 아이디를 설정합니다. 이 속성에 정의된 값으로 차트보드에 노드 이름을 표기합니다. |
/{bucket_name}/year={yyyy}/month={MM}/day={dd}/hour={HH}/part-{uuid}-{file_counter} | 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 리전 | - | enum | Object Storage 상품의 리전을 입력합니다. | |
| 버킷 | - | string | 버킷 이름을 입력합니다. | |
| 비밀 키 | - | string | S3 API 자격 증명 비밀 키를 입력합니다. | |
| 액세스 키 | - | string | S3 API 자격 증명 액세스 키를 입력합니다. | |
| Prefix | /year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH} | string | 오브젝트 업로드 시 이름 앞에 붙일 접두사를 입력합니다. 필드 또는 시간 형식을 입력할 수 있습니다. |
사용 가능한 시간 형식 |
| Prefix 시간 필드 | - | string | Prefix에 적용할 시간 필드를 입력합니다. | |
| Prefix 시간 필드 타입 | DATE_FILTER_RESULT | enum | Prefix에 적용할 시간 필드의 타입을 입력합니다. | DATE_FILTER_RESULT 타입만 가능(추후 다른 타입 지원 예정) |
| Prefix 시간대 | UTC | string | Prefix에 적용할 시간 필드의 타임 존을 입력합니다. | |
| Prefix 시간 적용 fallback | _prefix_datetime_parse_failure | string | Prefix 시간 적용에 실패한 경우 대체할 Prefix를 입력합니다. | |
| 기준 시각 | 1 | number | 오브젝트를 분할할 기준이 될 시간을 설정합니다. | |
| 기준 오브젝트 크기 | 5242880 | number | 오브젝트를 분할할 기준이 될 크기(단위: byte)를 설정합니다. | |
| 비활성 간격 | 1 | number | 데이터 인입이 없는 상태가 지속될 때 오브젝트를 분할하는 기준 시간을 설정합니다. | 설정된 시간 동안 데이터 인입이 없으면 현재 오브젝트가 업로드되며, 이후 새로 인입되는 데이터는 새로운 오브젝트에 작성됩니다. |
지원 코덱 * JSON 코덱 - JSON 형식 데이터 파싱 * LINE 코덱 - 행 단위 메시지 처리 * Parquet 코덱 - 데이터를 Parquet 형식으로 압축
obs-test-container/dataflow/%{deployment}{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/dataflow/production/part-378be4d8-2c59-4014-aaeb-a9bc75af2653-0
obs-test-container/dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}logTimeISO8601Asia/Seoul{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/dataflow/year=2022/month=11/day=21/hour=16/part-378be4d8-2c59-4014-aaeb-a9bc75af2653-0
obs-test-container/dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}logTimeTIMESTAMP_SECAsia/Seoul_failure{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/_failure/part-378be4d8-2c59-4014-aaeb-a9bc75af2653-0
/{bucket_name}/year={yyyy}/month={MM}/day={dd}/hour={HH}/part-{uuid}-{file_counter} | 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 리전 | - | enum | Data Lake Storage 상품의 리전을 입력합니다. | |
| 버킷 | - | string | 버킷 이름을 입력합니다. | |
| 비밀 키 | - | string | S3 API 자격 증명 비밀 키를 입력합니다. | |
| 액세스 키 | - | string | S3 API 자격 증명 액세스 키를 입력합니다. | |
| Prefix | /year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH} | string | 오브젝트 업로드 시 이름 앞에 붙일 접두사를 입력합니다. 필드 또는 시간 형식을 입력할 수 있습니다. |
사용 가능한 시간 형식 |
| Prefix 시간 필드 | - | string | Prefix에 적용할 시간 필드를 입력합니다. | |
| Prefix 시간 필드 타입 | DATE_FILTER_RESULT | enum | Prefix에 적용할 시간 필드의 타입을 입력합니다. | DATE_FILTER_RESULT 타입만 가능(추후 다른 타입 지원 예정) |
| Prefix 시간대 | UTC | string | Prefix에 적용할 시간 필드의 타임 존을 입력합니다. | |
| Prefix 시간 적용 fallback | _prefix_datetime_parse_failure | string | Prefix 시간 적용에 실패한 경우 대체할 Prefix를 입력합니다. | |
| 기준 시각 | 1 | number | 오브젝트를 분할할 기준이 될 시간을 설정합니다. | |
| 기준 오브젝트 크기 | 5242880 | number | 오브젝트를 분할할 기준이 될 크기(단위: byte)를 설정합니다. | |
| 비활성 간격 | 1 | number | 데이터 인입이 없는 상태가 지속될 때 오브젝트를 분할하는 기준 시간을 설정합니다. | 설정된 시간 동안 데이터 인입이 없으면 현재 오브젝트가 업로드되며, 이후 새로 인입되는 데이터는 새로운 오브젝트에 작성됩니다. |
지원 코덱 * JSON 코덱 - JSON 형식 데이터 파싱 * LINE 코덱 - 행 단위 메시지 처리 * Parquet 코덱 - 데이터를 Parquet 형식으로 압축
dls-test-container/dataflow/%{deployment}{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/dls-test-container/dataflow/production/part-378be4d8-2c59-4014-aaeb-a9bc75af2653-0
dls-test-container/dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}logTimeISO8601Asia/Seoul{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/dls-test-container/dataflow/year=2022/month=11/day=21/hour=16/part-378be4d8-2c59-4014-aaeb-a9bc75af2653-0
dls-test-container/dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}logTimeISO8601Asia/Seoul_failure{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/dls-test-container/_failure/part-378be4d8-2c59-4014-aaeb-a9bc75af2653-0
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 리전 | - | enum | S3 상품의 리전을 입력합니다. | s3 region |
| 버킷 | - | string | 버킷 이름을 입력합니다. | |
| 액세스 키 | - | string | S3 API 자격 증명 액세스 키를 입력합니다. | |
| 비밀 키 | - | string | S3 API 자격 증명 비밀 키를 입력합니다. | |
| Prefix | - | string | 오브젝트 업로드 시 이름 앞에 붙일 접두사를 입력합니다. 필드 또는 시간 형식을 입력할 수 있습니다. |
사용 가능한 시간 형식 |
| Prefix 시간 필드 | - | string | Prefix에 적용할 시간 필드를 입력합니다. | |
| Prefix 시간 필드 타입 | DATE_FILTER_RESULT | enum | Prefix에 적용할 시간 필드의 타입을 입력합니다. | DATE_FILTER_RESULT 타입만 가능(추후 다른 타입 지원 예정) |
| Prefix 시간대 | UTC | string | Prefix에 적용할 시간 필드의 타임 존을 입력합니다. | |
| Prefix 시간 적용 fallback | _prefix_datetime_parse_failure | string | Prefix 시간 적용에 실패한 경우 대체할 Prefix를 입력합니다. | |
| 기준 시각 | 1 | number | 오브젝트를 분할할 기준이 될 시간을 설정합니다. | |
| 기준 오브젝트 크기 | 5242880 | number | 오브젝트를 분할할 기준이 될 크기를 설정합니다. | |
| 경로 방식 요청 | false | boolean | 경로 방식 요청을 사용할지 여부를 결정합니다. | |
| 비활성 간격 | 1 | number | 데이터 인입이 없는 상태가 지속될 때 오브젝트를 분할하는 기준 시간을 설정합니다. | 설정된 시간 동안 데이터 인입이 없으면 현재 오브젝트가 업로드되며, 이후 새로 인입되는 데이터는 새로운 오브젝트에 작성됩니다. |
주의
true로 설정해야 합니다.지원 코덱 * JSON 코덱 - JSON 형식 데이터 파싱 * LINE 코덱 - 행 단위 메시지 처리 * Parquet 코덱 - 데이터를 Parquet 형식으로 압축
NHN Cloud의 EasyQueue에 데이터를 전송하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 앱키 | - | string | EasyQueue의 앱키를 입력합니다. | |
| User Access Key ID | - | string | 사용자 계정의 User Access Key ID를 입력합니다. | |
| Secret Access Key | - | string | 사용자 계정의 User Secret Key를 입력합니다. | |
| 토픽 | - | string | 메시지를 전송할 Kafka 토픽 이름을 입력합니다. | |
| 브로커 서버 목록 | - | string | Kafka 브로커 서버를 입력합니다. 서버가 여러 대일 경우 콤마(,)로 구분합니다. |
Kafka 공식 문서의 bootstrap.servers 속성 참고예: 10.100.1.1:9092,10.100.1.2:9092 |
| 클라이언트 아이디 | dataflow | string | Kafka Producer를 식별하는 ID를 입력합니다. | Kafka 공식 문서의 client.id 속성 참고 |
| 압축 유형 | none | enum | 전송하는 데이터를 압축할 방법을 입력합니다. | Kafka 공식 문서의 compression.type 속성 참고none, gzip, snappy, lz4, zstd 중 선택 |
| 메시지 키 | - | string | 메시지 키로 사용할 필드를 입력합니다. | |
| 메타데이터 갱신 주기 | 300000 | number | 파티션, 브로커 서버 상태 등을 갱신할 주기(ms)를 입력합니다. | Kafka 공식 문서의 metadata.max.age.ms 속성 참고 |
| 최대 요청 크기 | 1048576 | number | 전송 요청당 최대 크기(byte)를 입력합니다. | Kafka 공식 문서의 max.request.size 속성 참고 |
| 서버 재연결 주기 | 50 | number | 브로커 서버에 연결이 실패했을 때 재시도할 주기(ms)를 입력합니다. | Kafka 공식 문서의 reconnect.backoff.ms 속성 참고 |
| 배치 크기 | 16384 | number | 배치 요청으로 전송할 크기(byte)를 입력합니다. | Kafka 공식 문서의 batch.size 속성 참고 |
| 버퍼 메모리 | 33554432 | number | Kafka 전송에 사용하는 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 buffer.memory 속성 참고 |
| 수신 버퍼 크기 | 32768 | number | 데이터를 읽는 데 사용하는 TCP receive 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 receive.buffer.bytes 속성 참고 |
| 전송 지연 시간 | 0 | number | 메시지 전송을 지연할 시간을 입력합니다. 지연된 메시지는 배치 요청으로 한 번에 전송합니다. | Kafka 공식 문서의 linger.ms 속성 참고 |
| 서버 요청 타임아웃 | 30000 | number | 전송 요청에 대한 타임아웃(ms)을 입력합니다. | Kafka 공식 문서의 request.timeout.ms 속성 참고 |
| 전송 버퍼 크기 | 131072 | number | 데이터를 전송하는 데 사용하는 TCP send 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 send.buffer.bytes 속성 참고 |
| ack 속성 | all | enum | 브로커 서버에서 메시지를 받았는지 확인하는 설정을 입력합니다. | Kafka 공식 문서의 acks 속성 참고0 - 메시지 수신 여부를 확인하지 않습니다. 1 - 토픽의 leader가 follower가 데이터를 복사하는 것을 기다리지 않고 메시지를 수신했다는 응답을 합니다. all - 토픽의 leader가 follower가 데이터를 복사하는 것을 기다린 뒤 메시지를 수신했다는 응답을 합니다. |
| 재시도 요청 주기 | 100 | number | 전송 요청이 실패했을 때 재시도할 주기(ms)를 입력합니다. | Kafka 공식 문서의 retry.backoff.ms 속성 참고 |
| 재시도 횟수 | 2147483647 | number | 전송 요청이 실패했을 때 재시도할 최대 횟수를 입력합니다. | Kafka 공식 문서의 retries 속성 참고설정값을 초과하여 재시도하는 경우 데이터 유실이 발생할 수 있습니다. |
| 전달 보장 방식 | EXACTLY_ONCE | enum | 메시지 전달 보장 방식을 선택합니다. | AT_LEAST_ONCE: 메시지가 최소 한 번은 전달되지만, 장애 상황에서 중복이 발생할 수 있습니다. 중복 처리를 애플리케이션에서 직접 관리할 수 있거나, 중복이 허용되는 경우에 적합합니다. EXACTLY_ONCE: 메시지가 정확히 한 번만 처리됩니다. 중복이 허용되지 않는 결제·정산 등 핵심 트랜잭션에 적합하지만, 내부적으로 트랜잭션을 사용하므로 처리량이 다소 낮아질 수 있습니다. |
| 추가 설정 | - | hash | Kafka 연결에 사용할 추가 Producer 설정을 입력합니다. | Kafka 공식 문서 참고 |
지원 코덱 * JSON 코덱 - JSON 형식 데이터 파싱 * LINE 코덱 - 행 단위 메시지 처리
Kafka에 데이터를 전송하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 토픽 | - | string | 메시지를 전송할 Kafka 토픽 이름을 입력합니다. | |
| 브로커 서버 목록 | string | Kafka 브로커 서버를 입력합니다. 서버가 여러 대일 경우 콤마(,)로 구분합니다. |
Kafka 공식 문서의 bootstrap.servers 속성 참고예: 10.100.1.1:9092,10.100.1.2:9092 |
|
| 클라이언트 아이디 | dataflow | string | Kafka Producer를 식별하는 ID를 입력합니다. | Kafka 공식 문서의 client.id 속성 참고 |
| 압축 유형 | none | enum | 전송하는 데이터를 압축할 방법을 입력합니다. | Kafka 공식 문서의 compression.type 속성 참고none, gzip, snappy, lz4, zstd 중 선택 |
| 메시지 키 | - | string | 메시지 키로 사용할 필드를 입력합니다. | |
| 메타데이터 갱신 주기 | 300000 | number | 파티션, 브로커 서버 상태 등을 갱신할 주기(ms)를 입력합니다. | Kafka 공식 문서의 metadata.max.age.ms 속성 참고 |
| 최대 요청 크기 | 1048576 | number | 전송 요청당 최대 크기(byte)를 입력합니다. | Kafka 공식 문서의 max.request.size 속성 참고 |
| 서버 재연결 주기 | 50 | number | 브로커 서버에 연결이 실패했을 때 재시도할 주기(ms)를 입력합니다. | Kafka 공식 문서의 reconnect.backoff.ms 속성 참고 |
| 배치 크기 | 16384 | number | 배치 요청으로 전송할 크기(byte)를 입력합니다. | Kafka 공식 문서의 batch.size 속성 참고 |
| 버퍼 메모리 | 33554432 | number | Kafka 전송에 사용하는 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 buffer.memory 속성 참고 |
| 수신 버퍼 크기 | 32768 | number | 데이터를 읽는 데 사용하는 TCP receive 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 receive.buffer.bytes 속성 참고 |
| 전송 지연 시간 | 0 | number | 메시지 전송을 지연할 시간을 입력합니다. 지연된 메시지는 배치 요청으로 한 번에 전송합니다. | Kafka 공식 문서의 linger.ms 속성 참고 |
| 서버 요청 타임아웃 | 30000 | number | 전송 요청에 대한 타임아웃(ms)을 입력합니다. | Kafka 공식 문서의 request.timeout.ms 속성 참고 |
| 전송 버퍼 크기 | 131072 | number | 데이터를 전송하는 데 사용하는 TCP send 버퍼의 크기(byte)를 입력합니다. | Kafka 공식 문서의 send.buffer.bytes 속성 참고 |
| ack 속성 | all | enum | 브로커 서버에서 메시지를 받았는지 확인하는 설정을 입력합니다. | Kafka 공식 문서의 acks 속성 참고0 - 메시지 수신 여부를 확인하지 않습니다. 1 - 토픽의 leader가 follower가 데이터를 복사하는 것을 기다리지 않고 메시지를 수신했다는 응답을 합니다. all - 토픽의 leader가 follower가 데이터를 복사하는 것을 기다린 뒤 메시지를 수신했다는 응답을 합니다. |
| 재시도 요청 주기 | 100 | number | 전송 요청이 실패했을 때 재시도할 주기(ms)를 입력합니다. | Kafka 공식 문서의 retry.backoff.ms 속성 참고 |
| 재시도 횟수 | 2147483647 | number | 전송 요청이 실패했을 때 재시도할 최대 횟수를 입력합니다. | Kafka 공식 문서의 retries 속성 참고설정값을 초과하여 재시도하는 경우 데이터 유실이 발생할 수 있습니다. |
| 전달 보장 방식 | EXACTLY_ONCE | enum | 메시지 전달 보장 방식을 선택합니다. | AT_LEAST_ONCE: 메시지가 최소 한 번은 전달되지만, 장애 상황에서 중복이 발생할 수 있습니다. 중복 처리를 애플리케이션에서 직접 관리할 수 있거나, 중복이 허용되는 경우에 적합합니다. EXACTLY_ONCE: 메시지가 정확히 한 번만 처리됩니다. 중복이 허용되지 않는 결제·정산 등 핵심 트랜잭션에 적합하지만, 내부적으로 트랜잭션을 사용하므로 처리량이 다소 낮아질 수 있습니다. |
| 추가 설정 | - | hash | Kafka 연결에 사용할 추가 Producer 설정을 입력합니다. | Kafka 공식 문서 참고 |
지원 코덱 * JSON 코덱 - JSON 형식 데이터 출력 * LINE 코덱 - 행 단위 메시지 출력
지원 코덱 * JSON 코덱 - JSON 형식 데이터 출력 * LINE 코덱 - 행 단위 메시지 출력
인입된 데이터의 값에 따라 흐름 분기를 정의하는 노드 유형입니다.
조건문으로 메시지를 필터링하는 노드입니다.
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 조건문 | - | string | 메시지를 필터링할 조건을 입력합니다. | 아래 예제를 참고하세요. |
조건문 → logLevel == "ERROR"
{
"logLevel": "ERROR"
}
{
"logLevel": "INFO"
}
조건문 → response.status == 200 또는 response["status"] == 200
{
"response": {
"status": 200
}
}
{
"response": {
"status": 404
}
}
| 속성명 | 기본값 | 자료형 | 설명 | 비고 |
|---|---|---|---|---|
| 시드 | - | number | 난수 생성 시 사용할 시드를 입력합니다. 시드가 같고 입력 메시지가 동일하다면 결과는 동일합니다. | |
| 분할 설정 | - | hash | 분기 이름과 비율을 JSON 형식으로 입력합니다. 모든 비율의 합은 1.0이어야 합니다. |
예: {"train": 0.6, "test": 0.3, "sampling": 0.1} |
42{"train": 0.6, "test": 0.3, "sampling": 0.1}입력된 이벤트가 설정된 비율에 따라 각 분기로 전달됩니다.
train 분기에 연결된 하위 노드: 전체 이벤트의 약 60%가 전달됩니다.test 분기에 연결된 하위 노드: 전체 이벤트의 약 30%가 전달됩니다.sampling 분기에 연결된 하위 노드: 전체 이벤트의 약 10%가 전달됩니다.