| Node Category | V1 | V2 | Note |
|---|---|---|---|
| Source | O Provides all proven connectors, including Kafka and JDBC |
O NHN Cloud/Object Storage connectors first, with Kafka and JDBC to be provided later |
Check the "Supported Engine Type" table in each node section for final support. |
| Filter | O Provides all existing plugins, including Alter, Grok, and Mutate |
O Includes common JSON/Date nodes + V2-specific nodes, such as Coerce/Copy/Rename/Strip |
Parameters/default values may vary even for the same node, so check the "Parameters by Engine Type" table. |
| Branch | O | O | |
| Sink | O Provides all Object Storage, S3, Kafka, etc. |
O Provides most common connectors. Parquet advanced options, etc. are currently V1-first |
Check the "Supported Engine Type" column in the properties table for detailed limitations. |
V1 Dedicated or Priority Node/Feature
- Source > (Apache) Kafka and Source > JDBC currently only work on V1, with V2 support planned for the future.
- Some filters, such as Filter > (Logstash) Grok and Filter > Mutate, only work on V1.
- The Parquet compression/format advanced options for Sink > (NHN Cloud) Object Storage and (Amazon) S3 only support V1 settings, with V2 support planned for the future.
V2 Dedicated Node/Additional Feature
- Filter > Coerce, Filter > Copy, Filter > Rename, and Filter > Strip are management nodes provided in V2.
- Properties marked as Supported Engine Type: V2, such as Overwrite and Delete Original Field in Filter > JSON, can only be set in V2.
- Charts/options marked as V2 Available Later in the Monitoring and Templates sections will be automatically updated after release, and currently only display V1 execution information.
Nodes whose compatibility varies depending on engine type are always updated in the ### Supported Engine Types table and Note/Caution blocks. When designing a new flow, first get a general idea of the scope from the summary above, then review the detailed support status and limitations in the actual node section.
For V2, multiple Object Storage instances cannot be used within the same flow if they share the same bucket name, even if they belong to different regions or projects.
Unsupported Configuration Examples
{{ executionTime }}{{ MINUTE }}{{ HOUR }}{{ DAY }}{{ MONTH }}{{ YEAR }}{{ time | startOf: unit }}unit from the given time.{{ time | endOf: unit }}unit from the given time.{{ time | subTime: delta, unit }}delta in the time zone defined by unit from the given time.{{ time | addTime: delta, unit }}delta in the time zone defined by unit from the given time.{{ time | format: formatStr }}formatStr.TRUE or FALSE from the drop-down menu.+ to insert the string into the array.["message" , "yyyy-MM-dd HH:mm:ssZ", "ISO8601"], insert the string into the array in the following order: message, yyyy-MM-dd HH:mm:ssZ, ISO8601.| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| Type | - | string | V1 | Create type field with the value given in each message. |
|
| ID | - | string | V1, V2 | Sets Node ID Mark the Node name on the chart board with values defined in this property. |
|
| Tag | - | array of strings | V1 | Add the tag of given value to each message. | |
| Add Field | - | Hash | V1 | You can add a custom field You can add fields by calling in the value of each field with %{[depth1_field]}. |
{
"my_custom_field": "%{[json_body][logType]}"
}
Currently, session logs and crash logs are not supported.| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | O |
Query Start time.Query Start time and the Query End time and ends the flow.| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| Appkey | - | string | V1, V2 | Enter the app key for Log & Crash Search. | |
| SecretKey | - | string | V1, V2 | Enter the secret key for Log & Crash Search. | |
| Query Start time | {{exeuctionTime}} |
string | V1, V2 | Enter the start time of log query. Must be entered in ISO 8601 format with offset or DSL format. Example: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| Query End time | - | string | V1, V2 | Enter the end time of log query. Must be entered in ISO 8601 format with offset or DSL format. Example: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| Number of retries | - | number | V1 | Enter the maximum number of times to retry when a log query fails. | |
| Search Query | * |
string | V1, V2 | Enter the search query to use when requesting a Log & Crash Search query. For detailed query writing instructions, please refer to the "Lucene Query Guide" of the Log & Crash Search service. |
Supported codec: * json codec - JSON data parsing
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | O |
Query Start time.Query Start time and the Query End time and ends the flow.| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| Appkey | - | string | V1, V2 | Enter the app key for CloudTrail. | |
| User Access Key ID | - | string | V2 | Enter the User Access Key ID of the user account. | |
| Secret Access Key | - | string | V2 | Enter the User Secret Key of the user account. | |
| Query Start time | {{executionTime}} |
string | V1, V2 | Enter the start time of data query. Must be entered in ISO 8601 format with offset or DSL format. Example: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| Query End time | - | string | V1, V2 | Enter the end time of data query. Must be entered in ISO 8601 format with offset or DSL format. Example: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| Number of Retries | - | number | V1 | Enter the maximum number of times to retry when a data query fails. | |
| Event Type | * |
string | V2 | Enter the event ID you want to search for. |
Supported codec: * json codec - JSON data parsing
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | O | [Important Notes on Object Storage Connectivity (Engine V2)]refer to (node-config-guide/#v2-object-storage) |
list update cycleand processes data by reading newly added objects.| Property name | Default value | Data type | Supported engine type | Description | Note |
|---|---|---|---|---|---|
| Bucket | - | string | V1, V2 | Enter a bucket name to read data. | |
| Region | - | string | V1, V2 | Enter region information configured in the storage. | |
| Secret Key | - | string | V1, V2 | Enter the credential secret key issued by S3. | |
| Access key | - | string | V1, V2 | Enter the credential access key issued by S3. | |
| List update cycle | 60 |
number | V1, V2 | Enter the object list update cycle included in the bucket. | |
| Metadata included or not | false |
boolean | V1 | Determine whether to include metadata from the S3 object as a key. In order to expose metadata fields to the Sink plugin, you need to combine filter node types (see guide below). | fields to be created are as follows. last_modified: The last time the object was modified content_length: Object size key: Object name content_type: Object type metadata: Metadata etag: etag |
| Prefix | - | string | V1, V2 | Enter a prefix of an object to read. | |
| Key pattern to exclude | - | string | V1, V2 | Enter the pattern of an object not to be read. | |
| Delete processed objects | false |
boolean | V1 | If the property value is true, delete the object read. |
Metadata included or not is enabled, the metadata field is created, but is not exposed by the Sink plugin without injecting it as a regular field.}
// General field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "Object contents..."
// Metadata fields
// Cannot be exposed to the Sink plugin until the user injects it as a regular field
// "[@metadata][s3][last_modified]": 2024-01-05T01:35:50.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][metadata]": {}
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
}
{
"last_modified": "%{[@metadata][s3][last_modified]}"
"content_length": "%{[@metadata][s3][content_length]}"
"key": "%{[@metadata][s3][key]}"
"content_type": "%{[@metadata][s3][content_type]}"
"metadata": "%{[@metadata][s3][metadata]}"
"etag": "%{[@metadata][s3][etag]}"
}
}
// General field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "Object contents..."
"last_modified": 2024-01-05T01:35:50.000Z
"content_length": 220
"key": "{filename}"
"content_type": "text/plain"
"metadata": {}
"etag": "\"56ad65461e0abb907465bacf6e4f96cf\""
// Metadata field
// "[@metadata][s3][last_modified]": 2024-01-05T01:35:50.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][metadata]": {}
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
}
Supported codec: * plain codec - Raw data string storage * json codec - JSON data parsing * line codec - Line-by-line message processing (Engine V1 only)
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | O | [V2 Important Notes on Object Storage Connectivity (Engine V2)]refer to (node-config-guide/#v2-object-storage) |
list update cycleand processes data by reading newly added objects.| Property name | Default value | Data type | Supported engine type | Description | Note |
|---|---|---|---|---|---|
| Endpoint | - | string | V1, V2 | Enter endpoint for S3 storage. | Only HTTP and HTTPS URL types can be entered. |
| Bucket | - | string | V1, V2 | Enter a bucket name to read data. | |
| Region | * V1: us-east-1* V2: - |
string | V1, V2 | Enter region information configured in the storage. | |
| Session token | - | string | V1 | Enter AWS session token. | |
| Secret Key | - | string | V1, V2 | Enter the credential secret key issued by S3. | |
| Access key | - | string | V1, V2 | Enter the credential access key issued by S3. | |
| List update cycle | 60 |
number | V1, V2 | Enter the object list update cycle included in the bucket. | |
| Metadata included or not | false |
boolean | V1 | Determine whether to include metadata from the S3 object as a key. In order to expose metadata fields to the Sink plugin, you need to combine filter node types (see guide below). | fields to be created are as follows. last_modified: The last time the object was modified content_length: Object size key: Object name content_type: Object type metadata: Metadata etag: etag |
| Prefix | * V1: nil * V2: - |
string | V1, V2 | Enter a prefix of an object to read. | |
| Key pattern to exclude | * V1: nil * V2: - |
string | V1, V2 | Enter the pattern of an object not to be read. | |
| Delete | false |
boolean | V1 | If the property value is true, delete the object read. | |
| Additional settings | - | hash | V1 | Enter additional settings to use when connecting to the S3 server. | Guide |
Caution
{"force_path_style" : true}trueMetadata included or not is enabled, the metadata field is created, but is not exposed by the Sink plugin without injecting it as a regular field.{
// General field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "Object contents..."
// Metadata fields
// Cannot be exposed to the Sink plugin until the user injects it as a regular field
// "[@metadata][s3][server_side_encryption]": "AES256"
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][last_modified]": 2024-01-05T02:27:26.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][metadata]": {}
}
{
"server_side_encryption": "%{[@metadata][s3][server_side_encryption]}"
"etag": "%{[@metadata][s3][etag]}"
"content_type": "%{[@metadata][s3][content_type]}"
"key": "%{[@metadata][s3][key]}"
"last_modified": "%{[@metadata][s3][last_modified]}"
"content_length": "%{[@metadata][s3][content_length]}"
"metadata": "%{[@metadata][s3][metadata]}"
}
{
// General field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "file contents..."
"server_side_encryption": "AES256"
"etag": "\"56ad65461e0abb907465bacf6e4f96cf\""
"content_type": "text/plain"
"key": "{filename}"
"last_modified": 2024-01-05T01:35:50.000Z
"content_length": 220
"metadata": {}
// Metadata field
// "[@metadata][s3][server_side_encryption]": "AES256"
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][last_modified]": 2024-01-05T02:27:26.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][metadata]": {}
}
Supported codec: * plain codec - Raw data string storage * json codec - JSON data parsing * line codec - Line-by-line message processing (Engine V1 only)
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | - | To be supported |
Caution
| Property Name | Default Value | Data Type | Supported Engine Type | Description | Note |
|---------------------------------|----------------------------------------------------------------------------|------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Broker Server List | localhost:9092 | string | V1 | Enter the Kafka broker server. Separate multiple servers with commas (,). | Refer to the bootstrap.servers property in the Kafka official documentation
Example: 10.100.1.1:9092,10.100.1.2:9092 |
| Consumer Group ID | dataflow | string | V1 | Enter the ID that identifies the Kafka Consumer Group. | See the group.id property in the Kafka official documentation |
| Topic list | dataflow | array of strings | V1 | Enter a list of Kafka topics to receive messages from. |
| Topic pattern | - | string | V1 | Enter the Kafka topic pattern to receive messages from. | Example: *-messages|
| Exclude internal topics | true | boolean | V1 | Exclude internal topics such as __consumer_offsets. | See the exclude.internal.topics property in the Kafka official documentation
Exclude internal topics such as __consumer_offsets from the receiving destination. |
| Client ID | dataflow | string | V1 | Enter the ID that identifies the Kafka Consumer. | See the client.id property in the Kafka official documentation |
| Partition assignment policy | - | string | V1 | Determines how Kafka assigns partitions to consumer groups when receiving messages. | See the partition.assignment.strategy property in the Kafka official documentation
org.apache.kafka.clients.consumer.RangeAssignor
org.apache.kafka.clients.consumer.RoundRobinAssignor
org.apache.kafka.clients.consumer.StickyAssignor
org.apache.kafka.clients.consumer.CooperativeStickyAssignor |
| Offset setting | latest | enum | V1 | Enter the criteria for setting the offset of the consumer group. | See the auto.offset.reset property in the Kafka official documentation
All of the settings below will keep the existing offset if the consumer group already exists.
none: If there is no consumer group, return an error.
earliest: If there is no consumer group, initialize to the oldest offset in the partition.
latest: If there is no consumer group, initialize to the most recent offset in the partition. |
| Key Deserialization Type | org.apache.kafka.common.serialization.StringDeserializer | string | V1 | Enter the method to serialize the keys of incoming messages. | See the key.deserializer property in the Kafka official documentation |
| Message deserialization type | org.apache.kafka.common.serialization.StringDeserializer | string | V1 | Enter how to serialize the values of the incoming message. | See the value.deserializer property in the Kafka official documentation |
| Whether to generate metadata | false | boolean | V1 | If the property value is true, metadata fields for the message will be generated. To expose metadata fields to the Sink plugin, you must combine the filter node type (see the guide below). | The fields generated are as follows:
topic: The topic that received the message
consumer_group: The consumer group ID used to receive the message
partition: The partition number of the topic that received the message
offset: The offset of the partition that received the message
key: A ByteBuffer containing the message key |
| Fetch minimum size | - | number | V1 | Enter the minimum amount of data to retrieve in a single fetch request. | See the fetch.min.bytes property in the Kafka official documentation |
| Send buffer size | - | number | V1 | Enter the size (in bytes) of the TCP send buffer used to transmit data. | See the send.buffer.bytes property in the Kafka official documentation |
| Retry request interval | - | number | V1 | Enter the interval (ms) to retry when a send request fails. | See the retry.backoff.ms property in the Kafka official documentation |
| Cyclic redundancy check | true | boolean | V1 | Check the CRC of the message. | See the check.crcs property in the Kafka official documentation |
| Server Reconnection Interval | 50 | number | V1 | Enter the interval to retry when a connection to the broker server fails. | See the reconnect.backoff.ms property in the Kafka official documentation |
| Poll Timeout | 100 | number | V1 | Enter the timeout (in milliseconds) for requests to fetch new messages from the topic. | |
| Fetch Max Size Per Partition | - | number | V1 | Enter the maximum size to fetch in a single fetch request per partition. | See the max.partition.fetch.bytes property in the Kafka official documentation |
| Server Request Timeout | - | number | V1 | Enter the timeout (in milliseconds) for send requests. | See the request.timeout.ms property in the Kafka official documentation |
| TCP Receive Buffer Size | - | number | V1 | Enter the size (in bytes) of the TCP receive buffer used to read data. | See the receive.buffer.bytes property in the Kafka official documentation |
| Session Timeout | - | number | V1 | Enter the session timeout (in milliseconds) for the consumer.
If a consumer fails to send a heartbeat within this time, it will be excluded from the consumer group. | See the session.timeout.ms property in the Kafka official documentation |
| Maximum number of poll messages | - | number | V1 | Enter the maximum number of messages to retrieve in a single poll request. | See the max.poll.records property in the Kafka official documentation |
| Maximum poll interval | - | number | V1 | Enter the maximum interval (ms) between poll requests. | See the max.poll.interval.ms property in the Kafka official documentation |
| Maximum fetch size | - | number | V1 | Enter the maximum size to retrieve in a single fetch request. | See the fetch.max.bytes property in the Kafka official documentation |
| Maximum fetch wait time | - | number | V1 | Enter the wait time (ms) to send a fetch request if the amount of data equal to the Fetch Minimum Size setting has not been collected. | See the fetch.max.wait.ms property in the Kafka official documentation |
| Consumer Health Check Interval | - | number | V1 | Enter the interval (ms) at which the consumer sends a heartbeat. | See the heartbeat.interval.ms property in the Kafka official documentation |
| Metadata Update Interval | - | number | V1 | Enter the interval (ms) at which to update partitions, broker server status, etc. | See the metadata.max.age.ms property in the Kafka official documentation |
| IDLE Timeout | - | number | V1 | Enter the number of milliseconds to wait before closing a connection with no data transfer. | See the connections.max.idle.ms property in the Kafka official documentation |
Metadata created or not is enabled, the metadata field is created, but is not exposed by the Sink plugin without injecting it as a regular field.{
// normal fields
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "kafka topic message..."
// metadata field
// Cannot be exposed to the Sink plugin until user input data into normal fields
// "[@metadata][kafka][topic]": "my-topic"
// "[@metadata][kafka][consumer_group]": "my_consumer_group"
// "[@metadata][kafka][partition]": "1"
// "[@metadata][kafka][offset]": "123"
// "[@metadata][kafka][key]": "my_key"
// "[@metadata][kafka][timestamp]": "-1"
}
{
"kafka_topic": "%{[@metadata][kafka][topic]}"
"kafka_consumer_group": "%{[@metadata][kafka][consumer_group]}"
"kafka_partition": "%{[@metadata][kafka][partition]}"
"kafka_offset": "%{[@metadata][kafka][offset]}"
"kafka_key": "%{[@metadata][kafka][key]}"
"kafka_timestamp": "%{[@metadata][kafka][timestamp]}"
}
{
// normal field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "kafka topic message..."
"kafka_topic": "my-topic"
"kafka_consumer_group": "my_consumer_group"
"kafka_partition": "1"
"kafka_offset": "123"
"kafka_key": "my_key"
"kafka_timestamp": "-1"
// metadata field
// "[@metadata][kafka][topic]": "my-topic"
// "[@metadata][kafka][consumer_group]": "my_consumer_group"
// "[@metadata][kafka][partition]": "1"
// "[@metadata][kafka][offset]": "123"
// "[@metadata][kafka][key]": "my_key"
// "[@metadata][kafka][timestamp]": "-1"
}
Supported codec: * plain codec - Raw data string storage * json codec - JSON data parsing * line codec - Line-by-line message processing
query execute frequency.| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | - | To be supported |
| Property name | Default value | Data type | Supported engine type | Description | Note |
|---|---|---|---|---|---|
| User | - | string | V1 | Enter a DB user. | |
| Connection String | - | string | V1 | Enter the DB connection information. | Example: jdbc:mysql://my.sql.endpoint:3306/my_db_name |
| Password | - | string | V1 | Enter the user password. | |
| Query | - | string | V1 | Write a query to create a message. | |
| Whether to convert columns to lowercase | false |
boolean | V1 | Determine whether to lowercase the column names you get as a result of the query. | |
| Query execute frequency | * * * * * |
string | V1 | Enter the execute frequency of the query in a cron-like expression. | |
| Tracking Columns | - | string | V1 | Select the columns you want to track. | The predefined parameter :SQL_LAST_VALUEallows you to use a value corresponding to the column you want to track in the last query result.See how to write a query below. |
| Tracking column type | numeric |
string | V1 | Select the type of data in the column you want to track. | Example: numeric or timestamp |
| Time zone | - | string | V1 | Define the time zone to use when converting a column of type timestamp to a human-readable string. | Example: Asia/Seoul |
| Whether to apply paging | false |
boolean | V1 | Determines whether to apply paging to the query. | When paging is applied, the query is split into multiple executions, the order of which is not guaranteed. |
| Page size | - | number | V1 | In a paged query, it determines how many pages to query at once. |
:sql_last_valu allows you to use the value corresponding to the tracking columnin the result of the last executed query (the default value is 0, if the tracking column typeis numeric, or 1970-01-01 00:00:00 , if it is timestamp).SELECT * FROM MY_TABLE WHERE id > :sql_last_value
:sql_last_valuein addition to the condition.SELECT * FROM MY_TABLE WHERE id > :sql_last_value and id > custom_value order by id ASC
Supported codec: * plain codec - Raw data string storage
| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| ID | - | string | V1, V2 | Sets Node ID. Mark node name on chart board with values defined in this property. |
|
| Add Tag | - | array of strings | V1 | Add Tag of each message | |
| Delete Tag | - | array of strings | V1 | Delete Tag that was given to each message | |
| Delete Field | - | array of strings | V1 | Delete Field of each message | |
| Add Field | - | Hash | V1 | You can add a custom field You can add fields by calling in the value of each Field with %{[depth1_field]}. |
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | X |
| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| Modify Field | - | array of strings | V1 | Compare the field value to a given value, if they are equal, modify the field value to the given value. | |
| Overwrite Field | - | array of strings | V1 | Compare the field value to a given value, if they are equal, modify other field value to the given value. | |
| Coalesce | - | array of strings | V1 | Assign a non-null value to the first of the fields that follow one field. |
["logType", "ERROR", "FAIL"]["logType", "FAIL", "isBillingTarget", "false"]{
"logType": "ERROR",
"isBillingTarget": "true"
}
{
"logType": "FAIL",
"isBillingTarget": "false"
}
["logType", "ERROR", "isBillingTarget", "false"]{
"logType": "ERROR",
"isBillingTarget": "true"
}
{
"logType": "ERROR",
"isBillingTarget": "false"
}
["reason", "CONNECTION_TIMEOUT", "MONGODB_CONNECTION_TIMEOUT"]{
"reason": "CONNECTION_TIMEOUT"
}
{
"reason": "MONGODB_CONNECTION_TIMEOUT"
}
["reason", "%{webClientReason}", "%{mongoReason}", "%{redisReason}"]{
"mongoReason": "COLLECTION_NOT_FOUND"
}
{
"reason": "COLLECTION_NOT_FOUND",
"mongoReason": "COLLECTION_NOT_FOUND"
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | - | To be supported |
| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| Mode | - | enum | V1 | Choose between encryption mode and decryption mode. | Select one from the list. |
| Appkey | - | string | V1 | Enter SKM app key that saves the key for encryption/decryption. | |
| Key ID | - | string | V1 | Enter SKM ID that saves the key for encryption/decryption. | |
| Key Version | - | string | V1 | Enter SKM key version that saves the key for encryption/decryption. | |
| Source Field | message |
string | V1 | Enter Field name for encryption/decryption. | |
| Field to be stored | message |
string | V1 | Enter Field name to save encryption/decryption result. |
encryptSKM appkeySKM Symmetric key ID116{
"message": "this is plain message"
}
{
"message": "this is plain message",
"encrypted_message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
decryptSKM appkeySKM Symmetric key ID116{
"message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
{
"decrypted_message": "this is plain message",
"message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | - | To be supported |
| Property name | Default value | Data type | Supported engine type | Description | Note |
|---|---|---|---|---|---|
| Match | - | hash | V1 | Enter the information of the string to be parsed. | |
| Pattern definition | - | hash | V1 | Enter a custom pattern as a regular expression for the rule of tokens to be parsed. | Check the link below for system defined patterns. https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/grok-patterns |
| Failure tag | _grokparsefailure |
array of strings | V1 | Enter the tag name to define if string parsing fails. | |
| Timeout | 30000 |
number | V1 | Enter the amount of time to wait for string parsing. | |
| Timeout tag | _groktimeout |
string | V1 | Enter the tag to register for an event when a timeout occurs. | |
| Overwrite | - | array of strings | V1 | When writing a value to a designated field after parsing, if a value is already defined in the field, enter the field names to be overwritten. | |
| Store only values with specified names | true |
boolean | V1 | If the property value is true, do not store unnamed parting results. | |
| Capture empty string | false |
boolean | V1 | If the property value is true, store empty strings in fields. | |
| Close or not when match | true |
boolean | V1 | If the property value is true, the grok match result will terminate the plugin. |
{ "message": "%{IP:clientip} %{HYPHEN} %{USER} [%{HTTPDATE:timestamp}] "%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} %{NUMBER:bytes}" }{ "HYPHEN": "-*" }{
"message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \\\"GET /apache_pb.gif HTTP/1.0\\\" 200 2326"
}
{
"message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \\\"GET /apache_pb.gif HTTP/1.0\\\" 200 2326",
"timestamp": "10/Oct/2000:13:55:36 -0700",
"clientip": "127.0.0.1",
"verb": "GET",
"httpversion": "1.0",
"response": "200",
"bytes": "2326",
"request": "/apache_pb.gif"
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | O |
| Property Name | Default | Data Type | Supported Engine Type | Description | Note |
|---|---|---|---|---|---|
| Field to Save | - | string | V1, V2 | Enter the field name to save the CSV parsing results. | |
| Quote | " | string | V1, V2 | Enter the character that separates the column fields. | |
| Ignore First Row | false | boolean | V1, V2 | If the property value is true, the column name entered in the first row of the read data is ignored. | |
| Column | - | array of strings | V1 | Enter the column name. | |
| Delimiter | , | string | V1, V2 | Enter the string that separates the columns. | |
| Source Field | * V1: message * V2: - |
string | V1, V2 | Enter the field name to parse the CSV. | |
| Schema | - | hash | V1, V2 | Enter the name and data type of each column in dictionary format. | See Schema Input Method by Engine Type |
| Overwrite | false | boolean | V2 | If true, overwrites the CSV parsing result with a field to be saved or an existing field. | |
| Delete original field | false | boolean | V2 | Deletes the source field when CSV parsing is complete. If parsing fails, keep it. |
message["one", "two", "t hree"]{"one": "string", "two": "string", "t hree": "string"}{
"message": "hey,foo,\\\"bar baz\\\""
}
{
"message": "hey,foo,\"bar baz\"",
"one": "hey",
"t hree": "bar baz",
"two": "foo"
}
message["one", "two", "t hree"]{"two": "integer", "t hree": "boolean"}{"one": "string", "two": "integer", "t hree": "boolean"}{
"message": "\\\"wow hello world!\\\", 2, false"
}
{
"message": "\\\"wow hello world!\\\", 2, false",
"one": "wow hello world!",
"t hree": false,
"two": 2
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | O |
| Property name | Default value | Data type | Supported engine type | Description | Note |
|---|---|---|---|---|---|
| Source field | * V1: message* V2: - |
string | V1, V2 | Enter a field name to parse JSON strings. | |
| Field to save | - | string | V1, V2 | Enter the field name to save the JSON parsing result. If no property value is specified, the result is stored in the root field. |
|
| Overwrite | false |
boolean | V2 | If true, overwrites the JSON parsing result with a field to be saved or an existing field. | |
| Delete original field | false |
boolean | V2 | Deletes the source field when JSON parsing is complete. If parsing fails, keep it. |
messagejson_parsed_messsage{
"message": "{\\\"json\\\": \\\"parse\\\", \\\"example\\\": \\\"string\\\"}"
}
{
"json_parsed_message": {
"json": "parse",
"example": "string"
},
"message": "{\\\"json\\\": \\\"parse\\\", \\\"example\\\": \\\"string\\\"}"
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | O |
| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| Source Field | - | string | V1, V2 | Enter a field name to get strings. | |
| Formats | - | array of strings | V1, V2 | Enter formats to get strings. | The pre-defined formats are as follows. ISO8601, UNIX, UNIX_MS, TAI64N (V2 unsupported) |
| Locale | * V1: - * V2: ko_KR |
string | V1, V2 | Enter a locale to use for string analysis. | e.g. en, en-US, ko-KR |
| Field to be stored | * V1: @timestamp* V2: - |
string | V1, V2 | Enter a field name to store the result of parsing data strings. | |
| Failure tag | _dateparsefailure |
array of strings | V1 | Enter the tag name to define if data string parsing fails. | |
| Time zone | * V1: - * V2: Asia/Seoul |
string | V1, V2 | Enter the time zone for the date. | e.g. Asia/Seoul |
message["yyyy-MM-dd HH:mm:ssZ", "ISO8601"]timeAsia/Seoul{
"message": "2017-03-16T17:40:00"
}
{
"message": "2017-03-16T17:40:00",
"time": 2022-04-04T09:08:01.222Z
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | O |
| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| Field to store UUID | - | string | V1, V2 | Enter a field name to store UUID creation result. | |
| Overwrite | false |
boolean | V1, V2 | Select whether to overwrite the value if it exists in the specified field name. |
userId{
"message": "uuid test message"
}
{
"userId": "70186b1e-bdec-43d6-8086-ed0481b59370",
"message": "uuid test message"
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | X | |
| V2 | O |
| Property name | Default value | Data type | Supported engine type | Description | Note |
|---|---|---|---|---|---|
| Target field | - | string | V2 | Enter the target field to convert the data type. | |
| Conversion type | - | enum | V2 | Select the data type to convert. * Data type: STRING, INTEGER, FLOAT, DOUBLE, BOOLEAN |
messageINTEGER{
"message": "2025"
}
{
"message": 2025
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | X |
| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| Source field | message |
string | V1 | Enter a field name to separate messages. | |
| Field to be stored | - | string | V1 | Enter a field name to store separated messages. | |
| Separator | \n |
string | V1 |
message{
"message": [
{"number": 1},
{"number": 2}
]
}
{
"message": [
{"number": 1},
{"number": 2}
],
"number": 1
}
{
"message": [
{"number": 1},
{"number": 2}
],
"number": 2
}
message,{
"message": "1,2"
}
{
"message": "1"
}
{
"message": "2"
}
messagetarget,{
"message": "1,2"
}
{
"message": "1,2",
"target": "1"
}
{
"message": "1,2",
"target": "2"
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | X |
| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| Byte length | - | number | V1 | Enter the maximum byte length to represent a string. | |
| Source field | - | string | V1 | Enter a field name for truncate. |
message{
"message": "This message is too long."
}
{
"message": "This message"
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | X |
| Property name | Default value | Data type | Supported engine type | Description | Remarks |
|---|---|---|---|---|---|
| Set Defaults | - | Hash | V1 | Replace null with defaults. | |
| Rename Field | - | Hash | V1 | Rename the field. | |
| Update field values | - | Hash | V1 | Replaces the field value with the new value. If the field does not exist, no action is taken. | |
| Replace Value | - | Hash | V1 | Replace the field value with a new value. If there is no field, create a new field. | |
| Convert Type | - | Hash | V1 | Convert the field value to another type. | The following types are supported: integer, interger_eu, float, float_eu, string, and boolean. |
| Replace String | - | array of strings | V1 | Replace the part of string with regular expression. | |
| Uppercase Letter | - | array of strings | V1 | Uppercase Letter the string in the target field to uppercase letter. | |
| Capitalize First Letter | - | array of strings | V1 | Convert the first letter in the target field to uppercase letter, and the rest to lowercase letter. | |
| Lowercase Letter | - | array of strings | V1 | Lowercase the string in the target field to lowercase letter. | |
| Strip Space | - | array of strings | V1 | Remove spaces before and after the string in the target field. | |
| Split String | - | Hash | V1 | Split strings using separators. | |
| Join Array | - | Hash | V1 | Join array elements to separator. | |
| Merge Field | - | Hash | V1 | Merge the two fields. | |
| Copy Field | - | Hash | V1 | Copy the existing field to another field. If the field exists, overwrite field. | |
| Failure Tag | _mutate_error |
string | V1 | Enter a tag to define if an error occurs. |
{"fieldname": "new value"}["fieldname"]{
"fieldname": "old value"
}
{
"fieldname": "NEW VALUE"
}
{
"fieldname": "default_value"
}
{
"fieldname": null
}
{
"fieldname": "default_value"
}
{
"fieldname": "changed_fieldname"
}
{
"fieldname": "Hello World!"
}
{
"changed_fieldname": "Hello World!"
}
{
"fieldname": "%{other_fieldname}: %{fieldname}",
"not_exist_fieldname": "DataFlow"
}
{
"fieldname": "Hello World!",
"other_fieldname": "DataFlow"
}
{
"fieldname": "DataFlow: Hello World!",
"other_fieldname": "DataFlow"
}
{
"fieldname": "%{other_fieldname}: %{fieldname}",
"not_exist_fieldname": "DataFlow"
}
{
"fieldname": "Hello World!",
"other_fieldname": "DataFlow"
}
{
"fieldname": "DataFlow: Hello World!",
"other_fieldname": "DataFlow",
"not_exist_fieldname": "DataFlow"
}
{
"message1": "integer",
"message2": "boolean"
}
{
"message1": "1000",
"message2": "true"
}
{
"message1": 1000,
"message2": true
}
1000true is converted to 1, false to 0.10001000.5trueis converted to 1.0, false to 0.0.1000.51is converted to true and a 0 to false.1.0is converted to true, and a 0.0 to false."true", "t", "yes", "y", "1", "1.0"are converted to true, "false", "f", "no", "n", “0", “0.0"are converted to false. The empty strings are converted tofalse.["fieldname", "/", "_", "fieldname2", "[\\?#-]", "."]
/ with _ in the string values in the fieldname field and \, ?, #, and -with . in the string values in the fieldname2 field.{
"fieldname": "Hello/World",
"fieldname2": "Hello\\?World#Test-123"
}
{
"fieldname": "Hello_World",
"fieldname2": "Hello.World.Test.123"
}
["fieldname"]
{
"fieldname": "hello world"
}
{
"fieldname": "HELLO WORLD"
}
["fieldname"]
{
"fieldname": "hello world"
}
{
"fieldname": "Hello world"
}
["fieldname"]
{
"fieldname": "HELLO WORLD"
}
{
"fieldname": "hello world"
}
["field1", "field2"]
{
"field1": "Hello World! ",
"field2": " Hello DataFlow!"
}
{
"field1": "Hello World!",
"field2": "Hello DataFlow!"
}
{
"fieldname": ","
}
{
"fieldname": "Hello,World"
}
{
"fieldname": ["Hello", "World"]
}
{
"fieldname": ","
}
{
"fieldname": ["Hello", "World"]
}
{
"fieldname": "Hello,World"
}
{
"array_data1": "string_data1",
"string_data2": "string_data1",
"json_data1": "json_data2"
}
{
"array_data1": ["array_data1"],
"string_data1": "string_data1",
"string_data2": "string_data2",
"json_data1": {"json_field1": "json_data1"},
"json_data2": {"json_field2": "json_data2"}
}
{
"array_data1": ["array_data1", "string_data1"],
"string_data1": "string_data1",
"string_data2": ["string_data2", "string_data1"],
"json_data1": {"json_field2" : "json_data2", "json_field1": "json_data1"},
"json_data2": {"json_field2": "json_data2"}
}
{
"source_field": "dest_field"
}
{
"source_field": "Hello World!"
}
{
"source_field": "Hello World!",
"dest_field": "Hello World!"
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | X | |
| V2 | O |
| Property Name | Default Value | Data Type | Supported Engine Type | Description | Note |
|---|---|---|---|---|---|
| Target Field | - | string | V2 | Enter the name of the field for which you want to specify a default value. | |
| Default Value | - | string | V2 | Enter a default value. |
fieldnamedefault_value{
"fieldname": null
}
{
"fieldname": "default_value"
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | X | |
| V2 | O |
| Property Name | Default | Data Type | Supported Engine Type | Description | Note |
|---|---|---|---|---|---|
| Target Field | - | string | V2 | Enter the name of the source field to copy. | |
| Field to Save | - | string | V2 | Enter the name of the field to save the copied result. | |
| Overwrite | false |
boolean | V2 | If true, the field to save will be overwritten if it already exists. |
source_fielddest_field{
"source_field": "Hello World!"
}
{
"source_field": "Hello World!",
"dest_field": "Hello World!"
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | X | |
| V2 | O |
| Property Name | Default | Data Type | Supported Engine Type | Description | Note |
|---|---|---|---|---|---|
| Source Field | string | V2 | Enter the source field to be renamed. | ||
| Target Field | - | string | V2 | Enter the field name to be renamed | |
| Overwrite | false |
boolean | V2 | If true, the target field will be overwritten if it already exists. |
fieldnamechanged_fieldname{
"fieldname": "Hello World!"
}
{
"changed_fieldname": "Hello World!"
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | X | |
| V2 | O |
| Property Name | Default Value | Data Type | Supported Engine Type | Description | Note |
|---|---|---|---|---|---|
| Target Fields | - | array of strings | V2 | Enter the target fields from which to remove blank. |
["field1", "field2"]{
"field1": "Hello World! ",
"field2": " Hello DataFlow!"
}
{
"field1": "Hello World!",
"field2": "Hello DataFlow!"
}
| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| ID | - | string | Sets Node ID Mark node name on the chart board with values defined in this property. |
/{bucket_name}/year={yyyy}/month={MM}/day={dd}/hour={HH}/ls.s3.{uuid}.{yyyy}-{MM}-{dd}T{HH}.{mm}.part{seq_id}.txt/{bucket_name}/year={yyyy}/month={MM}/day={dd}/hour={HH}/part-{uuid}-{file_counter} Note
Output Path Examples by Prefix Input Condition are based on Engine V1.| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | O | V2 Important Notes on Object Storage Connectivity (Engine V2) 참고 |
| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| region | - | enum | V1, V2 | Enter the region of Object Storage product | |
| Bucket | - | string | V1, V2 | Enter bucket name | |
| Secret Key | - | string | V1, V2 | Enter S3 API Credential Secret Key. | |
| Access Key | - | string | V1, V2 | Enter S3 API Credential Access Key. | |
| Prefix | /year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH} |
string | V1, V2 | Enter a prefix to prefix the name when uploading the object. You can enter a field or time format. |
Available Time Format |
| Prefix Time Field | * V1: @timestamp * V2: - |
string | V1, V2 | Enter a time field to apply to the prefix. | |
| Prefix Time Field Type | DATE_FILTER_RESULT |
enum | V1, V2 | Enter a time field type to apply to the prefix. | Engine type V2 supports DATE_FILTER_RESULT type only (other types will be supported later) |
| Prefix Time Zone | UTC |
string | V1, V2 | Enter a time zone for the Time field to apply to the prefix. | |
| Prefix Time Application fallback | _prefix_datetime_parse_failure |
string | V1, V2 | Enter a prefix to replace if the prefix time application fails. | |
| Encoding | none |
enum | V1 | Enter whether to encode or not . gzip encoding is available. | |
| Object Rotation Policy | size_and_time |
enum | V1 | Determines object creation rules. | size_and_time – Use object size and time to decide size – Use object size to decide Time – Use time to decide Engine type V2 supports size_and_time only |
| Reference Time | 1 |
number | V1, V2 | Set the time to be the basis for object splitting. | Set if object rotation policy is size_and_time or time |
| Object size | 5242880 |
number | V1, V2 | Set the size (unit: byte) to be the basis for object splitting. | Set when object rotation policy is size_and_time or size |
| Inactivity Interval | 1 |
number | V2 | Sets the reference time for splitting an object when there is no data inflow for a set period of time. | If there is no data inflow for the set period of time, the current object will be uploaded, and any new data inflow will be written in a new object. |
Supported codec: * plain codec - Raw data string storage (Engine V1 only) * json codec - JSON data parsing * line codec - Line-by-line message processing * parquet codec - Compressed columnar storage format (Engine V1 only)
obs-test-container/dataflow/%{deployment}{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/dataflow/production/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
obs-test-container/dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}logTimeISO8601Asia/Seoul{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/dataflow/year=2022/month=11/day=21/hour=16/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
obs-test-container/dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}logTimeTIMESTAMP_SECAsia/Seoul_failure{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/_failure/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | O | [V2 Important Notes on Object Storage Connectivity (Engine V2)]refer to (node-config-guide/#v2-object-storage) |
| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| region | * V1: us-east-1* V2: - |
enum | Enter Region of S3 product. | s3 region | |
| Bucket | - | string | V1, V2 | Enter bucket name | |
| Access Key | - | string | V1, V2 | Enter S3 API Credential Access Key. | |
| Secret Key | - | string | V1, V2 | Enter S3 API Credential Secret Key. | |
| Signature Version | v4 |
enum | V1, V2 | Enter the version to use when signing AWS requests. | |
| Session Token | - | string | V1, V2 | Enter the Session Token for AWS temporary Credentials. | Session Token Guide |
| Prefix | - | string | V1, V2 | Enter a prefix to prefix the name when uploading the object. You can enter a field or time format. |
Available Time Format |
| Prefix Time Field | * V1: @timestamp* v2: - |
string | V1, V2 | Enter a time field to apply to the prefix. | |
| Prefix Time Field Type | DATE_FILTER_RESULT |
enum | V1, V2 | Enter a time field type to apply to the prefix. | Engine type V2 is only available with DATE_FILTER_RESULT type (other types will be supported in the future) |
| Prefix Time Zone | UTC |
string | V1, V2 | Enter a time zone for the Time field to apply to the prefix. | |
| Prefix Time Application fallback | _prefix_datetime_parse_failure |
string | V1, V2 | Enter a prefix to replace if the prefix time application fails. | |
| Storage Class | STANDARD |
enum | V1 | Set Storage Class when object is uploaded. | Storage Class Guide |
| Encoding | none |
enum | V1 | Enter whether to encode or not . gzip encoding is available. | |
| Object Rotation Policy | size_and_time |
enum | V1 | Determine object creation rules. | size_and_time – Use object size and time to decide size – Use object size to decide Time – Use time to decide Engine type V2 supports size_and_time only |
| Reference Time | 1 |
number | V1, V2 | Set the time to be the basis for object splitting. | Set when the object rotation policy is size_and_time or time |
| Object size | 5242880 |
number | V1, V2 | Set the size to be the basis for object splitting. | Set when the object rotation policy is size_and_time or size |
| ACL | private |
enum | V1 | Enter ACL policy to set when object is uploaded. | |
| Additional Settings | - | Hash | V1 | Enter additional settings to connect to S3. | Guide |
| Inactivity Interval | 1 |
number | V2 | Sets the reference time for splitting an object when there is no data inflow for a set period of time. | If there is no data inflow for the set period of time, the current object will be uploaded, and any new data inflow will be written in a new object. |
Caution
{"force_path_style" : true}trueSupported codec: * plain codec - Raw data string storage (Engine V1 only) * json codec - JSON data parsing * line codec - Line-by-line message processing * parquet codec - Compressed columnar storage format (Engine V1 only)
true and when AWS S3 returns 307 response {
follow_redirects → true
}
{
retry_limit → 5
}
true, the URL must be path-style, not virtual-hosted-style. Reference{
force_path_style → true
}
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | - | To be supported |
| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| Topic | - | string | Type the name of Kafka topic to which want to send message. | ||
| Broker Server List | localhost:9092 |
string | V1 | Enter Kafka Broker server. Separate multiple servers with commas (,). |
Kafka official documentation - See bootstrap.servers propertyex: 10.100.1.1:9092,10.100.1.2:9092 |
| Client ID | dataflow |
string | V1 | Enter ID that identifies Kafka Producer. | Kafka official documentation - See client.id property |
| Message Serialization Type | org.apache.kafka.common.serialization.StringSerializer |
string | V1 | Enter how to serialize message value to send. | Kafka official documentation - See value.serializer property |
| Compression type | none |
enum | V1 | Enter how to compress data to send. | Kafka official documentation - See compression.type property Select out of none, gzip, snappy, lz4 |
| Message Key | - | string | V1 | Enter the field to use as the message key. The format is as follows .Example: %{FIELD} | |
| Key Serialization Type | org.apache.kafka.common.serialization.StringSerializer |
string | V1 | Enter how to serialize message key to send. | Kafka official documentation - See key.serializer property |
| Metadata Update Cycle | 300000 |
number | V1 | Enter the interval (ms) at which want to update partition, broker server status, etc. | Kafka official documentation - See metadata.max.age.ms property |
| Maximum Request Size | 1048576 |
number | V1 | Enter maximum size (byte) per transfer request. | Kafka official documentation - See max.request.size property |
| Server Reconnection Cycle | 50 |
number | V1 | Enter how often to retry when Connection to broker server fails. | Kafka official documentation - See reconnect.backoff.ms property |
| Batch Size | 16384 |
number | V1 | Enter size (byte) to send to Batch Request. | Kafka official documentation - See batch.size property |
| Buffer Memory | 33554432 |
number | V1 | Enter size (byte) of buffer used to transfer Kafka. | Kafka official documentation - See buffer.memory property |
| Receiving Buffer Size | 32768 |
number | V1 | Enter size (byte) of TCP receive buffer used to read data. | Kafka official documentation - See receive.buffer.bytes property |
| Transfer Delay Time | 0 |
number | V1 | Enter delay time for message sending. Delayed messages are sent as batch requests at once. | Kafka official documentation - See linger.ms property |
| Server Request Timeout | 40000 |
number | V1 | Enter timeout (ms) for Transfer Request. | Kafka official documentation - See request.timeout.ms property |
| Transfer Buffer Size | 131072 |
number | V1 | Enter size (byte) of TCP send buffer used to transfer data. | Kafka official documentation - See send.buffer.bytes property |
| Ack Property | 1 |
enum | V1 | Enter settings to verify that messages have been received from Broker server. | Kafka official documentation - See acks property0 - Does not check if 6543eyu0=message is received. 1 - Leader of topic responds that the message was received without waiting for follower to copy data. all - Leader of topic waits for follower to copy the data before responding that they have received the message. |
| Retry Request Cycle | 100 |
number | V1 | Enter the interval (ms) to retry when transfer request fails. | Kafka official documentation - See retry.backoff.ms property |
| Retry times | - | number | V1 | Enter the maximum times (ms) to retry when transfer request fails. | Kafka official documentation - See retries propertyRetrying exceeding the set value may result in data loss. |
Supported codec: * plain codec - Raw data string storage * json codec - JSON data parsing * line codec - Line-by-line message processing
{
"hidden": "Hello Dataflow!",
"@timestamp": "2022-11-21T07:49:20Z"
}
2022-11-21T07:49:20.000Z e5ef7ece9bb0 %{message}
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | O |
Supported codec:
* plain codec - Raw data string storage
* json codec - JSON data parsing
* line codec - Line-by-line message processing
* debug codec - Detailed output for debugging purposes (Engine V1 only)
| Engine Type | Support | Note |
|---|---|---|
| V1 | O | |
| V2 | O |
| Property name | Default value | Data type | Supported engine type | Description | Others |
|---|---|---|---|---|---|
| conditional sentence. | - | string | V1, V2 | Enter the conditions for message filtering. | Conditional syntax varies depending on the engine type. See the examples below. |
[logLevel] == "ERROR"logLevel == "ERROR"{
"logLevel": "ERROR"
}
{
"logLevel": "INFO"
}
[response][status] == 200response.status == 200 or response["status"] == 200{
"resposne": {
"status": 200
}
}
{
"resposne": {
"status": 404
}
}