Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl pubsub in metasrv #2045

Merged
merged 5 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
use tonic::Code;

use crate::pubsub::Message;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
Expand Down Expand Up @@ -461,6 +463,12 @@ pub enum Error {

#[snafu(display("Invalid heartbeat request: {}", err_msg))]
InvalidHeartbeatRequest { err_msg: String, location: Location },

#[snafu(display("Failed to publish message: {:?}", source))]
PublishMessage {
source: SendError<Message>,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -506,6 +514,7 @@ impl ErrorExt for Error {
| Error::UpdateTableMetadata { .. }
| Error::NoEnoughAvailableDatanode { .. }
| Error::ConvertGrpcExpr { .. }
| Error::PublishMessage { .. }
| Error::Join { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub mod node_stat;
mod on_leader_start_handler;
mod persist_stats_handler;
pub(crate) mod region_lease_handler;
pub mod report_handler;
mod response_header_handler;

#[async_trait::async_trait]
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler/persist_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ mod tests {
table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
))),
publish: None,
};

let handler = PersistStatsHandler::default();
Expand Down
45 changes: 45 additions & 0 deletions src/meta-srv/src/handler/report_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;

use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::pubsub::Message;

pub struct ReportHandler;
fengys1996 marked this conversation as resolved.
Show resolved Hide resolved

#[async_trait]
impl HeartbeatHandler for ReportHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}

async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
_: &mut HeartbeatAccumulator,
) -> Result<()> {
let req = Box::new(req.clone());

if let Some(publish) = ctx.publish.as_ref() {
fengys1996 marked this conversation as resolved.
Show resolved Hide resolved
publish.send_msg(Message::Heartbeat(req)).await;
}

Ok(())
}
}
1 change: 1 addition & 0 deletions src/meta-srv/src/handler/response_header_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ mod tests {
table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
))),
publish: None,
};

let req = HeartbeatRequest {
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod metrics;
#[cfg(feature = "mock")]
pub mod mocks;
pub mod procedure;
pub mod pubsub;
pub mod selector;
mod sequence;
pub mod service;
Expand Down
22 changes: 22 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::error::{RecoverProcedureSnafu, Result};
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::metadata_service::MetadataServiceRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::{Selector, SelectorType};
use crate::sequence::SequenceRef;
use crate::service::mailbox::MailboxRef;
Expand Down Expand Up @@ -117,6 +118,7 @@ pub struct Context {
pub meta_peer_client: MetaPeerClientRef,
pub mailbox: MailboxRef,
pub election: Option<ElectionRef>,
pub publish: Option<PublishRef>,
pub skip_all: Arc<AtomicBool>,
pub is_infancy: bool,
pub table_metadata_manager: TableMetadataManagerRef,
Expand Down Expand Up @@ -177,6 +179,8 @@ pub struct MetaSrv {
ddl_manager: DdlManagerRef,
table_metadata_manager: TableMetadataManagerRef,
greptimedb_telemerty_task: Arc<GreptimeDBTelemetryTask>,
publish: Option<PublishRef>,
subscribe_manager: Option<SubscribeManagerRef>,
}

impl MetaSrv {
Expand All @@ -196,6 +200,7 @@ impl MetaSrv {
let procedure_manager = self.procedure_manager.clone();
let in_memory = self.in_memory.clone();
let leader_cached_kv_store = self.leader_cached_kv_store.clone();
let subscribe_manager = self.subscribe_manager.clone();
let mut rx = election.subscribe_leader_change();
let task_handler = self.greptimedb_telemerty_task.clone();
let _handle = common_runtime::spawn_bg(async move {
Expand All @@ -219,6 +224,12 @@ impl MetaSrv {
});
}
LeaderChangeMessage::StepDown(leader) => {
if let Some(sub_manager) = subscribe_manager.clone() {
info!("Leader changed, un_subscribe all");
if let Err(e) = sub_manager.un_subscribe_all() {
error!("Failed to un_subscribe all, error: {}", e);
}
}
error!("Leader :{:?} step down", leader);
let _ = task_handler.stop().await.map_err(|e| {
debug!(
Expand Down Expand Up @@ -329,6 +340,14 @@ impl MetaSrv {
&self.table_metadata_manager
}

pub fn publish(&self) -> Option<&PublishRef> {
self.publish.as_ref()
}

pub fn subscribe_manager(&self) -> Option<&SubscribeManagerRef> {
self.subscribe_manager.as_ref()
}

#[inline]
pub fn new_ctx(&self) -> Context {
let server_addr = self.options().server_addr.clone();
Expand All @@ -339,6 +358,8 @@ impl MetaSrv {
let mailbox = self.mailbox.clone();
let election = self.election.clone();
let skip_all = Arc::new(AtomicBool::new(false));
let publish = self.publish.clone();
fengys1996 marked this conversation as resolved.
Show resolved Hide resolved

Context {
server_addr,
in_memory,
Expand All @@ -350,6 +371,7 @@ impl MetaSrv {
skip_all,
is_infancy: false,
table_metadata_manager: self.table_metadata_manager.clone(),
publish,
}
}
}
15 changes: 15 additions & 0 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ use common_grpc::channel_manager::ChannelConfig;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use tokio::sync::mpsc::Sender;

use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::ddl::{DdlManager, DdlManagerRef};
use crate::error::Result;
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
use crate::handler::report_handler::ReportHandler;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox,
KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, Pushers, RegionFailureHandler,
Expand All @@ -40,6 +42,9 @@ use crate::metasrv::{
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::state_store::MetaStateStore;
use crate::pubsub::{
DefaultPublish, DefaultSubscribeManager, Message, PublishRef, SubscribeManagerRef,
};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::sequence::Sequence;
use crate::service::mailbox::MailboxRef;
Expand Down Expand Up @@ -169,6 +174,7 @@ impl MetaSrvBuilder {
&table_metadata_manager,
);
let _ = ddl_manager.try_start();
let (publish, sub_manager) = build_publish();

let handler_group = match handler_group {
Some(handler_group) => handler_group,
Expand Down Expand Up @@ -215,6 +221,7 @@ impl MetaSrvBuilder {
}
group.add_handler(RegionLeaseHandler::default()).await;
group.add_handler(PersistStatsHandler::default()).await;
group.add_handler(ReportHandler).await;
group
}
};
Expand All @@ -237,6 +244,8 @@ impl MetaSrvBuilder {
ddl_manager,
table_metadata_manager,
greptimedb_telemerty_task: get_greptimedb_telemetry_task(meta_peer_client).await,
publish: Some(publish),
subscribe_manager: Some(sub_manager),
fengys1996 marked this conversation as resolved.
Show resolved Hide resolved
})
}
}
Expand Down Expand Up @@ -315,6 +324,12 @@ fn build_ddl_manager(
))
}

fn build_publish() -> (PublishRef, SubscribeManagerRef) {
let sub_manager = Arc::new(DefaultSubscribeManager::<Sender<Message>>::default());
let publish = Arc::new(DefaultPublish::new(sub_manager.clone()));
(publish, sub_manager)
}

impl Default for MetaSrvBuilder {
fn default() -> Self {
Self::new()
Expand Down
44 changes: 44 additions & 0 deletions src/meta-srv/src/pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::meta::HeartbeatRequest;

mod publish;
mod subscribe_manager;
mod subscriber;
#[cfg(test)]
mod tests;

pub use publish::*;
pub use subscribe_manager::*;
pub use subscriber::*;
fengys1996 marked this conversation as resolved.
Show resolved Hide resolved

/// Subscribed topic type, determined by the ability of meta.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Topic {
Heartbeat,
}

#[derive(Clone, Debug)]
pub enum Message {
Heartbeat(Box<HeartbeatRequest>),
}

impl Message {
pub fn topic(&self) -> Topic {
match self {
Message::Heartbeat(_) => Topic::Heartbeat,
}
}
}
72 changes: 72 additions & 0 deletions src/meta-srv/src/pubsub/publish.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;

use common_telemetry::error;

use crate::pubsub::{Message, SubscribeManager, Transport, UnSubRequest};

/// This trait provides a `send_msg` method that can be used by other modules
/// of meta to publish [Message].
#[async_trait::async_trait]
pub trait Publish: Send + Sync {
async fn send_msg(&self, message: Message);
}

pub type PublishRef = Arc<dyn Publish>;

/// The default implementation of [Publish]
pub struct DefaultPublish<M, T> {
subscribe_manager: Arc<M>,
_transport: PhantomData<T>,
}

impl<M, T> DefaultPublish<M, T> {
pub fn new(subscribe_manager: Arc<M>) -> Self {
Self {
subscribe_manager,
_transport: PhantomData,
}
}
}

#[async_trait::async_trait]
impl<M, T> Publish for DefaultPublish<M, T>
where
M: SubscribeManager<T>,
T: Transport + Debug,
{
async fn send_msg(&self, message: Message) {
let sub_list = self
.subscribe_manager
.subscriber_list_by_topic(&message.topic());

for sub in sub_list {
if sub.transport_msg(message.clone()).await.is_err() {
// If an error occurs, we consider the subscriber offline,
// so un_subscribe here.
let req = UnSubRequest {
subscriber_id: sub.id(),
};
fengys1996 marked this conversation as resolved.
Show resolved Hide resolved

if let Err(e) = self.subscribe_manager.un_subscribe(req.clone()) {
fengys1996 marked this conversation as resolved.
Show resolved Hide resolved
error!("failed to un_subscribe, req: {:?}, error: {:?}", req, e);
fengys1996 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
}
Loading