Data & Analytics > DataFlow > Node Type Guide

  • Node types are pre-defined templates for easy flow creation.
  • Types of node types are Source, Filter, Branch, and Sink.
  • It is recommended that Source, Sink Node type have to be tested to ensure that Endpoint information is valid.
  • You must use the DataFlow IP fixation feature when connecting to data sources with access control enabled.
    • To use static DataFlow IP, please contact customer service.
  • Each node type may have different support, properties, and behavior depending on the engine type. For more information, refer to the description for each node.

Engine Type Summary

Node Category V1 V2 Note
Source O
Provides all proven connectors, including Kafka and JDBC
O
NHN Cloud/Object Storage connectors first, with Kafka and JDBC to be provided later
Check the "Supported Engine Type" table in each node section for final support.
Filter O
Provides all existing plugins, including Alter, Grok, and Mutate
O
Includes common JSON/Date nodes + V2-specific nodes, such as Coerce/Copy/Rename/Strip
Parameters/default values ​​may vary even for the same node, so check the "Parameters by Engine Type" table.
Branch O O
Sink O
Provides all Object Storage, S3, Kafka, etc.
O
Provides most common connectors. Parquet advanced options, etc. are currently V1-first
Check the "Supported Engine Type" column in the properties table for detailed limitations.

V1 Dedicated or Priority Node/Feature - Source > (Apache) Kafka and Source > JDBC currently only work on V1, with V2 support planned for the future. - Some filters, such as Filter > (Logstash) Grok and Filter > Mutate, only work on V1. - The Parquet compression/format advanced options for Sink > (NHN Cloud) Object Storage and (Amazon) S3 only support V1 settings, with V2 support planned for the future.

V2 Dedicated Node/Additional Feature - Filter > Coerce, Filter > Copy, Filter > Rename, and Filter > Strip are management nodes provided in V2. - Properties marked as Supported Engine Type: V2, such as Overwrite and Delete Original Field in Filter > JSON, can only be set in V2. - Charts/options marked as V2 Available Later in the Monitoring and Templates sections will be automatically updated after release, and currently only display V1 execution information.

Nodes whose compatibility varies depending on engine type are always updated in the ### Supported Engine Types table and Note/Caution blocks. When designing a new flow, first get a general idea of ​​the scope from the summary above, then review the detailed support status and limitations in the actual node section.

Important Notes on Object Storage Connectivity (Engine V2)

For V2, multiple Object Storage instances cannot be used within the same flow if they share the same bucket name, even if they belong to different regions or projects.

Unsupported Configuration Examples

  • Example 1
    • First Object Storage connection details
      • Region: KR1
      • Bucket name: Data
      • Project: TEST
    • Second Object Storage connection details
      • Region: JP1
      • Bucket name: Data
      • Project: TEST
    • Although the buckets are distinct due to being in different regions, they cannot be used together in a single DataFlow if they share the same name.
  • Example 2
    • First Object Storage connection details
      • Region: KR1
      • Bucket name: Data
      • Project: TEST_1
    • Second Object Storage connection details
      • Region: KR1
      • Bucket name: Data
      • Project: TEST_2
    • Although the buckets are distinct due to being in different projects, they cannot be used together in a single DataFlow if they share the same name.

Domain Specific Language(DSL) Definition

  • DSL definition is required to execute the flow.

Variable

  • {{ executionTime }}
    • Flow execution time
  • Time unit
    • MINUTE - {{ MINUTE }}
    • HOUR - {{ HOUR }}
    • DAY - {{ DAY }}
    • MONTH - {{ MONTH }}
    • YEAR - {{ YEAR }}

Filter

  • {{ time | startOf: unit }}
    • Returns the start time of time zone defined by unit from the given time.
    • Calculate based on Korean time.
    • ex) {{ executionTime | startOf: MINUTE }}
    • ex) {{ "2022-11-04T13:31:28Z" | startOf: MINUTE }}
      • → 2022-11-04T13:31:00Z
    • ex) {{ "2022-11-04T13:31:28Z" | startOf: HOUR }}
      • → 2022-11-04T13:00:00Z
    • ex) {{ "2022-11-04T13:31:28Z" | startOf: DAY }}
      • → 2022-11-04T00:00:00Z
    • ex) {{ "2022-11-04T13:31:28Z" | startOf: MONTH }}
      • → 2022-11-01T00:00:00Z
    • ex) {{ "2022-11-04T13:31:28Z" | startOf: YEAR }}
      • → 2022-01-01T00:00:00Z
  • {{ time | endOf: unit }}
    • Returns the last time of time zone defined by unit from the given time.
    • Calculate based on Korean time.
    • ex) {{ executionTime | endOf: MINUTE }}
    • ex) {{ "2022-11-04T13:31:28Z" | endOf: MINUTE }}
      • → 2022-11-04T13:31:59.999999999Z
    • ex) {{ "2022-11-04T13:31:28Z" | endOf: HOUR }}
      • → 2022-11-04T13:59:59.999999999Z
    • ex) {{ "2022-11-04T13:31:28Z" | endOf: DAY }}
      • → 2022-11-04T23:59:59.999999999Z
    • ex) {{ "2022-11-04T13:31:28Z" | endOf: MONTH }}
      • → 2022-11-30T23:59:59.999999999Z
    • ex) {{ "2022-11-04T13:31:28Z" | endOf: YEAR }}
      • → 2022-12-31T23:59:59.999999999Z
  • {{ time | subTime: delta, unit }}
    • Returns the time subtracted by delta in the time zone defined by unit from the given time.
    • ex) {{ executionTime | subTime: 10, MINUTE }}
    • ex) {{ "2022-11-04T13:31:28Z" | subTime: 10, MINUTE }}
      • → 2022-11-04T13:21:28Z
  • {{ time | addTime: delta, unit }}
    • Returns the time added by delta in the time zone defined by unit from the given time.
    • ex) {{ executionTime | addTime: 10, MINUTE }}
    • ex) {{ "2022-11-04T13:31:28Z" | addTime: 10, MINUTE }}
      • → 2022-11-04T13:41:28Z
  • {{ time | format: formatStr }}
    • Returns the given time in the form formatStr.
      • ios8601
      • yyyy
      • yy
      • MM
      • M
      • dd
      • d
      • mm
      • m
      • ss
      • s
    • ex) {{ executionTime | format: 'yyyy' }}
    • ex) {{ "2022-11-04T13:31:28Z" | format: 'yyyy' }}
      • → 2022
  • nested filter example
    • DSL expression at 03:00 hour on the day the flow started
      • → {{ executionTime | startOf: DAY | addTime: 3, HOUR }}

Input by Data Type

string

  • Enter a string.

number

  • Enter a number greater or equal to 0.
  • Use the arrow to the right of the input box to adjust the value by 1.

boolean

  • Select TRUE or FALSE from the drop-down menu.

enum

  • Select an item from the drop-down menu.

array of strings

  • Enter the strings that will go into the array one by one.
  • After entering the string, click + to insert the string into the array.
  • ex) If you want to enter ["message" , "yyyy-MM-dd HH:mm:ssZ", "ISO8601"], insert the string into the array in the following order: message, yyyy-MM-dd HH:mm:ssZ, ISO8601.

Hash

  • Enter a string in JSON format.

Source

  • Node type that defines an endpoint that imports data to the flow.

Execution Mode

  • The Source node has two execution modes, BATCH and STREAMING.
    • STREAMING mode: Processes data in real time without exiting the flow.
    • BATCH mode: Processes a set amount of data and then terminates the flow.
  • The execution modes are supported differently by each source node.

Common Settings on Source Node

Property name Default value Data type Supported engine type Description Others
Type - string V1 Create type field with the value given in each message.
ID - string V1, V2 Sets Node ID
Mark the Node name on the chart board with values defined in this property.
Tag - array of strings V1 Add the tag of given value to each message.
Add Field - Hash V1 You can add a custom field
You can add fields by calling in the value of each field with %{[depth1_field]}.

Example of adding fields

{ 
    "my_custom_field": "%{[json_body][logType]}" 
}

Node Description

  • (NHN Cloud) Log & Crash Search Node is node that reads logs from Log & Crash Search.
  • You can set the log query start time for a node. If not set, the log is read from the start of the flow.
  • If no end time is entered in the node, logs are read in streaming format. If an end time is entered, logs are read up to the end time and the flow ends.
  • Currently, session logs and crash logs are not supported.
  • Affected by tokens from Log & Crash Search's Log Search API.
    • If you don't have enough tokens, you need to contact Log & Crash Search.

