{{ executionTime }}{{ MINUTE }}{{ HOUR }}{{ DAY }}{{ MONTH }}{{ YEAR }}{{ time | startOf: unit }}unit from the given time.{{ time | endOf: unit }}unit from the given time.{{ time | subTime: delta, unit }}delta in the time zone defined by unit from the given time.{{ time | addTime: delta, unit }}delta in the time zone defined by unit from the given time.{{ time | format: formatStr }}formatStr.TRUE or FALSE from the drop-down menu.+ to insert the string into the array.["message" , "yyyy-MM-dd HH:mm:ssZ", "ISO8601"], insert the string into the array in the following order: message, yyyy-MM-dd HH:mm:ssZ, ISO8601.| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| Type | - | string | Create type field with the value given in each message. |
|
| ID | - | string | Sets Node ID Mark the Node name on the chart board with values defined in this property. |
|
| Tag | - | array of strings | Add the tag of given value to each message. | |
| Add Field | - | Hash | You can add a custom field You can add fields by calling in the value of each field with %{[depth1_field]}. |
{
"my_custom_field": "%{[json_body][logType]}"
}
Currently, session logs and crash logs are not supported.Query Start time.Query Start time and the Query End time and ends the flow.| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| Appkey | - | string | Enter the app key for Log & Crash Search. | |
| SecretKey | - | string | Enter the secret key for Log & Crash Search. | |
| Query Start time | - | string | Enter the start time of log query. Must be entered in ISO 8601 format with offset or DSL format. Example: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| Query End time | - | string | Enter the end time of log query. Must be entered in ISO 8601 format with offset or DSL format. Example: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| Number of retries | - | number | Enter the maximum number of times to retry when a log query fails. | |
| Search Query | * | string | Enter the search query to use when requesting a Log & Crash Search query. For detailed query writing instructions, please refer to the "Lucene Query Guide" of the Log & Crash Search service. |
message.{
"message":"{\\\"log\\\":\\\"&\\\", \\\"Crash\\\": \\\"Search\\\", \\\"Result\\\": \\\"Data\\\"}"
}
{"log":"&", "Crash": "Search", "Result": "Data"}
Query Start time.Query Start time and the Query End time and ends the flow.| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| Appkey | - | string | Enter the app key for CloudTrail. | |
| Query Start time | - | string | Enter the start time of data query. Must be entered in ISO 8601 format with offset or DSL format. Example: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| Query End time | - | string | Enter the end time of data query. Must be entered in ISO 8601 format with offset or DSL format. Example: 2025-07-23T11:23:00+09:00, {{ executionTime }} |
|
| Number of retries | - | number | Enter the maximum number of times to retry when a data query fails. |
message.{
"message":"{\\\"log\\\":\\\"CloudTrail\\\", \\\"Result\\\": \\\"Data\\\", \\\"@timestamp\\\": \\\"2023-12-06T08:09:24.887Z\\\", \\\"@version\\\": \\\"1\\\"}"
}
{"log":"CloudTrail", "Result": "Data", "@timestamp": "2023-12-06T08:09:24.887Z", "@version":"1"}
list update cycleand processes data by reading newly added objects.| Property name | Default value | Data type | Description | Note |
|---|---|---|---|---|
| Bucket | - | string | Enter a bucket name to read data. | |
| Region | - | string | Enter region information configured in the storage. | |
| Secret Key | - | string | Enter the credential secret key issued by S3. | |
| Access key | - | string | Enter the credential access key issued by S3. | |
| List update cycle | - | number | Enter the object list update cycle included in the bucket. | |
| Metadata included or not | - | boolean | Determine whether to include metadata from the S3 object as a key. In order to expose metadata fields to the Sink plugin, you need to combine filter node types (see guide below). | fields to be created are as follows. last_modified: The last time the object was modified content_length: Object size key: Object name content_type: Object type metadata: Metadata etag: etag |
| Prefix | - | string | Enter a prefix of an object to read. | |
| Key pattern to exclude | - | string | Enter the pattern of an object not to be read. | |
| Delete processed objects | false | boolean | If the property value is true, delete the object read. |
Metadata included or not is enabled, the metadata field is created, but is not exposed by the Sink plugin without injecting it as a regular field.}
// General field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "Object contents..."
// Metadata fields
// Cannot be exposed to the Sink plugin until the user injects it as a regular field
// "[@metadata][s3][last_modified]": 2024-01-05T01:35:50.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][metadata]": {}
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
}
{
"last_modified": "%{[@metadata][s3][last_modified]}"
"content_length": "%{[@metadata][s3][content_length]}"
"key": "%{[@metadata][s3][key]}"
"content_type": "%{[@metadata][s3][content_type]}"
"metadata": "%{[@metadata][s3][metadata]}"
"etag": "%{[@metadata][s3][etag]}"
}
}
// General field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "Object contents..."
"last_modified": 2024-01-05T01:35:50.000Z
"content_length": 220
"key": "{filename}"
"content_type": "text/plain"
"metadata": {}
"etag": "\"56ad65461e0abb907465bacf6e4f96cf\""
// Metadata field
// "[@metadata][s3][last_modified]": 2024-01-05T01:35:50.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][metadata]": {}
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
}
{
"message":"{\\\"S3\\\":\\\"Storage\\\", \\\"Read\\\": \\\"Object\\\", \\\"Result\\\": \\\"Data\\\"}"
}
{"S3":"Storage", "Read": "Object", "Result": "Data"}
list update cycleand processes data by reading newly added objects.| Property name | Default value | Data type | Description | Note |
|---|---|---|---|---|
| Endpoint | - | string | Enter endpoint for S3 storage. | Only HTTP and HTTPS URL types can be entered. |
| Bucket | - | string | Enter a bucket name to read data. | |
| Region | - | string | Enter region information configured in the storage. | |
| Session token | - | string | Enter AWS session token. | |
| Secret Key | - | string | Enter the credential secret key issued by S3. | |
| Access key | - | string | Enter the credential access key issued by S3. | |
| List update cycle | - | number | Enter the object list update cycle included in the bucket. | |
| Metadata included or not | - | boolean | Determine whether to include metadata from the S3 object as a key. In order to expose metadata fields to the Sink plugin, you need to combine filter node types (see guide below). | fields to be created are as follows. last_modified: The last time the object was modified content_length: Object size key: Object name content_type: Object type metadata: Metadata etag: etag |
| Prefix | - | string | Enter a prefix of an object to read. | |
| Key pattern to exclude | - | string | Enter the pattern of an object not to be read. | |
| Delete | false | boolean | If the property value is true, delete the object read. | |
| Additional settings | - | hash | Enter additional settings to use when connecting to the S3 server. | See the following link for a full list of available settings. https://docs.aws.amazon.com/sdk-for-ruby/v2/api/Aws/S3/Client.html Example: { "force_path_style": true } |
Metadata included or not is enabled, the metadata field is created, but is not exposed by the Sink plugin without injecting it as a regular field.{
// General field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "Object contents..."
// Metadata fields
// Cannot be exposed to the Sink plugin until the user injects it as a regular field
// "[@metadata][s3][server_side_encryption]": "AES256"
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][last_modified]": 2024-01-05T02:27:26.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][metadata]": {}
}
{
"server_side_encryption": "%{[@metadata][s3][server_side_encryption]}"
"etag": "%{[@metadata][s3][etag]}"
"content_type": "%{[@metadata][s3][content_type]}"
"key": "%{[@metadata][s3][key]}"
"last_modified": "%{[@metadata][s3][last_modified]}"
"content_length": "%{[@metadata][s3][content_length]}"
"metadata": "%{[@metadata][s3][metadata]}"
}
{
// General field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "파일 내용..."
"server_side_encryption": "AES256"
"etag": "\"56ad65461e0abb907465bacf6e4f96cf\""
"content_type": "text/plain"
"key": "{filename}"
"last_modified": 2024-01-05T01:35:50.000Z
"content_length": 220
"metadata": {}
// Metadata field
// "[@metadata][s3][server_side_encryption]": "AES256"
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][last_modified]": 2024-01-05T02:27:26.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][metadata]": {}
}
{
"message":"{\\\"S3\\\":\\\"Storage\\\", \\\"Read\\\": \\\"Object\\\", \\\"Result\\\": \\\"Data\\\"}"
}
{"S3":"Storage", "Read": "Object", "Result": "Data"}
Caution
| Property name | Default value | Data type | Description | Note |
|---|---|---|---|---|
| Broker server list | localhost:9092 | string | Enter the Kafka broker server. Separate multiple servers with commas ( , ). | bootstrap.servers ex: 10.100.1.1:9092,10.100.1.2:9092 |
| Consumer group ID | dataflow | string | Enter an ID that identifies the Kafka Consumer Group. | group.id |
| Internal topic excluded or not | true | boolean | exclude.internal.topics Exclude internal topics such as __consumer_offsets from recipients. |
|
| Topic pattern | - | string | Enter a Kafka topic patter to receive messages. | ex: *-messages |
| Client ID | dataflow | string | Enter an ID to identify Kafka Consumer. | client.id |
| Partition allocation policy | - | string | Determines how Kafka assigns partitions to consumer groups when receiving messages. | partition.assignment.strategy org.apache.kafka.clients.consumer.RangeAssignor org.apache.kafka.clients.consumer.RoundRobinAssignor org.apache.kafka.clients.consumer.StickyAssignor org.apache.kafka.clients.consumer.CooperativeStickyAssignor |
| Offset settings | none | enum | Enter | auto.offset.reset All of the settings below preserve the existing offset if the consumer group already exists. none: Return an error when there is no consumer group. earliest: Initialize to the partition’s oldest offset if there is no consumer group. latest: Initialize to the partition’s most recent offset if there is no consumer group. |
| Offset commit cycle | 5000 | number | Enter a cycle to update the consumer group offset. | auto.commit.internal.ms |
| Offset auto commit or not | true | boolean | enable.auto.commit | |
| Key deserialization type | org.apache.kafka.common.serialization.StringDeserializer | string | Enter how to serialize the keys of incoming messages. | key.deserializer |
| Message deserialization type | org.apache.kafka.common.serialization.StringDeserializer | string | Enter how to serialize the values of incoming messages. | value.deserializer |
| Metadata created or not | false | boolean | If the property value is true, creates a metadata field for the message. You need to combine filter node types to expose metadata fields to the Sink plugin (see the guide below). | fields to be created are as follows. topic: Topic that receives message consumer_group: Consumer group ID used to receive messages partition: Topic partition number that receives messages offset: Partition offset that receives messages key: ByteBuffer that includes message keys |
| Minimum Fetch size | - | number | Enter the minimum size of data to be imported in one fetch request. | fetch.min.bytes |
| Transfer buffer size | - | number | Enter size (byte) of TCP send buffer used to transfer data. | send.buffer.bytes |
| Retry request cycle | 100 | number | Enter the retry cycle (ms) when a transfer request fails. | retry.backoff.ms |
| Cyclic redundancy check | true | enum | Check the message CRC. | check.crcs |
| Server reconnection cycle | 50 | number | Enter a retry cycle when connecting to broker server fails. | reconnect.backoff.ms |
| Poll timeout | 100 | number | Enter the timeout (ms) for requests to fetch new messages from the topic. | |
| Maximum fetch size per partition | - | number | Enter the maximum size to be imported in one fetch request per partition. | max.partition.fetch.bytes |
| Server request timeout | 30000 | number | Enter the timeout (ms) for sent request. | request.timeout.ms |
| TCP receive buffer size | - | number | Enter the size in bytes of the TCP receive buffer used to read data. | receive.buffer.bytes |
| session_timeout_ms | - | number | Enter a session timeout (ms) of consumer. If a consumer fails to send a heartbeat within that time, it is excluded from the consumer group. |
session.timeout.ms |
| Maximum poll message count | - | number | Enter the maximum number of messages to retrieve in one poll request. | max.poll.records |
| Maximum poll cycle | - | number | Enter the maximum cycle (ms) between poll requests. | max.poll.interval.ms |
| Maximum Fetch size | - | number | Enter the maximum size to be imported in one fetch request. | fetch.max.bytes |
| Maximum Fetch wait time | - | number | Enter the wait time (ms) to send a fetch request when data is not gathered as much as the minimum fetch size setting. | fetch.max.wait.ms |
| Consumer health check cycle | - | number | Enter a cycle of consumer sending heartbeat. | heartbeat.interval.ms |
| Metadata update cycle | - | number | Enter the cycle (ms) to update the partition, broker server status, etc. | metadata.max.age.ms |
| IDLE timeout | - | number | Enter the wait time (ms) to close a connection without data transmission. | connections.max.idle.ms |
Metadata created or not is enabled, the metadata field is created, but is not exposed by the Sink plugin without injecting it as a regular field.{
// normal fields
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "kafka topic message..."
// metadata field
// Cannot be exposed to the Sink plugin until user input data into normal fields
// "[@metadata][kafka][topic]": "my-topic"
// "[@metadata][kafka][consumer_group]": "my_consumer_group"
// "[@metadata][kafka][partition]": "1"
// "[@metadata][kafka][offset]": "123"
// "[@metadata][kafka][key]": "my_key"
// "[@metadata][kafka][timestamp]": "-1"
}
{
"kafka_topic": "%{[@metadata][kafka][topic]}"
"kafka_consumer_group": "%{[@metadata][kafka][consumer_group]}"
"kafka_partition": "%{[@metadata][kafka][partition]}"
"kafka_offset": "%{[@metadata][kafka][offset]}"
"kafka_key": "%{[@metadata][kafka][key]}"
"kafka_timestamp": "%{[@metadata][kafka][timestamp]}"
}
{
// normal field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "kafka topic message..."
"kafka_topic": "my-topic"
"kafka_consumer_group": "my_consumer_group"
"kafka_partition": "1"
"kafka_offset": "123"
"kafka_key": "my_key"
"kafka_timestamp": "-1"
// metadata field
// "[@metadata][kafka][topic]": "my-topic"
// "[@metadata][kafka][consumer_group]": "my_consumer_group"
// "[@metadata][kafka][partition]": "1"
// "[@metadata][kafka][offset]": "123"
// "[@metadata][kafka][key]": "my_key"
// "[@metadata][kafka][timestamp]": "-1"
}
{
"hello": "world!",
"hey": "foo"
}
{
"message": "{\"hello\":\"world\",\"hey\":\"foo\"}"
}
{
"hello": "world!",
"hey": "foo"
}
{
"hello": "world!",
"hey": "foo"
}
query execute frequency.| Property name | Default value | Data type | Description | Note |
|---|---|---|---|---|
| User | - | string | Enter a DB user. | |
| Connection String | - | string | Enter the DB connection information. | Example: jdbc:mysql://my.sql.endpoint:3306/my_db_name |
| Password | - | string | Enter the user password. | |
| Query | - | string | Write a query to create a message. | |
| Whether to convert columns to lowercase | true | boolean | Determine whether to lowercase the column names you get as a result of the query. | |
| Query execute frequency | * * * * * |
string | Enter the execute frequency of the query in a cron-like expression. | |
| Tracking Columns | - | string | Select the columns you want to track. | The predefined parameter :SQL_LAST_VALUEallows you to use a value corresponding to the column you want to track in the last query result.See how to write a query below. |
| Tracking column type | array of strings | string | Select the type of data in the column you want to track. | Example: numeric or timestamp |
| Time zone | - | string | Define the time zone to use when converting a column of type timestamp to a human-readable string. | Example: Asia/Seoul |
| Whether to apply paging | true | boolean | Determines whether to apply paging to the query. | When paging is applied, the query is split into multiple executions, the order of which is not guaranteed. |
| Page size | - | number | In a paged query, it determines how many pages to query at once. |
:sql_last_valu allows you to use the value corresponding to the tracking columnin the result of the last executed query (the default value is 0, if the tracking column typeis numeric, or 1970-01-01 00:00:00 , if it is timestamp).SELECT * FROM MY_TABLE WHERE id > :sql_last_value
:sql_last_valuein addition to the condition.SELECT * FROM MY_TABLE WHERE id > :sql_last_value and id > custom_value order by id ASC
{
"id": 1,
"name": "dataflow",
"deleted": false
}
| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| ID | - | string | Sets Node ID Mark node name on chart board with values defined in this property. |
|
| Add Tag | - | array of strings | Add Tag of each message | |
| Delete Tag | - | array of strings | Delete Tag that was given to each message | |
| Delete Field | - | array of strings | Delete Field of each message | |
| Add Field | - | Hash | You can add a custom field You can add fields by calling in the value of each Field with %{[depth1_field]}. |
| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| Modify Field | - | array of strings | Compare the field value to a given value, if they are equal, modify the field value to the given value. | |
| Overwrite Field | - | array of strings | Compare the field value to a given value, if they are equal, modify other field value to the given value. | |
| Coalesce | - | array of strings | Assign a non-null value to the first of the fields that follow one field. |
["logType", "ERROR", "FAIL"]["logType", "FAIL", "isBillingTarget", "false"]{
"logType": "ERROR",
"isBillingTarget": "true"
}
{
"logType": "FAIL",
"isBillingTarget": "false"
}
["logType", "ERROR", "isBillingTarget", "false"]{
"logType": "ERROR",
"isBillingTarget": "true"
}
{
"logType": "ERROR",
"isBillingTarget": "false"
}
["reason", "CONNECTION_TIMEOUT", "MONGODB_CONNECTION_TIMEOUT"]{
"reason": "CONNECTION_TIMEOUT"
}
{
"reason": "MONGODB_CONNECTION_TIMEOUT"
}
["reason", "%{webClientReason}", "%{mongoReason}", "%{redisReason}"]{
"mongoReason": "COLLECTION_NOT_FOUND"
}
{
"reason": "COLLECTION_NOT_FOUND",
"mongoReason": "COLLECTION_NOT_FOUND"
}
| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| Mode | - | enum | Choose between encryption mode and decryption mode. | Select one from the list. |
| Appkey | - | string | Enter SKM app key that saves the key for encryption/decryption. | |
| Key ID | - | string | Enter SKM ID that saves the key for encryption/decryption. | |
| Key Version | - | string | Enter SKM key version that saves the key for encryption/decryption. | |
| Encryption/decryption key length | 16 | number | Enter encryption/decryption key length | |
| IV Random Length | - | number | Enter random bytes length of Initial Vector. | |
| Source Field | - | string | Enter Field name for encryption/decryption. | |
| Field to be stored | - | string | Enter Field name to save encryption/decryption result. |
encryptSKM appkeySKM Symmetric key ID116{
"message": "this is plain message"
}
{
"message": "this is plain message",
"encrypted_message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
decryptSKM appkeySKM Symmetric key ID116{
"message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
{
"decrypted_message": "this is plain message",
"message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
| Property name | Default value | Data type | Description | Note |
|---|---|---|---|---|
| Match | - | hash | Enter the information of the string to be parsed. | |
| Pattern definition | - | hash | Enter a custom pattern as a regular expression for the rule of tokens to be parsed. | Check the link below for system defined patterns. https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/grok-patterns |
| Failure tag | - | array of strings | Enter the tag name to define if string parsing fails. | |
| Timeout | 30000 | number | Enter the amount of time to wait for string parsing. | |
| Overwrite | - | array of strings | When writing a value to a designated field after parsing, if a value is already defined in the field, enter the field names to be overwritten. | |
| Store only values with specified names | true | boolean | If the property value is true, do not store unnamed parting results. | |
| Capture empty string | false | boolean | If the property value is true, store empty strings in fields. | |
| Close or not when match | true | boolean | If the property value is true, the grok match result will terminate the plugin. |
{ "message": "%{IP:clientip} %{HYPHEN} %{USER} [%{HTTPDATE:timestamp}] "%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} %{NUMBER:bytes}" }{ "HYPHEN": "-*" }{
"message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \\\"GET /apache_pb.gif HTTP/1.0\\\" 200 2326"
}
{
"message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \\\"GET /apache_pb.gif HTTP/1.0\\\" 200 2326",
"timestamp": "10/Oct/2000:13:55:36 -0700",
"clientip": "127.0.0.1",
"verb": "GET",
"httpversion": "1.0",
"response": "200",
"bytes": "2326",
"request": "/apache_pb.gif"
}
| Property name | Default value | Data type | Description | Note |
|---|---|---|---|---|
| Field to save | - | string | Enter the field name to save the CSV parsing result. | |
| Quote | " | string | Enter the character that divides the column fields. | |
| First line ignored or not | false | boolean | If the property value is true, the column name entered in the first row of the read data is ignored. | |
| Column | - | array of strings | Enter the column name. | |
| Separator | , | string | Enter a string to separate columns. | |
| Source field | message | string | Enter the field name to parse CSV. | |
| Schema | - | hash | Enter the name and data type of each column in the form of a dictionary. | Register separately from the fields defined in the column. The data type is basically a string, and if you need to convert to another data type, use the schema setting. Possible data types are as follows. integer, float, date, date_time, boolean |
message["one", "two", "t hree"]{
"message": "hey,foo,\\\"bar baz\\\""
}
{
"message": "hey,foo,\"bar baz\"",
"one": "hey",
"t hree": "bar baz",
"two": "foo"
}
message["one", "two", "t hree"]{
"message": "hey,foo,\\\"bar baz\\\""
}
{
"message": "hey,foo,\"bar baz\"",
"one": "hey",
"t hree": "bar baz",
"two": "foo"
}
message["one", "two", "t hree"]{"two": "integer", "t hree": "boolean"}{
"message": "\\\"wow hello world!\\\", 2, false"
}
{
"message": "\\\"wow hello world!\\\", 2, false",
"one": "wow hello world!",
"t hree": false,
"two": 2
}
| Property name | Default value | Data type | Description | Note |
|---|---|---|---|---|
| Source field | message | string | Enter a field name to parse JSON strings. | |
| Field to save | - | string | Enter the field name to save the JSON parsing result. If no property value is specified, the result is stored in the root field. |
messagejson_parsed_messsage{
"message": "{\\\"json\\\": \\\"parse\\\", \\\"example\\\": \\\"string\\\"}"
}
{
"json_parsed_message": {
"json": "parse",
"example": "string"
},
"message": "uuid test message"
}
| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| Source Field | - | string | Enter a field name to get strings. | |
| Formats | - | array of strings | Enter formats to get strings. | The pre-defined formats are as follows. ISO8601, UNIX, UNIX_MS, TAI64N |
| Locale | - | Enter a locale to use for string analysis. | ex) en, en-US, ko-kr | |
| Field to be stored | - | string | Enter a field name to store the result of parsing data strings. | |
| Failure tag | - | array of strings | Enter the tag name to define if data string parsing fails. | |
| Time zone | - | string | Enter the time zone for the date. |
["message" , "yyyy-MM-dd HH:mm:ssZ", "ISO8601"]timeAsia/Seoul{
"message": "2017-03-16T17:40:00"
}
{
"message": "2017-03-16T17:40:00",
"time": 2022-04-04T09:08:01.222Z
}
| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| Field to store UUID | - | string | Enter a field name to store UUID creation result. | |
| Overwrite | - | boolean | Select whether to overwrite the value if it exists in the specified field name. |
userId{
"message": "uuid test message"
}
{
"userId": "70186b1e-bdec-43d6-8086-ed0481b59370",
"message": "uuid test message"
}
| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| Source field | - | string | Enter a field name to separate messages. | |
| Field to be stored | - | string | Enter a field name to store separated messages. | |
| Separator | \n |
string |
message{
"message": [
{"number": 1},
{"number": 2}
]
}
{
"message": [
{"number": 1},
{"number": 2}
],
"number": 1
}
{
"message": [
{"number": 1},
{"number": 2}
],
"number": 2
}
message,{
"message": "1,2"
}
{
"message": "1"
}
{
"message": "2"
}
messagetarget,{
"message": "1,2"
}
{
"message": "1,2",
"target": "1"
}
{
"message": "1,2",
"target": "2"
}
| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| Byte length | - | number | Enter the maximum byte length to represent a string. | |
| Source field | - | string | Enter a field name for truncate. |
message{
"message": "This message is too long."
}
{
"message": "This message"
}
| Property name | Default value | Data type | Description | Remarks |
|---|---|---|---|---|
| Set Defaults | - | Hash | Replace null with defaults. | |
| Rename Field | - | Hash | Rename the field. | |
| Update field values | - | Hash | Replaces the field value with the new value. If the field does not exist, no action is taken. | |
| Replace Value | - | Hash | Replace the field value with a new value. If there is no field, create a new field. | |
| Convert Type | - | Hash | Convert the field value to another type. | The following types are supported: integer, interger_eu, float, float_eu, string, and boolean. |
| Replace String | - | array | Replace the part of string with regular expression. | |
| Uppercase Letter | - | array | Uppercase Letter the string in the target field to uppercase letter. | |
| Capitalize First Letter | - | array | Convert the first letter in the target field to uppercase letter, and the rest to lowercase letter. | |
| Lowercase Letter | - | array | Lowercase the string in the target field to lowercase letter. | |
| Strip Space | - | array | Remove spaces before and after the string in the target field. | |
| Split String | - | Hash | Split strings using separators. | |
| Join Array | - | Hash | Join array elements to separator. | |
| Merge Field | - | Hash | Merge the two fields. | |
| Copy Field | - | Hash | Copy the existing field to another field. If the field exists, overwrite field. | |
| Failure Tag | _mutate_error | string | Enter a tag to define if an error occurs. |
{"fieldname": "new value"}["fieldname"]{
"fieldname": "old value"
}
{
"fieldname": "NEW VALUE"
}
{
"fieldname": "default_value"
}
{
"fieldname": null
}
{
"fieldname": "default_value"
}
{
"fieldname": "changed_fieldname"
}
{
"fieldname": "Hello World!"
}
{
"changed_fieldname": "Hello World!"
}
{
"fieldname": "%{other_fieldname}: %{fieldname}",
"not_exist_fieldname": "DataFlow"
}
{
"fieldname": "Hello World!",
"other_fieldname": "DataFlow"
}
{
"fieldname": "DataFlow: Hello World!",
"other_fieldname": "DataFlow"
}
{
"fieldname": "%{other_fieldname}: %{fieldname}",
"not_exist_fieldname": "DataFlow"
}
{
"fieldname": "Hello World!",
"other_fieldname": "DataFlow"
}
{
"fieldname": "DataFlow: Hello World!",
"other_fieldname": "DataFlow",
"not_exist_fieldname": "DataFlow"
}
{
"message1": "integer",
"message2": "boolean"
}
{
"message1": "1000",
"message2": "true"
}
{
"message1": 1000,
"message2": true
}
1000true is converted to 1, false to 0.10001000.5trueis converted to 1.0, false to 0.0.1000.51is converted to true and a 0 to false.1.0is converted to true, and a 0.0 to false."true", "t", "yes", "y", "1", "1.0"are converted to true, "false", "f", "no", "n", “0", “0.0"are converted to false. The empty strings are converted tofalse.["fieldname", "/", "_", "fieldname2", "[\\?#-]", "."]
/ with _ in the string values in the fieldname field and \, ?, #, and -with . in the string values in the fieldname2 field.{
"fieldname": "Hello/World",
"fieldname2": "Hello\\?World#Test-123"
}
{
"fieldname": "Hello_World",
"fieldname2": "Hello.World.Test.123"
}
["fieldname"]
{
"fieldname": "hello world"
}
{
"fieldname": "HELLO WORLD"
}
["fieldname"]
{
"fieldname": "hello world"
}
{
"fieldname": "Hello world"
}
["fieldname"]
{
"fieldname": "HELLO WORLD"
}
{
"fieldname": "hello world"
}
["field1", "field2"]
{
"field1": "Hello World! ",
"field2": " Hello DataFlow!"
}
{
"field1": "Hello World!",
"field2": "Hello DataFlow!"
}
{
"fieldname": ","
}
{
"fieldname": "Hello,World"
}
{
"fieldname": ["Hello", "World"]
}
{
"fieldname": ","
}
{
"fieldname": ["Hello", "World"]
}
{
"fieldname": "Hello,World"
}
{
"array_data1": "string_data1",
"string_data2": "string_data1",
"json_data1": "json_data2"
}
{
"array_data1": ["array_data1"],
"string_data1": "string_data1",
"string_data2": "string_data2",
"json_data1": {"json_field1": "json_data1"},
"json_data2": {"json_field2": "json_data2"}
}
{
"array_data1": ["array_data1", "string_data1"],
"string_data1": "string_data1",
"string_data2": ["string_data2", "string_data1"],
"json_data1": {"json_field2" : "json_data2", "json_field1": "json_data1"},
"json_data2": {"json_field2": "json_data2"}
}
{
"source_field": "dest_field"
}
{
"source_field": "Hello World!"
}
{
"source_field": "Hello World!",
"dest_field": "Hello World!"
}
| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| ID | - | string | Sets Node ID Mark node name on the chart board with values defined in this property. |
/{container_name}/year={yyyy}/month={MM}/day={dd}/hour={HH}/ls.s3.{uuid}.{yyyy}-{MM}-{dd}T{HH}.{mm}.part{seq_id}.txt| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| region | - | enum | Enter the region of Object Storage product | |
| Bucket | - | string | Enter bucket name | |
| Secret Key | - | string | Enter S3 API Credential Secret Key. | |
| Access Key | - | string | Enter S3 API Credential Access Key. | |
| Prefix | /year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH} | string | Enter a prefix to prefix the name when uploading the object. You can enter a field or time format. |
Available Time Format |
| Prefix Time Field | @timestamp | string | Enter a time field to apply to the prefix. | |
| Prefix Time Field Type | DATE_FILTER_RESULT | enum | Enter a time field type to apply to the prefix. | |
| Prefix Time Zone | UTC | string | Enter a time zone for the Time field to apply to the prefix. | |
| Prefix Time Application fallback | _prefix_datetime_parse_failure | string | Enter a prefix to replace if the prefix time application fails. | |
| Encoding | none | enum | Enter whether to encode or not . gzip encoding is available. | |
| Object Rotation Policy | size_and_time | enum | Determines object creation rules. | size_and_time – Use object size and time to decide size – Use object size to decide Time – Use time to decide |
| Reference Time | 15 | number | Set the time to be the basis for object splitting. | Set if object rotation policy is size_and_time or time |
| Object size | 5242880 | number | Set the size to be the basis for object splitting. | Set when object rotation policy is size_and_time or size |
KR1obs-test-container************{"hidden":"Hello Dataflow!","message":"Hello World", "@timestamp": "2022-11-21T07:49:20Z"}
/obs-test-container/year=2022/month=11/day=21/hour=07/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T07.49.part0.txt
{"@timestamp":"2022-11-21T07:49:20.000Z","host":"755b65d82bd0","hidden":"Hello Dataflow!","@version":"1","sequence":0,"message":"Hello World"}
KR1obs-test-container************{
"message": "Hello World!",
"hidden": "Hello Dataflow!",
"@timestamp": "2022-11-21T07:49:20Z"
}
/obs-test-container/year=2022/month=11/day=21/hour=07/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T07.49.part0.txt
2022-11-21T07:49:20.000Z e0e40e03dd94 Hello World
KR1obs-test-container************{
"hidden": "Hello Dataflow!",
"@timestamp": "2022-11-21T07:49:20Z"
}
/obs-test-container/year=2022/month=11/day=21/hour=07/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
2022-11-21T07:49:20.000Z f207c24a122e %{message}
| Property name | Default value | Data type | Description | Note |
|---|---|---|---|---|
| parquet compression codec | SNAPPY | enum | Enter the compression codec to use when converting PARQUET files. | Reference |
obs-test-container/dataflow/%{deployment}{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/dataflow/production/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
obs-test-container/dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}logTimeISO8601Asia/Seoul{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/dataflow/year=2022/month=11/day=21/hour=16/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
obs-test-container/dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}logTimeTIMESTAMP_SECAsia/Seoul_failure{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/_failure/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| region | - | enum | Enter Region of S3 product. | s3 region |
| Bucket | - | string | Enter bucket name | |
| Access Key | - | string | Enter S3 API Credential Access Key. | |
| Secret Key | - | string | Enter S3 API Credential Secret Key. | |
| Signature Version | - | enum | Enter the version to use when signing AWS requests. | |
| Session Token | - | string | Enter the Session Token for AWS temporary Credentials. | Session Token Guide |
| Prefix | - | string | Enter a prefix to prefix the name when uploading the object. You can enter a field or time format. |
Available Time Format |
| Prefix Time Field | @timestamp | string | Enter a time field to apply to the prefix. | |
| Prefix Time Field Type | DATE_FILTER_RESULT | enum | Enter a time field type to apply to the prefix. | |
| Prefix Time Zone | UTC | string | Enter a time zone for the Time field to apply to the prefix. | |
| Prefix Time Application fallback | _prefix_datetime_parse_failure | string | Enter a prefix to replace if the prefix time application fails. | |
| Storage Class | STANDARD | enum | Set Storage Class when object is uploaded. | Storage Class Guide |
| Encoding | none | enum | Enter whether to encode or not . gzip encoding is available. | |
| Object Rotation Policy | size_and_time | enum | Determine object creation rules. | size_and_time – Use object size and time to decide size – Use object size to decide Time – Use time to decide |
| Reference Time | 15 | number | Set the time to be the basis for object splitting. | Set when the object rotation policy is size_and_time or time |
| Object size | 5242880 | number | Set the size to be the basis for object splitting. | Set when the object rotation policy is size_and_time or size |
| ACL | private | enum | Enter ACL policy to set when object is uploaded. | |
| Additional Settings | { } | Hash | Enter additional settings to connect to S3. | Guide |
| Property name | Default value | Data type | Description | Note |
|---|---|---|---|---|
| parquet compression codec | SNAPPY | enum | Enter the compression codec to use when converting PARQUET files. | Reference |
true and when AWS S3 returns 307 response {
follow_redirects → true
}
{
retry_limit → 5
}
true, the URL must be path-style, not virtual-hosted-style. Reference{
force_path_style → true
}
| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| Topic | - | string | Type the name of Kafka topic to which want to send message. | |
| Broker Server List | localhost:9092 | string | Enter Kafka Broker server. Separate multiple servers with commas (,). |
bootstrap.servers ex: 10.100.1.1:9092,10.100.1.2:9092 |
| Client ID | dataflow | string | Enter ID that identifies Kafka Producer. | client.id |
| Message Serialization Type | org.apache.kafka.common.serialization.StringSerializer | string | Enter how to serialize message value to send. | value.serializer |
| Compression type | none | enum | Enter how to compress data to send. | compression.type Select out of none, gzip, snappy, lz4 |
| Key Serialization Type | org.apache.kafka.common.serialization.StringSerializer | string | Enter how to serialize message key to send. | key.serializer |
| Meta data upload cycle | 300000 | number | Enter the interval (ms) at which want to update partition, broker server status, etc. | metadata.max.age.ms |
| Maximum Request Size | 1048576 | number | Enter maximum size (byte) per transfer request. | max.request.size |
| Server Reconnection Cycle | 50 | number | Enter how often to retry when Connection to broker server fails. | reconnect.backoff.ms |
| Batch Size | 16384 | number | Enter size (byte) to send to Batch Request. | batch.size |
| Buffer Memory | 33554432 | number | Enter size (byte) of buffer used to transfer Kafka. | buffer.memory |
| Receiving Buffer Size | 32768 | number | Enter size (byte) of TCP receive buffer used to read data. | receive.buffer.bytes |
| Transfer Delay Time | 0 | number | Enter delay time for message sending. Delayed messages are sent as batch requests at once. | linger.ms |
| Server Request Timeout | 40000 | number | Enter timeout (ms) for Transfer Request. | request.timeout.ms |
| Meta data Query Timeout | number | https://kafka.apache.org/documentation/#upgrade_1100_notable | ||
| Transfer Buffer Size | 131072 | number | Enter size (byte) of TCP send buffer used to transfer data. | send.buffer.bytes |
| Ack Property | 1 | enum | Enter settings to verify that messages have been received from Broker server. | acks 0 - Does not check if 6543eyu0=message is received. 1 - Leader of topic responds that the message was received without waiting for follower to copy data. all - Leader of topic waits for follower to copy the data before responding that they have received the message. |
| Request Reconnection Cycle | 100 | number | Enter the interval (ms) to retry when transfer request fails. | retry.backoff.ms |
| Retry times | - | number | Enter the maximum times (ms) to retry when transfer request fails. | retries Retrying exceeding the set value may result in data loss. |
{
"message": "Hello World!",
"hidden": "Hello Dataflow!",
"@timestamp": "2022-11-21T07:49:20Z"
}
{"host":"0bc501d89f8c","message":"Hello World","hidden":"Hello Dataflow!","sequence":0,"@timestamp":"2022-11-21T07:49:20.000Z","@version":"1"}
{
"message": "Hello World!",
"hidden": "Hello Dataflow!",
"@timestamp": "2022-11-21T07:49:20Z"
}
2022-11-21T07:49:20.000Z 2898d492114d Hello World
{
"hidden": "Hello Dataflow!",
"@timestamp": "2022-11-21T07:49:20Z"
}
2022-11-21T07:49:20.000Z e5ef7ece9bb0 %{message}
{
"host": "data-flow-01",
"message": "Hello World!",
"@timestamp": "2022-11-21T07:49:20Z"
}
2022-11-21T07:49:20Z data-flow-01 Hello World!
{"host": "data-flow-01", "message": "Hello World!", "@timestamp": "2022-11-21T07:49:20Z"}
%{message} %{host}Hello World! data-flow-01
{
"host" => "data-flow-01",
"message" => "Hello World!",
"@timestamp" => 2022-11-21T07:49:20Z
}
| Property name | Default value | Data type | Description | Others |
|---|---|---|---|---|
| conditional sentence. | - | string | Enter the conditions for message filtering. |
[logLevel] == "ERROR"{
"logLevel": "ERROR"
}
{
"logLevel": "INFO"
}
[response][status] == 200{
"resposne": {
"status": 200
}
}
{
"resposne": {
"status": 404
}
}