Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_s3: Add parquet compression type #8837

Open
wants to merge 30 commits into
base: master
Choose a base branch
from

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented May 20, 2024

With columnify command we're able to support parquet format on out_s3.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
[SERVICE]
    Flush        5
    Daemon       Off
    Log_Level    trace
    HTTP_Server  Off
    HTTP_Listen  0.0.0.0
    HTTP_Port    2020

[INPUT]
    Name dummy
    Tag  dummy.local
    dummy {"boolean": false, "int": 1, "long": 1, "float": 1.1, "double": 1.1, "bytes": "foo", "string": "foo"}

[OUTPUT]
    Name  s3
    Match dummy*
    Region us-east-2
    bucket fbit-parquet-s3
    Use_Put_object true
    compression parquet
    parquet.schema_file schema-dummy.avsc

schema-dummy.avsc

{
  "type": "record",
  "name": "DummyMessages",
  "fields" : [
    {"name": "boolean", "type": "boolean"},
    {"name": "int",     "type": "int"},
    {"name": "long",    "type": "long"},
    {"name": "float",   "type": "float"},
    {"name": "double",  "type": "double"},
    {"name": "bytes",   "type": "bytes"},
    {"name": "string",  "type": "string"}
  ]
}
  • Debug log output from testing the change
Fluent Bit v3.0.4
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

___________.__                        __    __________.__  __          ________  
\_   _____/|  |  __ __   ____   _____/  |_  \______   \__|/  |_  ___  _\_____  \ 
 |    __)  |  | |  |  \_/ __ \ /    \   __\  |    |  _/  \   __\ \  \/ / _(__  < 
 |     \   |  |_|  |  /\  ___/|   |  \  |    |    |   \  ||  |    \   / /       \
 \___  /   |____/____/  \___  >___|  /__|    |______  /__||__|     \_/ /______  /
     \/                     \/     \/               \/                        \/ 

[2024/05/23 15:33:05] [ info] Configuration:
[2024/05/23 15:33:05] [ info]  flush time     | 5.000000 seconds
[2024/05/23 15:33:05] [ info]  grace          | 5 seconds
[2024/05/23 15:33:05] [ info]  daemon         | 0
[2024/05/23 15:33:05] [ info] ___________
[2024/05/23 15:33:05] [ info]  inputs:
[2024/05/23 15:33:05] [ info]      dummy
[2024/05/23 15:33:05] [ info] ___________
[2024/05/23 15:33:05] [ info]  filters:
[2024/05/23 15:33:05] [ info] ___________
[2024/05/23 15:33:05] [ info]  outputs:
[2024/05/23 15:33:05] [ info]      s3.0
[2024/05/23 15:33:05] [ info] ___________
[2024/05/23 15:33:05] [ info]  collectors:
[2024/05/23 15:33:05] [ info] [fluent bit] version=3.0.4, commit=7de2c45227, pid=952893
[2024/05/23 15:33:05] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/05/23 15:33:05] [ info] [storage] ver=1.1.6, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/05/23 15:33:05] [ info] [cmetrics] version=0.9.0
[2024/05/23 15:33:05] [ info] [ctraces ] version=0.5.1
[2024/05/23 15:33:05] [ info] [input:dummy:dummy.0] initializing
[2024/05/23 15:33:05] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2024/05/23 15:33:05] [debug] [dummy:dummy.0] created event channels: read=21 write=22
[2024/05/23 15:33:05] [debug] [s3:s3.0] created event channels: read=23 write=24
[2024/05/23 15:33:05] [ info] [output:s3:s3.0] Using upload size 100000000 bytes
[2024/05/23 15:33:05] [debug] [output:s3:s3.0] parquet.compression format is SNAPPY
[2024/05/23 15:33:05] [ info] [output:s3:s3.0] parquet.record_type format is jsonl
[2024/05/23 15:33:05] [ info] [output:s3:s3.0] parquet.schema_type format is avro
[2024/05/23 15:33:05] [debug] [aws_credentials] Initialized Env Provider in standard chain
[2024/05/23 15:33:05] [debug] [aws_credentials] creating profile (null) provider
[2024/05/23 15:33:05] [debug] [aws_credentials] Initialized AWS Profile Provider in standard chain
[2024/05/23 15:33:05] [debug] [aws_credentials] Not initializing EKS provider because AWS_ROLE_ARN was not set
[2024/05/23 15:33:05] [debug] [aws_credentials] Not initializing ECS Provider because AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is not set
[2024/05/23 15:33:05] [debug] [aws_credentials] Initialized EC2 Provider in standard chain
[2024/05/23 15:33:05] [debug] [aws_credentials] Sync called on the EC2 provider
[2024/05/23 15:33:05] [debug] [aws_credentials] Init called on the env provider
[2024/05/23 15:33:05] [debug] [aws_credentials] upstream_set called on the EC2 provider
[2024/05/23 15:33:05] [ info] [sp] stream processor started
[2024/05/23 15:33:05] [ info] [output:s3:s3.0] worker #0 started
[2024/05/23 15:33:05] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/05/23 15:33:06] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/05/23 15:33:07] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/05/23 15:33:08] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/05/23 15:33:09] [trace] [task 0x61437a0] created (id=0)
[2024/05/23 15:33:09] [debug] [task] created task=0x61437a0 id=0 OK
[2024/05/23 15:33:09] [debug] [output:s3:s3.0] task_id=0 assigned to thread #0
[2024/05/23 15:33:09] [debug] [output:s3:s3.0] Creating upload timer with frequency 60s
[2024/05/23 15:33:09] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/05/23 15:33:09] [debug] [out flush] cb_destroy coro_id=0
[2024/05/23 15:33:09] [trace] [coro] destroy coroutine=0x6143a40 data=0x6143a60
[2024/05/23 15:33:09] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/05/23 15:33:09] [debug] [task] destroy task=0x61437a0 (task_id=0)
[2024/05/23 15:33:10] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/05/23 15:33:11] [trace] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
^C[2024/05/23 15:33:12] [engine] caught signal (SIGINT)
[2024/05/23 15:33:12] [trace] [engine] flush enqueued data
[2024/05/23 15:33:12] [trace] [task 0x61bc460] created (id=0)
[2024/05/23 15:33:12] [debug] [out flush] cb_destroy coro_id=1
[2024/05/23 15:33:12] [debug] [task] created task=0x61bc460 id=0 OK
[2024/05/23 15:33:12] [trace] [coro] destroy coroutine=0x61bc700 data=0x61bc720
[2024/05/23 15:33:12] [debug] [output:s3:s3.0] task_id=0 assigned to thread #0
[2024/05/23 15:33:12] [ warn] [engine] service will shutdown in max 5 seconds
[2024/05/23 15:33:12] [ info] [input] pausing dummy.0
[2024/05/23 15:33:12] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/05/23 15:33:12] [debug] [task] destroy task=0x61bc460 (task_id=0)
[2024/05/23 15:33:12] [ info] [engine] service has stopped (0 pending tasks)
[2024/05/23 15:33:12] [ info] [input] pausing dummy.0
[2024/05/23 15:33:12] [ info] [output:s3:s3.0] thread worker #0 stopping...
[2024/05/23 15:33:12] [ info] [output:s3:s3.0] thread worker #0 stopped
[2024/05/23 15:33:12] [ info] [output:s3:s3.0] Sending all locally buffered data to S3
[2024/05/23 15:33:12] [ info] [output:s3:s3.0] Pre-compression chunk size is 882, After compression, chunk is 981 bytes
[2024/05/23 15:33:12] [trace] [upstream] get new connection for s3.us-east-2.amazonaws.com:443, net setup:
net.connect_timeout        = 10 seconds
net.source_address         = any
net.keepalive              = enabled
net.keepalive_idle_timeout = 30 seconds
net.max_worker_connections = 0
[2024/05/23 15:33:13] [trace] [net] connection #27 in process to s3.us-east-2.amazonaws.com:443
[2024/05/23 15:33:13] [trace] [tls] connection and handshake OK
[2024/05/23 15:33:13] [trace] [io] connection OK
[2024/05/23 15:33:13] [debug] [upstream] KA connection #27 to s3.us-east-2.amazonaws.com:443 is connected
[2024/05/23 15:33:13] [debug] [http_client] not using http_proxy for header
[2024/05/23 15:33:13] [debug] [aws_credentials] Requesting credentials from the env provider..
[2024/05/23 15:33:13] [trace] [io coro=(nil)] [net_write] trying 1608 bytes
[2024/05/23 15:33:13] [trace] [io coro=(nil)] [net_write] ret=1608 total=1608/1608
[2024/05/23 15:33:13] [trace] [io coro=(nil)] [net_write] trying 981 bytes
[2024/05/23 15:33:13] [trace] [io coro=(nil)] [net_write] ret=981 total=981/981
[2024/05/23 15:33:13] [trace] [io coro=(nil)] [net_read] try up to 4095 bytes
[2024/05/23 15:33:14] [trace] [io coro=(nil)] [net_read] ret=299
[2024/05/23 15:33:14] [debug] [upstream] KA connection #27 to s3.us-east-2.amazonaws.com:443 is now available

Install columnify with:

$ go install github.com/reproio/columnify/cmd/columnify@latest
# ...
$ which columnify
/path/to/columnify
$ echo $?
0
  • Attached Valgrind output that shows no leaks or memory corruption was found
==35435== 
==35435== HEAP SUMMARY:
==35435==     in use at exit: 0 bytes in 0 blocks
==35435==   total heap usage: 20,529 allocs, 20,529 frees, 3,530,224 bytes allocated
==35435== 
==35435== All heap blocks were freed -- no leaks are possible
==35435== 
==35435== For lists of detected and suppressed errors, rerun with: -s
==35435== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

fluent/fluent-bit-docs#1380

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Currently, it should be handled on out_s3. So, it is just for used to
specify the parquet format.

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
plugins/out_s3/s3.c Outdated Show resolved Hide resolved
plugins/out_s3/s3_win32_compat.h Outdated Show resolved Hide resolved
plugins/out_s3/s3.c Outdated Show resolved Hide resolved
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
@cosmo0920 cosmo0920 force-pushed the cosmo0920-add-parquet-compression-type-for-out_s3 branch from 31bb1c9 to 222ac4f Compare June 4, 2024 09:59
@cosmo0920 cosmo0920 force-pushed the cosmo0920-add-parquet-compression-type-for-out_s3 branch from 92710e0 to 8ad498b Compare June 4, 2024 10:47
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
@cosmo0920
Copy link
Contributor Author

Fluent Bit S3 already has the store_dir for local buffer files. The columnify files should be in a new sub-dir of that user configured dir or should have a separate user config option.

I added a config option to specify the separated nested directories to process parquet compressions.

@edsiper edsiper added this to the Fluent Bit v3.1.0 milestone Jun 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants