{{ executionTime }}
{{ MINUTE }}
{{ HOUR }}
{{ DAY }}
{{ MONTH }}
{{ YEAR }}
{{ time | startOf: unit }}
unit
from the given time.{{ time | endOf: unit }}
unit
from the given time.{{ time | subTime: delta, unit }}
delta
in the time zone defined by unit
from the given time.{{ time | addTime: delta, unit }}
delta
in the time zone defined by unit
from the given time.{{ time | format: formatStr }}
formatStr
.TRUE
or FALSE
from the drop-down menu.+
to insert the string into the array.["message" , "yyyy-MM-dd HH:mm:ssZ", "ISO8601"]
, insert the string into the array in the following order: message
, yyyy-MM-dd HH:mm:ssZ
, ISO8601
.Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
Type | - | string | Create type field with the value given in each message. |
|
ID | - | string | Sets Node ID Mark the Node name on the chart board with values defined in this property. |
|
Tag | - | array of strings | Add the tag of given value to each message. | |
Add Field | - | Hash | You can add a custom field You can add fields by calling in the value of each field with %{[depth1_field]} . |
{
"my_custom_field": "%{[json_body][logType]}"
}
Currently, session logs and crash logs are not supported.
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
Appkey | - | string | Enter the app key for Log&Crash Search. | |
SecretKey | - | string | Enter the secret key for Log & Crash Search. | |
Query Start time | - | string | Enter the start time of log query. | Note |
Query End time | - | string | Enter the end time of log query. | |
Number of retries | - | number | Enter the maximum number of times to retry when a log query fails. |
message
.{
"message":"{\\\"log\\\":\\\"&\\\", \\\"Crash\\\": \\\"Search\\\", \\\"Result\\\": \\\"Data\\\"}"
}
{"log":"&", "Crash": "Search", "Result": "Data"}
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
Appkey | - | string | Enter the app key for CloudTrail. | |
Query Start time | - | string | Enter the start time of data Query. | Note |
Log End time | - | string | Enter the end time of data Query. | |
Number of retries | - | number | Enter the maximum number of times to retry when data query fails. |
message
.{
"message":"{\\\"log\\\":\\\"CloudTrail\\\", \\\"Result\\\": \\\"Data\\\", \\\"@timestamp\\\": \\\"2023-12-06T08:09:24.887Z\\\", \\\"@version\\\": \\\"1\\\"}"
}
{"log":"CloudTrail", "Result": "Data", "@timestamp": "2023-12-06T08:09:24.887Z", "@version":"1"}
Property name | Default value | Data type | Description | Note |
---|---|---|---|---|
Bucket | - | string | Enter a bucket name to read data. | |
Region | - | string | Enter region information configured in the storage. | |
Secret Key | - | string | Enter the credential secret key issued by S3. | |
Access key | - | string | Enter the credential access key issued by S3. | |
List update cycle | - | number | Enter the object list update cycle included in the bucket. | |
Metadata included or not | - | boolean | Determine whether to include metadata from the S3 object as a key. In order to expose metadata fields to the Sink plugin, you need to combine filter node types (see guide below). | fields to be created are as follows. last_modified: The last time the object was modified content_length: Object size key: Object name content_type: Object type metadata: Metadata etag: etag |
Prefix | - | string | Enter a prefix of an object to read. | |
Key pattern to exclude | - | string | Enter the pattern of an object not to be read. | |
Delete processed objects | false | boolean | If the property value is true, delete the object read. |
Metadata included or not
is enabled, the metadata field is created, but is not exposed by the Sink plugin without injecting it as a regular field.}
// General field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "Object contents..."
// Metadata fields
// Cannot be exposed to the Sink plugin until the user injects it as a regular field
// "[@metadata][s3][last_modified]": 2024-01-05T01:35:50.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][metadata]": {}
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
}
{
"last_modified": "%{[@metadata][s3][last_modified]}"
"content_length": "%{[@metadata][s3][content_length]}"
"key": "%{[@metadata][s3][key]}"
"content_type": "%{[@metadata][s3][content_type]}"
"metadata": "%{[@metadata][s3][metadata]}"
"etag": "%{[@metadata][s3][etag]}"
}
}
// General field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "Object contents..."
"last_modified": 2024-01-05T01:35:50.000Z
"content_length": 220
"key": "{filename}"
"content_type": "text/plain"
"metadata": {}
"etag": "\"56ad65461e0abb907465bacf6e4f96cf\""
// Metadata field
// "[@metadata][s3][last_modified]": 2024-01-05T01:35:50.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][metadata]": {}
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
}
{
"message":"{\\\"S3\\\":\\\"Storage\\\", \\\"Read\\\": \\\"Object\\\", \\\"Result\\\": \\\"Data\\\"}"
}
{"S3":"Storage", "Read": "Object", "Result": "Data"}
Property name | Default value | Data type | Description | Note |
---|---|---|---|---|
Endpoint | - | string | Enter endpoint for S3 storage. | Only HTTP and HTTPS URL types can be entered. |
Bucket | - | string | Enter a bucket name to read data. | |
Region | - | string | Enter region information configured in the storage. | |
Session token | - | string | Enter AWS session token. | |
Secret Key | - | string | Enter the credential secret key issued by S3. | |
Access key | - | string | Enter the credential access key issued by S3. | |
List update cycle | - | number | Enter the object list update cycle included in the bucket. | |
Metadata included or not | - | boolean | Determine whether to include metadata from the S3 object as a key. In order to expose metadata fields to the Sink plugin, you need to combine filter node types (see guide below). | fields to be created are as follows. last_modified: The last time the object was modified content_length: Object size key: Object name content_type: Object type metadata: Metadata etag: etag |
Prefix | - | string | Enter a prefix of an object to read. | |
Key pattern to exclude | - | string | Enter the pattern of an object not to be read. | |
Delete | false | boolean | If the property value is true, delete the object read. | |
Additional settings | - | hash | Enter additional settings to use when connecting to the S3 server. | See the following link for a full list of available settings. https://docs.aws.amazon.com/sdk-for-ruby/v2/api/Aws/S3/Client.html Example) { "force_path_style": true } |
Metadata included or not
is enabled, the metadata field is created, but is not exposed by the Sink plugin without injecting it as a regular field.{
// General field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "Object contents..."
// Metadata fields
// Cannot be exposed to the Sink plugin until the user injects it as a regular field
// "[@metadata][s3][server_side_encryption]": "AES256"
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][last_modified]": 2024-01-05T02:27:26.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][metadata]": {}
}
{
"server_side_encryption": "%{[@metadata][s3][server_side_encryption]}"
"etag": "%{[@metadata][s3][etag]}"
"content_type": "%{[@metadata][s3][content_type]}"
"key": "%{[@metadata][s3][key]}"
"last_modified": "%{[@metadata][s3][last_modified]}"
"content_length": "%{[@metadata][s3][content_length]}"
"metadata": "%{[@metadata][s3][metadata]}"
}
{
// General field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "파일 내용..."
"server_side_encryption": "AES256"
"etag": "\"56ad65461e0abb907465bacf6e4f96cf\""
"content_type": "text/plain"
"key": "{filename}"
"last_modified": 2024-01-05T01:35:50.000Z
"content_length": 220
"metadata": {}
// Metadata field
// "[@metadata][s3][server_side_encryption]": "AES256"
// "[@metadata][s3][etag]": "\"56ad65461e0abb907465bacf6e4f96cf\""
// "[@metadata][s3][content_type]": "text/plain"
// "[@metadata][s3][key]": "{filename}"
// "[@metadata][s3][last_modified]": 2024-01-05T02:27:26.000Z
// "[@metadata][s3][content_length]": 220
// "[@metadata][s3][metadata]": {}
}
{
"message":"{\\\"S3\\\":\\\"Storage\\\", \\\"Read\\\": \\\"Object\\\", \\\"Result\\\": \\\"Data\\\"}"
}
{"S3":"Storage", "Read": "Object", "Result": "Data"}
Property name | Default value | Data type | Description | Note |
---|---|---|---|---|
Broker server list | localhost:9092 | string | Enter the Kafka broker server. Separate multiple servers with commas ( , ). | bootstrap.servers ex) 10.100.1.1:9092,10.100.1.2:9092 |
Consumer group ID | dataflow | string | Enter an ID that identifies the Kafka Consumer Group. | group.id |
Internal topic excluded or not | true | boolean | exclude.internal.topics Exclude internal topics such as __consumer_offsets from recipients. |
|
Topic pattern | - | string | Enter a Kafka topic patter to receive messages. | ex) *-messages |
Client ID | dataflow | string | Enter an ID to identify Kafka Consumer. | client.id |
Partition allocation policy | - | string | Determines how Kafka assigns partitions to consumer groups when receiving messages. | partition.assignment.strategy org.apache.kafka.clients.consumer.RangeAssignor org.apache.kafka.clients.consumer.RoundRobinAssignor org.apache.kafka.clients.consumer.StickyAssignor org.apache.kafka.clients.consumer.CooperativeStickyAssignor |
Offset settings | none | enum | Enter | auto.offset.reset All of the settings below preserve the existing offset if the consumer group already exists. none: Return an error when there is no consumer group. earliest: Initialize to the partition’s oldest offset if there is no consumer group. latest: Initialize to the partition’s most recent offset if there is no consumer group. |
Offset commit cycle | 5000 | number | Enter a cycle to update the consumer group offset. | auto.commit.internal.ms |
Offset auto commit or not | true | boolean | enable.auto.commit | |
Key deserialization type | org.apache.kafka.common.serialization.StringDeserializer | string | Enter how to serialize the keys of incoming messages. | key.deserializer |
Message deserialization type | org.apache.kafka.common.serialization.StringDeserializer | string | Enter how to serialize the values of incoming messages. | value.deserializer |
Metadata created or not | false | boolean | If the property value is true, creates a metadata field for the message. You need to combine filter node types to expose metadata fields to the Sink plugin (see the guide below). | fields to be created are as follows. topic: Topic that receives message consumer_group: Consumer group ID used to receive messages partition: Topic partition number that receives messages offset: Partition offset that receives messages key: ByteBuffer that includes message keys |
Minimum Fetch size | - | number | Enter the minimum size of data to be imported in one fetch request. | fetch.min.bytes |
Transfer buffer size | - | number | Enter size (byte) of TCP send buffer used to transfer data. | send.buffer.bytes |
Retry request cycle | 100 | number | Enter the retry cycle (ms) when a transfer request fails. | retry.backoff.ms |
Cyclic redundancy check | true | enum | Check the message CRC. | check.crcs |
Server reconnection cycle | 50 | number | Enter a retry cycle when connecting to broker server fails. | reconnect.backoff.ms |
Poll timeout | 100 | number | Enter the timeout (ms) for requests to fetch new messages from the topic. | |
Maximum fetch size per partition | - | number | Enter the maximum size to be imported in one fetch request per partition. | max.partition.fetch.bytes |
Server request timeout | 30000 | number | Enter the timeout (ms) for sent request. | request.timeout.ms |
TCP receive buffer size | - | number | Enter the size in bytes of the TCP receive buffer used to read data. | receive.buffer.bytes |
session_timeout_ms | - | number | Enter a session timeout (ms) of consumer. If a consumer fails to send a heartbeat within that time, it is excluded from the consumer group. |
session.timeout.ms |
Maximum poll message count | - | number | Enter the maximum number of messages to retrieve in one poll request. | max.poll.records |
Maximum poll cycle | - | number | Enter the maximum cycle (ms) between poll requests. | max.poll.interval.ms |
Maximum Fetch size | - | number | Enter the maximum size to be imported in one fetch request. | fetch.max.bytes |
Maximum Fetch wait time | - | number | Enter the wait time (ms) to send a fetch request when data is not gathered as much as the minimum fetch size setting. | fetch.max.wait.ms |
Consumer health check cycle | - | number | Enter a cycle of consumer sending heartbeat. | heartbeat.interval.ms |
Metadata update cycle | - | number | Enter the cycle (ms) to update the partition, broker server status, etc. | metadata.max.age.ms |
IDLE timeout | - | number | Enter the wait time (ms) to close a connection without data transmission. | connections.max.idle.ms |
Metadata created or not
is enabled, the metadata field is created, but is not exposed by the Sink plugin without injecting it as a regular field.{
// normal fields
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "kafka topic message..."
// metadata field
// Cannot be exposed to the Sink plugin until user input data into normal fields
// "[@metadata][kafka][topic]": "my-topic"
// "[@metadata][kafka][consumer_group]": "my_consumer_group"
// "[@metadata][kafka][partition]": "1"
// "[@metadata][kafka][offset]": "123"
// "[@metadata][kafka][key]": "my_key"
// "[@metadata][kafka][timestamp]": "-1"
}
{
"kafka_topic": "%{[@metadata][kafka][topic]}"
"kafka_consumer_group": "%{[@metadata][kafka][consumer_group]}"
"kafka_partition": "%{[@metadata][kafka][partition]}"
"kafka_offset": "%{[@metadata][kafka][offset]}"
"kafka_key": "%{[@metadata][kafka][key]}"
"kafka_timestamp": "%{[@metadata][kafka][timestamp]}"
}
{
// normal field
"@version": "1",
"@timestamp": "2022-04-11T00:01:23Z"
"message": "kafka topic message..."
"kafka_topic": "my-topic"
"kafka_consumer_group": "my_consumer_group"
"kafka_partition": "1"
"kafka_offset": "123"
"kafka_key": "my_key"
"kafka_timestamp": "-1"
// metadata field
// "[@metadata][kafka][topic]": "my-topic"
// "[@metadata][kafka][consumer_group]": "my_consumer_group"
// "[@metadata][kafka][partition]": "1"
// "[@metadata][kafka][offset]": "123"
// "[@metadata][kafka][key]": "my_key"
// "[@metadata][kafka][timestamp]": "-1"
}
{
"hello": "world!",
"hey": "foo"
}
{
"message": "{\"hello\":\"world\",\"hey\":\"foo\"}"
}
{
"hello": "world!",
"hey": "foo"
}
{
"hello": "world!",
"hey": "foo"
}
Property name | Default value | Data type | Description | Note |
---|---|---|---|---|
User | - | string | Enter a DB user. | |
Connection String | - | string | Enter the DB connection information. | Example) jdbc:mysql://my.sql.endpoint:3306/my_db_name |
Password | - | string | Enter the user password. | |
Query | - | string | Write a query to create a message. | |
Whether to convert columns to lowercase | true | boolean | Determine whether to lowercase the column names you get as a result of the query. | |
Query execute frequency | * * * * * |
string | Enter the execute frequency of the query in a cron-like expression. | |
Tracking Columns | - | string | Select the columns you want to track. | The predefined parameter :SQL_LAST_VALUE allows you to use a value corresponding to the column you want to track in the last query result.See how to write a query below. |
Tracking column type | array of strings | string | Select the type of data in the column you want to track. | Example) numeric or timestamp |
Time zone | - | string | Define the time zone to use when converting a column of type timestamp to a human-readable string. | Example) Asia/Seoul |
Whether to apply paging | true | boolean | Determines whether to apply paging to the query. | When paging is applied, the query is split into multiple executions, the order of which is not guaranteed. |
Page size | - | number | In a paged query, it determines how many pages to query at once. |
:sql_last_valu
allows you to use the value corresponding to the tracking column
in the result of the last executed query (the default value is 0
, if the tracking column type
is numeric
, or 1970-01-01 00:00:00
, if it is timestamp
).SELECT * FROM MY_TABLE WHERE id > :sql_last_value
:sql_last_value
in addition to the condition.SELECT * FROM MY_TABLE WHERE id > :sql_last_value and id > custom_value order by id ASC
{
"id": 1,
"name": "dataflow",
"deleted": false
}
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
ID | - | string | Sets Node ID Mark node name on chart board with values defined in this property. |
|
Add Tag | - | array of strings | Add Tag of each message | |
Delete Tag | - | array of strings | Delete Tag that was given to each message | |
Delete Field | - | array of strings | Delete Field of each message | |
Add Field | - | Hash | You can add a custom field You can add fields by calling in the value of each Field with %{[depth1_field]} . |
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
Overwrite Field | - | array of strings | Compare the field value to a given value, if they are equal, modify other field value to the given value. | |
Modify Field | - | array of strings | Compare the field value to a given value, if they are equal, modify the field value to the given value. | |
Coalesce | - | array of strings | Assign a non-null value to the first of the fields that follow one field. |
["logType", "ERROR", "isBillingTarget", "false"]
{
"logType": "ERROR"
}
{
"logType": "ERROR",
"isBillingTarget": "false"
}
["reason", "CONNECTION_TIMEOUT", "MONGODB_CONNECTION_TIMEOUT"]
{
"reason": "CONNECTION_TIMEOUT"
}
{
"reason": "MONGODB_CONNECTION_TIMEOUT"
}
["reason", "%{webClientReason}", "%{mongoReason}", "%{redisReason}"]
{
"mongoReason": "COLLECTION_NOT_FOUND"
}
{
"reason": "COLLECTION_NOT_FOUND",
"mongoReason": "COLLECTION_NOT_FOUND"
}
Even if flow contains multiple Cipher Nodes, all Cipher nodes must refer to only one SKM key reference.
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
Mode | - | enum | Choose between encryption mode and decryption mode. | Select one from the list. |
Appkey | - | string | Enter SKM app key that saves the key for encryption/decryption. | |
Key ID | - | string | Enter SKM ID that saves the key for encryption/decryption. | |
Key Version | - | string | Enter SKM key version that saves the key for encryption/decryption. | |
Encryption/decryption key length | 16 | number | Enter encryption/decryption key length | |
IV Random Length | - | number | Enter random bytes length of Initial Vector. | |
Source Field | - | string | Enter Field name for encryption/decryption. | |
Field to be stored | - | string | Enter Field name to save encryption/decryption result. |
encrypt
SKM appkey
SKM Symmetric key ID
1
16
{
"message": "this is plain message"
}
{
"message": "this is plain message",
"encrypted_message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
decrypt
SKM appkey
SKM Symmetric key ID
1
16
{
"message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
{
"decrypted_message": "this is plain message",
"message": "oZA6CHd4OwjPuS+MW0ydCU9NqbPQHGbPf4rll2ELzB8y5pyhxF6UhWZq5fxrt0/e"
}
Property name | Default value | Data type | Description | Note |
---|---|---|---|---|
Match | - | hash | Enter the information of the string to be parsed. | |
Pattern definition | - | hash | Enter a custom pattern as a regular expression for the rule of tokens to be parsed. | Check the link below for system defined patterns. https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/grok-patterns |
Failure tag | - | array of strings | Enter the tag name to define if string parsing fails. | |
Timeout | 30000 | number | Enter the amount of time to wait for string parsing. | |
Overwrite | - | array of strings | When writing a value to a designated field after parsing, if a value is already defined in the field, enter the field names to be overwritten. | |
Store only values with specified names | true | boolean | If the property value is true, do not store unnamed parting results. | |
Capture empty string | false | boolean | If the property value is true, store empty strings in fields. | |
Close or not when match | true | boolean | If the property value is true, the grok match result will terminate the plugin. |
{ "message": "%{IP:clientip} %{HYPHEN} %{USER} [%{HTTPDATE:timestamp}] "%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} %{NUMBER:bytes}" }
{ "HYPHEN": "-*" }
{
"message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \\\"GET /apache_pb.gif HTTP/1.0\\\" 200 2326"
}
{
"message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \\\"GET /apache_pb.gif HTTP/1.0\\\" 200 2326",
"timestamp": "10/Oct/2000:13:55:36 -0700",
"clientip": "127.0.0.1",
"verb": "GET",
"httpversion": "1.0",
"response": "200",
"bytes": "2326",
"request": "/apache_pb.gif"
}
Property name | Default value | Data type | Description | Note |
---|---|---|---|---|
Field to save | - | string | Enter the field name to save the CSV parsing result. | |
Quote | " | string | Enter the character that divides the column fields. | |
First line ignored or not | false | boolean | If the property value is true, the column name entered in the first row of the read data is ignored. | |
Column | - | array of strings | Enter the column name. | |
Separator | , | string | Enter a string to separate columns. | |
Source field | message | string | Enter the field name to parse CSV. | |
Schema | - | hash | Enter the name and data type of each column in the form of a dictionary. | Register separately from the fields defined in the column. The data type is basically a string, and if you need to convert to another data type, use the schema setting. Possible data types are as follows. integer, float, date, date_time, boolean |
message
["one", "two", "t hree"]
{
"message": "hey,foo,\\\"bar baz\\\""
}
{
"message": "hey,foo,\"bar baz\"",
"one": "hey",
"t hree": "bar baz",
"two": "foo"
}
message
["one", "two", "t hree"]
{
"message": "hey,foo,\\\"bar baz\\\""
}
{
"message": "hey,foo,\"bar baz\"",
"one": "hey",
"t hree": "bar baz",
"two": "foo"
}
message
["one", "two", "t hree"]
{"two": "integer", "t hree": "boolean"}
{
"message": "\\\"wow hello world!\\\", 2, false"
}
{
"message": "\\\"wow hello world!\\\", 2, false",
"one": "wow hello world!",
"t hree": false,
"two": 2
}
Property name | Default value | Data type | Description | Note |
---|---|---|---|---|
Source field | message | string | Enter a field name to parse JSON strings. | |
Field to save | - | string | Enter the field name to save the JSON parsing result. If no property value is specified, the result is stored in the root field. |
message
json_parsed_messsage
{
"message": "{\\\"json\\\": \\\"parse\\\", \\\"example\\\": \\\"string\\\"}"
}
{
"json_parsed_message": {
"json": "parse",
"example": "string"
},
"message": "uuid test message"
}
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
Match | - | json | Enter the information of the string to be parsed. | |
Pattern definition | - | json | Enter a custom pattern as a regular expression for the rule of tokens to be parsed. | Check the link below for system defined patterns. http://grokdebug.herokuapp.com/patterns |
Failure tag | - | array of strings | Enter the tag name to define if string parsing fails. | |
Timeout | 30000 | number | Enter the amount of time to wait for string parsing. | |
Overwrite | - | array of strings | When writing a value to a designated field after parsing, if a value is already defined in the field, enter the field names to be overwritten. | |
Store only values with specified names | - | boolean | Select whether to store unnamed parsing results. | |
Capture empty string | - | boolean | Select whether to store empty strings in fields. |
{ "message": "%{IP:clientip} %{HYPHEN} %{USER} [%{HTTPDATE:timestamp}] "%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} %{NUMBER:bytes}" }
{ "HYPHEN": "-*" }
{
"message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \\\"GET /apache_pb.gif HTTP/1.0\\\" 200 2326"
}
{
"message": "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] \\\"GET /apache_pb.gif HTTP/1.0\\\" 200 2326",
"timestamp": "10/Oct/2000:13:55:36 -0700",
"clientip": "127.0.0.1",
"verb": "GET",
"httpversion": "1.0",
"response": "200",
"bytes": "2326",
"request": "/apache_pb.gif"
}
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
Source Field | - | string | Enter a field name to get strings. | |
Formats | - | array of strings | Enter formats to get strings. | The pre-defined formats are as follows. ISO8601, UNIX, UNIX_MS, TAI64N |
Locale | - | Enter a locale to use for string analysis. | ex) en, en-US, ko-kr | |
Field to be stored | - | string | Enter a field name to store the result of parsing data strings. | |
Failure tag | - | array of strings | Enter the tag name to define if data string parsing fails. | |
Time zone | - | string | Enter the time zone for the date. |
["message" , "yyyy-MM-dd HH:mm:ssZ", "ISO8601"]
time
Asia/Seoul
{
"message": "2017-03-16T17:40:00"
}
{
"message": "2017-03-16T17:40:00",
"time": 2022-04-04T09:08:01.222Z
}
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
Field to store UUID | - | string | Enter a field name to store UUID creation result. | |
Overwrite | - | boolean | Select whether to overwrite the value if it exists in the specified field name. |
userId
{
"message": "uuid test message"
}
{
"userId": "70186b1e-bdec-43d6-8086-ed0481b59370",
"message": "uuid test message"
}
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
Source field | - | string | Enter a field name to separate messages. | |
Field to be stored | - | string | Enter a field name to store separated messages. | |
Separator | \n |
string |
message
{
"message": [
{"number": 1},
{"number": 2}
]
}
{
"message": [
{"number": 1},
{"number": 2}
],
"number": 1
}
{
"message": [
{"number": 1},
{"number": 2}
],
"number": 2
}
message
,
{
"message": "1,2"
}
{
"message": "1"
}
{
"message": "2"
}
message
target
,
{
"message": "1,2"
}
{
"message": "1,2",
"target": "1"
}
{
"message": "1,2",
"target": "2"
}
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
Byte length | - | number | Enter the maximum byte length to represent a string. | |
Source field | - | string | Enter a field name for truncate. |
message
{
"message": "This message is too long."
}
{
"message": "This messa"
}
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
ID | - | string | Sets Node ID Mark node name on the chart board with values defined in this property. |
/{container_name}/year={yyyy}/month={MM}/day={dd}/hour={HH}/ls.s3.{uuid}.{yyyy}-{MM}-{dd}T{HH}.{mm}.part{seq_id}.txt
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
region | - | enum | Enter the region of Object Storage product | OBS Region in Detail |
Bucket | - | string | Enter bucket name | |
Secret Key | - | string | Enter S3 API Credential Secret Key. | |
Access Key | - | string | Enter S3 API Credential Access Key. | |
Prefix | /year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH} | string | Enter a prefix to prefix the name when uploading the object. You can enter a field or time format. |
Available Time Format |
Prefix Time Field | @timestamp | string | Enter a time field to apply to the prefix. | |
Prefix Time Field Type | DATE_FILTER_RESULT | enum | Enter a time field type to apply to the prefix. | |
Prefix Time Zone | UTC | string | Enter a time zone for the Time field to apply to the prefix. | |
Prefix Time Application fallback | _prefix_datetime_parse_failure | string | Enter a prefix to replace if the prefix time application fails. | |
Encoding | none | enum | Enter whether to encode or not . gzip encoding is available. | |
Object Rotation Policy | size_and_time | enum | Determines object creation rules. | size_and_time – Use object size and time to decide size – Use object size to decide Time – Use time to decide |
Reference Time | 15 | number | Set the time to be the basis for object splitting. | Set if object rotation policy is size_and_time or time |
Object size | 5242880 | number | Set the size to be the basis for object splitting. | Set when object rotation policy is size_and_time or size |
KR1
obs-test-container
******
******
{"hidden":"Hello Dataflow!","message":"Hello World", "@timestamp": "2022-11-21T07:49:20Z"}
/obs-test-container/year=2022/month=11/day=21/hour=07/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T07.49.part0.txt
{"@timestamp":"2022-11-21T07:49:20.000Z","host":"755b65d82bd0","hidden":"Hello Dataflow!","@version":"1","sequence":0,"message":"Hello World"}
KR1
obs-test-container
******
******
{
"message": "Hello World!",
"hidden": "Hello Dataflow!",
"@timestamp": "2022-11-21T07:49:20Z"
}
/obs-test-container/year=2022/month=11/day=21/hour=07/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T07.49.part0.txt
2022-11-21T07:49:20.000Z e0e40e03dd94 Hello World
KR1
obs-test-container
******
******
{
"hidden": "Hello Dataflow!",
"@timestamp": "2022-11-21T07:49:20Z"
}
/obs-test-container/year=2022/month=11/day=21/hour=07/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
2022-11-21T07:49:20.000Z f207c24a122e %{message}
Property name | Default value | Data type | Description | Note |
---|---|---|---|---|
parquet compression codec | SNAPPY | enum | Enter the compression codec to use when converting PARQUET files. | Reference |
obs-test-container
/dataflow/%{deployment}
{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/dataflow/production/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
obs-test-container
/dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}
logTime
ISO8601
Asia/Seoul
{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/dataflow/year=2022/month=11/day=21/hour=16/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
obs-test-container
/dataflow/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/hour=%{+HH}
logTime
TIMESTAMP_SEC
Asia/Seoul
_failure
{
"deployment": "production",
"message": "example",
"logTime": "2022-11-21T07:49:20Z"
}
/obs-test-container/_failure/ls.s3.d53c090b-9718-4833-926a-725b20c85974.2022-11-21T00.47.part0.txt
/{container_name}/year={yyyy}/month={MM}/day={dd}/hour={HH}/ls.s3.{uuid}.{yyyy}-{MM}-{dd}T{HH}.{mm}.part{seq_id}.parquet
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
region | - | enum | Enter Region of S3 product. | s3 region |
Bucket | - | string | Enter bucket name | |
Access Key | - | string | Enter S3 API Credential Access Key. | |
Secret Key | - | string | Enter S3 API Credential Secret Key. | |
Signature Version | - | enum | Enter the version to use when signing AWS requests. | |
Session Token | - | string | Enter the Session Token for AWS temporary Credentials. | Session Token Guide |
Prefix | - | string | Enter a prefix to prefix the name when uploading the object. You can enter a field or time format. |
Available Time Format |
Prefix Time Field | @timestamp | string | Enter a time field to apply to the prefix. | |
Prefix Time Field Type | DATE_FILTER_RESULT | enum | Enter a time field type to apply to the prefix. | |
Prefix Time Zone | UTC | string | Enter a time zone for the Time field to apply to the prefix. | |
Prefix Time Application fallback | _prefix_datetime_parse_failure | string | Enter a prefix to replace if the prefix time application fails. | |
Storage Class | STANDARD | enum | Set Storage Class when object is uploaded. | Storage Class Guide |
Encoding | none | enum | Enter whether to encode or not . gzip encoding is available. | |
Object Rotation Policy | size_and_time | enum | Determine object creation rules. | size_and_time – Use object size and time to decide size – Use object size to decide Time – Use time to decide |
Reference Time | 15 | number | Set the time to be the basis for object splitting. | Set when the object rotation policy is size_and_time or time |
Object size | 5242880 | number | Set the size to be the basis for object splitting. | Set when the object rotation policy is size_and_time or size |
ACL | private | enum | Enter ACL policy to set when object is uploaded. | |
Additional Settings | { } | Hash | Enter additional settings to connect to S3. | Guide |
Property name | Default value | Data type | Description | Note |
---|---|---|---|---|
parquet compression codec | SNAPPY | enum | Enter the compression codec to use when converting PARQUET files. | Reference |
true
and when AWS S3 returns 307 response {
follow_redirects → true
}
{
retry_limit → 5
}
true
, the URL must be path-style, not virtual-hosted-style. Reference{
force_path_style → true
}
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
Topic | - | string | Type the name of Kafka topic to which want to send message. | |
Broker Server List | localhost:9092 | string | Enter Kafka Broker server. Separate multiple servers with commas (, ). |
bootstrap.servers ex) 10.100.1.1:9092,10.100.1.2:9092 |
Client ID | dataflow | string | Enter ID that identifies Kafka Producer. | client.id |
Message Serialization Type | org.apache.kafka.common.serialization.StringSerializer | string | Enter how to serialize message value to send. | value.serializer |
Compression type | none | enum | Enter how to compress data to send. | compression.type Select out of none, gzip, snappy, lz4 |
Key Serialization Type | org.apache.kafka.common.serialization.StringSerializer | string | Enter how to serialize message key to send. | key.serializer |
Meta data upload cycle | 300000 | number | Enter the interval (ms) at which want to update partition, broker server status, etc. | metadata.max.age.ms |
Maximum Request Size | 1048576 | number | Enter maximum size (byte) per transfer request. | max.request.size |
Server Reconnection Cycle | 50 | number | Enter how often to retry when Connection to broker server fails. | reconnect.backoff.ms |
Batch Size | 16384 | number | Enter size (byte) to send to Batch Request. | batch.size |
Buffer Memory | 33554432 | number | Enter size (byte) of buffer used to transfer Kafka. | buffer.memory |
Receiving Buffer Size | 32768 | number | Enter size (byte) of TCP receive buffer used to read data. | receive.buffer.bytes |
Transfer Delay Time | 0 | number | Enter delay time for message sending. Delayed messages are sent as batch requests at once. | linger.ms |
Server Request Timeout | 40000 | number | Enter timeout (ms) for Transfer Request. | request.timeout.ms |
Meta data Query Timeout | number | https://kafka.apache.org/documentation/#upgrade_1100_notable | ||
Transfer Buffer Size | 131072 | number | Enter size (byte) of TCP send buffer used to transfer data. | send.buffer.bytes |
Ack Property | 1 | enum | Enter settings to verify that messages have been received from Broker server. | acks 0 - Does not check if 6543eyu0=message is received. 1 - Leader of topic responds that the message was received without waiting for follower to copy data. all - Leader of topic waits for follower to copy the data before responding that they have received the message. |
Request Reconnection Cycle | 100 | number | Enter the interval (ms) to retry when transfer request fails. | retry.backoff.ms |
Retry times | - | number | Enter the maximum times (ms) to retry when transfer request fails. | retries Retrying exceeding the set value may result in data loss. |
{
"message": "Hello World!",
"hidden": "Hello Dataflow!",
"@timestamp": "2022-11-21T07:49:20Z"
}
{"host":"0bc501d89f8c","message":"Hello World","hidden":"Hello Dataflow!","sequence":0,"@timestamp":"2022-11-21T07:49:20.000Z","@version":"1"}
{
"message": "Hello World!",
"hidden": "Hello Dataflow!",
"@timestamp": "2022-11-21T07:49:20Z"
}
2022-11-21T07:49:20.000Z 2898d492114d Hello World
{
"hidden": "Hello Dataflow!",
"@timestamp": "2022-11-21T07:49:20Z"
}
2022-11-21T07:49:20.000Z e5ef7ece9bb0 %{message}
Property name | Default value | Data type | Description | Others |
---|---|---|---|---|
conditional sentence. | - | string | Enter the conditions for message filtering. |
[logLevel] == "ERROR"
{
"logLevel": "ERROR"
}
{
"logLevel": "INFO"
}
[response][status] == 200
{
"resposne": {
"status": 200
}
}
{
"resposne": {
"status": 404
}
}