From 4e9c34bdf42f1213402e3638656273659f87e37e Mon Sep 17 00:00:00 2001 From: youyuanwu <48816116+youyuanwu@users.noreply.github.com> Date: Tue, 19 Nov 2024 14:53:46 -0800 Subject: [PATCH] Impl and expose IFabricStatefulServicePartition3 apis (#95) Expose IFabricStatefulServicePartition3 apis in Rust. Some complicated apis are left out in this PR and will be added in the future. #92 --- crates/libs/core/src/runtime/stateful.rs | 31 +----- .../libs/core/src/runtime/stateful_bridge.rs | 15 ++- .../libs/core/src/runtime/stateful_proxy.rs | 104 +++++++++++++++++- crates/libs/core/src/types/common/metrics.rs | 39 ++++++- crates/libs/core/src/types/common/mod.rs | 35 +++++- .../libs/core/src/types/common/partition.rs | 65 +++++++++++ crates/samples/echomain-stateful2/src/echo.rs | 2 +- .../echomain-stateful2/src/statefulstore.rs | 6 +- crates/samples/kvstore/src/kvstore.rs | 7 +- 9 files changed, 252 insertions(+), 52 deletions(-) diff --git a/crates/libs/core/src/runtime/stateful.rs b/crates/libs/core/src/runtime/stateful.rs index d2b726c0..f3bbf489 100644 --- a/crates/libs/core/src/runtime/stateful.rs +++ b/crates/libs/core/src/runtime/stateful.rs @@ -4,13 +4,14 @@ // ------------------------------------------------------------ // stateful contains rs definition of stateful traits that user needs to implement -use mssf_com::FabricRuntime::IFabricStatefulServicePartition; use crate::sync::CancellationToken; -use crate::types::{LoadMetric, LoadMetricListRef, ReplicaRole}; +use crate::types::ReplicaRole; use crate::types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuorumMode}; +use super::stateful_proxy::StatefulServicePartition; + /// Represents a stateful service factory that is responsible for creating replicas /// of a specific type of stateful service. Stateful service factories are registered with /// the FabricRuntime by service hosts via register_stateful_service_factory(). @@ -64,32 +65,6 @@ pub trait LocalStatefulServiceReplica: Send + Sync + 'static { fn abort(&self); } -#[derive(Debug, Clone)] -pub struct StatefulServicePartition { - com_impl: IFabricStatefulServicePartition, -} - -impl StatefulServicePartition { - pub fn get_com(&self) -> &IFabricStatefulServicePartition { - &self.com_impl - } - - /// Reports load for the current replica in the partition. - pub fn report_load(&self, metrics: &[LoadMetric]) -> crate::Result<()> { - let metrics_ref = LoadMetricListRef::from_slice(metrics); - let raw = metrics_ref.as_raw_slice(); - unsafe { self.com_impl.ReportLoad(raw) } - } -} - -impl From<&IFabricStatefulServicePartition> for StatefulServicePartition { - fn from(e: &IFabricStatefulServicePartition) -> Self { - StatefulServicePartition { - com_impl: e.clone(), - } - } -} - /// TODO: replicator has no public documentation #[trait_variant::make(Replicator: Send)] pub trait LocalReplicator: Send + Sync + 'static { diff --git a/crates/libs/core/src/runtime/stateful_bridge.rs b/crates/libs/core/src/runtime/stateful_bridge.rs index 4abf34bb..d6ab3832 100644 --- a/crates/libs/core/src/runtime/stateful_bridge.rs +++ b/crates/libs/core/src/runtime/stateful_bridge.rs @@ -10,7 +10,7 @@ use std::sync::Arc; -use crate::{Interface, HSTRING}; +use crate::{runtime::stateful_proxy::StatefulServicePartition, Interface, HSTRING}; use tracing::info; use windows_core::implement; @@ -20,8 +20,8 @@ use mssf_com::{ IFabricPrimaryReplicator, IFabricPrimaryReplicator_Impl, IFabricReplicator, IFabricReplicatorCatchupSpecificQuorum, IFabricReplicatorCatchupSpecificQuorum_Impl, IFabricReplicator_Impl, IFabricStatefulServiceFactory, IFabricStatefulServiceFactory_Impl, - IFabricStatefulServicePartition, IFabricStatefulServiceReplica, - IFabricStatefulServiceReplica_Impl, + IFabricStatefulServicePartition, IFabricStatefulServicePartition3, + IFabricStatefulServiceReplica, IFabricStatefulServiceReplica_Impl, }, FabricTypes::{ FABRIC_EPOCH, FABRIC_REPLICA_INFORMATION, FABRIC_REPLICA_OPEN_MODE, FABRIC_REPLICA_ROLE, @@ -30,7 +30,6 @@ use mssf_com::{ }; use crate::{ - runtime::stateful::StatefulServicePartition, strings::HSTRINGWrap, sync::BridgeContext3, types::{Epoch, OpenMode, ReplicaInformation, ReplicaRole, ReplicaSetConfig}, @@ -542,14 +541,18 @@ where let inner = self.inner.clone(); let rt_cp = self.rt.clone(); let openmode2: OpenMode = openmode.into(); - let partition2: StatefulServicePartition = partition.unwrap().into(); + let com_partition = partition + .unwrap() + .cast::() + .expect("cannot query interface"); + let partition = StatefulServicePartition::from(&com_partition); info!( "IFabricStatefulReplicaBridge::BeginOpen: mode {:?}", openmode2 ); let (ctx, token) = BridgeContext3::make(callback); ctx.spawn(&self.rt, async move { - inner.open(openmode2, &partition2, token).await.map(|s| { + inner.open(openmode2, &partition, token).await.map(|s| { let bridge: IFabricPrimaryReplicator = IFabricPrimaryReplicatorBridge::create(s, rt_cp).into(); bridge.clone().cast::().unwrap() diff --git a/crates/libs/core/src/runtime/stateful_proxy.rs b/crates/libs/core/src/runtime/stateful_proxy.rs index c6a27360..686f2daa 100644 --- a/crates/libs/core/src/runtime/stateful_proxy.rs +++ b/crates/libs/core/src/runtime/stateful_proxy.rs @@ -10,20 +10,22 @@ use std::ffi::c_void; use mssf_com::FabricRuntime::{ IFabricPrimaryReplicator, IFabricReplicator, IFabricReplicatorCatchupSpecificQuorum, - IFabricStatefulServiceReplica, + IFabricStatefulServicePartition3, IFabricStatefulServiceReplica, }; use tracing::info; use windows_core::{Interface, HSTRING}; use crate::{ + error::FabricErrorCode, strings::HSTRINGWrap, sync::{fabric_begin_end_proxy2, CancellationToken}, - types::ReplicaRole, + types::{ + FaultType, LoadMetric, LoadMetricListRef, MoveCost, ReplicaRole, + ServicePartitionAccessStatus, ServicePartitionInformation, + }, }; -use super::stateful::{ - PrimaryReplicator, Replicator, StatefulServicePartition, StatefulServiceReplica, -}; +use super::stateful::{PrimaryReplicator, Replicator, StatefulServiceReplica}; use crate::types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuorumMode}; pub struct StatefulServiceReplicaProxy { @@ -306,3 +308,95 @@ impl PrimaryReplicator for PrimaryReplicatorProxy { unsafe { self.com_impl.RemoveReplica(replicaid) } } } + +/// Proxy COM object IFabricStatefulServicePartition3 +#[derive(Debug, Clone)] +pub struct StatefulServicePartition { + com_impl: IFabricStatefulServicePartition3, +} + +impl StatefulServicePartition { + pub fn get_com(&self) -> &IFabricStatefulServicePartition3 { + &self.com_impl + } + + /// Provides access to the ServicePartitionInformation of the service, which contains the partition type and ID. + pub fn get_partition_information(&self) -> crate::Result { + unsafe { self.com_impl.GetPartitionInfo()?.as_ref() } + .ok_or(FabricErrorCode::E_POINTER.into()) + .map(ServicePartitionInformation::from) + } + + /// Used to check the readiness of the replica in regard to read operations. + /// The ReadStatus should be checked before the replica is servicing a customer request that is a read operation. + pub fn get_read_status(&self) -> crate::Result { + unsafe { self.com_impl.GetReadStatus() }.map(ServicePartitionAccessStatus::from) + } + + /// Used to check the readiness of the partition in regard to write operations. + /// The WriteStatus should be checked before the replica services a customer request that is a write operation. + pub fn get_write_status(&self) -> crate::Result { + unsafe { self.com_impl.GetWriteStatus() }.map(ServicePartitionAccessStatus::from) + } + + /// TODO: not implemented + /// Creates a FabricReplicator with the specified settings and returns it to the replica. + pub fn create_replicator(&self) -> crate::Result<()> { + Err(FabricErrorCode::E_NOTIMPL.into()) + } + + /// Reports load for the current replica in the partition. + /// Remarks: + /// The reported metrics should correspond to those that are provided in the ServiceLoadMetricDescription + /// as a part of the ServiceDescription that is used to create the service. Load metrics that are not + /// present in the description are ignored. Reporting custom metrics allows Service Fabric to balance + /// services that are based on additional custom information. + pub fn report_load(&self, metrics: &[LoadMetric]) -> crate::Result<()> { + let metrics_ref = LoadMetricListRef::from_slice(metrics); + let raw = metrics_ref.as_raw_slice(); + unsafe { self.com_impl.ReportLoad(raw) } + } + + /// Enables the replica to report a fault to the runtime and indicates that it has encountered + /// an error from which it cannot recover and must either be restarted or removed. + pub fn report_fault(&self, fault_type: FaultType) -> crate::Result<()> { + unsafe { self.com_impl.ReportFault(fault_type.into()) } + } + + /// Reports the move cost for a replica. + /// Remarks: + /// Services can report move cost of a replica using this method. + /// While the Service Fabric Resource Balances searches for the best balance in the cluster, + /// it examines both load information and move cost of each replica. + /// Resource balances will prefer to move replicas with lower cost in order to achieve balance. + pub fn report_move_cost(&self, move_cost: MoveCost) -> crate::Result<()> { + unsafe { self.com_impl.ReportMoveCost(move_cost.into()) } + } + + /// Remarks: + /// The health information describes the report details, like the source ID, the property, + /// the health state and other relevant details. The partition uses an internal health client + /// to send the reports to the health store. The client optimizes messages to Health Manager + /// by batching reports per a configured duration (Default: 30 seconds). If the report has high priority, + /// you can specify send options to send it immediately. + + /// TODO: not yet implemented + /// Reports current partition health. + pub fn report_partition_health(&self) -> crate::Result<()> { + Err(FabricErrorCode::E_NOTIMPL.into()) + } + + /// TODO: not yet implemented + /// Reports health on the current stateful service replica of the partition. + pub fn report_replica_health(&self) -> crate::Result<()> { + Err(FabricErrorCode::E_NOTIMPL.into()) + } +} + +impl From<&IFabricStatefulServicePartition3> for StatefulServicePartition { + fn from(e: &IFabricStatefulServicePartition3) -> Self { + StatefulServicePartition { + com_impl: e.clone(), + } + } +} diff --git a/crates/libs/core/src/types/common/metrics.rs b/crates/libs/core/src/types/common/metrics.rs index d2208c01..7b7d9704 100644 --- a/crates/libs/core/src/types/common/metrics.rs +++ b/crates/libs/core/src/types/common/metrics.rs @@ -6,7 +6,10 @@ //! Module for handling fabric metrics use crate::{HSTRING, PCWSTR}; -use mssf_com::FabricTypes::FABRIC_LOAD_METRIC; +use mssf_com::FabricTypes::{ + FABRIC_LOAD_METRIC, FABRIC_MOVE_COST, FABRIC_MOVE_COST_HIGH, FABRIC_MOVE_COST_LOW, + FABRIC_MOVE_COST_MEDIUM, FABRIC_MOVE_COST_ZERO, +}; use std::marker::PhantomData; /// FABRIC_LOAD_METRIC @@ -53,3 +56,37 @@ impl<'a> LoadMetricListRef<'a> { self.metrics.as_slice() } } + +#[derive(Debug, Clone, PartialEq)] +pub enum MoveCost { + Zero, + Low, + Medium, + High, + // VeryHigh, +} + +impl From for MoveCost { + fn from(value: FABRIC_MOVE_COST) -> Self { + match value { + FABRIC_MOVE_COST_ZERO => Self::Zero, + FABRIC_MOVE_COST_LOW => Self::Low, + FABRIC_MOVE_COST_MEDIUM => Self::Medium, + FABRIC_MOVE_COST_HIGH => Self::High, + // Not supported in rust yet + // FABRIC_MOVE_COST_VERYHIGH =>Self::VeryHigh, + _ => Self::Zero, + } + } +} + +impl From for FABRIC_MOVE_COST { + fn from(value: MoveCost) -> Self { + match value { + MoveCost::Zero => FABRIC_MOVE_COST_ZERO, + MoveCost::Low => FABRIC_MOVE_COST_LOW, + MoveCost::Medium => FABRIC_MOVE_COST_MEDIUM, + MoveCost::High => FABRIC_MOVE_COST_HIGH, + } + } +} diff --git a/crates/libs/core/src/types/common/mod.rs b/crates/libs/core/src/types/common/mod.rs index 6d3dda30..6711fccd 100644 --- a/crates/libs/core/src/types/common/mod.rs +++ b/crates/libs/core/src/types/common/mod.rs @@ -12,8 +12,10 @@ mod metrics; pub use metrics::*; use mssf_com::FabricTypes::{ - FABRIC_HEALTH_STATE, FABRIC_HEALTH_STATE_ERROR, FABRIC_HEALTH_STATE_INVALID, - FABRIC_HEALTH_STATE_OK, FABRIC_HEALTH_STATE_UNKNOWN, FABRIC_HEALTH_STATE_WARNING, + FABRIC_FAULT_TYPE, FABRIC_FAULT_TYPE_INVALID, FABRIC_FAULT_TYPE_PERMANENT, + FABRIC_FAULT_TYPE_TRANSIENT, FABRIC_HEALTH_STATE, FABRIC_HEALTH_STATE_ERROR, + FABRIC_HEALTH_STATE_INVALID, FABRIC_HEALTH_STATE_OK, FABRIC_HEALTH_STATE_UNKNOWN, + FABRIC_HEALTH_STATE_WARNING, }; // FABRIC_HEALTH_STATE @@ -38,3 +40,32 @@ impl From<&FABRIC_HEALTH_STATE> for HealthState { } } } + +// FABRIC_FAULT_TYPE +#[derive(Debug, Clone, PartialEq)] +pub enum FaultType { + Invalid, + Permanent, + Transient, +} + +impl From for FaultType { + fn from(value: FABRIC_FAULT_TYPE) -> Self { + match value { + FABRIC_FAULT_TYPE_INVALID => Self::Invalid, + FABRIC_FAULT_TYPE_PERMANENT => Self::Permanent, + FABRIC_FAULT_TYPE_TRANSIENT => Self::Transient, + _ => Self::Invalid, + } + } +} + +impl From for FABRIC_FAULT_TYPE { + fn from(value: FaultType) -> Self { + match value { + FaultType::Invalid => FABRIC_FAULT_TYPE_INVALID, + FaultType::Permanent => FABRIC_FAULT_TYPE_PERMANENT, + FaultType::Transient => FABRIC_FAULT_TYPE_TRANSIENT, + } + } +} diff --git a/crates/libs/core/src/types/common/partition.rs b/crates/libs/core/src/types/common/partition.rs index 5d9dd554..db7fd2f1 100644 --- a/crates/libs/core/src/types/common/partition.rs +++ b/crates/libs/core/src/types/common/partition.rs @@ -6,6 +6,11 @@ use crate::{GUID, HSTRING}; use mssf_com::FabricTypes::{ FABRIC_INT64_RANGE_PARTITION_INFORMATION, FABRIC_NAMED_PARTITION_INFORMATION, + FABRIC_SERVICE_PARTITION_ACCESS_STATUS, FABRIC_SERVICE_PARTITION_ACCESS_STATUS_GRANTED, + FABRIC_SERVICE_PARTITION_ACCESS_STATUS_INVALID, + FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NOT_PRIMARY, + FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NO_WRITE_QUORUM, + FABRIC_SERVICE_PARTITION_ACCESS_STATUS_RECONFIGURATION_PENDING, FABRIC_SERVICE_PARTITION_INFORMATION, FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE, FABRIC_SERVICE_PARTITION_KIND_INVALID, FABRIC_SERVICE_PARTITION_KIND_NAMED, FABRIC_SERVICE_PARTITION_KIND_SINGLETON, FABRIC_SINGLETON_PARTITION_INFORMATION, @@ -97,3 +102,63 @@ impl From<&FABRIC_SERVICE_PARTITION_INFORMATION> for ServicePartitionInformation } } } + +/// FABRIC_SERVICE_PARTITION_ACCESS_STATUS +/// Remarks: +/// PartitionAccessStatus is used to check that a read or write operation is allowed. +/// When service replicas handle a client request, they should verify that the system is +/// in a state that allows processing. By checking the ReadStatus or WriteStatus as appropriate, +/// the replica can be notified of conditions that prevent correct operation. +/// Note that write operations might still see an exception from the replicator for one of these +/// conditions, because the condition might change between the WriteStatus check and the call +/// to StateReplicator.Replicate() (Not yet supported in mssf). +#[derive(Debug, Clone, PartialEq)] +pub enum ServicePartitionAccessStatus { + Invalid, + /// Indicates that the read or write operation access is granted and the operation is allowed. + Granted, + /// Indicates that the client should try again later, because a reconfiguration is in progress. + /// After the reconfiguration is completed, a new status is returned that gives further instructions. + /// The client should retry the operation at this replica + ReconfigurationPending, + /// Indicates that this client request was received by a replica that is not a Primary replica. + /// The read or write operation cannot be performed at this replica. + /// The client should attempt to use the naming service to identify the correct primary replica. + NotPrimary, + /// Indicates that no write quorum is available and, therefore, no write operation can be accepted. + /// The client should retry the operation at this replica. + NoWriteQuorum, +} + +impl From for ServicePartitionAccessStatus { + fn from(value: FABRIC_SERVICE_PARTITION_ACCESS_STATUS) -> Self { + match value { + FABRIC_SERVICE_PARTITION_ACCESS_STATUS_INVALID => Self::Invalid, + FABRIC_SERVICE_PARTITION_ACCESS_STATUS_GRANTED => Self::Granted, + FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NOT_PRIMARY => Self::NotPrimary, + FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NO_WRITE_QUORUM => Self::NoWriteQuorum, + FABRIC_SERVICE_PARTITION_ACCESS_STATUS_RECONFIGURATION_PENDING => { + Self::ReconfigurationPending + } + _ => Self::Invalid, + } + } +} + +impl From for FABRIC_SERVICE_PARTITION_ACCESS_STATUS { + fn from(value: ServicePartitionAccessStatus) -> Self { + match value { + ServicePartitionAccessStatus::Invalid => FABRIC_SERVICE_PARTITION_ACCESS_STATUS_INVALID, + ServicePartitionAccessStatus::Granted => FABRIC_SERVICE_PARTITION_ACCESS_STATUS_GRANTED, + ServicePartitionAccessStatus::ReconfigurationPending => { + FABRIC_SERVICE_PARTITION_ACCESS_STATUS_RECONFIGURATION_PENDING + } + ServicePartitionAccessStatus::NotPrimary => { + FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NOT_PRIMARY + } + ServicePartitionAccessStatus::NoWriteQuorum => { + FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NO_WRITE_QUORUM + } + } + } +} diff --git a/crates/samples/echomain-stateful2/src/echo.rs b/crates/samples/echomain-stateful2/src/echo.rs index 74a5fe85..47e9c2e1 100644 --- a/crates/samples/echomain-stateful2/src/echo.rs +++ b/crates/samples/echomain-stateful2/src/echo.rs @@ -7,7 +7,7 @@ use std::io::Error; -use mssf_core::runtime::stateful::StatefulServicePartition; +use mssf_core::runtime::stateful_proxy::StatefulServicePartition; use mssf_core::types::LoadMetric; use mssf_core::HSTRING; use tokio::io::{AsyncReadExt, AsyncWriteExt}; diff --git a/crates/samples/echomain-stateful2/src/statefulstore.rs b/crates/samples/echomain-stateful2/src/statefulstore.rs index 6eb98950..b89cacc1 100644 --- a/crates/samples/echomain-stateful2/src/statefulstore.rs +++ b/crates/samples/echomain-stateful2/src/statefulstore.rs @@ -6,10 +6,8 @@ use mssf_core::{ runtime::{ executor::{DefaultExecutor, Executor}, - stateful::{ - PrimaryReplicator, Replicator, StatefulServiceFactory, StatefulServicePartition, - StatefulServiceReplica, - }, + stateful::{PrimaryReplicator, Replicator, StatefulServiceFactory, StatefulServiceReplica}, + stateful_proxy::StatefulServicePartition, }, types::{ Epoch, OpenMode, ReplicaInformation, ReplicaRole, ReplicaSetConfig, ReplicaSetQuorumMode, diff --git a/crates/samples/kvstore/src/kvstore.rs b/crates/samples/kvstore/src/kvstore.rs index 64eb65e3..bb045307 100644 --- a/crates/samples/kvstore/src/kvstore.rs +++ b/crates/samples/kvstore/src/kvstore.rs @@ -9,11 +9,8 @@ use mssf_com::{ use mssf_core::{ runtime::{ executor::{DefaultExecutor, Executor}, - stateful::{ - PrimaryReplicator, StatefulServiceFactory, StatefulServicePartition, - StatefulServiceReplica, - }, - stateful_proxy::StatefulServiceReplicaProxy, + stateful::{PrimaryReplicator, StatefulServiceFactory, StatefulServiceReplica}, + stateful_proxy::{StatefulServicePartition, StatefulServiceReplicaProxy}, store::{create_com_key_value_store_replica, DummyStoreEventHandler}, store_proxy::KVStoreProxy, },