Skip to content

Commit

Permalink
refactor(naming): use the better naming for pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
zyy17 committed May 16, 2024
1 parent 9f4a6c6 commit cf2679a
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 96 deletions.
10 changes: 5 additions & 5 deletions src/meta-srv/src/handler/publish_heartbeat_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use async_trait::async_trait;
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::pubsub::{Message, PublishRef};
use crate::pubsub::{Message, PublisherRef};

pub struct PublishHeartbeatHandler {
publish: PublishRef,
publisher: PublisherRef,
}

impl PublishHeartbeatHandler {
pub fn new(publish: PublishRef) -> PublishHeartbeatHandler {
PublishHeartbeatHandler { publish }
pub fn new(publisher: PublisherRef) -> PublishHeartbeatHandler {
PublishHeartbeatHandler { publisher }
}
}

Expand All @@ -43,7 +43,7 @@ impl HeartbeatHandler for PublishHeartbeatHandler {
_: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let msg = Message::Heartbeat(Box::new(req.clone()));
self.publish.send_msg(msg).await;
self.publisher.publish(msg).await;

Ok(HandleControl::Continue)
}
Expand Down
16 changes: 8 additions & 8 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::handler::HeartbeatHandlerGroup;
use crate::lease::lookup_alive_datanode_peer;
use crate::lock::DistLockRef;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
use crate::selector::{Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
Expand Down Expand Up @@ -256,7 +256,7 @@ pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
pub struct MetaStateHandler {
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
subscribe_manager: Option<SubscribeManagerRef>,
subscribe_manager: Option<SubscriptionManagerRef>,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
state: StateRef,
Expand Down Expand Up @@ -295,7 +295,7 @@ impl MetaStateHandler {

if let Some(sub_manager) = self.subscribe_manager.clone() {
info!("Leader changed, un_subscribe all");
if let Err(e) = sub_manager.un_subscribe_all() {
if let Err(e) = sub_manager.unsubscribe_all() {
error!("Failed to un_subscribe all, error: {}", e);
}
}
Expand Down Expand Up @@ -351,7 +351,7 @@ impl Metasrv {
let procedure_manager = self.procedure_manager.clone();
let in_memory = self.in_memory.clone();
let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
let subscribe_manager = self.subscribe_manager();
let subscribe_manager = self.subscription_manager();
let mut rx = election.subscribe_leader_change();
let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
greptimedb_telemetry_task
Expand Down Expand Up @@ -540,12 +540,12 @@ impl Metasrv {
&self.region_migration_manager
}

pub fn publish(&self) -> Option<PublishRef> {
self.plugins.get::<PublishRef>()
pub fn publish(&self) -> Option<PublisherRef> {
self.plugins.get::<PublisherRef>()
}

pub fn subscribe_manager(&self) -> Option<SubscribeManagerRef> {
self.plugins.get::<SubscribeManagerRef>()
pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
self.plugins.get::<SubscriptionManagerRef>()
}

pub fn plugins(&self) -> &Plugins {
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use crate::metasrv::{
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::pubsub::PublishRef;
use crate::pubsub::PublisherRef;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
Expand Down Expand Up @@ -320,7 +320,7 @@ impl MetasrvBuilder {

let publish_heartbeat_handler = plugins
.clone()
.and_then(|plugins| plugins.get::<PublishRef>())
.and_then(|plugins| plugins.get::<PublisherRef>())
.map(|publish| PublishHeartbeatHandler::new(publish.clone()));

let region_lease_handler = RegionLeaseHandler::new(
Expand Down
6 changes: 3 additions & 3 deletions src/meta-srv/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ mod subscriber;
#[cfg(test)]
mod tests;

pub use publish::{DefaultPublish, Publish, PublishRef};
pub use publish::{DefaultPublisher, Publisher, PublisherRef};
pub use subscribe_manager::{
AddSubRequest, DefaultSubscribeManager, SubscribeManager, SubscribeManagerRef, SubscribeQuery,
UnSubRequest,
DefaultSubscribeManager, SubscribeRequest, SubscriptionManager, SubscriptionManagerRef,
SubscriptionQuery, UnsubscribeRequest,
};
pub use subscriber::{Subscriber, SubscriberRef, Transport};

Expand Down
44 changes: 22 additions & 22 deletions src/meta-srv/src/pubsub/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,53 @@ use std::sync::Arc;

use common_telemetry::error;

use crate::pubsub::{Message, SubscribeManager, Transport, UnSubRequest};
use crate::pubsub::{Message, SubscriptionManager, Transport, UnsubscribeRequest};

/// This trait provides a `send_msg` method that can be used by other modules
/// This trait provides a `publish` method that can be used by other modules
/// of meta to publish [Message].
#[async_trait::async_trait]
pub trait Publish: Send + Sync {
async fn send_msg(&self, message: Message);
pub trait Publisher: Send + Sync {
async fn publish(&self, message: Message);
}

pub type PublishRef = Arc<dyn Publish>;
pub type PublisherRef = Arc<dyn Publisher>;

/// The default implementation of [Publish]
pub struct DefaultPublish<M, T> {
subscribe_manager: Arc<M>,
/// The default implementation of [Publisher]
pub struct DefaultPublisher<M, T> {
subscription_manager: Arc<M>,
_transport: PhantomData<T>,
}

impl<M, T> DefaultPublish<M, T> {
pub fn new(subscribe_manager: Arc<M>) -> Self {
impl<M, T> DefaultPublisher<M, T> {
pub fn new(subscription_manager: Arc<M>) -> Self {
Self {
subscribe_manager,
subscription_manager,
_transport: PhantomData,
}
}
}

#[async_trait::async_trait]
impl<M, T> Publish for DefaultPublish<M, T>
impl<M, T> Publisher for DefaultPublisher<M, T>
where
M: SubscribeManager<T>,
M: SubscriptionManager<T>,
T: Transport + Debug,
{
async fn send_msg(&self, message: Message) {
let sub_list = self
.subscribe_manager
async fn publish(&self, message: Message) {
let subscribers = self
.subscription_manager
.subscribers_by_topic(&message.topic());

for sub in sub_list {
if sub.transport_msg(message.clone()).await.is_err() {
for subscriber in subscribers {
if subscriber.transport_msg(message.clone()).await.is_err() {
// If an error occurs, we consider the subscriber offline,
// so un_subscribe here.
let req = UnSubRequest {
subscriber_id: sub.id(),
let req = UnsubscribeRequest {
subscriber_id: subscriber.id(),
};

if let Err(e) = self.subscribe_manager.un_subscribe(req.clone()) {
error!(e; "failed to un_subscribe, req: {:?}", req);
if let Err(e) = self.subscription_manager.unsubscribe(req.clone()) {
error!(e; "failed to unsubscribe, req: {:?}", req);
}
}
}
Expand Down
58 changes: 28 additions & 30 deletions src/meta-srv/src/pubsub/subscribe_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,94 +21,92 @@ use tokio::sync::mpsc::Sender;
use crate::error::Result;
use crate::pubsub::{Message, Subscriber, SubscriberRef, Topic, Transport};

pub trait SubscribeQuery<T>: Send + Sync {
pub trait SubscriptionQuery<T>: Send + Sync {
fn subscribers_by_topic(&self, topic: &Topic) -> Vec<SubscriberRef<T>>;
}

pub trait SubscribeManager<T>: SubscribeQuery<T> {
fn subscribe(&self, req: AddSubRequest<T>) -> Result<()>;
pub trait SubscriptionManager<T>: SubscriptionQuery<T> {
fn subscribe(&self, req: SubscribeRequest<T>) -> Result<()>;

fn un_subscribe(&self, req: UnSubRequest) -> Result<()>;
fn unsubscribe(&self, req: UnsubscribeRequest) -> Result<()>;

fn un_subscribe_all(&self) -> Result<()>;
fn unsubscribe_all(&self) -> Result<()>;
}

pub type SubscribeManagerRef = Arc<dyn SubscribeManager<Sender<Message>>>;
pub type SubscriptionManagerRef = Arc<dyn SubscriptionManager<Sender<Message>>>;

pub struct AddSubRequest<T> {
pub topic_list: Vec<Topic>,
pub struct SubscribeRequest<T> {
pub topics: Vec<Topic>,
pub subscriber: Subscriber<T>,
}

#[derive(Debug, Clone)]
pub struct UnSubRequest {
pub struct UnsubscribeRequest {
pub subscriber_id: u32,
}

pub struct DefaultSubscribeManager<T> {
topic2sub: DashMap<Topic, Vec<Arc<Subscriber<T>>>>,
topic_to_subscribers: DashMap<Topic, Vec<Arc<Subscriber<T>>>>,
}

impl<T> Default for DefaultSubscribeManager<T> {
fn default() -> Self {
Self {
topic2sub: DashMap::new(),
topic_to_subscribers: DashMap::new(),
}
}
}

impl<T> SubscribeQuery<T> for DefaultSubscribeManager<T>
impl<T> SubscriptionQuery<T> for DefaultSubscribeManager<T>
where
T: Transport,
{
fn subscribers_by_topic(&self, topic: &Topic) -> Vec<SubscriberRef<T>> {
self.topic2sub
self.topic_to_subscribers
.get(topic)
.map(|list_ref| list_ref.clone())
.unwrap_or_default()
}
}

impl<T> SubscribeManager<T> for DefaultSubscribeManager<T>
impl<T> SubscriptionManager<T> for DefaultSubscribeManager<T>
where
T: Transport,
{
fn subscribe(&self, req: AddSubRequest<T>) -> Result<()> {
let AddSubRequest {
topic_list,
subscriber,
} = req;
fn subscribe(&self, req: SubscribeRequest<T>) -> Result<()> {
let SubscribeRequest { topics, subscriber } = req;

info!(
"Add a subscription, subscriber_id: {}, subscriber_name: {}, topic list: {:?}",
"Add a subscriber, subscriber_id: {}, subscriber_name: {}, topics: {:?}",
subscriber.id(),
subscriber.name(),
topic_list
topics
);

let subscriber = Arc::new(subscriber);

for topic in topic_list {
let mut entry = self.topic2sub.entry(topic).or_default();
for topic in topics {
let mut entry = self.topic_to_subscribers.entry(topic).or_default();
entry.push(subscriber.clone());
}

Ok(())
}

fn un_subscribe(&self, req: UnSubRequest) -> Result<()> {
let UnSubRequest { subscriber_id } = req;
fn unsubscribe(&self, req: UnsubscribeRequest) -> Result<()> {
let UnsubscribeRequest { subscriber_id } = req;

info!("Add a un_subscription, subscriber_id: {}", subscriber_id);
info!("Remove a subscriber, subscriber_id: {}", subscriber_id);

for mut sub_list in self.topic2sub.iter_mut() {
sub_list.retain(|subscriber| subscriber.id() != subscriber_id)
for mut subscribers in self.topic_to_subscribers.iter_mut() {
subscribers.retain(|subscriber| subscriber.id() != subscriber_id)
}

Ok(())
}

fn un_subscribe_all(&self) -> Result<()> {
self.topic2sub.clear();
fn unsubscribe_all(&self) -> Result<()> {
self.topic_to_subscribers.clear();

Ok(())
}
Expand Down
Loading

0 comments on commit cf2679a

Please sign in to comment.