Skip to content

Commit

Permalink
Impl and expose IFabricStatefulServicePartition3 apis (#95)
Browse files Browse the repository at this point in the history
Expose IFabricStatefulServicePartition3 apis in Rust.
Some complicated apis are left out in this PR and will be added in the
future.
#92
  • Loading branch information
youyuanwu authored Nov 19, 2024
1 parent 2c8933b commit 4e9c34b
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 52 deletions.
31 changes: 3 additions & 28 deletions crates/libs/core/src/runtime/stateful.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
// ------------------------------------------------------------

// stateful contains rs definition of stateful traits that user needs to implement
use mssf_com::FabricRuntime::IFabricStatefulServicePartition;

use crate::sync::CancellationToken;
use crate::types::{LoadMetric, LoadMetricListRef, ReplicaRole};
use crate::types::ReplicaRole;

use crate::types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuorumMode};

use super::stateful_proxy::StatefulServicePartition;

/// Represents a stateful service factory that is responsible for creating replicas
/// of a specific type of stateful service. Stateful service factories are registered with
/// the FabricRuntime by service hosts via register_stateful_service_factory().
Expand Down Expand Up @@ -64,32 +65,6 @@ pub trait LocalStatefulServiceReplica: Send + Sync + 'static {
fn abort(&self);
}

#[derive(Debug, Clone)]
pub struct StatefulServicePartition {
com_impl: IFabricStatefulServicePartition,
}

impl StatefulServicePartition {
pub fn get_com(&self) -> &IFabricStatefulServicePartition {
&self.com_impl
}

/// Reports load for the current replica in the partition.
pub fn report_load(&self, metrics: &[LoadMetric]) -> crate::Result<()> {
let metrics_ref = LoadMetricListRef::from_slice(metrics);
let raw = metrics_ref.as_raw_slice();
unsafe { self.com_impl.ReportLoad(raw) }
}
}

impl From<&IFabricStatefulServicePartition> for StatefulServicePartition {
fn from(e: &IFabricStatefulServicePartition) -> Self {
StatefulServicePartition {
com_impl: e.clone(),
}
}
}

/// TODO: replicator has no public documentation
#[trait_variant::make(Replicator: Send)]
pub trait LocalReplicator: Send + Sync + 'static {
Expand Down
15 changes: 9 additions & 6 deletions crates/libs/core/src/runtime/stateful_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use std::sync::Arc;

use crate::{Interface, HSTRING};
use crate::{runtime::stateful_proxy::StatefulServicePartition, Interface, HSTRING};
use tracing::info;
use windows_core::implement;

