Skip to content

Commit

Permalink
refactor: move some sink param determined in new (#17373)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Jun 20, 2024
1 parent 7585609 commit af8f9a5
Showing 1 changed file with 58 additions and 45 deletions.
103 changes: 58 additions & 45 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub struct SinkExecutor<F: LogStoreFactory> {
sink_writer_param: SinkWriterParam,
chunk_size: usize,
input_data_types: Vec<DataType>,
need_advance_delete: bool,
re_construct_with_sink_pk: bool,
compact_chunk: bool,
}

// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
Expand Down Expand Up @@ -107,46 +110,12 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
}

Ok(Self {
actor_context,
info,
input,
sink,
input_columns: columns,
sink_param,
log_store_factory,
sink_writer_param,
chunk_size,
input_data_types,
})
}

fn execute_inner(self) -> BoxedMessageStream {
let sink_id = self.sink_param.sink_id;
let actor_id = self.actor_context.id;
let fragment_id = self.actor_context.fragment_id;

let stream_key = self.info.pk_indices.clone();
let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
sink_id,
actor_id,
fragment_id,
);

let stream_key = info.pk_indices.clone();
let stream_key_sink_pk_mismatch = {
stream_key
.iter()
.any(|i| !self.sink_param.downstream_pk.contains(i))
.any(|i| !sink_param.downstream_pk.contains(i))
};

let input = self.input.execute();

let input = input.inspect_ok(move |msg| {
if let Message::Chunk(c) = msg {
metrics.sink_input_row_count.inc_by(c.capacity() as u64);
}
});

// When stream key is different from the user defined primary key columns for sinks. The operations could be out of order
// stream key: a,b
// sink pk: a
Expand Down Expand Up @@ -178,27 +147,71 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
// after compacting with the stream key, the two event with the same user defined sink pk must have different stream key.
// So the delete event is not to delete the inserted record in our internal streaming SQL semantic.
let need_advance_delete =
stream_key_sink_pk_mismatch && self.sink_param.sink_type != SinkType::AppendOnly;
stream_key_sink_pk_mismatch && sink_param.sink_type != SinkType::AppendOnly;
// NOTE(st1page): reconstruct with sink pk need extra cost to buffer a barrier's data, so currently we bind it with mismatch case.
let re_construct_with_sink_pk = need_advance_delete
&& self.sink_param.sink_type == SinkType::Upsert
&& !self.sink_param.downstream_pk.is_empty();
&& sink_param.sink_type == SinkType::Upsert
&& !sink_param.downstream_pk.is_empty();
// Don't compact chunk for blackhole sink for better benchmark performance.
let compact_chunk = !self.sink.is_blackhole();
tracing::info!("Sink info: sink_id: {} actor_id: {}, need_advance_delete: {}, re_construct_with_sink_pk: {}",
sink_id, actor_id, need_advance_delete, re_construct_with_sink_pk);
let compact_chunk = !sink.is_blackhole();

tracing::info!(
sink_id = sink_param.sink_id.sink_id,
actor_id = actor_context.id,
need_advance_delete,
re_construct_with_sink_pk,
compact_chunk,
"Sink executor info"
);

Ok(Self {
actor_context,
info,
input,
sink,
input_columns: columns,
sink_param,
log_store_factory,
sink_writer_param,
chunk_size,
input_data_types,
need_advance_delete,
re_construct_with_sink_pk,
compact_chunk,
})
}

fn execute_inner(self) -> BoxedMessageStream {
let sink_id = self.sink_param.sink_id;
let actor_id = self.actor_context.id;
let fragment_id = self.actor_context.fragment_id;

let stream_key = self.info.pk_indices.clone();
let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
sink_id,
actor_id,
fragment_id,
);

let input = self.input.execute();

let input = input.inspect_ok(move |msg| {
if let Message::Chunk(c) = msg {
metrics.sink_input_row_count.inc_by(c.capacity() as u64);
}
});

let processed_input = Self::process_msg(
input,
self.sink_param.sink_type,
stream_key,
need_advance_delete,
re_construct_with_sink_pk,
self.need_advance_delete,
self.re_construct_with_sink_pk,
self.chunk_size,
self.input_data_types,
self.sink_param.downstream_pk.clone(),
metrics.sink_chunk_buffer_size,
compact_chunk,
self.compact_chunk,
);

if self.sink.is_sink_into_table() {
Expand Down

0 comments on commit af8f9a5

Please sign in to comment.