Supported Engine Type

Engine Type Support Note
V1 O
V2 O

Execution Mode

  • STREAMING: Continues processing data after the Query Start time.
  • BATCH: Processes all data that falls between the Query Start time and the Query End time and ends the flow.

Property Description

Property name Default value Data type Supported engine type Description Others
Appkey - string V1, V2 Enter the app key for Log & Crash Search.
SecretKey - string V1, V2 Enter the secret key for Log & Crash Search.
Query Start time {{exeuctionTime}} string V1, V2 Enter the start time of log query. Must be entered in ISO 8601 format with offset or DSL format.
Example: 2025-07-23T11:23:00+09:00, {{ executionTime }}
Query End time - string V1, V2 Enter the end time of log query. Must be entered in ISO 8601 format with offset or DSL format.
Example: 2025-07-23T11:23:00+09:00, {{ executionTime }}
Number of retries - number V1 Enter the maximum number of times to retry when a log query fails.
Search Query * string V1, V2 Enter the search query to use when requesting a Log & Crash Search query. For detailed query writing instructions, please refer to the "Lucene Query Guide" of the Log & Crash Search service.
  • Set the number of retries
    • If the number of retries fails, no more log queries are attempted, and the flow ends.

Message imported by codec

  • Log & Crash Search is designed to process data in JSON format by default.
  • If you want to use each field in the Log&Crash Search log, we recommend using the JSON codec.

Supported codec: * json codec - JSON data parsing

Source > (NHN Cloud) CloudTrail

Node Description

  • (NHN Cloud) CloudTrail is a node that reads data from CloudTrail.
  • You can set the data query start time for a node. If not set, data is read from the start of the flow.
  • If no end time is entered in the node, data is read in streaming format. If an end time is entered, the data up to the end time is read and the flow ends.

Supported Engine Type

Engine Type Support Note
V1 O
V2 O

Execution Mode

  • STREAMING: Continues processing data after the Query Start time.
  • BATCH: Processes all data that falls between the Query Start time and the Query End time and ends the flow.

Property Description

Property name Default value Data type Supported engine type Description Others
Appkey - string V1, V2 Enter the app key for CloudTrail.
User Access Key ID - string V2 Enter the User Access Key ID of the user account.
Secret Access Key - string V2 Enter the User Secret Key of the user account.
Query Start time {{executionTime}} string V1, V2 Enter the start time of data query. Must be entered in ISO 8601 format with offset or DSL format.
Example: 2025-07-23T11:23:00+09:00, {{ executionTime }}
Query End time - string V1, V2 Enter the end time of data query. Must be entered in ISO 8601 format with offset or DSL format.
Example: 2025-07-23T11:23:00+09:00, {{ executionTime }}
Number of Retries - number V1 Enter the maximum number of times to retry when a data query fails.
Event Type * string V2 Enter the event ID you want to search for.
  • Set the number of retries
    • If the number of retries fails, no more data queries are attempted, and the flow ends.

Message imported by codec

  • CloudTrail is designed to process data in JSON format by default.
  • If you want to use each field in CloudTrail data, we recommend using json Codec.

Supported codec: * json codec - JSON data parsing

Source > (NHN Cloud) Object Storage

Node Description

  • Node that receives data from Object Storage of NHN Cloud.
  • Based on the object creation time, data is read from the object created the earliest.

Supported Engine Type

Engine Type Support Note
V1 O
V2 O [Important Notes on Object Storage Connectivity (Engine V2)]refer to (node-config-guide/#v2-object-storage)

Execution Mode

  • STREAMING: Updates the object list on each list update cycleand processes data by reading newly added objects.
  • BATCH: Fetches the object list once at the beginning of the flow, reads the objects, processes the data, and ends the flow.

Property Description

Property name Default value Data type Supported engine type Description Note
Bucket - string V1, V2 Enter a bucket name to read data.
Region - string V1, V2 Enter region information configured in the storage.
Secret Key - string V1, V2 Enter the credential secret key issued by S3.
Access key - string V1, V2 Enter the credential access key issued by S3.
List update cycle 60 number V1, V2 Enter the object list update cycle included in the bucket.
Metadata included or not false boolean V1 Determine whether to include metadata from the S3 object as a key. In order to expose metadata fields to the Sink plugin, you need to combine filter node types (see guide below). fields to be created are as follows.
last_modified: The last time the object was modified
content_length: Object size
key: Object name
content_type: Object type
metadata: Metadata
etag: etag
Prefix - string V1, V2 Enter a prefix of an object to read.
Key pattern to exclude - string V1, V2 Enter the pattern of an object not to be read.
Delete processed objects false boolean V1 If the property value is true, delete the object read.

Metadata Field Usage

  • When Metadata included or not is enabled, the metadata field is created, but is not exposed by the Sink plugin without injecting it as a regular field.
  • Example message after (NHN Cloud) Object Storage plugin when activating settings
}
    // General field
    "@version": "1",
    "@timestamp": "2022-04-11T00:01:23Z"
    "message": "Object contents..."

    // Metadata fields
    // Cannot be exposed to the Sink plugin until the user injects it as a regular field
    // "[@metadata][s3][last_modified]": 2024-01-05T01:35:50.000Z
    // "[@metadata][s3][content_length]": 220
    // "[@metadata][s3][key]": "{filename}"
    // "[@metadata][s3][content_type]": "text/plain"
    // "[@metadata][s3][metadata]": {}
    // "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
}
  • Although the option to add fields exists in this (NHN Cloud) Object Storage Source plugin, it fails to add fields simultaneously with data import.
  • Add it as a regular field via the Add field option in the common settings of any Filter plugin.
  • Example of field additional options
{
    "last_modified": "%{[@metadata][s3][last_modified]}"
    "content_length": "%{[@metadata][s3][content_length]}"
    "key": "%{[@metadata][s3][key]}"
    "content_type": "%{[@metadata][s3][content_type]}"
    "metadata": "%{[@metadata][s3][metadata]}"
    "etag": "%{[@metadata][s3][etag]}"
}
  • Example message after the alter (add field option) plugin
}
    // General field
    "@version": "1",
    "@timestamp": "2022-04-11T00:01:23Z"
    "message": "Object contents..."
    "last_modified": 2024-01-05T01:35:50.000Z
    "content_length": 220
    "key": "{filename}"
    "content_type": "text/plain"
    "metadata": {}
    "etag": "\"56ad65461e0abb907465bacf6e4f96cf\""

    // Metadata field
    // "[@metadata][s3][last_modified]": 2024-01-05T01:35:50.000Z
    // "[@metadata][s3][content_length]": 220
    // "[@metadata][s3][key]": "{filename}"
    // "[@metadata][s3][content_type]": "text/plain"
    // "[@metadata][s3][metadata]": {}
    // "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
}

Message imported by codec

Supported codec: * plain codec - Raw data string storage * json codec - JSON data parsing * line codec - Line-by-line message processing (Engine V1 only)

Source > (Amazon) S3

Node Description

  • Node for uploading data to Amazon S3.
  • Based on the object creation time, data is read from the object created the earliest.

Supported Engine Type