Expand All @@ -20,8 +20,8 @@ use mssf_com::{
IFabricPrimaryReplicator, IFabricPrimaryReplicator_Impl, IFabricReplicator,
IFabricReplicatorCatchupSpecificQuorum, IFabricReplicatorCatchupSpecificQuorum_Impl,
IFabricReplicator_Impl, IFabricStatefulServiceFactory, IFabricStatefulServiceFactory_Impl,
IFabricStatefulServicePartition, IFabricStatefulServiceReplica,
IFabricStatefulServiceReplica_Impl,
IFabricStatefulServicePartition, IFabricStatefulServicePartition3,
IFabricStatefulServiceReplica, IFabricStatefulServiceReplica_Impl,
},
FabricTypes::{
FABRIC_EPOCH, FABRIC_REPLICA_INFORMATION, FABRIC_REPLICA_OPEN_MODE, FABRIC_REPLICA_ROLE,
Expand All @@ -30,7 +30,6 @@ use mssf_com::{
};

use crate::{
runtime::stateful::StatefulServicePartition,
strings::HSTRINGWrap,
sync::BridgeContext3,
types::{Epoch, OpenMode, ReplicaInformation, ReplicaRole, ReplicaSetConfig},
Expand Down Expand Up @@ -542,14 +541,18 @@ where
let inner = self.inner.clone();
let rt_cp = self.rt.clone();
let openmode2: OpenMode = openmode.into();
let partition2: StatefulServicePartition = partition.unwrap().into();
let com_partition = partition
.unwrap()
.cast::<IFabricStatefulServicePartition3>()
.expect("cannot query interface");
let partition = StatefulServicePartition::from(&com_partition);
info!(
"IFabricStatefulReplicaBridge::BeginOpen: mode {:?}",
openmode2
);
let (ctx, token) = BridgeContext3::make(callback);
ctx.spawn(&self.rt, async move {
inner.open(openmode2, &partition2, token).await.map(|s| {
inner.open(openmode2, &partition, token).await.map(|s| {
let bridge: IFabricPrimaryReplicator =
IFabricPrimaryReplicatorBridge::create(s, rt_cp).into();
bridge.clone().cast::<IFabricReplicator>().unwrap()
Expand Down
104 changes: 99 additions & 5 deletions crates/libs/core/src/runtime/stateful_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@ use std::ffi::c_void;

use mssf_com::FabricRuntime::{
IFabricPrimaryReplicator, IFabricReplicator, IFabricReplicatorCatchupSpecificQuorum,
IFabricStatefulServiceReplica,
IFabricStatefulServicePartition3, IFabricStatefulServiceReplica,
};
use tracing::info;
use windows_core::{Interface, HSTRING};

use crate::{
error::FabricErrorCode,
strings::HSTRINGWrap,
sync::{fabric_begin_end_proxy2, CancellationToken},
types::ReplicaRole,
types::{
FaultType, LoadMetric, LoadMetricListRef, MoveCost, ReplicaRole,
ServicePartitionAccessStatus, ServicePartitionInformation,
},
};

use super::stateful::{
PrimaryReplicator, Replicator, StatefulServicePartition, StatefulServiceReplica,
};
use super::stateful::{PrimaryReplicator, Replicator, StatefulServiceReplica};
use crate::types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuorumMode};

pub struct StatefulServiceReplicaProxy {
Expand Down Expand Up @@ -306,3 +308,95 @@ impl PrimaryReplicator for PrimaryReplicatorProxy {
unsafe { self.com_impl.RemoveReplica(replicaid) }
}
}

/// Proxy COM object IFabricStatefulServicePartition3
#[derive(Debug, Clone)]
pub struct StatefulServicePartition {
com_impl: IFabricStatefulServicePartition3,
}

impl StatefulServicePartition {
pub fn get_com(&self) -> &IFabricStatefulServicePartition3 {
&self.com_impl
}

/// Provides access to the ServicePartitionInformation of the service, which contains the partition type and ID.
pub fn get_partition_information(&self) -> crate::Result<ServicePartitionInformation> {
unsafe { self.com_impl.GetPartitionInfo()?.as_ref() }
.ok_or(FabricErrorCode::E_POINTER.into())
.map(ServicePartitionInformation::from)
}

/// Used to check the readiness of the replica in regard to read operations.
/// The ReadStatus should be checked before the replica is servicing a customer request that is a read operation.
pub fn get_read_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
unsafe { self.com_impl.GetReadStatus() }.map(ServicePartitionAccessStatus::from)
}

/// Used to check the readiness of the partition in regard to write operations.
/// The WriteStatus should be checked before the replica services a customer request that is a write operation.
pub fn get_write_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
unsafe { self.com_impl.GetWriteStatus() }.map(ServicePartitionAccessStatus::from)
}

/// TODO: not implemented
/// Creates a FabricReplicator with the specified settings and returns it to the replica.
pub fn create_replicator(&self) -> crate::Result<()> {
Err(FabricErrorCode::E_NOTIMPL.into())
}

/// Reports load for the current replica in the partition.
/// Remarks:
/// The reported metrics should correspond to those that are provided in the ServiceLoadMetricDescription
/// as a part of the ServiceDescription that is used to create the service. Load metrics that are not
/// present in the description are ignored. Reporting custom metrics allows Service Fabric to balance
/// services that are based on additional custom information.
pub fn report_load(&self, metrics: &[LoadMetric]) -> crate::Result<()> {
let metrics_ref = LoadMetricListRef::from_slice(metrics);
let raw = metrics_ref.as_raw_slice();
unsafe { self.com_impl.ReportLoad(raw) }
}

/// Enables the replica to report a fault to the runtime and indicates that it has encountered
/// an error from which it cannot recover and must either be restarted or removed.
pub fn report_fault(&self, fault_type: FaultType) -> crate::Result<()> {
unsafe { self.com_impl.ReportFault(fault_type.into()) }
}

/// Reports the move cost for a replica.
/// Remarks:
/// Services can report move cost of a replica using this method.
/// While the Service Fabric Resource Balances searches for the best balance in the cluster,
/// it examines both load information and move cost of each replica.
/// Resource balances will prefer to move replicas with lower cost in order to achieve balance.
pub fn report_move_cost(&self, move_cost: MoveCost) -> crate::Result<()> {
unsafe { self.com_impl.ReportMoveCost(move_cost.into()) }
}

/// Remarks:
/// The health information describes the report details, like the source ID, the property,
/// the health state and other relevant details. The partition uses an internal health client
/// to send the reports to the health store. The client optimizes messages to Health Manager
/// by batching reports per a configured duration (Default: 30 seconds). If the report has high priority,
/// you can specify send options to send it immediately.
/// TODO: not yet implemented
/// Reports current partition health.
pub fn report_partition_health(&self) -> crate::Result<()> {
Err(FabricErrorCode::E_NOTIMPL.into())
}

/// TODO: not yet implemented
/// Reports health on the current stateful service replica of the partition.
pub fn report_replica_health(&self) -> crate::Result<()> {
Err(FabricErrorCode::E_NOTIMPL.into())
}
}

impl From<&IFabricStatefulServicePartition3> for StatefulServicePartition {
fn from(e: &IFabricStatefulServicePartition3) -> Self {
StatefulServicePartition {
com_impl: e.clone(),
}
}
}
39 changes: 38 additions & 1 deletion crates/libs/core/src/types/common/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
//! Module for handling fabric metrics
use crate::{HSTRING, PCWSTR};
use mssf_com::FabricTypes::FABRIC_LOAD_METRIC;
use mssf_com::FabricTypes::{
FABRIC_LOAD_METRIC, FABRIC_MOVE_COST, FABRIC_MOVE_COST_HIGH, FABRIC_MOVE_COST_LOW,
FABRIC_MOVE_COST_MEDIUM, FABRIC_MOVE_COST_ZERO,
};
use std::marker::PhantomData;

/// FABRIC_LOAD_METRIC
Expand Down Expand Up @@ -53,3 +56,37 @@ impl<'a> LoadMetricListRef<'a> {
self.metrics.as_slice()
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum MoveCost {
Zero,
Low,
Medium,
High,
// VeryHigh,
}

impl From<FABRIC_MOVE_COST> for MoveCost {
fn from(value: FABRIC_MOVE_COST) -> Self {
match value {
FABRIC_MOVE_COST_ZERO => Self::Zero,
FABRIC_MOVE_COST_LOW => Self::Low,
FABRIC_MOVE_COST_MEDIUM => Self::Medium,
FABRIC_MOVE_COST_HIGH => Self::High,
// Not supported in rust yet
// FABRIC_MOVE_COST_VERYHIGH =>Self::VeryHigh,
_ => Self::Zero,
}
}
}

impl From<MoveCost> for FABRIC_MOVE_COST {
fn from(value: MoveCost) -> Self {
match value {
MoveCost::Zero => FABRIC_MOVE_COST_ZERO,
MoveCost::Low => FABRIC_MOVE_COST_LOW,
MoveCost::Medium => FABRIC_MOVE_COST_MEDIUM,
MoveCost::High => FABRIC_MOVE_COST_HIGH,
}
}
}
35 changes: 33 additions & 2 deletions crates/libs/core/src/types/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ mod metrics;
pub use metrics::*;

use mssf_com::FabricTypes::{
FABRIC_HEALTH_STATE, FABRIC_HEALTH_STATE_ERROR, FABRIC_HEALTH_STATE_INVALID,
FABRIC_HEALTH_STATE_OK, FABRIC_HEALTH_STATE_UNKNOWN, FABRIC_HEALTH_STATE_WARNING,
FABRIC_FAULT_TYPE, FABRIC_FAULT_TYPE_INVALID, FABRIC_FAULT_TYPE_PERMANENT,
FABRIC_FAULT_TYPE_TRANSIENT, FABRIC_HEALTH_STATE, FABRIC_HEALTH_STATE_ERROR,
FABRIC_HEALTH_STATE_INVALID, FABRIC_HEALTH_STATE_OK, FABRIC_HEALTH_STATE_UNKNOWN,
FABRIC_HEALTH_STATE_WARNING,
};

// FABRIC_HEALTH_STATE
Expand All @@ -38,3 +40,32 @@ impl From<&FABRIC_HEALTH_STATE> for HealthState {
}
}
}

// FABRIC_FAULT_TYPE
#[derive(Debug, Clone, PartialEq)]
pub enum FaultType {
Invalid,
Permanent,
Transient,
}

impl From<FABRIC_FAULT_TYPE> for FaultType {
fn from(value: FABRIC_FAULT_TYPE) -> Self {
match value {
FABRIC_FAULT_TYPE_INVALID => Self::Invalid,
FABRIC_FAULT_TYPE_PERMANENT => Self::Permanent,
FABRIC_FAULT_TYPE_TRANSIENT => Self::Transient,
_ => Self::Invalid,
}
}
}

impl From<FaultType> for FABRIC_FAULT_TYPE {
fn from(value: FaultType) -> Self {
match value {
FaultType::Invalid => FABRIC_FAULT_TYPE_INVALID,
FaultType::Permanent => FABRIC_FAULT_TYPE_PERMANENT,
FaultType::Transient => FABRIC_FAULT_TYPE_TRANSIENT,
}
}
}
65 changes: 65 additions & 0 deletions crates/libs/core/src/types/common/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
use crate::{GUID, HSTRING};
use mssf_com::FabricTypes::{
FABRIC_INT64_RANGE_PARTITION_INFORMATION, FABRIC_NAMED_PARTITION_INFORMATION,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS, FABRIC_SERVICE_PARTITION_ACCESS_STATUS_GRANTED,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_INVALID,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NOT_PRIMARY,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NO_WRITE_QUORUM,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_RECONFIGURATION_PENDING,
FABRIC_SERVICE_PARTITION_INFORMATION, FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
FABRIC_SERVICE_PARTITION_KIND_INVALID, FABRIC_SERVICE_PARTITION_KIND_NAMED,
FABRIC_SERVICE_PARTITION_KIND_SINGLETON, FABRIC_SINGLETON_PARTITION_INFORMATION,
Expand Down Expand Up @@ -97,3 +102,63 @@ impl From<&FABRIC_SERVICE_PARTITION_INFORMATION> for ServicePartitionInformation
}
}
}

/// FABRIC_SERVICE_PARTITION_ACCESS_STATUS
/// Remarks:
/// PartitionAccessStatus is used to check that a read or write operation is allowed.
/// When service replicas handle a client request, they should verify that the system is
/// in a state that allows processing. By checking the ReadStatus or WriteStatus as appropriate,
/// the replica can be notified of conditions that prevent correct operation.
/// Note that write operations might still see an exception from the replicator for one of these
/// conditions, because the condition might change between the WriteStatus check and the call
/// to StateReplicator.Replicate() (Not yet supported in mssf).
#[derive(Debug, Clone, PartialEq)]
pub enum ServicePartitionAccessStatus {
Invalid,
/// Indicates that the read or write operation access is granted and the operation is allowed.
Granted,
/// Indicates that the client should try again later, because a reconfiguration is in progress.
/// After the reconfiguration is completed, a new status is returned that gives further instructions.
/// The client should retry the operation at this replica
ReconfigurationPending,
/// Indicates that this client request was received by a replica that is not a Primary replica.
/// The read or write operation cannot be performed at this replica.
/// The client should attempt to use the naming service to identify the correct primary replica.
NotPrimary,
/// Indicates that no write quorum is available and, therefore, no write operation can be accepted.
/// The client should retry the operation at this replica.
NoWriteQuorum,
}

impl From<FABRIC_SERVICE_PARTITION_ACCESS_STATUS> for ServicePartitionAccessStatus {
fn from(value: FABRIC_SERVICE_PARTITION_ACCESS_STATUS) -> Self {
match value {
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_INVALID => Self::Invalid,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_GRANTED => Self::Granted,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NOT_PRIMARY => Self::NotPrimary,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NO_WRITE_QUORUM => Self::NoWriteQuorum,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_RECONFIGURATION_PENDING => {
Self::ReconfigurationPending
}
_ => Self::Invalid,
}
}
}

impl From<ServicePartitionAccessStatus> for FABRIC_SERVICE_PARTITION_ACCESS_STATUS {
fn from(value: ServicePartitionAccessStatus) -> Self {
match value {
ServicePartitionAccessStatus::Invalid => FABRIC_SERVICE_PARTITION_ACCESS_STATUS_INVALID,
ServicePartitionAccessStatus::Granted => FABRIC_SERVICE_PARTITION_ACCESS_STATUS_GRANTED,
ServicePartitionAccessStatus::ReconfigurationPending => {
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_RECONFIGURATION_PENDING
}
ServicePartitionAccessStatus::NotPrimary => {
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NOT_PRIMARY
}
ServicePartitionAccessStatus::NoWriteQuorum => {
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NO_WRITE_QUORUM
}
}
}
}
Loading

0 comments on commit 4e9c34b

Please sign in to comment.