Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logproto: fixed state handling of the new internal handshake_in_progress flag #388

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions lib/logproto/logproto-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,18 @@ log_proto_client_validate_options(LogProtoClient *self)
return self->validate_options(self);
}

static inline gboolean
log_proto_client_needs_handshake(LogProtoClient *s)
{
return s->handshake != NULL;
}

static inline LogProtoStatus
log_proto_client_handshake(LogProtoClient *s, gboolean *handshake_finished)
{
if (s->handshake)
{
return s->handshake(s, handshake_finished);
}
if (log_proto_client_needs_handshake(s))
return s->handshake(s, handshake_finished);

*handshake_finished = TRUE;
return LPS_SUCCESS;
}
Expand Down
13 changes: 9 additions & 4 deletions lib/logproto/logproto-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,18 @@ log_proto_server_validate_options(LogProtoServer *self)
return self->validate_options(self);
}

static inline gboolean
log_proto_server_needs_handshake(LogProtoServer *s)
{
return s->handshake != NULL;
}

static inline LogProtoStatus
log_proto_server_handshake(LogProtoServer *s, gboolean *handshake_finished)
{
if (s->handshake)
{
return s->handshake(s, handshake_finished);
}
if (log_proto_server_needs_handshake(s))
return s->handshake(s, handshake_finished);

*handshake_finished = TRUE;
return LPS_SUCCESS;
}
Expand Down
15 changes: 11 additions & 4 deletions lib/logreader.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,12 @@ log_reader_apply_proto_and_poll_events(LogReader *self, LogProtoServer *proto, P
self->proto = proto;

if (self->proto)
log_proto_server_set_wakeup_cb(self->proto, (LogProtoServerWakeupFunc) log_reader_wakeup, self);
{
log_proto_server_set_wakeup_cb(self->proto, (LogProtoServerWakeupFunc) log_reader_wakeup, self);
self->handshake_in_progress = log_proto_server_needs_handshake(self->proto);
}
else
self->handshake_in_progress = FALSE;

self->poll_events = poll_events;
}
Expand Down Expand Up @@ -501,11 +506,13 @@ log_reader_fetch_log(LogReader *self)

if ((self->options->flags & LR_IGNORE_AUX_DATA))
aux = NULL;

log_transport_aux_data_init(aux);

if (self->handshake_in_progress)
{
return log_reader_process_handshake(self);
gboolean succ = log_reader_process_handshake(self);
if (FALSE == succ || self->handshake_in_progress)
return FALSE;
}

/* NOTE: this loop is here to decrease the load on the main loop, we try
Expand Down Expand Up @@ -777,7 +784,7 @@ log_reader_new(GlobalConfig *cfg)
self->super.schedule_dynamic_window_realloc = _schedule_dynamic_window_realloc;
self->super.metrics.raw_bytes_enabled = TRUE;
self->immediate_check = FALSE;
self->handshake_in_progress = TRUE;
self->handshake_in_progress = FALSE;
log_reader_init_watches(self);
g_mutex_init(&self->pending_close_lock);
g_cond_init(&self->pending_close_cond);
Expand Down
12 changes: 10 additions & 2 deletions lib/logwriter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,11 @@ log_writer_flush(LogWriter *self, LogWriterFlushMode flush_mode)
return FALSE;

if (self->handshake_in_progress)
return log_writer_process_handshake(self);
{
gboolean succ = log_writer_process_handshake(self);
if (FALSE == succ || self->handshake_in_progress)
return FALSE;
}

/* NOTE: in case we're reloading or exiting we flush all queued items as
* long as the destination can consume it. This is not going to be an
Expand Down Expand Up @@ -1720,7 +1724,11 @@ log_writer_set_proto(LogWriter *self, LogProtoClient *proto)

log_proto_client_set_client_flow_control(self->proto, &flow_control_funcs);
log_proto_client_set_options(self->proto, &self->options->proto_options.super);

self->handshake_in_progress = log_proto_client_needs_handshake(self->proto);
}
else
self->handshake_in_progress = FALSE;
}

static void
Expand Down Expand Up @@ -1923,7 +1931,7 @@ log_writer_new(guint32 flags, GlobalConfig *cfg)
self->flags = flags;
self->line_buffer = g_string_sized_new(128);
self->pollable_state = -1;
self->handshake_in_progress = TRUE;
self->handshake_in_progress = FALSE;
init_sequence_number(&self->seq_num);

log_writer_init_watches(self);
Expand Down