Engine Type Support Note
V1 O
V2 O [V2 Important Notes on Object Storage Connectivity (Engine V2)]refer to (node-config-guide/#v2-object-storage)

Execution Mode

  • STREAMING: Updates the object list on each list update cycleand processes data by reading newly added objects.
  • BATCH: Updates the object list once at the start of the flow, then reads the objects, processes the data, and ends the flow.

Property Description

Property name Default value Data type Supported engine type Description Note
Endpoint - string V1, V2 Enter endpoint for S3 storage. Only HTTP and HTTPS URL types can be entered.
Bucket - string V1, V2 Enter a bucket name to read data.
Region * V1: us-east-1
* V2: -
string V1, V2 Enter region information configured in the storage.
Session token - string V1 Enter AWS session token.
Secret Key - string V1, V2 Enter the credential secret key issued by S3.
Access key - string V1, V2 Enter the credential access key issued by S3.
List update cycle 60 number V1, V2 Enter the object list update cycle included in the bucket.
Metadata included or not false boolean V1 Determine whether to include metadata from the S3 object as a key. In order to expose metadata fields to the Sink plugin, you need to combine filter node types (see guide below). fields to be created are as follows.
last_modified: The last time the object was modified
content_length: Object size
key: Object name
content_type: Object type
metadata: Metadata
etag: etag
Prefix * V1: nil
* V2: -
string V1, V2 Enter a prefix of an object to read.
Key pattern to exclude * V1: nil
* V2: -
string V1, V2 Enter the pattern of an object not to be read.
Delete false boolean V1 If the property value is true, delete the object read.
Additional settings - hash V1 Enter additional settings to use when connecting to the S3 server. Guide

Caution

  • When connecting to NHN Cloud Object Storage using an (Amazon) S3 node, you must configure the properties as follows:
  • If the engine type is V1
    • Set the force_path_style value to true using the Additional Settings
    • Input example: {"force_path_style" : true}
  • If the engine type is V2
    • Enable Path-Style Requests by setting it to true

Metadata Field Usage

  • When Metadata included or not is enabled, the metadata field is created, but is not exposed by the Sink plugin without injecting it as a regular field.
  • Example message after (Amazon) S3 plugin when activating settings
{
    // General field
    "@version": "1",
    "@timestamp": "2022-04-11T00:01:23Z"
    "message": "Object contents..."

    // Metadata fields
    // Cannot be exposed to the Sink plugin until the user injects it as a regular field
    // "[@metadata][s3][server_side_encryption]": "AES256"
    // "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
    // "[@metadata][s3][content_type]": "text/plain"
    // "[@metadata][s3][key]": "{filename}"
    // "[@metadata][s3][last_modified]": 2024-01-05T02:27:26.000Z
    // "[@metadata][s3][content_length]": 220
    // "[@metadata][s3][metadata]": {}
}
  • The option to add fields exists in this (Amazon) S3 Source plugin, but it fails to do so at the same time as the data is being imported.
  • Add it as a regular field via the Add field option in the common settings of any Filter plugin.
  • Example of field additional options
{
    "server_side_encryption": "%{[@metadata][s3][server_side_encryption]}"
    "etag": "%{[@metadata][s3][etag]}"
    "content_type": "%{[@metadata][s3][content_type]}"
    "key": "%{[@metadata][s3][key]}"
    "last_modified": "%{[@metadata][s3][last_modified]}"
    "content_length": "%{[@metadata][s3][content_length]}"
    "metadata": "%{[@metadata][s3][metadata]}"
}
  • Example message after the alter (add field option) plugin
{
    // General field
    "@version": "1",
    "@timestamp": "2022-04-11T00:01:23Z"
    "message": "file contents..."
    "server_side_encryption": "AES256"
    "etag": "\"56ad65461e0abb907465bacf6e4f96cf\""
    "content_type": "text/plain"
    "key": "{filename}"
    "last_modified": 2024-01-05T01:35:50.000Z
    "content_length": 220
    "metadata": {}

    // Metadata field
    // "[@metadata][s3][server_side_encryption]": "AES256"
    // "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
    // "[@metadata][s3][content_type]": "text/plain"
    // "[@metadata][s3][key]": "{filename}"
    // "[@metadata][s3][last_modified]": 2024-01-05T02:27:26.000Z
    // "[@metadata][s3][content_length]": 220
    // "[@metadata][s3][metadata]": {}
}

Message imported by codec

Supported codec: * plain codec - Raw data string storage * json codec - JSON data parsing * line codec - Line-by-line message processing (Engine V1 only)

Source > (Apache) Kafka

Node Description

  • Node that receives data from Kafka.

Supported Engine Type

Engine Type Support Note
V1 O
V2 - To be supported

Execution Mode

  • STREAMING: Processes data every time a new message arrives in a topic.

Caution

  • Kafka nodes do not support BATCH mode.

Property Description

| Property Name | Default Value | Data Type | Supported Engine Type | Description | Note | |---------------------------------|----------------------------------------------------------------------------|------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Broker Server List | localhost:9092 | string | V1 | Enter the Kafka broker server. Separate multiple servers with commas (,). | Refer to the bootstrap.servers property in the Kafka official documentation
Example: 10.100.1.1:9092,10.100.1.2:9092 | | Consumer Group ID | dataflow | string | V1 | Enter the ID that identifies the Kafka Consumer Group. | See the group.id property in the Kafka official documentation | | Topic list | dataflow | array of strings | V1 | Enter a list of Kafka topics to receive messages from. | | Topic pattern | - | string | V1 | Enter the Kafka topic pattern to receive messages from. | Example: *-messages| | Exclude internal topics | true | boolean | V1 | Exclude internal topics such as __consumer_offsets. | See the exclude.internal.topics property in the Kafka official documentation
Exclude internal topics such as __consumer_offsets from the receiving destination. | | Client ID | dataflow | string | V1 | Enter the ID that identifies the Kafka Consumer. | See the client.id property in the Kafka official documentation | | Partition assignment policy | - | string | V1 | Determines how Kafka assigns partitions to consumer groups when receiving messages. | See the partition.assignment.strategy property in the Kafka official documentation
org.apache.kafka.clients.consumer.RangeAssignor
org.apache.kafka.clients.consumer.RoundRobinAssignor
org.apache.kafka.clients.consumer.StickyAssignor
org.apache.kafka.clients.consumer.CooperativeStickyAssignor | | Offset setting | latest | enum | V1 | Enter the criteria for setting the offset of the consumer group. | See the auto.offset.reset property in the Kafka official documentation
All of the settings below will keep the existing offset if the consumer group already exists.
none: If there is no consumer group, return an error.
earliest: If there is no consumer group, initialize to the oldest offset in the partition.
latest: If there is no consumer group, initialize to the most recent offset in the partition. | | Key Deserialization Type | org.apache.kafka.common.serialization.StringDeserializer | string | V1 | Enter the method to serialize the keys of incoming messages. | See the key.deserializer property in the Kafka official documentation | | Message deserialization type | org.apache.kafka.common.serialization.StringDeserializer | string | V1 | Enter how to serialize the values ​​of the incoming message. | See the value.deserializer property in the Kafka official documentation | | Whether to generate metadata | false | boolean | V1 | If the property value is true, metadata fields for the message will be generated. To expose metadata fields to the Sink plugin, you must combine the filter node type (see the guide below). | The fields generated are as follows:
topic: The topic that received the message
consumer_group: The consumer group ID used to receive the message
partition: The partition number of the topic that received the message
offset: The offset of the partition that received the message
key: A ByteBuffer containing the message key | | Fetch minimum size | - | number | V1 | Enter the minimum amount of data to retrieve in a single fetch request. | See the fetch.min.bytes property in the Kafka official documentation | | Send buffer size | - | number | V1 | Enter the size (in bytes) of the TCP send buffer used to transmit data. | See the send.buffer.bytes property in the Kafka official documentation | | Retry request interval | - | number | V1 | Enter the interval (ms) to retry when a send request fails. | See the retry.backoff.ms property in the Kafka official documentation | | Cyclic redundancy check | true | boolean | V1 | Check the CRC of the message. | See the check.crcs property in the Kafka official documentation | | Server Reconnection Interval | 50 | number | V1 | Enter the interval to retry when a connection to the broker server fails. | See the reconnect.backoff.ms property in the Kafka official documentation | | Poll Timeout | 100 | number | V1 | Enter the timeout (in milliseconds) for requests to fetch new messages from the topic. | | | Fetch Max Size Per Partition | - | number | V1 | Enter the maximum size to fetch in a single fetch request per partition. | See the max.partition.fetch.bytes property in the Kafka official documentation | | Server Request Timeout | - | number | V1 | Enter the timeout (in milliseconds) for send requests. | See the request.timeout.ms property in the Kafka official documentation | | TCP Receive Buffer Size | - | number | V1 | Enter the size (in bytes) of the TCP receive buffer used to read data. | See the receive.buffer.bytes property in the Kafka official documentation | | Session Timeout | - | number | V1 | Enter the session timeout (in milliseconds) for the consumer.
If a consumer fails to send a heartbeat within this time, it will be excluded from the consumer group. | See the session.timeout.ms property in the Kafka official documentation | | Maximum number of poll messages | - | number | V1 | Enter the maximum number of messages to retrieve in a single poll request. | See the max.poll.records property in the Kafka official documentation | | Maximum poll interval | - | number | V1 | Enter the maximum interval (ms) between poll requests. | See the max.poll.interval.ms property in the Kafka official documentation | | Maximum fetch size | - | number | V1 | Enter the maximum size to retrieve in a single fetch request. | See the fetch.max.bytes property in the Kafka official documentation | | Maximum fetch wait time | - | number | V1 | Enter the wait time (ms) to send a fetch request if the amount of data equal to the Fetch Minimum Size setting has not been collected. | See the fetch.max.wait.ms property in the Kafka official documentation | | Consumer Health Check Interval | - | number | V1 | Enter the interval (ms) at which the consumer sends a heartbeat. | See the heartbeat.interval.ms property in the Kafka official documentation | | Metadata Update Interval | - | number | V1 | Enter the interval (ms) at which to update partitions, broker server status, etc. | See the metadata.max.age.ms property in the Kafka official documentation | | IDLE Timeout | - | number | V1 | Enter the number of milliseconds to wait before closing a connection with no data transfer. | See the connections.max.idle.ms property in the Kafka official documentation |

Metadata Field Usage

  • When Metadata created or not is enabled, the metadata field is created, but is not exposed by the Sink plugin without injecting it as a regular field.
  • Message examples after Kafka plugin when the setting is enabled
{
    // normal fields
    "@version": "1",
    "@timestamp": "2022-04-11T00:01:23Z"
    "message": "kafka topic message..."

    // metadata field
    // Cannot be exposed to the Sink plugin until user input data into normal fields
    // "[@metadata][kafka][topic]": "my-topic"
    // "[@metadata][kafka][consumer_group]": "my_consumer_group"
    // "[@metadata][kafka][partition]": "1"
    // "[@metadata][kafka][offset]": "123"
    // "[@metadata][kafka][key]": "my_key"
    // "[@metadata][kafka][timestamp]": "-1"
}
  • There is an option to add fields in this Kafka Source plug-in, but adding fields cannot be performed at the same time as data is imported.
  • Add as a general field through the additional field options among the common settings of any Filter plug-in.
  • Example of field additional options
{
    "kafka_topic": "%{[@metadata][kafka][topic]}"
    "kafka_consumer_group": "%{[@metadata][kafka][consumer_group]}"
    "kafka_partition": "%{[@metadata][kafka][partition]}"
    "kafka_offset": "%{[@metadata][kafka][offset]}"
    "kafka_key": "%{[@metadata][kafka][key]}"
    "kafka_timestamp": "%{[@metadata][kafka][timestamp]}"
}
  • Message examples after alter (additional field option) plugin
{
    // normal field
    "@version": "1",
    "@timestamp": "2022-04-11T00:01:23Z"
    "message": "kafka topic message..."
    "kafka_topic": "my-topic"
    "kafka_consumer_group": "my_consumer_group"
    "kafka_partition": "1"
    "kafka_offset": "123"
    "kafka_key": "my_key"
    "kafka_timestamp": "-1"

    // metadata field
    // "[@metadata][kafka][topic]": "my-topic"
    // "[@metadata][kafka][consumer_group]": "my_consumer_group"
    // "[@metadata][kafka][partition]": "1"
    // "[@metadata][kafka][offset]": "123"
    // "[@metadata][kafka][key]": "my_key"
    // "[@metadata][kafka][timestamp]": "-1"
}

Message ingestion by codec type

Supported codec: * plain codec - Raw data string storage * json codec - JSON data parsing * line codec - Line-by-line message processing

Source > JDBC

Node Description

  • JDBC is a node that executes queries to the DB at a given interval to retrieve results.
  • Supports MySQL, MS-SQL, PostgreSQL, MariaDB, and Oracle drivers.

Execution Mode

  • STREAMING: Processes data by executing a query at every query execute frequency.
  • BATCH: Executes the query once at the beginning of the flow, processes the data, and ends the flow.

Supported Engine Type

Engine Type Support Note
V1 O
V2 - To be supported

Property Description

Property name Default value Data type Supported engine type Description Note
User - string V1 Enter a DB user.
Connection String - string V1 Enter the DB connection information. Example: jdbc:mysql://my.sql.endpoint:3306/my_db_name
Password - string V1 Enter the user password.
Query - string V1 Write a query to create a message.
Whether to convert columns to lowercase false boolean V1 Determine whether to lowercase the column names you get as a result of the query.
Query execute frequency * * * * * string V1 Enter the execute frequency of the query in a cron-like expression.
Tracking Columns - string V1 Select the columns you want to track. The predefined parameter :SQL_LAST_VALUEallows you to use a value corresponding to the column you want to track in the last query result.
See how to write a query below.
Tracking column type numeric string V1 Select the type of data in the column you want to track. Example: numeric or timestamp
Time zone - string V1 Define the time zone to use when converting a column of type timestamp to a human-readable string. Example: Asia/Seoul
Whether to apply paging false boolean V1 Determines whether to apply paging to the query. When paging is applied, the query is split into multiple executions, the order of which is not guaranteed.
Page size - number V1 In a paged query, it determines how many pages to query at once.

How to write a query

  • :sql_last_valu allows you to use the value corresponding to the tracking columnin the result of the last executed query (the default value is 0, if the tracking column typeis numeric, or 1970-01-01 00:00:00 , if it is timestamp).
SELECT * FROM MY_TABLE WHERE id > :sql_last_value
  • If you want to add a specific condition, add :sql_last_valuein addition to the condition.
SELECT * FROM MY_TABLE WHERE id > :sql_last_value and id > custom_value order by id ASC

Message imported by codec

Supported codec: * plain codec - Raw data string storage

Filter

  • Node type that defines how to handle imported data.

Common Settings on Filter Node

Property name Default value Data type Supported engine type Description Others
ID - string V1, V2 Sets Node ID.
Mark node name on chart board with values defined in this property.
Add Tag - array of strings V1 Add Tag of each message
Delete Tag - array of strings V1 Delete Tag that was given to each message
Delete Field - array of strings V1 Delete Field of each message
Add Field - Hash V1 You can add a custom field
You can add fields by calling in the value of each Field with %{[depth1_field]}.

Filter > Alter

Node Description

  • Node that changes message field values to another values.
  • You can only change the top-level fields.

Supported Engine Type

Engine Type Support Note
V1 O
V2 X

Property Description

Property name Default value Data type Supported engine type Description Others
Modify Field - array of strings V1 Compare the field value to a given value, if they are equal, modify the field value to the given value.
Overwrite Field - array of strings V1 Compare the field value to a given value, if they are equal, modify other field value to the given value.
Coalesce - array of strings V1 Assign a non-null value to the first of the fields that follow one field.

Order of applying settings

  • Each setting is applied in the order listed in the property description.

Condition

  • Modify Field → ["logType", "ERROR", "FAIL"]
  • Overwrite Field → ["logType", "FAIL", "isBillingTarget", "false"]

Input message

{
    "logType": "ERROR",
    "isBillingTarget": "true"
}

Output message

{
    "logType": "FAIL",
    "isBillingTarget": "false"
}

Example of Overwrite Field

Condition

  • Overwrite Field → ["logType", "ERROR", "isBillingTarget", "false"]

Input message

{
    "logType": "ERROR",
    "isBillingTarget": "true"
}

Output message

{
    "logType": "ERROR",
    "isBillingTarget": "false"
}

Example of Modify Field

Condition

  • Modify Field → ["reason", "CONNECTION_TIMEOUT", "MONGODB_CONNECTION_TIMEOUT"]

Input message

{
    "reason": "CONNECTION_TIMEOUT"
}

Output message

{
    "reason": "MONGODB_CONNECTION_TIMEOUT"
}

Coalesce Example

Condition

  • Coalesce → ["reason", "%{webClientReason}", "%{mongoReason}", "%{redisReason}"]

Input message

{
    "mongoReason": "COLLECTION_NOT_FOUND"
}

Output message

{
    "reason": "COLLECTION_NOT_FOUND",
    "mongoReason": "COLLECTION_NOT_FOUND"
}

Filter > Cipher

Node Description

  • Node for decrypting message field values.
  • For the encryption key, refer to symmetric keys in Secure Key Manager.
    • Secure Key Manager symmetric keys can be created through the Secure Key Manager web console or Add Key API in Secure Key Manager.
    • Even if a flow contains multiple Cipher nodes, all Cipher nodes can only refer to one Secure Key Manager's key reference.

Supported Engine Type

Engine Type Support Note
V1 O
V2 - To be supported

Property Description

Property name Default value Data type Supported engine type Description Others
Mode - enum V1 Choose between encryption mode and decryption mode. Select one from the list.
Appkey - string V1 Enter SKM app key that saves the key for encryption/decryption.
Key ID - string V1 Enter SKM ID that saves the key for encryption/decryption.
Key Version - string V1 Enter SKM key version that saves the key for encryption/decryption.
Source Field message string V1 Enter Field name for encryption/decryption.
Field to be stored message string V1 Enter Field name to save encryption/decryption result.

Encrypt example exercise

condition

  • mode → encrypt
  • Appkey → SKM appkey
  • Key ID → SKM Symmetric key ID
  • Key Version → 1
  • IV Random Length → 16
  • Source Field → message
  • Field to be stored → encrypted_message

Input message

{ 
    "message": "this is plain message" 
}

Output message

{ 
    "message": "this is plain message", 
    "encrypted_message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e" 
}

decrypt example

condition

  • mode → decrypt
  • Appkey → SKM appkey
  • Key ID → SKM Symmetric key ID
  • Key Version → 1
  • IV Random Length → 16
  • Source Field → message
  • Field to be stored → decrypted_message

Input message

{ 
    "message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e" 
}

Output message

{ 
    "decrypted_message": "this is plain message", 
    "message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e" 
}

Filter > (Logstash) Grok

Node Description

  • Node that parses a string according to a set rule and stores it in each set field.

Supported Engine Type

Engine Type Support Note
V1 O
V2 - To be supported

Property Description

Property name Default value Data type Supported engine type Description Note
Match - hash V1 Enter the information of the string to be parsed.
Pattern definition - hash V1 Enter a custom pattern as a regular expression for the rule of tokens to be parsed. Check the link below for system defined patterns.
https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/grok-patterns
Failure tag _grokparsefailure array of strings V1 Enter the tag name to define if string parsing fails.
Timeout 30000 number V1 Enter the amount of time to wait for string parsing.
Timeout tag _groktimeout string V1 Enter the tag to register for an event when a timeout occurs.
Overwrite - array of strings V1 When writing a value to a designated field after parsing, if a value is already defined in the field, enter the field names to be overwritten.
Store only values with specified names true boolean V1 If the property value is true, do not store unnamed parting results.
Capture empty string false boolean V1 If the property value is true, store empty strings in fields.
Close or not when match true boolean V1 If the property value is true, the grok match result will terminate the plugin.

Grok parsing examples

Condition

  • Match → { "message": "%{IP:clientip} %{HYPHEN} %{USER} [%{HTTPDATE:timestamp}] "%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} %{NUMBER:bytes}" }
  • Pattern definition → { "HYPHEN": "-*" }

Input messages

{
    "message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \\\"GET /apache_pb.gif HTTP/1.0\\\" 200 2326"
}

Output message

{
    "message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \\\"GET /apache_pb.gif HTTP/1.0\\\" 200 2326",
    "timestamp": "10/Oct/2000:13:55:36 -0700",
    "clientip": "127.0.0.1",
    "verb": "GET",
    "httpversion": "1.0",
    "response": "200",
    "bytes": "2326",
    "request": "/apache_pb.gif"
}

Filter > CSV

Node Description

  • Node that parses a message in CSV format and stores it in a field.

Supported Engine Type

Engine Type Support Note
V1 O
V2 O

Property Description

Property Name Default Data Type Supported Engine Type Description Note
Field to Save - string V1, V2 Enter the field name to save the CSV parsing results.
Quote " string V1, V2 Enter the character that separates the column fields.
Ignore First Row false boolean V1, V2 If the property value is true, the column name entered in the first row of the read data is ignored.
Column - array of strings V1 Enter the column name.
Delimiter , string V1, V2 Enter the string that separates the columns.
Source Field * V1: message
* V2: -
string V1, V2 Enter the field name to parse the CSV.
Schema - hash V1, V2 Enter the name and data type of each column in dictionary format. See Schema Input Method by Engine Type
Overwrite false boolean V2 If true, overwrites the CSV parsing result with a field to be saved or an existing field.
Delete original field false boolean V2 Deletes the source field when CSV parsing is complete. If parsing fails, keep it.

How to Input Schema per Engine Type

  • If the engine type is V1
    • Register separately from the fields defined in the column.
    • The default data type is string. If conversion to a different data type is required, use the schema settings.
    • The following data types are available:
      • integer, float, date, date_time, boolean
  • If the engine type is V2
    • V2 does not support column types and accepts all columns and data types as schema input.
    • The following data types are available:
      • string, integer, long, float, double, boolean

Examples of CSV parsing without data type

Condition

  • Source field → message
  • If the engine type is V1
    • Column → ["one", "two", "t hree"]
  • If the engine type is V2
    • Schema → {"one": "string", "two": "string", "t hree": "string"}

Input messages

{
    "message": "hey,foo,\\\"bar baz\\\""
}

Output message

{
    "message": "hey,foo,\"bar baz\"",
    "one": "hey",
    "t hree": "bar baz",
    "two": "foo"
}

Examples of CSV parsing that requires data type conversion

Condition

  • Source field → message
  • If the engine type is V1
    • Column → ["one", "two", "t hree"]
    • Schema → {"two": "integer", "t hree": "boolean"}
  • If the engine type is V2
    • Schema → {"one": "string", "two": "integer", "t hree": "boolean"}

Input messages

{
    "message": "\\\"wow hello world!\\\", 2, false"
}

Output message

{
    "message": "\\\"wow hello world!\\\", 2, false",
    "one": "wow hello world!",
    "t hree": false,
    "two": 2
}

Filter > JSON

Node Description

  • Node that parses a JSON string and stores it in a specified field.

Supported Engine Type

Engine Type Support Note
V1 O
V2 O

Property Description

Property name Default value Data type Supported engine type Description Note
Source field * V1: message
* V2: -
string V1, V2 Enter a field name to parse JSON strings.
Field to save - string V1, V2 Enter the field name to save the JSON parsing result.
If no property value is specified, the result is stored in the root field.
Overwrite false boolean V2 If true, overwrites the JSON parsing result with a field to be saved or an existing field.
Delete original field false boolean V2 Deletes the source field when JSON parsing is complete. If parsing fails, keep it.

JSON

Condition

  • Source field → message
  • Field to store → json_parsed_messsage

Input messages

{
    "message": "{\\\"json\\\": \\\"parse\\\", \\\"example\\\": \\\"string\\\"}"
}

Output message

{
    "json_parsed_message": {
        "json": "parse",
        "example": "string"
    },
    "message": "{\\\"json\\\": \\\"parse\\\", \\\"example\\\": \\\"string\\\"}"
}

Filter > Date

Node Description

  • A node that parses a data string and stores it in timestamp format.

Supported Engine Type

Engine Type Support Note
V1 O
V2 O

Property Description

Property name Default value Data type Supported engine type Description Others
Source Field - string V1, V2 Enter a field name to get strings.
Formats - array of strings V1, V2 Enter formats to get strings. The pre-defined formats are as follows.
ISO8601, UNIX, UNIX_MS, TAI64N (V2 unsupported)
Locale * V1: -
* V2: ko_KR
string V1, V2 Enter a locale to use for string analysis. e.g. en, en-US, ko-KR
Field to be stored * V1: @timestamp
* V2: -
string V1, V2 Enter a field name to store the result of parsing data strings.
Failure tag _dateparsefailure array of strings V1 Enter the tag name to define if data string parsing fails.
Time zone * V1: -
* V2: Asia/Seoul
string V1, V2 Enter the time zone for the date. e.g. Asia/Seoul

Examples of Date String Parsing

Condition

  • Source Field -> message
  • Formats -> ["yyyy-MM-dd HH:mm:ssZ", "ISO8601"]
  • Field to be stored → time
  • Time zone → Asia/Seoul

Input message

{
    "message": "2017-03-16T17:40:00"
}

Output message

{
    "message": "2017-03-16T17:40:00",
    "time": 2022-04-04T09:08:01.222Z
}

Filter > UUID

Node Description

  • A node that creates UUIDs and stores them in a field.

Supported Engine Type

Engine Type Support Note
V1 O
V2 O

Property Description

Property name Default value Data type Supported engine type Description Others
Field to store UUID - string V1, V2 Enter a field name to store UUID creation result.
Overwrite false boolean V1, V2 Select whether to overwrite the value if it exists in the specified field name.

Example of UUID Creation

Condition

  • Field to store UUID → userId

Input message

{
    "message": "uuid test message"
}

Output message

{
    "userId": "70186b1e-bdec-43d6-8086-ed0481b59370",
    "message": "uuid test message"
}

Filter > Convert

Node Description

  • A node to convert a data type in a certain field.

Supported Engine Type

Engine Type Support Note
V1 X
V2 O

Property Description

Property name Default value Data type Supported engine type Description Note
Target field - string V2 Enter the target field to convert the data type.
Conversion type - enum V2 Select the data type to convert.
* Data type: STRING, INTEGER, FLOAT, DOUBLE, BOOLEAN

Data Conversion Example

Condition

  • Target field -> message
  • Conversion type -> INTEGER

Input Message

{
    "message": "2025"
}

Output Message

{
    "message": 2025
}

Filter > Split

Node Description

  • A node that splits a single message into multiple messages.
  • Splits the message based on the result of parsing according to the settings.

Supported Engine Type

Engine Type Support Note
V1 O
V2 X

Property Description

Property name Default value Data type Supported engine type Description Others
Source field message string V1 Enter a field name to separate messages.
Field to be stored - string V1 Enter a field name to store separated messages.
Separator \n string V1

Example of Default Message Split

Condition

  • Source field → message

Input message

{
    "message": [
        {"number": 1},
        {"number": 2}
    ]
}

Output message

{
    "message": [
        {"number": 1},
        {"number": 2}
    ],
    "number": 1
}
{
    "message": [
        {"number": 1},
        {"number": 2}
    ],
    "number": 2
}

Example of Split after String Parsing

Condition

  • Source field → message
  • Separator → ,

Input message

{
    "message": "1,2"
}

Output message

{
    "message": "1"
}
{
    "message": "2"
}

Example of String Parsing and Splitting Into Different Fields

Condition

  • Source field → message
  • Field to be stored → target
  • Separator → ,

Input message

{
    "message": "1,2"
}

Output message

{
    "message": "1,2",
    "target": "1"
}
{
    "message": "1,2",
    "target": "2"
}

Filter > Truncate

Node Description

  • This node removes strings exceeding the byte length.

Supported Engine Type

Engine Type Support Note
V1 O
V2 X

Property Description

Property name Default value Data type Supported engine type Description Others
Byte length - number V1 Enter the maximum byte length to represent a string.
Source field - string V1 Enter a field name for truncate.

String Removal Example

Condition

  • Byte length → 10
  • Source field → message

Input message

{
    "message": "This message is too long."
}

Output message

{
    "message": "This message"
    }

Filter > Mutate

Node Description

  • A node that transforms the value of a field.

Supported Engine Type

Engine Type Support Note
V1 O
V2 X

Property Description

Property name Default value Data type Supported engine type Description Remarks
Set Defaults - Hash V1 Replace null with defaults.
Rename Field - Hash V1 Rename the field.
Update field values - Hash V1 Replaces the field value with the new value. If the field does not exist, no action is taken.
Replace Value - Hash V1 Replace the field value with a new value. If there is no field, create a new field.
Convert Type - Hash V1 Convert the field value to another type. The following types are supported: integer, interger_eu, float, float_eu, string, and boolean.
Replace String - array of strings V1 Replace the part of string with regular expression.
Uppercase Letter - array of strings V1 Uppercase Letter the string in the target field to uppercase letter.
Capitalize First Letter - array of strings V1 Convert the first letter in the target field to uppercase letter, and the rest to lowercase letter.
Lowercase Letter - array of strings V1 Lowercase the string in the target field to lowercase letter.
Strip Space - array of strings V1 Remove spaces before and after the string in the target field.
Split String - Hash V1 Split strings using separators.
Join Array - Hash V1 Join array elements to separator.
Merge Field - Hash V1 Merge the two fields.
Copy Field - Hash V1 Copy the existing field to another field. If the field exists, overwrite field.
Failure Tag _mutate_error string V1 Enter a tag to define if an error occurs.

Order of applying settings

  • Each setting is applied in the order listed in the property description.

Condition

  • Update Field Value → {"fieldname": "new value"}
  • Uppercase Letter → ["fieldname"]

Input message

{
    "fieldname": "old value"
}

Output message

{
    "fieldname": "NEW VALUE"
}

Example of setting default

Condition

{
  "fieldname": "default_value"
}

Input message

{
  "fieldname": null
}

Output message

{
  "fieldname": "default_value"
}

Example of renaming a field

Condition

{
  "fieldname": "changed_fieldname"
}

Input message

{
  "fieldname": "Hello World!"
}

Output message

{
  "changed_fieldname": "Hello World!"
}

Example of updating field values

Condition

{
  "fieldname": "%{other_fieldname}: %{fieldname}",
  "not_exist_fieldname": "DataFlow"
}

Input message

{
  "fieldname": "Hello World!",
  "other_fieldname": "DataFlow"
}

Output message

{
  "fieldname": "DataFlow: Hello World!",
  "other_fieldname": "DataFlow"
}

Example of replacing values

Condition

{
  "fieldname": "%{other_fieldname}: %{fieldname}",
  "not_exist_fieldname": "DataFlow"
}

Input message

{
  "fieldname": "Hello World!",
  "other_fieldname": "DataFlow"
}

Output message

{
  "fieldname": "DataFlow: Hello World!",
  "other_fieldname": "DataFlow",
  "not_exist_fieldname": "DataFlow"
}

Example of converting types

Condition

{
  "message1": "integer",
  "message2": "boolean"
}

Input message

{
  "message1": "1000",
  "message2": "true"
}

Output message

{
    "message1": 1000,
    "message2": true
}

Type Description

  • integer
    • Converts strings to integers. Supports comma-separated strings. Fractional parts are discarded.
      • Example: "1,000.5" -> 1000
    • Converts floats to integers. Fractional parts are discarded.
    • Converts boolean values to integers: true is converted to 1, false to 0.
  • integer_eu
    • Converts data to integers. Supports dot-separated strings. Fractional parts are discarded.
      • Example: "1.000,5" -> 1000
    • Float and boolean values are treated the same as integers.
  • float
    • Converts integers to floats.
    • Converts strings to floats. Supports comma-separated strings.
      • Example: "1,000.5" -> 1000.5
    • Converts boolean values to integers: trueis converted to 1.0, false to 0.0.
  • float_eu
    • Converts data to floats. Supports dot-delimited strings.
      • Example: "1.000,5" -> 1000.5
    • Float and boolean values are treated the same as floats.
  • string
    • Converts data to strings with UTF-8 encoding.
  • boolean
    • Converts integers to boolean values. A 1is converted to true and a 0 to false.
    • Converts floats to boolean values. A 1.0is converted to true, and a 0.0 to false.
    • Converts strings to boolean values: "true", "t", "yes", "y", "1", "1.0"are converted to true, "false", "f", "no", "n", “0", “0.0"are converted to false. The empty strings are converted tofalse.
  • Array data elements are converted as described above.

Example of replacing string

Condition

["fieldname", "/", "_", "fieldname2", "[\\?#-]", "."]
  • Replace / with _ in the string values in the fieldname field and \, ?, #, and -with . in the string values in the fieldname2 field.

Input message

{
  "fieldname": "Hello/World",
  "fieldname2": "Hello\\?World#Test-123"
}

Output message

{
  "fieldname": "Hello_World",
  "fieldname2": "Hello.World.Test.123"
}

Example of uppercase letter

Condition

["fieldname"]

Input message

{
  "fieldname": "hello world"
}

Output message

{
  "fieldname": "HELLO WORLD"
}

Example of capitalizing the first letter

Condition

["fieldname"]

Input message

{
  "fieldname": "hello world"
}

Output message

{
  "fieldname": "Hello world"
}

Example of lowercase letter

Condition

["fieldname"]

Input message

{
  "fieldname": "HELLO WORLD"
}

Output message

{
  "fieldname": "hello world"
}

Example of stripping spaces

Condition

["field1", "field2"]

Input message

{
  "field1": "Hello World!   ",
  "field2": "   Hello DataFlow!"
}

Output message

{
  "field1": "Hello World!",
  "field2": "Hello DataFlow!"
}

Example of splitting strings

Condition

{
  "fieldname": ","
}

Input message

{
  "fieldname": "Hello,World"
}

Output message

{
  "fieldname": ["Hello", "World"]
}

Example of joining arrays

Condition

{
  "fieldname": ","
}

Input Data

{
  "fieldname": ["Hello", "World"]
}

Output message

{
  "fieldname": "Hello,World"
}

Example of merging fields

Condition

{
  "array_data1": "string_data1",
  "string_data2": "string_data1",
  "json_data1": "json_data2"
}

Input message

{
  "array_data1": ["array_data1"],
  "string_data1": "string_data1",
  "string_data2": "string_data2",
  "json_data1": {"json_field1": "json_data1"},
  "json_data2": {"json_field2": "json_data2"}
}

Output message

{
  "array_data1": ["array_data1", "string_data1"],
  "string_data1": "string_data1",
  "string_data2": ["string_data2", "string_data1"],
  "json_data1": {"json_field2" : "json_data2", "json_field1": "json_data1"},
  "json_data2": {"json_field2": "json_data2"}
}
  • array + array = merge two arrays
  • array + string = add string to array
  • string + string = convert two strings to an array with two strings as elements
  • json + json = json merge

Example of copying fields

Condition

{
  "source_field": "dest_field"
}

Input message

{
  "source_field": "Hello World!"
}

Output message

{
  "source_field": "Hello World!",
  "dest_field": "Hello World!"
}

Filter > Coerce

Node Description

  • A node that replaces null values ​​with default values.

Supported Engine Type

Engine Type Support Note
V1 X
V2 O

Property Description

Property Name Default Value Data Type Supported Engine Type Description Note
Target Field - string V2 Enter the name of the field for which you want to specify a default value.
Default Value - string V2 Enter a default value.

Default Setting Example

Condition

  • Target field → fieldname
  • Default value → default_value

Input Message

{
    "fieldname": null
}

Output Message

{
    "fieldname": "default_value"
}

Filter > Copy

Node Description

  • A node that copies an existing field to another field.

Supported Engine Type

Engine Type Support Note
V1 X
V2 O

Property Description

Property Name Default Data Type Supported Engine Type Description Note
Target Field - string V2 Enter the name of the source field to copy.
Field to Save - string V2 Enter the name of the field to save the copied result.
Overwrite false boolean V2 If true, the field to save will be overwritten if it already exists.

Default Setting Example

Condition

  • Source field → source_field
  • Field to be saved → dest_field

Input Message

{
    "source_field": "Hello World!"
}

Output Message

{
    "source_field": "Hello World!",
    "dest_field": "Hello World!"
}

Filter > Rename

Node Description

  • A node that changes the field name.

Supported Engine Type

Engine Type Support Note
V1 X
V2 O

Property Description

Property Name Default Data Type Supported Engine Type Description Note
Source Field string V2 Enter the source field to be renamed.
Target Field - string V2 Enter the field name to be renamed
Overwrite false boolean V2 If true, the target field will be overwritten if it already exists.

Default Setting Example

Condition

  • Source field → fieldname
  • Target field → changed_fieldname

Input Message

{
    "fieldname": "Hello World!"
}

Output Message

{
    "changed_fieldname": "Hello World!"
}

Filter > Strip

Node Description

  • A node that removes leading and trailing spaces from a string in a field.

Supported Engine Type

Engine Type Support Note
V1 X
V2 O

Property Description

Property Name Default Value Data Type Supported Engine Type Description Note
Target Fields - array of strings V2 Enter the target fields from which to remove blank.

Example of Setting Default Values

Condition

  • Target field → ["field1", "field2"]

Input Message

{
    "field1": "Hello World!   ",
    "field2": "   Hello DataFlow!"
}

Output Message

{
    "field1": "Hello World!",
    "field2": "Hello DataFlow!"
}

Sink

  • Type of node that defines an endpoint to load data that has completed filter operation.

Common Settings on Sink Node

Property name Default value Data type Description Others
ID - string Sets Node ID
Mark node name on the chart board with values defined in this property.

Sink > (NHN Cloud) Object Storage

Node Description

  • Node for uploading data to Object Storage in NHN Cloud.
  • When created using default settings without additional configuration, objects are output according to the following path format.
    • Engine V1: /{bucket_name}/year={yyyy}/month={MM}/day={dd}/hour={HH}/ls.s3.{uuid}.{yyyy}-{MM}-{dd}T{HH}.{mm}.part{seq_id}.txt
    • Engine V2: /{bucket_name}/year={yyyy}/month={MM}/day={dd}/hour={HH}/part-{uuid}-{file_counter}
  • If the engine type is V2, the provided codecs are JSON and LINE. Plain and parquet codecs will be supported later.

Note

  • The following Output Path Examples by Prefix Input Condition are based on Engine V1.

Supported Engine Type

Engine Type Support Note
V1 O
V2 O V2 Important Notes on Object Storage Connectivity (Engine V2) 참고

Property Description

Property name Default value Data type Supported engine type Description Others
region - enum V1, V2 Enter the region of Object Storage product
Bucket - string V1, V2 Enter bucket name
Secret Key - string V1, V2 Enter S3 API Credential Secret Key.
Access Key - string V1, V2 Enter S3 API Credential Access Key.
Prefix /year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH} string V1, V2 Enter a prefix to prefix the name when uploading the object.
You can enter a field or time format.
Available Time Format
Prefix Time Field * V1: @timestamp
* V2: -
string V1, V2 Enter a time field to apply to the prefix.
Prefix Time Field Type DATE_FILTER_RESULT enum V1, V2 Enter a time field type to apply to the prefix. Engine type V2 supports DATE_FILTER_RESULT type only (other types will be supported later)
Prefix Time Zone UTC string V1, V2 Enter a time zone for the Time field to apply to the prefix.
Prefix Time Application fallback _prefix_datetime_parse_failure string V1, V2 Enter a prefix to replace if the prefix time application fails.
Encoding none enum V1 Enter whether to encode or not . gzip encoding is available.
Object Rotation Policy size_and_time enum V1 Determines object creation rules. size_and_time – Use object size and time to decide
size – Use object size to decide
Time – Use time to decide
Engine type V2 supports size_and_time only
Reference Time 1 number V1, V2 Set the time to be the basis for object splitting. Set if object rotation policy is size_and_time or time
Object size 5242880 number V1, V2 Set the size (unit: byte) to be the basis for object splitting. Set when object rotation policy is size_and_time or size
Inactivity Interval 1 number V2 Sets the reference time for splitting an object when there is no data inflow for a set period of time. If there is no data inflow for the set period of time, the current object will be uploaded, and any new data inflow will be written in a new object.

Output examples by codec type

Supported codec: * plain codec - Raw data string storage (Engine V1 only) * json codec - JSON data parsing * line codec - Line-by-line message processing * parquet codec - Compressed columnar storage format (Engine V1 only)

Prefix Example - Field

Condition

  • Bucket → obs-test-container
  • Prefix → /dataflow/%{deployment}

Input Message

{
    "deployment": "production",
    "message": "example",
    "logTime": "2022-11-21T07:49:20Z"
}

Output Path

/obs-test-container/dataflow/production/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt

Prefix Example - Hour

Condition

  • Bucket → obs-test-container
  • Prefix → /dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}
  • Prefix Time Field → logTime
  • Prefix Time Field Type → ISO8601
  • Prefix Time Zone → Asia/Seoul

Input Message

{
    "deployment": "production",
    "message": "example",
    "logTime": "2022-11-21T07:49:20Z"
}

Output Path

/obs-test-container/dataflow/year=2022/month=11/day=21/hour=16/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt

Prefix Example - When failed to apply time

Condition

  • Bucket → obs-test-container
  • Prefix → /dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}
  • Prefix Time Field → logTime
  • Prefix Time Field Type → TIMESTAMP_SEC
  • Prefix Time Zone → Asia/Seoul
  • Prefix Time Application fallback → _failure

