Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

output: scheduled timer callbacks with coroutines #7981

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

PettitWesley
Copy link
Contributor

Implements #7466


Enter [N/A] in the box, if an item is not applicable to your change.

Testing

In order to test the change, some output needs to use timer coroutines.

The plan is for S3 output to use them, I have code which works and is undergoing extensive testing here: https://github.com/PettitWesley/fluent-bit/pull/27/commits

  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@edsiper
Copy link
Member

edsiper commented Oct 10, 2023

@PettitWesley it seems the code breaks something, majority of the unit tests are failing, can you check pls ?

@PettitWesley
Copy link
Contributor Author

The failure is due to a bug in my code:

(gdb) bt
#0  0x00000000004639dd in mk_list_size (head=0x16efda0) at /home/ec2-user/fluent-bit/lib/monkey/include/monkey/mk_core/mk_list.h:165
#1  0x0000000000467760 in flb_output_timer_coros_size (ins=0x16efad0) at /home/ec2-user/fluent-bit/include/fluent-bit/flb_output.h:1187
#2  0x0000000000468991 in flb_running_count (config=0x16ce7a0) at /home/ec2-user/fluent-bit/src/flb_engine.c:493
#3  0x00000000004699ba in flb_engine_start (config=0x16ce7a0) at /home/ec2-user/fluent-bit/src/flb_engine.c:1003
#4  0x0000000000418789 in flb_lib_worker (data=0x16b2460) at /home/ec2-user/fluent-bit/src/flb_lib.c:638
#5  0x00007f173dc9f7f2 in start_thread () from /lib64/libc.so.6
#6  0x00007f173dc3f450 in clone3 () from /lib64/libc.so.6
(gdb) bt full
#0  0x00000000004639dd in mk_list_size (head=0x16efda0) at /home/ec2-user/fluent-bit/lib/monkey/include/monkey/mk_core/mk_list.h:165
        ret = 0
        it = 0x0
#1  0x0000000000467760 in flb_output_timer_coros_size (ins=0x16efad0) at /home/ec2-user/fluent-bit/include/fluent-bit/flb_output.h:1187
        size = 0
#2  0x0000000000468991 in flb_running_count (config=0x16ce7a0) at /home/ec2-user/fluent-bit/src/flb_engine.c:493
        tasks = 0
        timers = 0
        n = 0
        head = 0x16efc90
        tmp = 0x16cea50
        o_ins = 0x16efad0
#3  0x00000000004699ba in flb_engine_start (config=0x16ce7a0) at /home/ec2-user/fluent-bit/src/flb_engine.c:1003
        __flb_event_priority_live_foreach_iter = 1
        __flb_event_priority_live_foreach_n_events = 5
        ret = 4
        count = 32535
        ts = 2180270372444040704
        tmp = "24.0K\000\000\000\000\000\000\000\000\000\000"
        rb_flush_flag = 0
        t_flush = {tm = {tv_sec = 1, tv_nsec = 0}}
        event = 0x16ce8f0
        evl = 0x7f1738001790
        evl_bktq = 0x7f1738005fc0
        sched = 0x7f173800c900
        dns_ctx = {lookups = {prev = 0x7f173dbf5e30, next = 0x7f173dbf5e30}, lookups_drop = {prev = 0x7f173dbf5e40, next = 0x7f173dbf5e40}}
        rb_ms = 250
        rb_env = 0x0
#4  0x0000000000418789 in flb_lib_worker (data=0x16b2460) at /home/ec2-user/fluent-bit/src/flb_lib.c:638
        ret = 0
        ctx = 0x16b2460
        config = 0x16ce7a0
#5  0x00007f173dc9f7f2 in start_thread () from /lib64/libc.so.6
No symbol table info available.
#6  0x00007f173dc3f450 in clone3 () from /lib64/libc.so.6

@PettitWesley
Copy link
Contributor Author

I think the root cause of the unit test failure is that I forgot to initialize the timer coro list for the zero worker case.

@PettitWesley PettitWesley temporarily deployed to pr October 11, 2023 23:45 — with GitHub Actions Inactive
@PettitWesley PettitWesley temporarily deployed to pr October 11, 2023 23:45 — with GitHub Actions Inactive
@PettitWesley PettitWesley temporarily deployed to pr October 11, 2023 23:45 — with GitHub Actions Inactive
@PettitWesley PettitWesley temporarily deployed to pr October 12, 2023 00:12 — with GitHub Actions Inactive
@@ -333,7 +334,7 @@ static void output_thread(void *data)
flb_sched_timer_cleanup(sched);

/* Check if we should stop the event loop */
if (stopping == FLB_TRUE && mk_list_size(&th_ins->flush_list) == 0) {
if (stopping == FLB_TRUE && mk_list_size(&th_ins->flush_list) == 0 && mk_list_size(&th_ins->timer_coro_list) == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why mk_list_size() for timer_coro_list in this case is not protected as in the others part of the code (mutex) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush_list isn't protected either, I thought that in this control code, its guaranteed that this is the only code running in this thread and accessing the thread instance list... is that not true? Why is the flush_list not protected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to think on this some more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matt is helping me realize that the problem is that the engine looks at these lists too, which means there need to be a lock that the engine acquires to ensure its allowed to safely review the list that the output thread owns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 * If the main engine (parent thread) needs to query the number of active
 * 'flushes' running by a threaded instance, then the access to the 'flush_list'
 * must be protected: we use 'flush_mutex for that purpose.

https://github.com/fluent/fluent-bit/blob/master/include/fluent-bit/flb_output_thread.h#L85

I htink I get it now. The engine needs to acquire the lock to read the list, and the output thread needs to acquire the lock for writes/deletes, so that the engine won't read it at the same time. If the output thread is just reading, then that's always safe. Assuming the output thread will modify the list.

Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Feb 29, 2024
@github-actions github-actions bot removed the Stale label Aug 16, 2024
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Dec 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants