From ce8eb089f3d002e5057f860454adeb0993431a19 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 13 Dec 2024 17:06:27 +0300 Subject: [PATCH] Extract public sk types to safekeeper_api (#10137) ## Problem We want to extract safekeeper http client to separate crate for use in storage controller and neon_local. However, many types used in the API are internal to safekeeper. ## Summary of changes Move them to safekeeper_api crate. No functional changes. ref https://github.com/neondatabase/neon/issues/9011 --- Cargo.lock | 3 + libs/safekeeper_api/Cargo.toml | 5 +- libs/safekeeper_api/src/lib.rs | 17 ++ libs/safekeeper_api/src/models.rs | 170 +++++++++++++++++- safekeeper/src/control_file_upgrade.rs | 3 +- safekeeper/src/debug_dump.rs | 2 +- safekeeper/src/handler.rs | 4 +- safekeeper/src/http/client.rs | 3 +- safekeeper/src/http/routes.rs | 72 ++------ safekeeper/src/json_ctrl.rs | 5 +- safekeeper/src/pull_timeline.rs | 7 +- safekeeper/src/receive_wal.rs | 21 +-- safekeeper/src/recovery.rs | 8 +- safekeeper/src/safekeeper.rs | 19 +- safekeeper/src/send_wal.rs | 89 +-------- safekeeper/src/state.rs | 7 +- safekeeper/src/timeline.rs | 55 ++---- safekeeper/src/timeline_manager.rs | 4 +- safekeeper/src/timelines_global_map.rs | 2 +- safekeeper/src/wal_backup.rs | 3 +- safekeeper/src/wal_backup_partial.rs | 2 +- safekeeper/src/wal_reader_stream.rs | 2 +- safekeeper/src/wal_service.rs | 3 +- .../tests/walproposer_sim/safekeeper.rs | 3 +- 24 files changed, 264 insertions(+), 245 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2d5e03613b1..c4f80f63c9be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5565,7 +5565,10 @@ name = "safekeeper_api" version = "0.1.0" dependencies = [ "const_format", + "postgres_ffi", + "pq_proto", "serde", + "tokio", "utils", ] diff --git a/libs/safekeeper_api/Cargo.toml b/libs/safekeeper_api/Cargo.toml index 14811232d33b..4234ec6779a2 100644 --- a/libs/safekeeper_api/Cargo.toml +++ b/libs/safekeeper_api/Cargo.toml @@ -5,6 +5,9 @@ edition.workspace = true license.workspace = true [dependencies] -serde.workspace = true const_format.workspace = true +serde.workspace = true +postgres_ffi.workspace = true +pq_proto.workspace = true +tokio.workspace = true utils.workspace = true diff --git a/libs/safekeeper_api/src/lib.rs b/libs/safekeeper_api/src/lib.rs index 63c2c51188b8..be6923aca902 100644 --- a/libs/safekeeper_api/src/lib.rs +++ b/libs/safekeeper_api/src/lib.rs @@ -1,10 +1,27 @@ #![deny(unsafe_code)] #![deny(clippy::undocumented_unsafe_blocks)] use const_format::formatcp; +use pq_proto::SystemId; +use serde::{Deserialize, Serialize}; /// Public API types pub mod models; +/// Consensus logical timestamp. Note: it is a part of sk control file. +pub type Term = u64; +pub const INVALID_TERM: Term = 0; + +/// Information about Postgres. Safekeeper gets it once and then verifies all +/// further connections from computes match. Note: it is a part of sk control +/// file. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ServerInfo { + /// Postgres server version + pub pg_version: u32, + pub system_id: SystemId, + pub wal_seg_size: u32, +} + pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454; pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 28666d197afd..3e424a792c7f 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -1,10 +1,23 @@ +//! Types used in safekeeper http API. Many of them are also reused internally. + +use postgres_ffi::TimestampTz; use serde::{Deserialize, Serialize}; +use std::net::SocketAddr; +use tokio::time::Instant; use utils::{ - id::{NodeId, TenantId, TimelineId}, + id::{NodeId, TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, + pageserver_feedback::PageserverFeedback, }; +use crate::{ServerInfo, Term}; + +#[derive(Debug, Serialize)] +pub struct SafekeeperStatus { + pub id: NodeId, +} + #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { pub tenant_id: TenantId, @@ -18,6 +31,161 @@ pub struct TimelineCreateRequest { pub local_start_lsn: Option, } +/// Same as TermLsn, but serializes LSN using display serializer +/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct TermSwitchApiEntry { + pub term: Term, + pub lsn: Lsn, +} + +/// Augment AcceptorState with last_log_term for convenience +#[derive(Debug, Serialize, Deserialize)] +pub struct AcceptorStateStatus { + pub term: Term, + pub epoch: Term, // aka last_log_term, old `epoch` name is left for compatibility + pub term_history: Vec, +} + +/// Things safekeeper should know about timeline state on peers. +/// Used as both model and internally. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PeerInfo { + pub sk_id: NodeId, + pub term: Term, + /// Term of the last entry. + pub last_log_term: Term, + /// LSN of the last record. + pub flush_lsn: Lsn, + pub commit_lsn: Lsn, + /// Since which LSN safekeeper has WAL. + pub local_start_lsn: Lsn, + /// When info was received. Serde annotations are not very useful but make + /// the code compile -- we don't rely on this field externally. + #[serde(skip)] + #[serde(default = "Instant::now")] + pub ts: Instant, + pub pg_connstr: String, + pub http_connstr: String, +} + +pub type FullTransactionId = u64; + +/// Hot standby feedback received from replica +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct HotStandbyFeedback { + pub ts: TimestampTz, + pub xmin: FullTransactionId, + pub catalog_xmin: FullTransactionId, +} + +pub const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0; + +impl HotStandbyFeedback { + pub fn empty() -> HotStandbyFeedback { + HotStandbyFeedback { + ts: 0, + xmin: 0, + catalog_xmin: 0, + } + } +} + +/// Standby status update +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct StandbyReply { + pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby. + pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby. + pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby. + pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01. + pub reply_requested: bool, +} + +impl StandbyReply { + pub fn empty() -> Self { + StandbyReply { + write_lsn: Lsn::INVALID, + flush_lsn: Lsn::INVALID, + apply_lsn: Lsn::INVALID, + reply_ts: 0, + reply_requested: false, + } + } +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct StandbyFeedback { + pub reply: StandbyReply, + pub hs_feedback: HotStandbyFeedback, +} + +impl StandbyFeedback { + pub fn empty() -> Self { + StandbyFeedback { + reply: StandbyReply::empty(), + hs_feedback: HotStandbyFeedback::empty(), + } + } +} + +/// Receiver is either pageserver or regular standby, which have different +/// feedbacks. +/// Used as both model and internally. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum ReplicationFeedback { + Pageserver(PageserverFeedback), + Standby(StandbyFeedback), +} + +/// Uniquely identifies a WAL service connection. Logged in spans for +/// observability. +pub type ConnectionId = u32; + +/// Serialize is used only for json'ing in API response. Also used internally. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WalSenderState { + pub ttid: TenantTimelineId, + pub addr: SocketAddr, + pub conn_id: ConnectionId, + // postgres application_name + pub appname: Option, + pub feedback: ReplicationFeedback, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WalReceiverState { + /// None means it is recovery initiated by us (this safekeeper). + pub conn_id: Option, + pub status: WalReceiverStatus, +} + +/// Walreceiver status. Currently only whether it passed voting stage and +/// started receiving the stream, but it is easy to add more if needed. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum WalReceiverStatus { + Voting, + Streaming, +} + +/// Info about timeline on safekeeper ready for reporting. +#[derive(Debug, Serialize, Deserialize)] +pub struct TimelineStatus { + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub acceptor_state: AcceptorStateStatus, + pub pg_info: ServerInfo, + pub flush_lsn: Lsn, + pub timeline_start_lsn: Lsn, + pub local_start_lsn: Lsn, + pub commit_lsn: Lsn, + pub backup_lsn: Lsn, + pub peer_horizon_lsn: Lsn, + pub remote_consistent_lsn: Lsn, + pub peers: Vec, + pub walsenders: Vec, + pub walreceivers: Vec, +} + fn lsn_invalid() -> Lsn { Lsn::INVALID } diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index a4b4670e423b..dd152fd4cce8 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -1,11 +1,12 @@ //! Code to deal with safekeeper control file upgrades use crate::{ - safekeeper::{AcceptorState, PgUuid, ServerInfo, Term, TermHistory, TermLsn}, + safekeeper::{AcceptorState, PgUuid, TermHistory, TermLsn}, state::{EvictionState, PersistedPeers, TimelinePersistentState}, wal_backup_partial, }; use anyhow::{bail, Result}; use pq_proto::SystemId; +use safekeeper_api::{ServerInfo, Term}; use serde::{Deserialize, Serialize}; use tracing::*; use utils::{ diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index 93011eddec07..19362a0992d4 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -14,6 +14,7 @@ use camino::Utf8PathBuf; use chrono::{DateTime, Utc}; use postgres_ffi::XLogSegNo; use postgres_ffi::MAX_SEND_SIZE; +use safekeeper_api::models::WalSenderState; use serde::Deserialize; use serde::Serialize; @@ -25,7 +26,6 @@ use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use crate::safekeeper::TermHistory; -use crate::send_wal::WalSenderState; use crate::state::TimelineMemState; use crate::state::TimelinePersistentState; use crate::timeline::get_timeline_dir; diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 2ca6333ba835..bb639bfb3221 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -4,6 +4,8 @@ use anyhow::Context; use pageserver_api::models::ShardParameters; use pageserver_api::shard::{ShardIdentity, ShardStripeSize}; +use safekeeper_api::models::ConnectionId; +use safekeeper_api::Term; use std::future::Future; use std::str::{self, FromStr}; use std::sync::Arc; @@ -16,9 +18,7 @@ use crate::auth::check_permission; use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; use crate::metrics::{TrafficMetrics, PG_QUERIES_GAUGE}; -use crate::safekeeper::Term; use crate::timeline::TimelineError; -use crate::wal_service::ConnectionId; use crate::{GlobalTimelines, SafeKeeperConf}; use postgres_backend::PostgresBackend; use postgres_backend::QueryError; diff --git a/safekeeper/src/http/client.rs b/safekeeper/src/http/client.rs index a166fc1ab9b0..669a9c0ce94b 100644 --- a/safekeeper/src/http/client.rs +++ b/safekeeper/src/http/client.rs @@ -8,6 +8,7 @@ //! etc. use reqwest::{IntoUrl, Method, StatusCode}; +use safekeeper_api::models::TimelineStatus; use std::error::Error as _; use utils::{ http::error::HttpErrorBody, @@ -15,8 +16,6 @@ use utils::{ logging::SecretString, }; -use super::routes::TimelineStatus; - #[derive(Debug, Clone)] pub struct Client { mgmt_api_endpoint: String, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 71c36f1d4631..9bc1bf340919 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -1,5 +1,9 @@ use hyper::{Body, Request, Response, StatusCode}; -use serde::{Deserialize, Serialize}; +use safekeeper_api::models::AcceptorStateStatus; +use safekeeper_api::models::SafekeeperStatus; +use safekeeper_api::models::TermSwitchApiEntry; +use safekeeper_api::models::TimelineStatus; +use safekeeper_api::ServerInfo; use std::collections::HashMap; use std::fmt; use std::io::Write as _; @@ -31,26 +35,17 @@ use utils::{ request::{ensure_no_body, parse_request_param}, RequestExt, RouterBuilder, }, - id::{NodeId, TenantId, TenantTimelineId, TimelineId}, + id::{TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, }; use crate::debug_dump::TimelineDigestRequest; -use crate::receive_wal::WalReceiverState; -use crate::safekeeper::Term; -use crate::safekeeper::{ServerInfo, TermLsn}; -use crate::send_wal::WalSenderState; -use crate::timeline::PeerInfo; +use crate::safekeeper::TermLsn; use crate::timelines_global_map::TimelineDeleteForceResult; use crate::GlobalTimelines; use crate::SafeKeeperConf; use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline}; -#[derive(Debug, Serialize)] -struct SafekeeperStatus { - id: NodeId, -} - /// Healthcheck handler. async fn status_handler(request: Request) -> Result, ApiError> { check_permission(&request, None)?; @@ -73,50 +68,6 @@ fn get_global_timelines(request: &Request) -> Arc { .clone() } -/// Same as TermLsn, but serializes LSN using display serializer -/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct TermSwitchApiEntry { - pub term: Term, - pub lsn: Lsn, -} - -impl From for TermLsn { - fn from(api_val: TermSwitchApiEntry) -> Self { - TermLsn { - term: api_val.term, - lsn: api_val.lsn, - } - } -} - -/// Augment AcceptorState with last_log_term for convenience -#[derive(Debug, Serialize, Deserialize)] -pub struct AcceptorStateStatus { - pub term: Term, - pub epoch: Term, // aka last_log_term - pub term_history: Vec, -} - -/// Info about timeline on safekeeper ready for reporting. -#[derive(Debug, Serialize, Deserialize)] -pub struct TimelineStatus { - pub tenant_id: TenantId, - pub timeline_id: TimelineId, - pub acceptor_state: AcceptorStateStatus, - pub pg_info: ServerInfo, - pub flush_lsn: Lsn, - pub timeline_start_lsn: Lsn, - pub local_start_lsn: Lsn, - pub commit_lsn: Lsn, - pub backup_lsn: Lsn, - pub peer_horizon_lsn: Lsn, - pub remote_consistent_lsn: Lsn, - pub peers: Vec, - pub walsenders: Vec, - pub walreceivers: Vec, -} - fn check_permission(request: &Request, tenant_id: Option) -> Result<(), ApiError> { check_permission_with(request, |claims| { crate::auth::check_permission(claims, tenant_id) @@ -187,6 +138,15 @@ async fn timeline_list_handler(request: Request) -> Result, json_response(StatusCode::OK, res) } +impl From for TermLsn { + fn from(api_val: TermSwitchApiEntry) -> Self { + TermLsn { + term: api_val.term, + lsn: api_val.lsn, + } + } +} + /// Report info about timeline. async fn timeline_status_handler(request: Request) -> Result, ApiError> { let ttid = TenantTimelineId::new( diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index dc4ad3706e6c..256e350ceba5 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -8,16 +8,17 @@ use anyhow::Context; use postgres_backend::QueryError; +use safekeeper_api::{ServerInfo, Term}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::*; use crate::handler::SafekeeperPostgresHandler; -use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo}; +use crate::safekeeper::{AcceptorProposerMessage, AppendResponse}; use crate::safekeeper::{ AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, }; -use crate::safekeeper::{Term, TermHistory, TermLsn}; +use crate::safekeeper::{TermHistory, TermLsn}; use crate::state::TimelinePersistentState; use crate::timeline::WalResidentTimeline; use postgres_backend::PostgresBackend; diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index f58a9dca1dbc..00777273cbf9 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -4,6 +4,7 @@ use camino::Utf8PathBuf; use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt, TryStreamExt}; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; +use safekeeper_api::{models::TimelineStatus, Term}; use serde::{Deserialize, Serialize}; use std::{ cmp::min, @@ -21,11 +22,7 @@ use tracing::{error, info, instrument}; use crate::{ control_file::CONTROL_FILE_NAME, debug_dump, - http::{ - client::{self, Client}, - routes::TimelineStatus, - }, - safekeeper::Term, + http::client::{self, Client}, state::{EvictionState, TimelinePersistentState}, timeline::{Timeline, WalResidentTimeline}, timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline}, diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 2a49890d618f..08371177cd24 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -9,9 +9,7 @@ use crate::metrics::{ }; use crate::safekeeper::AcceptorProposerMessage; use crate::safekeeper::ProposerAcceptorMessage; -use crate::safekeeper::ServerInfo; use crate::timeline::WalResidentTimeline; -use crate::wal_service::ConnectionId; use crate::GlobalTimelines; use anyhow::{anyhow, Context}; use bytes::BytesMut; @@ -23,8 +21,8 @@ use postgres_backend::PostgresBackend; use postgres_backend::PostgresBackendReader; use postgres_backend::QueryError; use pq_proto::BeMessage; -use serde::Deserialize; -use serde::Serialize; +use safekeeper_api::models::{ConnectionId, WalReceiverState, WalReceiverStatus}; +use safekeeper_api::ServerInfo; use std::future; use std::net::SocketAddr; use std::sync::Arc; @@ -171,21 +169,6 @@ impl WalReceiversShared { } } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WalReceiverState { - /// None means it is recovery initiated by us (this safekeeper). - pub conn_id: Option, - pub status: WalReceiverStatus, -} - -/// Walreceiver status. Currently only whether it passed voting stage and -/// started receiving the stream, but it is easy to add more if needed. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum WalReceiverStatus { - Voting, - Streaming, -} - /// Scope guard to access slot in WalReceivers registry and unregister from /// it in Drop. pub struct WalReceiverGuard { diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index 7b87166aa052..61647c16b00a 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -7,6 +7,8 @@ use std::{fmt, pin::pin}; use anyhow::{bail, Context}; use futures::StreamExt; use postgres_protocol::message::backend::ReplicationMessage; +use safekeeper_api::models::{PeerInfo, TimelineStatus}; +use safekeeper_api::Term; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::time::timeout; use tokio::{ @@ -24,13 +26,11 @@ use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE}; use crate::safekeeper::{AppendRequest, AppendRequestHeader}; use crate::timeline::WalResidentTimeline; use crate::{ - http::routes::TimelineStatus, receive_wal::MSG_QUEUE_SIZE, safekeeper::{ - AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, Term, TermHistory, - TermLsn, VoteRequest, + AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, TermHistory, TermLsn, + VoteRequest, }, - timeline::PeerInfo, SafeKeeperConf, }; diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 6eb69f0b7ce2..ccd7940c7212 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -5,6 +5,9 @@ use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_ffi::{TimeLineID, MAX_SEND_SIZE}; +use safekeeper_api::models::HotStandbyFeedback; +use safekeeper_api::Term; +use safekeeper_api::INVALID_TERM; use serde::{Deserialize, Serialize}; use std::cmp::max; use std::cmp::min; @@ -16,7 +19,6 @@ use tracing::*; use crate::control_file; use crate::metrics::MISC_OPERATION_SECONDS; -use crate::send_wal::HotStandbyFeedback; use crate::state::TimelineState; use crate::wal_storage; @@ -31,10 +33,6 @@ use utils::{ const SK_PROTOCOL_VERSION: u32 = 2; pub const UNKNOWN_SERVER_VERSION: u32 = 0; -/// Consensus logical timestamp. -pub type Term = u64; -pub const INVALID_TERM: Term = 0; - #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct TermLsn { pub term: Term, @@ -198,16 +196,6 @@ impl AcceptorState { } } -/// Information about Postgres. Safekeeper gets it once and then verifies -/// all further connections from computes match. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ServerInfo { - /// Postgres server version - pub pg_version: u32, - pub system_id: SystemId, - pub wal_seg_size: u32, -} - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PersistedPeerInfo { /// LSN up to which safekeeper offloaded WAL to s3. @@ -1041,6 +1029,7 @@ where mod tests { use futures::future::BoxFuture; use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE}; + use safekeeper_api::ServerInfo; use super::*; use crate::state::{EvictionState, PersistedPeers, TimelinePersistentState}; diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 0887cf726418..84632219984a 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -4,11 +4,10 @@ use crate::handler::SafekeeperPostgresHandler; use crate::metrics::RECEIVED_PS_FEEDBACKS; use crate::receive_wal::WalReceivers; -use crate::safekeeper::{Term, TermLsn}; +use crate::safekeeper::TermLsn; use crate::send_interpreted_wal::InterpretedWalSender; use crate::timeline::WalResidentTimeline; use crate::wal_reader_stream::WalReaderStreamBuilder; -use crate::wal_service::ConnectionId; use crate::wal_storage::WalReader; use anyhow::{bail, Context as AnyhowContext}; use bytes::Bytes; @@ -19,7 +18,11 @@ use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError}; use postgres_ffi::get_current_timestamp; use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody}; -use serde::{Deserialize, Serialize}; +use safekeeper_api::models::{ + ConnectionId, HotStandbyFeedback, ReplicationFeedback, StandbyFeedback, StandbyReply, + WalSenderState, INVALID_FULL_TRANSACTION_ID, +}; +use safekeeper_api::Term; use tokio::io::{AsyncRead, AsyncWrite}; use utils::failpoint_support; use utils::id::TenantTimelineId; @@ -28,7 +31,6 @@ use utils::postgres_client::PostgresClientProtocol; use std::cmp::{max, min}; use std::net::SocketAddr; -use std::str; use std::sync::Arc; use std::time::Duration; use tokio::sync::watch::Receiver; @@ -42,65 +44,6 @@ const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; // neon extension of replication protocol const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z'; -type FullTransactionId = u64; - -/// Hot standby feedback received from replica -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct HotStandbyFeedback { - pub ts: TimestampTz, - pub xmin: FullTransactionId, - pub catalog_xmin: FullTransactionId, -} - -const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0; - -impl HotStandbyFeedback { - pub fn empty() -> HotStandbyFeedback { - HotStandbyFeedback { - ts: 0, - xmin: 0, - catalog_xmin: 0, - } - } -} - -/// Standby status update -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct StandbyReply { - pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby. - pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby. - pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby. - pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01. - pub reply_requested: bool, -} - -impl StandbyReply { - fn empty() -> Self { - StandbyReply { - write_lsn: Lsn::INVALID, - flush_lsn: Lsn::INVALID, - apply_lsn: Lsn::INVALID, - reply_ts: 0, - reply_requested: false, - } - } -} - -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct StandbyFeedback { - pub reply: StandbyReply, - pub hs_feedback: HotStandbyFeedback, -} - -impl StandbyFeedback { - pub fn empty() -> Self { - StandbyFeedback { - reply: StandbyReply::empty(), - hs_feedback: HotStandbyFeedback::empty(), - } - } -} - /// WalSenders registry. Timeline holds it (wrapped in Arc). pub struct WalSenders { mutex: Mutex, @@ -341,25 +284,6 @@ impl WalSendersShared { } } -// Serialized is used only for pretty printing in json. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WalSenderState { - ttid: TenantTimelineId, - addr: SocketAddr, - conn_id: ConnectionId, - // postgres application_name - appname: Option, - feedback: ReplicationFeedback, -} - -// Receiver is either pageserver or regular standby, which have different -// feedbacks. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -enum ReplicationFeedback { - Pageserver(PageserverFeedback), - Standby(StandbyFeedback), -} - // id of the occupied slot in WalSenders to access it (and save in the // WalSenderGuard). We could give Arc directly to the slot, but there is not // much sense in that as values aggregation which is performed on each feedback @@ -888,6 +812,7 @@ impl ReplyReader { #[cfg(test)] mod tests { + use safekeeper_api::models::FullTransactionId; use utils::id::{TenantId, TimelineId}; use super::*; diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index 941b7e67d0a9..c6ae6c1d2b0e 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -5,7 +5,7 @@ use std::{cmp::max, ops::Deref}; use anyhow::{bail, Result}; use postgres_ffi::WAL_SEGMENT_SIZE; -use safekeeper_api::models::TimelineTermBumpResponse; +use safekeeper_api::{models::TimelineTermBumpResponse, ServerInfo, Term}; use serde::{Deserialize, Serialize}; use utils::{ id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -14,10 +14,7 @@ use utils::{ use crate::{ control_file, - safekeeper::{ - AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory, - UNKNOWN_SERVER_VERSION, - }, + safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, TermHistory, UNKNOWN_SERVER_VERSION}, timeline::TimelineError, wal_backup_partial::{self}, }; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 94d6ef106160..36860a0da2b4 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -4,8 +4,8 @@ use anyhow::{anyhow, bail, Result}; use camino::{Utf8Path, Utf8PathBuf}; use remote_storage::RemotePath; -use safekeeper_api::models::TimelineTermBumpResponse; -use serde::{Deserialize, Serialize}; +use safekeeper_api::models::{PeerInfo, TimelineTermBumpResponse}; +use safekeeper_api::Term; use tokio::fs::{self}; use tokio_util::sync::CancellationToken; use utils::id::TenantId; @@ -31,9 +31,7 @@ use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use crate::control_file; use crate::rate_limit::RateLimiter; use crate::receive_wal::WalReceivers; -use crate::safekeeper::{ - AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, Term, TermLsn, -}; +use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn}; use crate::send_wal::WalSenders; use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState}; use crate::timeline_guard::ResidenceGuard; @@ -47,40 +45,17 @@ use crate::wal_storage::{Storage as wal_storage_iface, WalReader}; use crate::SafeKeeperConf; use crate::{debug_dump, timeline_manager, wal_storage}; -/// Things safekeeper should know about timeline state on peers. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PeerInfo { - pub sk_id: NodeId, - pub term: Term, - /// Term of the last entry. - pub last_log_term: Term, - /// LSN of the last record. - pub flush_lsn: Lsn, - pub commit_lsn: Lsn, - /// Since which LSN safekeeper has WAL. - pub local_start_lsn: Lsn, - /// When info was received. Serde annotations are not very useful but make - /// the code compile -- we don't rely on this field externally. - #[serde(skip)] - #[serde(default = "Instant::now")] - ts: Instant, - pub pg_connstr: String, - pub http_connstr: String, -} - -impl PeerInfo { - fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo { - PeerInfo { - sk_id: NodeId(sk_info.safekeeper_id), - term: sk_info.term, - last_log_term: sk_info.last_log_term, - flush_lsn: Lsn(sk_info.flush_lsn), - commit_lsn: Lsn(sk_info.commit_lsn), - local_start_lsn: Lsn(sk_info.local_start_lsn), - pg_connstr: sk_info.safekeeper_connstr.clone(), - http_connstr: sk_info.http_connstr.clone(), - ts, - } +fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo { + PeerInfo { + sk_id: NodeId(sk_info.safekeeper_id), + term: sk_info.term, + last_log_term: sk_info.last_log_term, + flush_lsn: Lsn(sk_info.flush_lsn), + commit_lsn: Lsn(sk_info.commit_lsn), + local_start_lsn: Lsn(sk_info.local_start_lsn), + pg_connstr: sk_info.safekeeper_connstr.clone(), + http_connstr: sk_info.http_connstr.clone(), + ts, } } @@ -697,7 +672,7 @@ impl Timeline { { let mut shared_state = self.write_shared_state().await; shared_state.sk.record_safekeeper_info(&sk_info).await?; - let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now()); + let peer_info = peer_info_from_sk_info(&sk_info, Instant::now()); shared_state.peers_info.upsert(&peer_info); } Ok(()) diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index c02fb904cf63..a33994dcabaa 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -14,6 +14,7 @@ use std::{ use futures::channel::oneshot; use postgres_ffi::XLogSegNo; +use safekeeper_api::{models::PeerInfo, Term}; use serde::{Deserialize, Serialize}; use tokio::{ task::{JoinError, JoinHandle}, @@ -32,10 +33,9 @@ use crate::{ rate_limit::{rand_duration, RateLimiter}, recovery::recovery_main, remove_wal::calc_horizon_lsn, - safekeeper::Term, send_wal::WalSenders, state::TimelineState, - timeline::{ManagerTimeline, PeerInfo, ReadGuardSharedState, StateSK, WalResidentTimeline}, + timeline::{ManagerTimeline, ReadGuardSharedState, StateSK, WalResidentTimeline}, timeline_guard::{AccessService, GuardId, ResidenceGuard}, timelines_set::{TimelineSetGuard, TimelinesSet}, wal_backup::{self, WalBackupTaskHandle}, diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index e1241ceb9b84..ad29c9f66c2c 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -4,7 +4,6 @@ use crate::defaults::DEFAULT_EVICTION_CONCURRENCY; use crate::rate_limit::RateLimiter; -use crate::safekeeper::ServerInfo; use crate::state::TimelinePersistentState; use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError}; use crate::timelines_set::TimelinesSet; @@ -13,6 +12,7 @@ use crate::{control_file, wal_storage, SafeKeeperConf}; use anyhow::{bail, Context, Result}; use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; +use safekeeper_api::ServerInfo; use serde::Serialize; use std::collections::HashMap; use std::str::FromStr; diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 34b5dbeaa1cf..8517fa03443c 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -3,6 +3,7 @@ use anyhow::{Context, Result}; use camino::{Utf8Path, Utf8PathBuf}; use futures::stream::FuturesOrdered; use futures::StreamExt; +use safekeeper_api::models::PeerInfo; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use utils::backoff; @@ -30,7 +31,7 @@ use tracing::*; use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS}; -use crate::timeline::{PeerInfo, WalResidentTimeline}; +use crate::timeline::WalResidentTimeline; use crate::timeline_manager::{Manager, StateSnapshot}; use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME}; diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index bddfca50e4fb..4e5b34a9bf65 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -22,6 +22,7 @@ use camino::Utf8PathBuf; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; use remote_storage::RemotePath; +use safekeeper_api::Term; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; @@ -31,7 +32,6 @@ use utils::{id::NodeId, lsn::Lsn}; use crate::{ metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS}, rate_limit::{rand_duration, RateLimiter}, - safekeeper::Term, timeline::WalResidentTimeline, timeline_manager::StateSnapshot, wal_backup::{self}, diff --git a/safekeeper/src/wal_reader_stream.rs b/safekeeper/src/wal_reader_stream.rs index f8c0c502cdbc..aea628c20808 100644 --- a/safekeeper/src/wal_reader_stream.rs +++ b/safekeeper/src/wal_reader_stream.rs @@ -4,12 +4,12 @@ use async_stream::try_stream; use bytes::Bytes; use futures::Stream; use postgres_backend::CopyStreamHandlerEnd; +use safekeeper_api::Term; use std::time::Duration; use tokio::time::timeout; use utils::lsn::Lsn; use crate::{ - safekeeper::Term, send_wal::{EndWatch, WalSenderGuard}, timeline::WalResidentTimeline, }; diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 1ff83918a76c..1ebcb060e776 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -4,6 +4,7 @@ //! use anyhow::{Context, Result}; use postgres_backend::QueryError; +use safekeeper_api::models::ConnectionId; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpStream; @@ -114,8 +115,6 @@ async fn handle_socket( .await } -/// Unique WAL service connection ids are logged in spans for observability. -pub type ConnectionId = u32; pub type ConnectionCount = u32; pub fn issue_connection_id(count: &mut ConnectionCount) -> ConnectionId { diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 12aa02577185..efcdd89e7da7 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -15,12 +15,13 @@ use desim::{ }; use http::Uri; use safekeeper::{ - safekeeper::{ProposerAcceptorMessage, SafeKeeper, ServerInfo, UNKNOWN_SERVER_VERSION}, + safekeeper::{ProposerAcceptorMessage, SafeKeeper, UNKNOWN_SERVER_VERSION}, state::{TimelinePersistentState, TimelineState}, timeline::TimelineError, wal_storage::Storage, SafeKeeperConf, }; +use safekeeper_api::ServerInfo; use tracing::{debug, info_span, warn}; use utils::{ id::{NodeId, TenantId, TenantTimelineId, TimelineId},