Input Message

{
    "deployment": "production",
    "message": "example",
    "logTime": "2022-11-21T07:49:20Z"
}

Output Path

/obs-test-container/_failure/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt

Sink > (Amazon) S3

Node Description

  • Node for uploading data to Amazon S3.
  • If the engine type is V2, the provided codecs are JSON and LINE. Plain and parquet codecs will be supported later.

Supported Engine Type

Engine Type Support Note
V1 O
V2 O [V2 Important Notes on Object Storage Connectivity (Engine V2)]refer to (node-config-guide/#v2-object-storage)

Property Description

Property name Default value Data type Supported engine type Description Others
region * V1: us-east-1
* V2: -
enum Enter Region of S3 product. s3 region
Bucket - string V1, V2 Enter bucket name
Access Key - string V1, V2 Enter S3 API Credential Access Key.
Secret Key - string V1, V2 Enter S3 API Credential Secret Key.
Signature Version v4 enum V1, V2 Enter the version to use when signing AWS requests.
Session Token - string V1, V2 Enter the Session Token for AWS temporary Credentials. Session Token Guide
Prefix - string V1, V2 Enter a prefix to prefix the name when uploading the object.
You can enter a field or time format.
Available Time Format
Prefix Time Field * V1: @timestamp
* v2: -
string V1, V2 Enter a time field to apply to the prefix.
Prefix Time Field Type DATE_FILTER_RESULT enum V1, V2 Enter a time field type to apply to the prefix. Engine type V2 is only available with DATE_FILTER_RESULT type (other types will be supported in the future)
Prefix Time Zone UTC string V1, V2 Enter a time zone for the Time field to apply to the prefix.
Prefix Time Application fallback _prefix_datetime_parse_failure string V1, V2 Enter a prefix to replace if the prefix time application fails.
Storage Class STANDARD enum V1 Set Storage Class when object is uploaded. Storage Class Guide
Encoding none enum V1 Enter whether to encode or not . gzip encoding is available.
Object Rotation Policy size_and_time enum V1 Determine object creation rules. size_and_time – Use object size and time to decide
size – Use object size to decide
Time – Use time to decide
Engine type V2 supports size_and_time only
Reference Time 1 number V1, V2 Set the time to be the basis for object splitting. Set when the object rotation policy is size_and_time or time
Object size 5242880 number V1, V2 Set the size to be the basis for object splitting. Set when the object rotation policy is size_and_time or size
ACL private enum V1 Enter ACL policy to set when object is uploaded.
Additional Settings - Hash V1 Enter additional settings to connect to S3. Guide
Inactivity Interval 1 number V2 Sets the reference time for splitting an object when there is no data inflow for a set period of time. If there is no data inflow for the set period of time, the current object will be uploaded, and any new data inflow will be written in a new object.

Caution

  • When connecting to NHN Cloud Object Storage using an (Amazon) S3 node, you must configure the properties as follows:
  • If the engine type is V1
    • Set the force_path_style value to true using the Additional Settings
    • Input example: {"force_path_style" : true}
  • If the engine type is V2
    • Enable Path-Style Requests by setting it to true

Output examples by codec type

Supported codec: * plain codec - Raw data string storage (Engine V1 only) * json codec - JSON data parsing * line codec - Line-by-line message processing * parquet codec - Compressed columnar storage format (Engine V1 only)

Additional Settings example

follow redirects

  • Track the redirect path, when set to true and when AWS S3 returns 307 response
{ 
    follow_redirects → true 
}

retry limit

  • Maximum times of retries for 4xx, 5xx responses
{ 
    retry_limit → 5 
}

force_path_style

  • For true, the URL must be path-style, not virtual-hosted-style. Reference
{ 
    force_path_style → true 
}

Sink > (Apache) Kafka

Node Description

  • Node for sending data to Kafka.

Supported Engine Type

Engine Type Support Note
V1 O
V2 - To be supported

Property Description

Property name Default value Data type Supported engine type Description Others
Topic - string Type the name of Kafka topic to which want to send message.
Broker Server List localhost:9092 string V1 Enter Kafka Broker server. Separate multiple servers with commas (,). Kafka official documentation - See bootstrap.servers property
ex: 10.100.1.1:9092,10.100.1.2:9092
Client ID dataflow string V1 Enter ID that identifies Kafka Producer. Kafka official documentation - See client.id property
Message Serialization Type org.apache.kafka.common.serialization.StringSerializer string V1 Enter how to serialize message value to send. Kafka official documentation - See value.serializer property
Compression type none enum V1 Enter how to compress data to send. Kafka official documentation - See compression.type property
Select out of none, gzip, snappy, lz4
Message Key - string V1 Enter the field to use as the message key. The format is as follows .Example: %{FIELD}
Key Serialization Type org.apache.kafka.common.serialization.StringSerializer string V1 Enter how to serialize message key to send. Kafka official documentation - See key.serializer property
Metadata Update Cycle 300000 number V1 Enter the interval (ms) at which want to update partition, broker server status, etc. Kafka official documentation - See metadata.max.age.ms property
Maximum Request Size 1048576 number V1 Enter maximum size (byte) per transfer request. Kafka official documentation - See max.request.size property
Server Reconnection Cycle 50 number V1 Enter how often to retry when Connection to broker server fails. Kafka official documentation - See reconnect.backoff.ms property
Batch Size 16384 number V1 Enter size (byte) to send to Batch Request. Kafka official documentation - See batch.size property
Buffer Memory 33554432 number V1 Enter size (byte) of buffer used to transfer Kafka. Kafka official documentation - See buffer.memory property
Receiving Buffer Size 32768 number V1 Enter size (byte) of TCP receive buffer used to read data. Kafka official documentation - See receive.buffer.bytes property
Transfer Delay Time 0 number V1 Enter delay time for message sending. Delayed messages are sent as batch requests at once. Kafka official documentation - See linger.ms property
Server Request Timeout 40000 number V1 Enter timeout (ms) for Transfer Request. Kafka official documentation - See request.timeout.ms property
Transfer Buffer Size 131072 number V1 Enter size (byte) of TCP send buffer used to transfer data. Kafka official documentation - See send.buffer.bytes property
Ack Property 1 enum V1 Enter settings to verify that messages have been received from Broker server. Kafka official documentation - See acks property
0 - Does not check if 6543eyu0=message is received.
1 - Leader of topic responds that the message was received without waiting for follower to copy data.
all - Leader of topic waits for follower to copy the data before responding that they have received the message.
Retry Request Cycle 100 number V1 Enter the interval (ms) to retry when transfer request fails. Kafka official documentation - See retry.backoff.ms property
Retry times - number V1 Enter the maximum times (ms) to retry when transfer request fails. Kafka official documentation - See retries property
Retrying exceeding the set value may result in data loss.

Output examples by codec type

Supported codec: * plain codec - Raw data string storage * json codec - JSON data parsing * line codec - Line-by-line message processing

plain Codec Output example exercise – when message field doesn’t exist

Input message

{ 
    "hidden": "Hello Dataflow!", 
    "@timestamp": "2022-11-21T07:49:20Z" 
}

Output message

2022-11-21T07:49:20.000Z e5ef7ece9bb0 %{message}

Sink > Stdout

Node Description

  • Node that outputs messages to standard output.
  • This is useful for checking the data processed by the Source, Filter nodes.

Supported Engine Type

Engine Type Support Note
V1 O
V2 O

Example output by codec

Supported codec: * plain codec - Raw data string storage * json codec - JSON data parsing * line codec - Line-by-line message processing
* debug codec - Detailed output for debugging purposes (Engine V1 only)

Branch

  • Node type that defines flow Quarter in accordance with imported data value.

IF

Node Description

  • Node for filtering messages through conditional sentence.

Supported Engine Type

Engine Type Support Note
V1 O
V2 O

Property Description

Property name Default value Data type Supported engine type Description Others
conditional sentence. - string V1, V2 Enter the conditions for message filtering. Conditional syntax varies depending on the engine type. See the examples below.

Available operators

  • If the engine type is V1
    • Comparison: ==, !=, <, >, <=, >=
    • Regular expression: =~, !~ (tests the string on the left with the pattern given on the right)
    • Inclusion: in, not in
    • Logical operators: and, or, nand, xor
    • Negation operators: !
  • If the engine type is V2
    • Comparison: ==, !=, <, >, <=, >=
    • Regular expression: =~ (tests the string on the left with the pattern given on the right)
    • Inclusion: =~, !~, .contains()
    • Logical operators: &&, ||, not
    • Negation operators: !, not

Filtering example exercise - first depth field reference

condition

  • If the engine type is V1
    • Conditional → [logLevel] == "ERROR"
  • If the engine type is V2
    • Conditional → logLevel == "ERROR"

Pass message

{ 
    "logLevel": "ERROR"
}

Missed message

{ 
    "logLevel": "INFO"
}

Filtering example exercise - second depth field reference

condition

  • If the engine type is V1
    • Conditional → [response][status] == 200
  • If the engine type is V2
    • Conditional → response.status == 200 or response["status"] == 200

Passed message

{ 
    "resposne": { 
        "status": 200 
    } 
}

Missed message

{ 
    "resposne": { 
        "status": 404 
    } 
}
Table of Contents
TOP