From c8ff5b0536ffbac55035c951148526cfe7b93b81 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sun, 11 Aug 2024 16:47:11 +0300 Subject: [PATCH 01/10] Add response tag to getpage request in V3 protocol version --- libs/pageserver_api/src/models.rs | 286 +++++++++++-- pageserver/client/src/page_service.rs | 2 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 1 + pageserver/src/metrics.rs | 1 + pageserver/src/page_service.rs | 197 +++++++-- pgxn/neon/libpagestore.c | 5 +- pgxn/neon/pagestore_client.h | 88 ++-- pgxn/neon/pagestore_smgr.c | 382 +++++++++++++----- 8 files changed, 741 insertions(+), 221 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index f3fc9fad760a..c4b6a206f046 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1460,13 +1460,17 @@ impl TryFrom for PagestreamBeMessageTag { // interface allows sending both LSNs, and let the pageserver do the right thing. There was no // difference in the responses between V1 and V2. // -#[derive(Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum PagestreamProtocolVersion { V2, + V3, } +pub type RequestId = u64; + #[derive(Debug, PartialEq, Eq)] pub struct PagestreamExistsRequest { + pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub rel: RelTag, @@ -1474,6 +1478,7 @@ pub struct PagestreamExistsRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamNblocksRequest { + pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub rel: RelTag, @@ -1481,6 +1486,7 @@ pub struct PagestreamNblocksRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamGetPageRequest { + pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub rel: RelTag, @@ -1489,6 +1495,7 @@ pub struct PagestreamGetPageRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamDbSizeRequest { + pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub dbnode: u32, @@ -1496,6 +1503,7 @@ pub struct PagestreamDbSizeRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamGetSlruSegmentRequest { + pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub kind: u8, @@ -1504,31 +1512,56 @@ pub struct PagestreamGetSlruSegmentRequest { #[derive(Debug)] pub struct PagestreamExistsResponse { + pub reqid: RequestId, + pub request_lsn: Lsn, + pub not_modified_since: Lsn, + pub rel: RelTag, pub exists: bool, } #[derive(Debug)] pub struct PagestreamNblocksResponse { + pub reqid: RequestId, + pub request_lsn: Lsn, + pub not_modified_since: Lsn, + pub rel: RelTag, pub n_blocks: u32, } #[derive(Debug)] pub struct PagestreamGetPageResponse { + pub reqid: RequestId, + pub request_lsn: Lsn, + pub not_modified_since: Lsn, + pub rel: RelTag, + pub blkno: u32, pub page: Bytes, } #[derive(Debug)] pub struct PagestreamGetSlruSegmentResponse { + pub reqid: RequestId, + pub request_lsn: Lsn, + pub not_modified_since: Lsn, + pub kind: u8, + pub segno: u32, pub segment: Bytes, } #[derive(Debug)] pub struct PagestreamErrorResponse { + pub reqid: RequestId, + pub request_lsn: Lsn, + pub not_modified_since: Lsn, pub message: String, } #[derive(Debug)] pub struct PagestreamDbSizeResponse { + pub reqid: RequestId, + pub request_lsn: Lsn, + pub not_modified_since: Lsn, + pub db_node: u32, pub db_size: i64, } @@ -1552,6 +1585,7 @@ impl PagestreamFeMessage { match self { Self::Exists(req) => { bytes.put_u8(0); + bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u32(req.rel.spcnode); @@ -1562,6 +1596,7 @@ impl PagestreamFeMessage { Self::Nblocks(req) => { bytes.put_u8(1); + bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u32(req.rel.spcnode); @@ -1572,6 +1607,7 @@ impl PagestreamFeMessage { Self::GetPage(req) => { bytes.put_u8(2); + bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u32(req.rel.spcnode); @@ -1583,6 +1619,7 @@ impl PagestreamFeMessage { Self::DbSize(req) => { bytes.put_u8(3); + bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u32(req.dbnode); @@ -1590,6 +1627,7 @@ impl PagestreamFeMessage { Self::GetSlruSegment(req) => { bytes.put_u8(4); + bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u8(req.kind); @@ -1600,19 +1638,31 @@ impl PagestreamFeMessage { bytes.into() } - pub fn parse(body: &mut R) -> anyhow::Result { + pub fn parse( + body: &mut R, + protocol_version: PagestreamProtocolVersion, + ) -> anyhow::Result { // these correspond to the NeonMessageTag enum in pagestore_client.h // // TODO: consider using protobuf or serde bincode for less error prone // serialization. let msg_tag = body.read_u8()?; - - // these two fields are the same for every request type - let request_lsn = Lsn::from(body.read_u64::()?); - let not_modified_since = Lsn::from(body.read_u64::()?); + let (reqid, request_lsn, not_modified_since) = match protocol_version { + PagestreamProtocolVersion::V2 => ( + 0, + Lsn::from(body.read_u64::()?), + Lsn::from(body.read_u64::()?), + ), + PagestreamProtocolVersion::V3 => ( + body.read_u64::()?, + Lsn::from(body.read_u64::()?), + Lsn::from(body.read_u64::()?), + ), + }; match msg_tag { 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { + reqid, request_lsn, not_modified_since, rel: RelTag { @@ -1623,6 +1673,7 @@ impl PagestreamFeMessage { }, })), 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { + reqid, request_lsn, not_modified_since, rel: RelTag { @@ -1633,6 +1684,7 @@ impl PagestreamFeMessage { }, })), 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + reqid, request_lsn, not_modified_since, rel: RelTag { @@ -1644,12 +1696,14 @@ impl PagestreamFeMessage { blkno: body.read_u32::()?, })), 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { + reqid, request_lsn, not_modified_since, dbnode: body.read_u32::()?, })), 4 => Ok(PagestreamFeMessage::GetSlruSegment( PagestreamGetSlruSegmentRequest { + reqid, request_lsn, not_modified_since, kind: body.read_u8()?, @@ -1662,43 +1716,114 @@ impl PagestreamFeMessage { } impl PagestreamBeMessage { - pub fn serialize(&self) -> Bytes { + pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes { let mut bytes = BytesMut::new(); use PagestreamBeMessageTag as Tag; - match self { - Self::Exists(resp) => { - bytes.put_u8(Tag::Exists as u8); - bytes.put_u8(resp.exists as u8); - } - - Self::Nblocks(resp) => { - bytes.put_u8(Tag::Nblocks as u8); - bytes.put_u32(resp.n_blocks); - } - - Self::GetPage(resp) => { - bytes.put_u8(Tag::GetPage as u8); - bytes.put(&resp.page[..]); - } - - Self::Error(resp) => { - bytes.put_u8(Tag::Error as u8); - bytes.put(resp.message.as_bytes()); - bytes.put_u8(0); // null terminator - } - Self::DbSize(resp) => { - bytes.put_u8(Tag::DbSize as u8); - bytes.put_i64(resp.db_size); + match protocol_version { + PagestreamProtocolVersion::V2 => { + match self { + Self::Exists(resp) => { + bytes.put_u8(Tag::Exists as u8); + bytes.put_u8(resp.exists as u8); + } + + Self::Nblocks(resp) => { + bytes.put_u8(Tag::Nblocks as u8); + bytes.put_u32(resp.n_blocks); + } + + Self::GetPage(resp) => { + bytes.put_u8(Tag::GetPage as u8); + bytes.put(&resp.page[..]) + } + + Self::Error(resp) => { + bytes.put_u8(Tag::Error as u8); + bytes.put(resp.message.as_bytes()); + bytes.put_u8(0); // null terminator + } + Self::DbSize(resp) => { + bytes.put_u8(Tag::DbSize as u8); + bytes.put_i64(resp.db_size); + } + + Self::GetSlruSegment(resp) => { + bytes.put_u8(Tag::GetSlruSegment as u8); + bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); + bytes.put(&resp.segment[..]); + } + } } - - Self::GetSlruSegment(resp) => { - bytes.put_u8(Tag::GetSlruSegment as u8); - bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); - bytes.put(&resp.segment[..]); + _ => { + match self { + Self::Exists(resp) => { + bytes.put_u8(Tag::Exists as u8); + bytes.put_u64(resp.reqid); + bytes.put_u64(resp.request_lsn.0); + bytes.put_u64(resp.not_modified_since.0); + bytes.put_u32(resp.rel.spcnode); + bytes.put_u32(resp.rel.dbnode); + bytes.put_u32(resp.rel.relnode); + bytes.put_u8(resp.rel.forknum); + bytes.put_u8(resp.exists as u8); + } + + Self::Nblocks(resp) => { + bytes.put_u8(Tag::Nblocks as u8); + bytes.put_u64(resp.reqid); + bytes.put_u64(resp.request_lsn.0); + bytes.put_u64(resp.not_modified_since.0); + bytes.put_u32(resp.rel.spcnode); + bytes.put_u32(resp.rel.dbnode); + bytes.put_u32(resp.rel.relnode); + bytes.put_u8(resp.rel.forknum); + bytes.put_u32(resp.n_blocks); + } + + Self::GetPage(resp) => { + bytes.put_u8(Tag::GetPage as u8); + bytes.put_u64(resp.reqid); + bytes.put_u64(resp.request_lsn.0); + bytes.put_u64(resp.not_modified_since.0); + bytes.put_u32(resp.rel.spcnode); + bytes.put_u32(resp.rel.dbnode); + bytes.put_u32(resp.rel.relnode); + bytes.put_u8(resp.rel.forknum); + bytes.put_u32(resp.blkno); + bytes.put(&resp.page[..]) + } + + Self::Error(resp) => { + bytes.put_u8(Tag::Error as u8); + bytes.put_u64(resp.reqid); + bytes.put_u64(resp.request_lsn.0); + bytes.put_u64(resp.not_modified_since.0); + bytes.put(resp.message.as_bytes()); + bytes.put_u8(0); // null terminator + } + Self::DbSize(resp) => { + bytes.put_u8(Tag::DbSize as u8); + bytes.put_u64(resp.reqid); + bytes.put_u64(resp.request_lsn.0); + bytes.put_u64(resp.not_modified_since.0); + bytes.put_u32(resp.db_node); + bytes.put_i64(resp.db_size); + } + + Self::GetSlruSegment(resp) => { + bytes.put_u8(Tag::GetSlruSegment as u8); + bytes.put_u64(resp.reqid); + bytes.put_u64(resp.request_lsn.0); + bytes.put_u64(resp.not_modified_since.0); + bytes.put_u8(resp.kind); + bytes.put_u32(resp.segno); + bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); + bytes.put(&resp.segment[..]); + } + } } } - bytes.into() } @@ -1710,38 +1835,109 @@ impl PagestreamBeMessage { let ok = match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? { Tag::Exists => { - let exists = buf.read_u8()?; + let reqid = buf.read_u64::()?; + let request_lsn = Lsn(buf.read_u64::()?); + let not_modified_since = Lsn(buf.read_u64::()?); + let rel = RelTag { + spcnode: buf.read_u32::()?, + dbnode: buf.read_u32::()?, + relnode: buf.read_u32::()?, + forknum: buf.read_u8()?, + }; + let exists = buf.read_u8()? != 0; Self::Exists(PagestreamExistsResponse { - exists: exists != 0, + reqid, + request_lsn, + not_modified_since, + rel, + exists, }) } Tag::Nblocks => { + let reqid = buf.read_u64::()?; + let request_lsn = Lsn(buf.read_u64::()?); + let not_modified_since = Lsn(buf.read_u64::()?); + let rel = RelTag { + spcnode: buf.read_u32::()?, + dbnode: buf.read_u32::()?, + relnode: buf.read_u32::()?, + forknum: buf.read_u8()?, + }; let n_blocks = buf.read_u32::()?; - Self::Nblocks(PagestreamNblocksResponse { n_blocks }) + Self::Nblocks(PagestreamNblocksResponse { + reqid, + request_lsn, + not_modified_since, + rel, + n_blocks, + }) } Tag::GetPage => { + let reqid = buf.read_u64::()?; + let request_lsn = Lsn(buf.read_u64::()?); + let not_modified_since = Lsn(buf.read_u64::()?); + let rel = RelTag { + spcnode: buf.read_u32::()?, + dbnode: buf.read_u32::()?, + relnode: buf.read_u32::()?, + forknum: buf.read_u8()?, + }; + let blkno = buf.read_u32::()?; let mut page = vec![0; 8192]; // TODO: use MaybeUninit buf.read_exact(&mut page)?; - PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() }) + Self::GetPage(PagestreamGetPageResponse { + reqid, + request_lsn, + not_modified_since, + rel, + blkno, + page: page.into(), + }) } Tag::Error => { + let reqid = buf.read_u64::()?; + let request_lsn = Lsn(buf.read_u64::()?); + let not_modified_since = Lsn(buf.read_u64::()?); let mut msg = Vec::new(); buf.read_until(0, &mut msg)?; let cstring = std::ffi::CString::from_vec_with_nul(msg)?; let rust_str = cstring.to_str()?; - PagestreamBeMessage::Error(PagestreamErrorResponse { + Self::Error(PagestreamErrorResponse { + reqid, + request_lsn, + not_modified_since, message: rust_str.to_owned(), }) } Tag::DbSize => { + let reqid = buf.read_u64::()?; + let request_lsn = Lsn(buf.read_u64::()?); + let not_modified_since = Lsn(buf.read_u64::()?); + let db_node = buf.read_u32::()?; let db_size = buf.read_i64::()?; - Self::DbSize(PagestreamDbSizeResponse { db_size }) + Self::DbSize(PagestreamDbSizeResponse { + reqid, + request_lsn, + not_modified_since, + db_node, + db_size, + }) } Tag::GetSlruSegment => { + let reqid = buf.read_u64::()?; + let request_lsn = Lsn(buf.read_u64::()?); + let not_modified_since = Lsn(buf.read_u64::()?); + let kind = buf.read_u8()?; + let segno = buf.read_u32::()?; let n_blocks = buf.read_u32::()?; let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize]; buf.read_exact(&mut segment)?; Self::GetSlruSegment(PagestreamGetSlruSegmentResponse { + reqid, + request_lsn, + not_modified_since, + kind, + segno, segment: segment.into(), }) } @@ -1780,6 +1976,7 @@ mod tests { // Test serialization/deserialization of PagestreamFeMessage let messages = vec![ PagestreamFeMessage::Exists(PagestreamExistsRequest { + reqid: 0, request_lsn: Lsn(4), not_modified_since: Lsn(3), rel: RelTag { @@ -1790,6 +1987,7 @@ mod tests { }, }), PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { + reqid: 0, request_lsn: Lsn(4), not_modified_since: Lsn(4), rel: RelTag { @@ -1800,6 +1998,7 @@ mod tests { }, }), PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + reqid: 0, request_lsn: Lsn(4), not_modified_since: Lsn(3), rel: RelTag { @@ -1811,6 +2010,7 @@ mod tests { blkno: 7, }), PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { + reqid: 0, request_lsn: Lsn(4), not_modified_since: Lsn(3), dbnode: 7, diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index f9507fc47a3a..207ec4166cb8 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -60,7 +60,7 @@ impl Client { ) -> anyhow::Result { let copy_both: tokio_postgres::CopyBothDuplex = self .client - .copy_both_simple(&format!("pagestream_v2 {tenant_id} {timeline_id}")) + .copy_both_simple(&format!("pagestream_v3 {tenant_id} {timeline_id}")) .await?; let Client { cancel_on_client_drop, diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index b2df01714d31..910708c04a80 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -322,6 +322,7 @@ async fn main_impl( .to_rel_block() .expect("we filter non-rel-block keys out above"); PagestreamGetPageRequest { + reqid: 0, request_lsn: if rng.gen_bool(args.req_latest_probability) { Lsn::MAX } else { diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index bdbabf3f7511..9a3bd7f931e0 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1845,6 +1845,7 @@ pub(crate) static LIVE_CONNECTIONS: Lazy = Lazy::new(|| { #[derive(Clone, Copy, enum_map::Enum, IntoStaticStr)] pub(crate) enum ComputeCommandKind { + PageStreamV3, PageStreamV2, Basebackup, Fullbackup, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d00ec11a7611..105d4357e77e 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -17,7 +17,7 @@ use pageserver_api::models::{ PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse, - PagestreamProtocolVersion, + PagestreamProtocolVersion, RequestId, }; use pageserver_api::shard::TenantShardId; use postgres_backend::{ @@ -537,6 +537,29 @@ impl From for QueryError { } } +#[derive(thiserror::Error, Debug)] +struct BatchedPageStreamError { + err: PageStreamError, + reqid: RequestId, + request_lsn: Lsn, + not_modified_since: Lsn, +} + +impl std::fmt::Display for BatchedPageStreamError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.err.fmt(f) + } +} + +struct BatchedGetPageRequest { + rel: RelTag, + blkno: BlockNumber, + reqid: RequestId, + request_lsn: Lsn, + not_modified_since: Lsn, + timer: SmgrOpTimer, +} + enum BatchedFeMessage { Exists { span: Span, @@ -554,7 +577,7 @@ enum BatchedFeMessage { span: Span, shard: timeline::handle::Handle, effective_request_lsn: Lsn, - pages: smallvec::SmallVec<[(RelTag, BlockNumber, SmgrOpTimer); 1]>, + pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>, }, DbSize { span: Span, @@ -570,7 +593,7 @@ enum BatchedFeMessage { }, RespondError { span: Span, - error: PageStreamError, + error: BatchedPageStreamError, }, } @@ -595,7 +618,7 @@ impl BatchedFeMessage { BatchedFeMessage::GetPage { shard, pages, .. } => ( shard, pages.len(), - itertools::Either::Right(pages.iter_mut().map(|(_, _, timer)| timer)), + itertools::Either::Right(pages.iter_mut().map(|p| &mut p.timer)), ), BatchedFeMessage::RespondError { .. } => return Ok(()), }; @@ -661,6 +684,7 @@ impl PageServerHandler { timeline_handles: &mut TimelineHandles, cancel: &CancellationToken, ctx: &RequestContext, + protocol_version: PagestreamProtocolVersion, parent_span: Span, ) -> Result, QueryError> where @@ -695,7 +719,8 @@ impl PageServerHandler { fail::fail_point!("ps::handle-pagerequest-message"); // parse request - let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + let neon_fe_msg = + PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?; let batched_msg = match neon_fe_msg { PagestreamFeMessage::Exists(req) => { @@ -763,6 +788,7 @@ impl PageServerHandler { } } PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + reqid, request_lsn, not_modified_since, rel, @@ -774,7 +800,12 @@ impl PageServerHandler { ($error:expr) => {{ let error = BatchedFeMessage::RespondError { span, - error: $error, + error: BatchedPageStreamError { + err: $error, + reqid, + request_lsn, + not_modified_since, + }, }; Ok(Some(error)) }}; @@ -831,7 +862,14 @@ impl PageServerHandler { span, shard, effective_request_lsn, - pages: smallvec::smallvec![(rel, blkno, timer)], + pages: smallvec::smallvec![BatchedGetPageRequest { + rel, + blkno, + reqid, + request_lsn, + not_modified_since, + timer + }], } } }; @@ -910,6 +948,7 @@ impl PageServerHandler { pgb_writer: &mut PostgresBackend, batch: BatchedFeMessage, cancel: &CancellationToken, + protocol_version: PagestreamProtocolVersion, ctx: &RequestContext, ) -> Result<(), QueryError> where @@ -917,7 +956,7 @@ impl PageServerHandler { { // invoke handler function let (handler_results, span): ( - Vec>, + Vec>, _, ) = match batch { BatchedFeMessage::Exists { @@ -932,7 +971,13 @@ impl PageServerHandler { .handle_get_rel_exists_request(&shard, &req, ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer))], + .map(|msg| (msg, timer)) + .map_err(|err| BatchedPageStreamError { + err, + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + })], span, ) } @@ -948,7 +993,13 @@ impl PageServerHandler { .handle_get_nblocks_request(&shard, &req, ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer))], + .map(|msg| (msg, timer)) + .map_err(|err| BatchedPageStreamError { + err, + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + })], span, ) } @@ -990,7 +1041,13 @@ impl PageServerHandler { .handle_db_size_request(&shard, &req, ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer))], + .map(|msg| (msg, timer)) + .map_err(|err| BatchedPageStreamError { + err, + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + })], span, ) } @@ -1006,7 +1063,13 @@ impl PageServerHandler { .handle_get_slru_segment_request(&shard, &req, ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer))], + .map(|msg| (msg, timer)) + .map_err(|err| BatchedPageStreamError { + err, + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + })], span, ) } @@ -1022,7 +1085,7 @@ impl PageServerHandler { // Other handler errors are sent back as an error message and we stay in pagestream protocol. for handler_result in handler_results { let (response_msg, timer) = match handler_result { - Err(e) => match &e { + Err(e) => match &e.err { PageStreamError::Shutdown => { // If we fail to fulfil a request during shutdown, which may be _because_ of // shutdown, then do not send the error to the client. Instead just drop the @@ -1047,7 +1110,10 @@ impl PageServerHandler { }); ( PagestreamBeMessage::Error(PagestreamErrorResponse { - message: e.to_string(), + reqid: e.reqid, + request_lsn: e.request_lsn, + not_modified_since: e.not_modified_since, + message: e.err.to_string(), }), None, // TODO: measure errors ) @@ -1060,7 +1126,9 @@ impl PageServerHandler { // marshal & transmit response message // - pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; + pgb_writer.write_message_noflush(&BeMessage::CopyData( + &response_msg.serialize(protocol_version), + ))?; // We purposefully don't count flush time into the timer. // @@ -1123,7 +1191,7 @@ impl PageServerHandler { pgb: &mut PostgresBackend, tenant_id: TenantId, timeline_id: TimelineId, - _protocol_version: PagestreamProtocolVersion, + protocol_version: PagestreamProtocolVersion, ctx: RequestContext, ) -> Result<(), QueryError> where @@ -1163,6 +1231,7 @@ impl PageServerHandler { timeline_handles, request_span, pipelining_config, + protocol_version, &ctx, ) .await @@ -1175,6 +1244,7 @@ impl PageServerHandler { timeline_id, timeline_handles, request_span, + protocol_version, &ctx, ) .await @@ -1201,6 +1271,7 @@ impl PageServerHandler { timeline_id: TimelineId, mut timeline_handles: TimelineHandles, request_span: Span, + protocol_version: PagestreamProtocolVersion, ctx: &RequestContext, ) -> ( (PostgresBackendReader, TimelineHandles), @@ -1218,6 +1289,7 @@ impl PageServerHandler { &mut timeline_handles, &cancel, ctx, + protocol_version, request_span.clone(), ) .await; @@ -1238,7 +1310,7 @@ impl PageServerHandler { } let err = self - .pagesteam_handle_batched_message(pgb_writer, msg, &cancel, ctx) + .pagesteam_handle_batched_message(pgb_writer, msg, &cancel, protocol_version, ctx) .await; match err { Ok(()) => {} @@ -1261,6 +1333,7 @@ impl PageServerHandler { mut timeline_handles: TimelineHandles, request_span: Span, pipelining_config: PageServicePipeliningConfigPipelined, + protocol_version: PagestreamProtocolVersion, ctx: &RequestContext, ) -> ( (PostgresBackendReader, TimelineHandles), @@ -1358,6 +1431,7 @@ impl PageServerHandler { &mut timeline_handles, &cancel_batcher, &ctx, + protocol_version, request_span.clone(), ) .await; @@ -1403,8 +1477,14 @@ impl PageServerHandler { batch .throttle_and_record_start_processing(&self.cancel) .await?; - self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx) - .await?; + self.pagesteam_handle_batched_message( + pgb_writer, + batch, + &cancel, + protocol_version, + &ctx, + ) + .await?; } } }); @@ -1590,6 +1670,10 @@ impl PageServerHandler { .await?; Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + rel: req.rel, exists, })) } @@ -1616,6 +1700,10 @@ impl PageServerHandler { .await?; Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + rel: req.rel, n_blocks, })) } @@ -1643,6 +1731,10 @@ impl PageServerHandler { let db_size = total_blocks as i64 * BLCKSZ as i64; Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse { + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + db_node: req.dbnode, db_size, })) } @@ -1652,9 +1744,9 @@ impl PageServerHandler { &mut self, timeline: &Timeline, effective_lsn: Lsn, - requests: smallvec::SmallVec<[(RelTag, BlockNumber, SmgrOpTimer); 1]>, + requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>, ctx: &RequestContext, - ) -> Vec> { + ) -> Vec> { debug_assert_current_span_has_tenant_and_timeline_id(); timeline @@ -1663,7 +1755,7 @@ impl PageServerHandler { let results = timeline .get_rel_page_at_lsn_batched( - requests.iter().map(|(reltag, blkno, _)| (reltag, blkno)), + requests.iter().map(|p| (&p.rel, &p.blkno)), effective_lsn, ctx, ) @@ -1675,16 +1767,26 @@ impl PageServerHandler { requests .into_iter() .zip(results.into_iter()) - .map(|((_, _, timer), res)| { + .map(|(req, res)| { res.map(|page| { ( PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + rel: req.rel, + blkno: req.blkno, page, }), - timer, + req.timer, ) }) - .map_err(PageStreamError::from) + .map_err(|e| BatchedPageStreamError { + err: PageStreamError::from(e), + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + }) }), ) } @@ -1711,7 +1813,14 @@ impl PageServerHandler { let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?; Ok(PagestreamBeMessage::GetSlruSegment( - PagestreamGetSlruSegmentResponse { segment }, + PagestreamGetSlruSegmentResponse { + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + kind: req.kind, + segno: req.segno, + segment, + }, )) } @@ -1906,6 +2015,7 @@ struct FullBackupCmd { struct PageStreamCmd { tenant_id: TenantId, timeline_id: TimelineId, + protocol_version: PagestreamProtocolVersion, } /// `lease lsn tenant timeline lsn` @@ -1926,7 +2036,7 @@ enum PageServiceCmd { } impl PageStreamCmd { - fn parse(query: &str) -> anyhow::Result { + fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result { let parameters = query.split_whitespace().collect_vec(); if parameters.len() != 2 { bail!( @@ -1941,6 +2051,7 @@ impl PageStreamCmd { Ok(Self { tenant_id, timeline_id, + protocol_version, }) } } @@ -2078,7 +2189,14 @@ impl PageServiceCmd { bail!("cannot parse query: {query}") }; match cmd.to_ascii_lowercase().as_str() { - "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(other)?)), + "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse( + other, + PagestreamProtocolVersion::V2, + )?)), + "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse( + other, + PagestreamProtocolVersion::V3, + )?)), "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)), "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)), "lease" => { @@ -2160,25 +2278,21 @@ where PageServiceCmd::PageStream(PageStreamCmd { tenant_id, timeline_id, + protocol_version, }) => { tracing::Span::current() .record("tenant_id", field::display(tenant_id)) .record("timeline_id", field::display(timeline_id)); self.check_permission(Some(tenant_id))?; + let command_kind = match protocol_version { + PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2, + PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3, + }; + COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc(); - COMPUTE_COMMANDS_COUNTERS - .for_command(ComputeCommandKind::PageStreamV2) - .inc(); - - self.handle_pagerequests( - pgb, - tenant_id, - timeline_id, - PagestreamProtocolVersion::V2, - ctx, - ) - .await?; + self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx) + .await?; } PageServiceCmd::BaseBackup(BaseBackupCmd { tenant_id, @@ -2357,7 +2471,8 @@ mod tests { cmd, PageServiceCmd::PageStream(PageStreamCmd { tenant_id, - timeline_id + timeline_id, + PagestreamProtocolVersion::V2, }) ); let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap(); diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 88d0a5292bf7..c21f07b1d7d7 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -556,6 +556,9 @@ pageserver_connect(shardno_t shard_no, int elevel) switch (neon_protocol_version) { + case 3: + pagestream_query = psprintf("pagestream_v3 %s %s", neon_tenant, neon_timeline); + break; case 2: pagestream_query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline); break; @@ -1137,7 +1140,7 @@ pg_init_libpagestore(void) &neon_protocol_version, 2, /* use protocol version 2 */ 2, /* min */ - 2, /* max */ + 3, /* max */ PGC_SU_BACKEND, 0, /* no flags required */ NULL, NULL, NULL); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index f905e3b0faa3..de1a22b690d3 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -44,10 +44,15 @@ typedef enum T_NeonGetSlruSegmentResponse, } NeonMessageTag; +typedef uint64 NeonRequestId; + /* base struct for c-style inheritance */ typedef struct { - NeonMessageTag tag; + NeonMessageTag tag; + NeonRequestId reqid; + XLogRecPtr lsn; + XLogRecPtr not_modified_since; } NeonMessage; #define messageTag(m) (((const NeonMessage *)(m))->tag) @@ -67,6 +72,7 @@ typedef enum { SLRU_MULTIXACT_OFFSETS } SlruKind; + /*-- * supertype of all the Neon*Request structs below. * @@ -88,92 +94,94 @@ typedef enum { * These structs describe the V2 of these requests. (The old now-defunct V1 * protocol contained just one LSN and a boolean 'latest' flag.) */ -typedef struct -{ - NeonMessageTag tag; - XLogRecPtr lsn; - XLogRecPtr not_modified_since; -} NeonRequest; +typedef NeonMessage NeonRequest; typedef struct { - NeonRequest req; - NRelFileInfo rinfo; - ForkNumber forknum; + NeonRequest req; + NRelFileInfo rinfo; + ForkNumber forknum; } NeonExistsRequest; typedef struct { - NeonRequest req; - NRelFileInfo rinfo; - ForkNumber forknum; + NeonRequest req; + NRelFileInfo rinfo; + ForkNumber forknum; } NeonNblocksRequest; typedef struct { - NeonRequest req; - Oid dbNode; + NeonRequest req; + Oid dbNode; } NeonDbSizeRequest; typedef struct { - NeonRequest req; - NRelFileInfo rinfo; - ForkNumber forknum; - BlockNumber blkno; + NeonRequest req; + NRelFileInfo rinfo; + ForkNumber forknum; + BlockNumber blkno; } NeonGetPageRequest; typedef struct { - NeonRequest req; - SlruKind kind; - int segno; + NeonRequest req; + SlruKind kind; + int segno; } NeonGetSlruSegmentRequest; /* supertype of all the Neon*Response structs below */ -typedef struct -{ - NeonMessageTag tag; -} NeonResponse; +typedef NeonMessage NeonResponse; typedef struct { - NeonMessageTag tag; - bool exists; + NeonResponse resp; + NRelFileInfo rinfo; + ForkNumber forknum; + bool exists; } NeonExistsResponse; typedef struct { - NeonMessageTag tag; - uint32 n_blocks; + NeonResponse resp; + NRelFileInfo rinfo; + ForkNumber forknum; + uint32 n_blocks; } NeonNblocksResponse; typedef struct { - NeonMessageTag tag; - char page[FLEXIBLE_ARRAY_MEMBER]; + NeonResponse resp; + NRelFileInfo rinfo; + ForkNumber forknum; + BlockNumber blkno; + char page[FLEXIBLE_ARRAY_MEMBER]; } NeonGetPageResponse; #define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ)) typedef struct { - NeonMessageTag tag; - int64 db_size; + NeonResponse resp; + Oid dbNode; + int64 db_size; } NeonDbSizeResponse; typedef struct { - NeonMessageTag tag; - char message[FLEXIBLE_ARRAY_MEMBER]; /* null-terminated error - * message */ + NeonResponse resp; + char message[FLEXIBLE_ARRAY_MEMBER]; /* null-terminated error + * message */ } NeonErrorResponse; typedef struct { - NeonMessageTag tag; - int n_blocks; - char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT]; + NeonResponse resp; + SlruKind kind; + int segno; + int n_blocks; + char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT]; } NeonGetSlruSegmentResponse; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 385905d9cee9..9ff8154861a2 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -120,6 +120,9 @@ static bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block static BlockNumber neon_nblocks(SMgrRelation reln, ForkNumber forknum); +static uint32 local_request_counter; +#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter) + /* * Prefetch implementation: * @@ -188,15 +191,11 @@ typedef struct PrefetchRequest uint8 status; /* see PrefetchStatus for valid values */ uint8 flags; /* see PrefetchRequestFlags */ neon_request_lsns request_lsns; + NeonRequestId reqid; NeonResponse *response; /* may be null */ uint64 my_ring_index; } PrefetchRequest; -StaticAssertDecl(sizeof(PrefetchRequest) == 64, - "We prefer to have a power-of-2 size for this struct. Please" - " try to find an alternative solution before reaching to" - " increase the expected size here"); - /* prefetch buffer lookup hash table */ typedef struct PrfHashEntry @@ -797,6 +796,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns NeonGetPageRequest request = { .req.tag = T_NeonGetPageRequest, + .req.reqid = GENERATE_REQUEST_ID(), /* lsn and not_modified_since are filled in below */ .rinfo = BufTagGetNRelFileInfo(slot->buftag), .forknum = slot->buftag.forkNum, @@ -805,6 +805,8 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns Assert(mySlotNo == MyPState->ring_unused); + slot->reqid = request.req.reqid; + if (force_request_lsns) slot->request_lsns = *force_request_lsns; else @@ -1177,6 +1179,10 @@ nm_pack_request(NeonRequest *msg) initStringInfo(&s); pq_sendbyte(&s, msg->tag); + if (neon_protocol_version >= 3) + { + pq_sendint64(&s, msg->reqid); + } pq_sendint64(&s, msg->lsn); pq_sendint64(&s, msg->not_modified_since); @@ -1254,8 +1260,16 @@ NeonResponse * nm_unpack_response(StringInfo s) { NeonMessageTag tag = pq_getmsgbyte(s); + NeonResponse resp_hdr = {0}; /* make valgrind happy */ NeonResponse *resp = NULL; + resp_hdr.tag = tag; + if (neon_protocol_version >= 3) + { + resp_hdr.reqid = pq_getmsgint64(s); + resp_hdr.lsn = pq_getmsgint64(s); + resp_hdr.not_modified_since = pq_getmsgint64(s); + } switch (tag) { /* pagestore -> pagestore_client */ @@ -1263,7 +1277,14 @@ nm_unpack_response(StringInfo s) { NeonExistsResponse *msg_resp = palloc0(sizeof(NeonExistsResponse)); - msg_resp->tag = tag; + if (neon_protocol_version >= 3) + { + NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4); + NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4); + NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4); + msg_resp->forknum = pq_getmsgbyte(s); + } + msg_resp->resp = resp_hdr; msg_resp->exists = pq_getmsgbyte(s); pq_getmsgend(s); @@ -1275,7 +1296,14 @@ nm_unpack_response(StringInfo s) { NeonNblocksResponse *msg_resp = palloc0(sizeof(NeonNblocksResponse)); - msg_resp->tag = tag; + if (neon_protocol_version >= 3) + { + NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4); + NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4); + NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4); + msg_resp->forknum = pq_getmsgbyte(s); + } + msg_resp->resp = resp_hdr; msg_resp->n_blocks = pq_getmsgint(s, 4); pq_getmsgend(s); @@ -1288,12 +1316,20 @@ nm_unpack_response(StringInfo s) NeonGetPageResponse *msg_resp; msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE); - msg_resp->tag = tag; + if (neon_protocol_version >= 3) + { + NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4); + NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4); + NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4); + msg_resp->forknum = pq_getmsgbyte(s); + msg_resp->blkno = pq_getmsgint(s, 4); + } + msg_resp->resp = resp_hdr; /* XXX: should be varlena */ memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ); pq_getmsgend(s); - Assert(msg_resp->tag == T_NeonGetPageResponse); + Assert(msg_resp->resp.tag == T_NeonGetPageResponse); resp = (NeonResponse *) msg_resp; break; @@ -1303,7 +1339,11 @@ nm_unpack_response(StringInfo s) { NeonDbSizeResponse *msg_resp = palloc0(sizeof(NeonDbSizeResponse)); - msg_resp->tag = tag; + if (neon_protocol_version >= 3) + { + msg_resp->dbNode = pq_getmsgint(s, 4); + } + msg_resp->resp = resp_hdr; msg_resp->db_size = pq_getmsgint64(s); pq_getmsgend(s); @@ -1321,7 +1361,7 @@ nm_unpack_response(StringInfo s) msglen = strlen(msgtext); msg_resp = palloc0(sizeof(NeonErrorResponse) + msglen + 1); - msg_resp->tag = tag; + msg_resp->resp = resp_hdr; memcpy(msg_resp->message, msgtext, msglen + 1); pq_getmsgend(s); @@ -1332,9 +1372,17 @@ nm_unpack_response(StringInfo s) case T_NeonGetSlruSegmentResponse: { NeonGetSlruSegmentResponse *msg_resp; - int n_blocks = pq_getmsgint(s, 4); + int n_blocks; msg_resp = palloc(sizeof(NeonGetSlruSegmentResponse)); - msg_resp->tag = tag; + + if (neon_protocol_version >= 3) + { + msg_resp->kind = pq_getmsgbyte(s); + msg_resp->segno = pq_getmsgint(s, 4); + } + msg_resp->resp = resp_hdr; + + n_blocks = pq_getmsgint(s, 4); msg_resp->n_blocks = n_blocks; memcpy(msg_resp->data, pq_getmsgbytes(s, n_blocks * BLCKSZ), n_blocks * BLCKSZ); pq_getmsgend(s); @@ -2306,6 +2354,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) { NeonExistsRequest request = { .req.tag = T_NeonExistsRequest, + .req.reqid = GENERATE_REQUEST_ID(), .req.lsn = request_lsns.request_lsn, .req.not_modified_since = request_lsns.not_modified_since, .rinfo = InfoFromSMgrRel(reln), @@ -2313,31 +2362,59 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) }; resp = page_server_request(&request); - } - switch (resp->tag) - { - case T_NeonExistsResponse: - exists = ((NeonExistsResponse *) resp)->exists; - break; - - case T_NeonErrorResponse: - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forkNum, - LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; + switch (resp->tag) + { + case T_NeonExistsResponse: + { + NeonExistsResponse* exists_resp = (NeonExistsResponse *) resp; + if (neon_protocol_version >= 3) + { + if (resp->reqid != request.req.reqid || + resp->lsn != request.req.lsn || + resp->not_modified_since != request.req.not_modified_since || + RelFileInfoEquals(exists_resp->rinfo, request.rinfo) || + exists_resp->forknum != forkNum) + { + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(exists_resp->rinfo), exists_resp->forknum, + request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since), RelFileInfoFmt(request.rinfo), forkNum); + } + } + exists = exists_resp->exists; + break; + } + case T_NeonErrorResponse: + if (neon_protocol_version >= 3) + { + if (resp->reqid != request.req.reqid || + resp->lsn != request.req.lsn || + resp->not_modified_since != request.req.not_modified_since) + { + elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), + request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since)); + } + } + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg(NEON_TAG "[reqid %lx] could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", + resp->reqid, + RelFileInfoFmt(InfoFromSMgrRel(reln)), + forkNum, + LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), + errdetail("page server returned error: %s", + ((NeonErrorResponse *) resp)->message))); + break; - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x", - T_NeonExistsResponse, T_NeonErrorResponse, resp->tag); + default: + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x", + T_NeonExistsResponse, T_NeonErrorResponse, resp->tag); + } + pfree(resp); } - pfree(resp); return exists; } @@ -2945,15 +3022,43 @@ neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_block switch (resp->tag) { case T_NeonGetPageResponse: - memcpy(buffer, ((NeonGetPageResponse *) resp)->page, BLCKSZ); + { + NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp; + if (neon_protocol_version >= 3) + { + if (resp->reqid != slot->reqid || + resp->lsn != slot->request_lsns.request_lsn || + resp->not_modified_since != slot->request_lsns.not_modified_since || + RelFileInfoEquals(getpage_resp->rinfo, rinfo) || + getpage_resp->forknum != forkNum || + getpage_resp->blkno != base_blockno + i) + { + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->rinfo), getpage_resp->forknum, getpage_resp->blkno, + slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, base_blockno + i); + } + } + memcpy(buffer, getpage_resp->page, BLCKSZ); lfc_write(rinfo, forkNum, blockno, buffer); break; - + } case T_NeonErrorResponse: + if (neon_protocol_version >= 3) + { + if (resp->reqid != slot->reqid || + resp->lsn != slot->request_lsns.request_lsn || + resp->not_modified_since != slot->request_lsns.not_modified_since) + { + elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), + slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since)); + } + } ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[shard %d] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", - slot->shard_no, blockno, RelFileInfoFmt(rinfo), + errmsg(NEON_TAG "[shard %d, reqid %lx] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", + slot->shard_no, resp->reqid, blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(reqlsns->effective_request_lsn)), errdetail("page server returned error: %s", ((NeonErrorResponse *) resp)->message))); @@ -3437,6 +3542,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) { NeonNblocksRequest request = { .req.tag = T_NeonNblocksRequest, + .req.reqid = GENERATE_REQUEST_ID(), .req.lsn = request_lsns.request_lsn, .req.not_modified_since = request_lsns.not_modified_since, .rinfo = InfoFromSMgrRel(reln), @@ -3444,39 +3550,67 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) }; resp = page_server_request(&request); - } - - switch (resp->tag) - { - case T_NeonNblocksResponse: - n_blocks = ((NeonNblocksResponse *) resp)->n_blocks; - break; - case T_NeonErrorResponse: - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forknum, - LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; + switch (resp->tag) + { + case T_NeonNblocksResponse: + { + NeonNblocksResponse * relsize_resp = (NeonNblocksResponse *) resp; + if (neon_protocol_version >= 3) + { + if (resp->reqid != request.req.reqid || + resp->lsn != request.req.lsn || + resp->not_modified_since != request.req.not_modified_since || + RelFileInfoEquals(relsize_resp->rinfo, request.rinfo) || + relsize_resp->forknum != forknum) + { + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(relsize_resp->rinfo), relsize_resp->forknum, + request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since), RelFileInfoFmt(request.rinfo), forknum); + } + } + n_blocks = relsize_resp->n_blocks; + break; + } + case T_NeonErrorResponse: + if (neon_protocol_version >= 3) + { + if (resp->reqid != request.req.reqid || + resp->lsn != request.req.lsn || + resp->not_modified_since != request.req.not_modified_since) + { + elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), + request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since)); + } + } + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg(NEON_TAG "[reqid %lx] could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", + resp->reqid, + RelFileInfoFmt(InfoFromSMgrRel(reln)), + forknum, + LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), + errdetail("page server returned error: %s", + ((NeonErrorResponse *) resp)->message))); + break; - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x", - T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag); - } - update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks); + default: + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x", + T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag); + } + update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks); - neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks", - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forknum, - LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), - n_blocks); + neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks", + RelFileInfoFmt(InfoFromSMgrRel(reln)), + forknum, + LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), + n_blocks); - pfree(resp); + pfree(resp); + } return n_blocks; } @@ -3497,39 +3631,67 @@ neon_dbsize(Oid dbNode) { NeonDbSizeRequest request = { .req.tag = T_NeonDbSizeRequest, + .req.reqid = GENERATE_REQUEST_ID(), .req.lsn = request_lsns.request_lsn, .req.not_modified_since = request_lsns.not_modified_since, .dbNode = dbNode, }; resp = page_server_request(&request); - } - - switch (resp->tag) - { - case T_NeonDbSizeResponse: - db_size = ((NeonDbSizeResponse *) resp)->db_size; - break; - case T_NeonErrorResponse: - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "could not read db size of db %u from page server at lsn %X/%08X", - dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; + switch (resp->tag) + { + case T_NeonDbSizeResponse: + { + NeonDbSizeResponse* dbsize_resp = (NeonDbSizeResponse *) resp; + if (neon_protocol_version >= 3) + { + if (resp->reqid != request.req.reqid || + resp->lsn != request.req.lsn || + resp->not_modified_since != request.req.not_modified_since || + dbsize_resp->dbNode != dbNode) + { + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), dbsize_resp->dbNode, + request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since), dbNode); + } + } + db_size = dbsize_resp->db_size; + break; + } + case T_NeonErrorResponse: + if (neon_protocol_version >= 3) + { + if (resp->reqid != request.req.reqid || + resp->lsn != request.req.lsn || + resp->not_modified_since != request.req.not_modified_since) + { + elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), + request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since)); + } + } + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg(NEON_TAG "[reqid %lx] could not read db size of db %u from page server at lsn %X/%08X", + resp->reqid, + dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), + errdetail("page server returned error: %s", + ((NeonErrorResponse *) resp)->message))); + break; - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x", - T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag); - } + default: + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x", + T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag); + } - neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes", - dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), db_size); + neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes", + dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), db_size); - pfree(resp); + pfree(resp); + } return db_size; } @@ -3862,6 +4024,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf request = (NeonGetSlruSegmentRequest) { .req.tag = T_NeonGetSlruSegmentRequest, + .req.reqid = GENERATE_REQUEST_ID(), .req.lsn = request_lsn, .req.not_modified_since = not_modified_since, .kind = kind, @@ -3880,14 +4043,42 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf switch (resp->tag) { case T_NeonGetSlruSegmentResponse: - n_blocks = ((NeonGetSlruSegmentResponse *) resp)->n_blocks; - memcpy(buffer, ((NeonGetSlruSegmentResponse *) resp)->data, n_blocks*BLCKSZ); + { + NeonGetSlruSegmentResponse* slru_resp = (NeonGetSlruSegmentResponse *) resp; + if (neon_protocol_version >= 3) + { + if (resp->reqid != request.req.reqid || + resp->lsn != request.req.lsn || + resp->not_modified_since != request.req.not_modified_since || + slru_resp->kind != kind || + slru_resp->segno != segno) + { + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), slru_resp->kind, slru_resp->segno, + request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since), kind, segno); + } + } + n_blocks = slru_resp->n_blocks; + memcpy(buffer, slru_resp->data, n_blocks*BLCKSZ); break; - + } case T_NeonErrorResponse: + if (neon_protocol_version >= 3) + { + if (resp->reqid != request.req.reqid || + resp->lsn != request.req.lsn || + resp->not_modified_since != request.req.not_modified_since) + { + elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", + resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), + request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since)); + } + } ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "could not read SLRU %d segment %d at lsn %X/%08X", + errmsg(NEON_TAG "[reqid %lx] could not read SLRU %d segment %d at lsn %X/%08X", + resp->reqid, kind, segno, LSN_FORMAT_ARGS(request_lsn)), @@ -4028,6 +4219,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, NeonNblocksRequest request = { .req = (NeonRequest) { .tag = T_NeonNblocksRequest, + .reqid = GENERATE_REQUEST_ID(), .lsn = end_recptr, .not_modified_since = end_recptr, }, From e63ea8a75414c122d11e701405dc16b82637ee7f Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 13 Aug 2024 15:55:03 +0300 Subject: [PATCH 02/10] Add tag to all response messages --- pageserver/src/page_service.rs | 68 +++++++++++++++------------------- 1 file changed, 30 insertions(+), 38 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 105d4357e77e..fb6354d1e358 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -552,12 +552,12 @@ impl std::fmt::Display for BatchedPageStreamError { } struct BatchedGetPageRequest { - rel: RelTag, - blkno: BlockNumber, + rel: RelTag, + blkno: BlockNumber, reqid: RequestId, request_lsn: Lsn, not_modified_since: Lsn, - timer: SmgrOpTimer, + timer: SmgrOpTimer, } enum BatchedFeMessage { @@ -863,13 +863,8 @@ impl PageServerHandler { shard, effective_request_lsn, pages: smallvec::smallvec![BatchedGetPageRequest { - rel, - blkno, - reqid, - request_lsn, - not_modified_since, - timer - }], + rel, blkno, reqid, request_lsn, not_modified_since, timer + }], } } }; @@ -1232,7 +1227,7 @@ impl PageServerHandler { request_span, pipelining_config, protocol_version, - &ctx, + &ctx, ) .await } @@ -1245,7 +1240,7 @@ impl PageServerHandler { timeline_handles, request_span, protocol_version, - &ctx, + &ctx, ) .await } @@ -1763,32 +1758,29 @@ impl PageServerHandler { assert_eq!(results.len(), requests.len()); // TODO: avoid creating the new Vec here - Vec::from_iter( - requests - .into_iter() - .zip(results.into_iter()) - .map(|(req, res)| { - res.map(|page| { - ( - PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - rel: req.rel, - blkno: req.blkno, - page, - }), - req.timer, - ) - }) - .map_err(|e| BatchedPageStreamError { - err: PageStreamError::from(e), - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - }) - }), - ) + Vec::from_iter(requests.into_iter().zip(results.into_iter()).map( + |(req, res)| { + res.map(|page| { + ( + PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + rel: req.rel, + blkno: req.blkno, + page, + }), + req.timer, + ) + }) + .map_err(|e| BatchedPageStreamError { + err: PageStreamError::from(e), + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + }) + }, + )) } #[instrument(skip_all, fields(shard_id))] From 2ea6f7fee1f82166eacdfd2aaec85a301a83abf5 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 17 Dec 2024 17:24:27 +0200 Subject: [PATCH 03/10] Fix formatting --- pageserver/src/page_service.rs | 68 +++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index fb6354d1e358..105d4357e77e 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -552,12 +552,12 @@ impl std::fmt::Display for BatchedPageStreamError { } struct BatchedGetPageRequest { - rel: RelTag, - blkno: BlockNumber, + rel: RelTag, + blkno: BlockNumber, reqid: RequestId, request_lsn: Lsn, not_modified_since: Lsn, - timer: SmgrOpTimer, + timer: SmgrOpTimer, } enum BatchedFeMessage { @@ -863,8 +863,13 @@ impl PageServerHandler { shard, effective_request_lsn, pages: smallvec::smallvec![BatchedGetPageRequest { - rel, blkno, reqid, request_lsn, not_modified_since, timer - }], + rel, + blkno, + reqid, + request_lsn, + not_modified_since, + timer + }], } } }; @@ -1227,7 +1232,7 @@ impl PageServerHandler { request_span, pipelining_config, protocol_version, - &ctx, + &ctx, ) .await } @@ -1240,7 +1245,7 @@ impl PageServerHandler { timeline_handles, request_span, protocol_version, - &ctx, + &ctx, ) .await } @@ -1758,29 +1763,32 @@ impl PageServerHandler { assert_eq!(results.len(), requests.len()); // TODO: avoid creating the new Vec here - Vec::from_iter(requests.into_iter().zip(results.into_iter()).map( - |(req, res)| { - res.map(|page| { - ( - PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - rel: req.rel, - blkno: req.blkno, - page, - }), - req.timer, - ) - }) - .map_err(|e| BatchedPageStreamError { - err: PageStreamError::from(e), - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - }) - }, - )) + Vec::from_iter( + requests + .into_iter() + .zip(results.into_iter()) + .map(|(req, res)| { + res.map(|page| { + ( + PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + rel: req.rel, + blkno: req.blkno, + page, + }), + req.timer, + ) + }) + .map_err(|e| BatchedPageStreamError { + err: PageStreamError::from(e), + reqid: req.reqid, + request_lsn: req.request_lsn, + not_modified_since: req.not_modified_since, + }) + }), + ) } #[instrument(skip_all, fields(shard_id))] From a6417d675865e18efe3fb71ed4b93a7a01747710 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 18 Dec 2024 12:07:34 +0200 Subject: [PATCH 04/10] Fix test build --- libs/pageserver_api/src/models.rs | 4 +++- pageserver/src/page_service.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index c4b6a206f046..3d9f8f2159c4 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -2018,7 +2018,9 @@ mod tests { ]; for msg in messages { let bytes = msg.serialize(); - let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap(); + let reconstructed = + PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3) + .unwrap(); assert!(msg == reconstructed); } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 105d4357e77e..070e85cf8ecf 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -2472,7 +2472,7 @@ mod tests { PageServiceCmd::PageStream(PageStreamCmd { tenant_id, timeline_id, - PagestreamProtocolVersion::V2, + protocol_version: PagestreamProtocolVersion::V2, }) ); let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap(); From 657681dc4f55da1caa8e7311a7b360d42542150a Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 18 Dec 2024 14:48:18 +0200 Subject: [PATCH 05/10] Fix bugs in V3 checks --- pgxn/neon/pagestore_smgr.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 9ff8154861a2..ec13dd72e0b8 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -364,6 +364,7 @@ compact_prefetch_buffers(void) target_slot->shard_no = source_slot->shard_no; target_slot->status = source_slot->status; target_slot->response = source_slot->response; + target_slot->reqid = source_slot->reqid; target_slot->request_lsns = source_slot->request_lsns; target_slot->my_ring_index = empty_ring_index; @@ -2373,7 +2374,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) if (resp->reqid != request.req.reqid || resp->lsn != request.req.lsn || resp->not_modified_since != request.req.not_modified_since || - RelFileInfoEquals(exists_resp->rinfo, request.rinfo) || + !RelFileInfoEquals(exists_resp->rinfo, request.rinfo) || exists_resp->forknum != forkNum) { NEON_PANIC_CONNECTION_STATE(-1, PANIC, @@ -3029,7 +3030,7 @@ neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_block if (resp->reqid != slot->reqid || resp->lsn != slot->request_lsns.request_lsn || resp->not_modified_since != slot->request_lsns.not_modified_since || - RelFileInfoEquals(getpage_resp->rinfo, rinfo) || + !RelFileInfoEquals(getpage_resp->rinfo, rinfo) || getpage_resp->forknum != forkNum || getpage_resp->blkno != base_blockno + i) { @@ -3561,7 +3562,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) if (resp->reqid != request.req.reqid || resp->lsn != request.req.lsn || resp->not_modified_since != request.req.not_modified_since || - RelFileInfoEquals(relsize_resp->rinfo, request.rinfo) || + !RelFileInfoEquals(relsize_resp->rinfo, request.rinfo) || relsize_resp->forknum != forknum) { NEON_PANIC_CONNECTION_STATE(-1, PANIC, From 7951ad3462eb2b195797bf8b22dd49ddf73ab4ac Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 20 Dec 2024 13:40:25 +0200 Subject: [PATCH 06/10] Make clippyhappy --- pageserver/src/page_service.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 070e85cf8ecf..90c53b4c3394 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -677,6 +677,7 @@ impl PageServerHandler { ) } + #[allow(clippy::too_many_arguments)] async fn pagestream_read_message( pgb: &mut PostgresBackendReader, tenant_id: TenantId, From e416533e74e3087578a38dccdc2cbfc256351b2c Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 20 Dec 2024 14:18:19 +0200 Subject: [PATCH 07/10] Fix error reporting --- pageserver/src/page_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 90c53b4c3394..2a393c335c0f 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1105,7 +1105,7 @@ impl PageServerHandler { // print the all details to the log with {:#}, but for the client the // error message is enough. Do not log if shutting down, as the anyhow::Error // here includes cancellation which is not an error. - let full = utils::error::report_compact_sources(&e); + let full = utils::error::report_compact_sources(&e.err); span.in_scope(|| { error!("error reading relation or page version: {full:#}") }); From 1bc9c3bf7085f9b36a65f4552c2fbf89732eab90 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 21 Dec 2024 08:42:36 +0200 Subject: [PATCH 08/10] Use shorter heartbeat interval for test_timeline_archival_chaos --- test_runner/regress/test_timeline_archive.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py index 87579f9e9280..eaae7df3dc27 100644 --- a/test_runner/regress/test_timeline_archive.py +++ b/test_runner/regress/test_timeline_archive.py @@ -398,6 +398,9 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder): # Offloading is off by default at time of writing: remove this line when it's on by default neon_env_builder.pageserver_config_override = "timeline_offloading = true" + neon_env_builder.storage_controller_config = { + "heartbeat_interval" : "100msec" + } neon_env_builder.enable_pageserver_remote_storage(s3_storage()) # We will exercise migrations, so need multiple pageservers From 2a00b1a9b3bd3d0df0ff97d991c16702066a980d Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 21 Dec 2024 21:28:57 +0200 Subject: [PATCH 09/10] Make ruff happy --- test_runner/regress/test_timeline_archive.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py index eaae7df3dc27..9b3a48add90b 100644 --- a/test_runner/regress/test_timeline_archive.py +++ b/test_runner/regress/test_timeline_archive.py @@ -398,9 +398,7 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder): # Offloading is off by default at time of writing: remove this line when it's on by default neon_env_builder.pageserver_config_override = "timeline_offloading = true" - neon_env_builder.storage_controller_config = { - "heartbeat_interval" : "100msec" - } + neon_env_builder.storage_controller_config = {"heartbeat_interval": "100msec"} neon_env_builder.enable_pageserver_remote_storage(s3_storage()) # We will exercise migrations, so need multiple pageservers From 58b6120fe2ac5b75d9a93e01e73051d739862309 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 24 Dec 2024 09:55:14 +0200 Subject: [PATCH 10/10] Unmdo unrelated changes --- libs/pageserver_api/src/models.rs | 290 ++----------- pageserver/client/src/page_service.rs | 2 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 1 - pageserver/src/metrics.rs | 1 - pageserver/src/page_service.rs | 200 ++------- pgxn/neon/libpagestore.c | 5 +- pgxn/neon/pagestore_client.h | 88 ++-- pgxn/neon/pagestore_smgr.c | 383 +++++------------- 8 files changed, 223 insertions(+), 747 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 3d9f8f2159c4..f3fc9fad760a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1460,17 +1460,13 @@ impl TryFrom for PagestreamBeMessageTag { // interface allows sending both LSNs, and let the pageserver do the right thing. There was no // difference in the responses between V1 and V2. // -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Clone, Copy)] pub enum PagestreamProtocolVersion { V2, - V3, } -pub type RequestId = u64; - #[derive(Debug, PartialEq, Eq)] pub struct PagestreamExistsRequest { - pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub rel: RelTag, @@ -1478,7 +1474,6 @@ pub struct PagestreamExistsRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamNblocksRequest { - pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub rel: RelTag, @@ -1486,7 +1481,6 @@ pub struct PagestreamNblocksRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamGetPageRequest { - pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub rel: RelTag, @@ -1495,7 +1489,6 @@ pub struct PagestreamGetPageRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamDbSizeRequest { - pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub dbnode: u32, @@ -1503,7 +1496,6 @@ pub struct PagestreamDbSizeRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamGetSlruSegmentRequest { - pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub kind: u8, @@ -1512,56 +1504,31 @@ pub struct PagestreamGetSlruSegmentRequest { #[derive(Debug)] pub struct PagestreamExistsResponse { - pub reqid: RequestId, - pub request_lsn: Lsn, - pub not_modified_since: Lsn, - pub rel: RelTag, pub exists: bool, } #[derive(Debug)] pub struct PagestreamNblocksResponse { - pub reqid: RequestId, - pub request_lsn: Lsn, - pub not_modified_since: Lsn, - pub rel: RelTag, pub n_blocks: u32, } #[derive(Debug)] pub struct PagestreamGetPageResponse { - pub reqid: RequestId, - pub request_lsn: Lsn, - pub not_modified_since: Lsn, - pub rel: RelTag, - pub blkno: u32, pub page: Bytes, } #[derive(Debug)] pub struct PagestreamGetSlruSegmentResponse { - pub reqid: RequestId, - pub request_lsn: Lsn, - pub not_modified_since: Lsn, - pub kind: u8, - pub segno: u32, pub segment: Bytes, } #[derive(Debug)] pub struct PagestreamErrorResponse { - pub reqid: RequestId, - pub request_lsn: Lsn, - pub not_modified_since: Lsn, pub message: String, } #[derive(Debug)] pub struct PagestreamDbSizeResponse { - pub reqid: RequestId, - pub request_lsn: Lsn, - pub not_modified_since: Lsn, - pub db_node: u32, pub db_size: i64, } @@ -1585,7 +1552,6 @@ impl PagestreamFeMessage { match self { Self::Exists(req) => { bytes.put_u8(0); - bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u32(req.rel.spcnode); @@ -1596,7 +1562,6 @@ impl PagestreamFeMessage { Self::Nblocks(req) => { bytes.put_u8(1); - bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u32(req.rel.spcnode); @@ -1607,7 +1572,6 @@ impl PagestreamFeMessage { Self::GetPage(req) => { bytes.put_u8(2); - bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u32(req.rel.spcnode); @@ -1619,7 +1583,6 @@ impl PagestreamFeMessage { Self::DbSize(req) => { bytes.put_u8(3); - bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u32(req.dbnode); @@ -1627,7 +1590,6 @@ impl PagestreamFeMessage { Self::GetSlruSegment(req) => { bytes.put_u8(4); - bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u8(req.kind); @@ -1638,31 +1600,19 @@ impl PagestreamFeMessage { bytes.into() } - pub fn parse( - body: &mut R, - protocol_version: PagestreamProtocolVersion, - ) -> anyhow::Result { + pub fn parse(body: &mut R) -> anyhow::Result { // these correspond to the NeonMessageTag enum in pagestore_client.h // // TODO: consider using protobuf or serde bincode for less error prone // serialization. let msg_tag = body.read_u8()?; - let (reqid, request_lsn, not_modified_since) = match protocol_version { - PagestreamProtocolVersion::V2 => ( - 0, - Lsn::from(body.read_u64::()?), - Lsn::from(body.read_u64::()?), - ), - PagestreamProtocolVersion::V3 => ( - body.read_u64::()?, - Lsn::from(body.read_u64::()?), - Lsn::from(body.read_u64::()?), - ), - }; + + // these two fields are the same for every request type + let request_lsn = Lsn::from(body.read_u64::()?); + let not_modified_since = Lsn::from(body.read_u64::()?); match msg_tag { 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { - reqid, request_lsn, not_modified_since, rel: RelTag { @@ -1673,7 +1623,6 @@ impl PagestreamFeMessage { }, })), 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - reqid, request_lsn, not_modified_since, rel: RelTag { @@ -1684,7 +1633,6 @@ impl PagestreamFeMessage { }, })), 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - reqid, request_lsn, not_modified_since, rel: RelTag { @@ -1696,14 +1644,12 @@ impl PagestreamFeMessage { blkno: body.read_u32::()?, })), 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - reqid, request_lsn, not_modified_since, dbnode: body.read_u32::()?, })), 4 => Ok(PagestreamFeMessage::GetSlruSegment( PagestreamGetSlruSegmentRequest { - reqid, request_lsn, not_modified_since, kind: body.read_u8()?, @@ -1716,114 +1662,43 @@ impl PagestreamFeMessage { } impl PagestreamBeMessage { - pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes { + pub fn serialize(&self) -> Bytes { let mut bytes = BytesMut::new(); use PagestreamBeMessageTag as Tag; - match protocol_version { - PagestreamProtocolVersion::V2 => { - match self { - Self::Exists(resp) => { - bytes.put_u8(Tag::Exists as u8); - bytes.put_u8(resp.exists as u8); - } - - Self::Nblocks(resp) => { - bytes.put_u8(Tag::Nblocks as u8); - bytes.put_u32(resp.n_blocks); - } - - Self::GetPage(resp) => { - bytes.put_u8(Tag::GetPage as u8); - bytes.put(&resp.page[..]) - } - - Self::Error(resp) => { - bytes.put_u8(Tag::Error as u8); - bytes.put(resp.message.as_bytes()); - bytes.put_u8(0); // null terminator - } - Self::DbSize(resp) => { - bytes.put_u8(Tag::DbSize as u8); - bytes.put_i64(resp.db_size); - } - - Self::GetSlruSegment(resp) => { - bytes.put_u8(Tag::GetSlruSegment as u8); - bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); - bytes.put(&resp.segment[..]); - } - } + match self { + Self::Exists(resp) => { + bytes.put_u8(Tag::Exists as u8); + bytes.put_u8(resp.exists as u8); } - _ => { - match self { - Self::Exists(resp) => { - bytes.put_u8(Tag::Exists as u8); - bytes.put_u64(resp.reqid); - bytes.put_u64(resp.request_lsn.0); - bytes.put_u64(resp.not_modified_since.0); - bytes.put_u32(resp.rel.spcnode); - bytes.put_u32(resp.rel.dbnode); - bytes.put_u32(resp.rel.relnode); - bytes.put_u8(resp.rel.forknum); - bytes.put_u8(resp.exists as u8); - } - - Self::Nblocks(resp) => { - bytes.put_u8(Tag::Nblocks as u8); - bytes.put_u64(resp.reqid); - bytes.put_u64(resp.request_lsn.0); - bytes.put_u64(resp.not_modified_since.0); - bytes.put_u32(resp.rel.spcnode); - bytes.put_u32(resp.rel.dbnode); - bytes.put_u32(resp.rel.relnode); - bytes.put_u8(resp.rel.forknum); - bytes.put_u32(resp.n_blocks); - } - - Self::GetPage(resp) => { - bytes.put_u8(Tag::GetPage as u8); - bytes.put_u64(resp.reqid); - bytes.put_u64(resp.request_lsn.0); - bytes.put_u64(resp.not_modified_since.0); - bytes.put_u32(resp.rel.spcnode); - bytes.put_u32(resp.rel.dbnode); - bytes.put_u32(resp.rel.relnode); - bytes.put_u8(resp.rel.forknum); - bytes.put_u32(resp.blkno); - bytes.put(&resp.page[..]) - } - - Self::Error(resp) => { - bytes.put_u8(Tag::Error as u8); - bytes.put_u64(resp.reqid); - bytes.put_u64(resp.request_lsn.0); - bytes.put_u64(resp.not_modified_since.0); - bytes.put(resp.message.as_bytes()); - bytes.put_u8(0); // null terminator - } - Self::DbSize(resp) => { - bytes.put_u8(Tag::DbSize as u8); - bytes.put_u64(resp.reqid); - bytes.put_u64(resp.request_lsn.0); - bytes.put_u64(resp.not_modified_since.0); - bytes.put_u32(resp.db_node); - bytes.put_i64(resp.db_size); - } - - Self::GetSlruSegment(resp) => { - bytes.put_u8(Tag::GetSlruSegment as u8); - bytes.put_u64(resp.reqid); - bytes.put_u64(resp.request_lsn.0); - bytes.put_u64(resp.not_modified_since.0); - bytes.put_u8(resp.kind); - bytes.put_u32(resp.segno); - bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); - bytes.put(&resp.segment[..]); - } - } + + Self::Nblocks(resp) => { + bytes.put_u8(Tag::Nblocks as u8); + bytes.put_u32(resp.n_blocks); + } + + Self::GetPage(resp) => { + bytes.put_u8(Tag::GetPage as u8); + bytes.put(&resp.page[..]); + } + + Self::Error(resp) => { + bytes.put_u8(Tag::Error as u8); + bytes.put(resp.message.as_bytes()); + bytes.put_u8(0); // null terminator + } + Self::DbSize(resp) => { + bytes.put_u8(Tag::DbSize as u8); + bytes.put_i64(resp.db_size); + } + + Self::GetSlruSegment(resp) => { + bytes.put_u8(Tag::GetSlruSegment as u8); + bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); + bytes.put(&resp.segment[..]); } } + bytes.into() } @@ -1835,109 +1710,38 @@ impl PagestreamBeMessage { let ok = match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? { Tag::Exists => { - let reqid = buf.read_u64::()?; - let request_lsn = Lsn(buf.read_u64::()?); - let not_modified_since = Lsn(buf.read_u64::()?); - let rel = RelTag { - spcnode: buf.read_u32::()?, - dbnode: buf.read_u32::()?, - relnode: buf.read_u32::()?, - forknum: buf.read_u8()?, - }; - let exists = buf.read_u8()? != 0; + let exists = buf.read_u8()?; Self::Exists(PagestreamExistsResponse { - reqid, - request_lsn, - not_modified_since, - rel, - exists, + exists: exists != 0, }) } Tag::Nblocks => { - let reqid = buf.read_u64::()?; - let request_lsn = Lsn(buf.read_u64::()?); - let not_modified_since = Lsn(buf.read_u64::()?); - let rel = RelTag { - spcnode: buf.read_u32::()?, - dbnode: buf.read_u32::()?, - relnode: buf.read_u32::()?, - forknum: buf.read_u8()?, - }; let n_blocks = buf.read_u32::()?; - Self::Nblocks(PagestreamNblocksResponse { - reqid, - request_lsn, - not_modified_since, - rel, - n_blocks, - }) + Self::Nblocks(PagestreamNblocksResponse { n_blocks }) } Tag::GetPage => { - let reqid = buf.read_u64::()?; - let request_lsn = Lsn(buf.read_u64::()?); - let not_modified_since = Lsn(buf.read_u64::()?); - let rel = RelTag { - spcnode: buf.read_u32::()?, - dbnode: buf.read_u32::()?, - relnode: buf.read_u32::()?, - forknum: buf.read_u8()?, - }; - let blkno = buf.read_u32::()?; let mut page = vec![0; 8192]; // TODO: use MaybeUninit buf.read_exact(&mut page)?; - Self::GetPage(PagestreamGetPageResponse { - reqid, - request_lsn, - not_modified_since, - rel, - blkno, - page: page.into(), - }) + PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() }) } Tag::Error => { - let reqid = buf.read_u64::()?; - let request_lsn = Lsn(buf.read_u64::()?); - let not_modified_since = Lsn(buf.read_u64::()?); let mut msg = Vec::new(); buf.read_until(0, &mut msg)?; let cstring = std::ffi::CString::from_vec_with_nul(msg)?; let rust_str = cstring.to_str()?; - Self::Error(PagestreamErrorResponse { - reqid, - request_lsn, - not_modified_since, + PagestreamBeMessage::Error(PagestreamErrorResponse { message: rust_str.to_owned(), }) } Tag::DbSize => { - let reqid = buf.read_u64::()?; - let request_lsn = Lsn(buf.read_u64::()?); - let not_modified_since = Lsn(buf.read_u64::()?); - let db_node = buf.read_u32::()?; let db_size = buf.read_i64::()?; - Self::DbSize(PagestreamDbSizeResponse { - reqid, - request_lsn, - not_modified_since, - db_node, - db_size, - }) + Self::DbSize(PagestreamDbSizeResponse { db_size }) } Tag::GetSlruSegment => { - let reqid = buf.read_u64::()?; - let request_lsn = Lsn(buf.read_u64::()?); - let not_modified_since = Lsn(buf.read_u64::()?); - let kind = buf.read_u8()?; - let segno = buf.read_u32::()?; let n_blocks = buf.read_u32::()?; let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize]; buf.read_exact(&mut segment)?; Self::GetSlruSegment(PagestreamGetSlruSegmentResponse { - reqid, - request_lsn, - not_modified_since, - kind, - segno, segment: segment.into(), }) } @@ -1976,7 +1780,6 @@ mod tests { // Test serialization/deserialization of PagestreamFeMessage let messages = vec![ PagestreamFeMessage::Exists(PagestreamExistsRequest { - reqid: 0, request_lsn: Lsn(4), not_modified_since: Lsn(3), rel: RelTag { @@ -1987,7 +1790,6 @@ mod tests { }, }), PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - reqid: 0, request_lsn: Lsn(4), not_modified_since: Lsn(4), rel: RelTag { @@ -1998,7 +1800,6 @@ mod tests { }, }), PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - reqid: 0, request_lsn: Lsn(4), not_modified_since: Lsn(3), rel: RelTag { @@ -2010,7 +1811,6 @@ mod tests { blkno: 7, }), PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - reqid: 0, request_lsn: Lsn(4), not_modified_since: Lsn(3), dbnode: 7, @@ -2018,9 +1818,7 @@ mod tests { ]; for msg in messages { let bytes = msg.serialize(); - let reconstructed = - PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3) - .unwrap(); + let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap(); assert!(msg == reconstructed); } } diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index 207ec4166cb8..f9507fc47a3a 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -60,7 +60,7 @@ impl Client { ) -> anyhow::Result { let copy_both: tokio_postgres::CopyBothDuplex = self .client - .copy_both_simple(&format!("pagestream_v3 {tenant_id} {timeline_id}")) + .copy_both_simple(&format!("pagestream_v2 {tenant_id} {timeline_id}")) .await?; let Client { cancel_on_client_drop, diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 910708c04a80..b2df01714d31 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -322,7 +322,6 @@ async fn main_impl( .to_rel_block() .expect("we filter non-rel-block keys out above"); PagestreamGetPageRequest { - reqid: 0, request_lsn: if rng.gen_bool(args.req_latest_probability) { Lsn::MAX } else { diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 9a3bd7f931e0..bdbabf3f7511 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1845,7 +1845,6 @@ pub(crate) static LIVE_CONNECTIONS: Lazy = Lazy::new(|| { #[derive(Clone, Copy, enum_map::Enum, IntoStaticStr)] pub(crate) enum ComputeCommandKind { - PageStreamV3, PageStreamV2, Basebackup, Fullbackup, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2a393c335c0f..d00ec11a7611 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -17,7 +17,7 @@ use pageserver_api::models::{ PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse, - PagestreamProtocolVersion, RequestId, + PagestreamProtocolVersion, }; use pageserver_api::shard::TenantShardId; use postgres_backend::{ @@ -537,29 +537,6 @@ impl From for QueryError { } } -#[derive(thiserror::Error, Debug)] -struct BatchedPageStreamError { - err: PageStreamError, - reqid: RequestId, - request_lsn: Lsn, - not_modified_since: Lsn, -} - -impl std::fmt::Display for BatchedPageStreamError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.err.fmt(f) - } -} - -struct BatchedGetPageRequest { - rel: RelTag, - blkno: BlockNumber, - reqid: RequestId, - request_lsn: Lsn, - not_modified_since: Lsn, - timer: SmgrOpTimer, -} - enum BatchedFeMessage { Exists { span: Span, @@ -577,7 +554,7 @@ enum BatchedFeMessage { span: Span, shard: timeline::handle::Handle, effective_request_lsn: Lsn, - pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>, + pages: smallvec::SmallVec<[(RelTag, BlockNumber, SmgrOpTimer); 1]>, }, DbSize { span: Span, @@ -593,7 +570,7 @@ enum BatchedFeMessage { }, RespondError { span: Span, - error: BatchedPageStreamError, + error: PageStreamError, }, } @@ -618,7 +595,7 @@ impl BatchedFeMessage { BatchedFeMessage::GetPage { shard, pages, .. } => ( shard, pages.len(), - itertools::Either::Right(pages.iter_mut().map(|p| &mut p.timer)), + itertools::Either::Right(pages.iter_mut().map(|(_, _, timer)| timer)), ), BatchedFeMessage::RespondError { .. } => return Ok(()), }; @@ -677,7 +654,6 @@ impl PageServerHandler { ) } - #[allow(clippy::too_many_arguments)] async fn pagestream_read_message( pgb: &mut PostgresBackendReader, tenant_id: TenantId, @@ -685,7 +661,6 @@ impl PageServerHandler { timeline_handles: &mut TimelineHandles, cancel: &CancellationToken, ctx: &RequestContext, - protocol_version: PagestreamProtocolVersion, parent_span: Span, ) -> Result, QueryError> where @@ -720,8 +695,7 @@ impl PageServerHandler { fail::fail_point!("ps::handle-pagerequest-message"); // parse request - let neon_fe_msg = - PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?; + let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; let batched_msg = match neon_fe_msg { PagestreamFeMessage::Exists(req) => { @@ -789,7 +763,6 @@ impl PageServerHandler { } } PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - reqid, request_lsn, not_modified_since, rel, @@ -801,12 +774,7 @@ impl PageServerHandler { ($error:expr) => {{ let error = BatchedFeMessage::RespondError { span, - error: BatchedPageStreamError { - err: $error, - reqid, - request_lsn, - not_modified_since, - }, + error: $error, }; Ok(Some(error)) }}; @@ -863,14 +831,7 @@ impl PageServerHandler { span, shard, effective_request_lsn, - pages: smallvec::smallvec![BatchedGetPageRequest { - rel, - blkno, - reqid, - request_lsn, - not_modified_since, - timer - }], + pages: smallvec::smallvec![(rel, blkno, timer)], } } }; @@ -949,7 +910,6 @@ impl PageServerHandler { pgb_writer: &mut PostgresBackend, batch: BatchedFeMessage, cancel: &CancellationToken, - protocol_version: PagestreamProtocolVersion, ctx: &RequestContext, ) -> Result<(), QueryError> where @@ -957,7 +917,7 @@ impl PageServerHandler { { // invoke handler function let (handler_results, span): ( - Vec>, + Vec>, _, ) = match batch { BatchedFeMessage::Exists { @@ -972,13 +932,7 @@ impl PageServerHandler { .handle_get_rel_exists_request(&shard, &req, ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) - .map_err(|err| BatchedPageStreamError { - err, - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - })], + .map(|msg| (msg, timer))], span, ) } @@ -994,13 +948,7 @@ impl PageServerHandler { .handle_get_nblocks_request(&shard, &req, ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) - .map_err(|err| BatchedPageStreamError { - err, - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - })], + .map(|msg| (msg, timer))], span, ) } @@ -1042,13 +990,7 @@ impl PageServerHandler { .handle_db_size_request(&shard, &req, ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) - .map_err(|err| BatchedPageStreamError { - err, - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - })], + .map(|msg| (msg, timer))], span, ) } @@ -1064,13 +1006,7 @@ impl PageServerHandler { .handle_get_slru_segment_request(&shard, &req, ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) - .map_err(|err| BatchedPageStreamError { - err, - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - })], + .map(|msg| (msg, timer))], span, ) } @@ -1086,7 +1022,7 @@ impl PageServerHandler { // Other handler errors are sent back as an error message and we stay in pagestream protocol. for handler_result in handler_results { let (response_msg, timer) = match handler_result { - Err(e) => match &e.err { + Err(e) => match &e { PageStreamError::Shutdown => { // If we fail to fulfil a request during shutdown, which may be _because_ of // shutdown, then do not send the error to the client. Instead just drop the @@ -1105,16 +1041,13 @@ impl PageServerHandler { // print the all details to the log with {:#}, but for the client the // error message is enough. Do not log if shutting down, as the anyhow::Error // here includes cancellation which is not an error. - let full = utils::error::report_compact_sources(&e.err); + let full = utils::error::report_compact_sources(&e); span.in_scope(|| { error!("error reading relation or page version: {full:#}") }); ( PagestreamBeMessage::Error(PagestreamErrorResponse { - reqid: e.reqid, - request_lsn: e.request_lsn, - not_modified_since: e.not_modified_since, - message: e.err.to_string(), + message: e.to_string(), }), None, // TODO: measure errors ) @@ -1127,9 +1060,7 @@ impl PageServerHandler { // marshal & transmit response message // - pgb_writer.write_message_noflush(&BeMessage::CopyData( - &response_msg.serialize(protocol_version), - ))?; + pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; // We purposefully don't count flush time into the timer. // @@ -1192,7 +1123,7 @@ impl PageServerHandler { pgb: &mut PostgresBackend, tenant_id: TenantId, timeline_id: TimelineId, - protocol_version: PagestreamProtocolVersion, + _protocol_version: PagestreamProtocolVersion, ctx: RequestContext, ) -> Result<(), QueryError> where @@ -1232,7 +1163,6 @@ impl PageServerHandler { timeline_handles, request_span, pipelining_config, - protocol_version, &ctx, ) .await @@ -1245,7 +1175,6 @@ impl PageServerHandler { timeline_id, timeline_handles, request_span, - protocol_version, &ctx, ) .await @@ -1272,7 +1201,6 @@ impl PageServerHandler { timeline_id: TimelineId, mut timeline_handles: TimelineHandles, request_span: Span, - protocol_version: PagestreamProtocolVersion, ctx: &RequestContext, ) -> ( (PostgresBackendReader, TimelineHandles), @@ -1290,7 +1218,6 @@ impl PageServerHandler { &mut timeline_handles, &cancel, ctx, - protocol_version, request_span.clone(), ) .await; @@ -1311,7 +1238,7 @@ impl PageServerHandler { } let err = self - .pagesteam_handle_batched_message(pgb_writer, msg, &cancel, protocol_version, ctx) + .pagesteam_handle_batched_message(pgb_writer, msg, &cancel, ctx) .await; match err { Ok(()) => {} @@ -1334,7 +1261,6 @@ impl PageServerHandler { mut timeline_handles: TimelineHandles, request_span: Span, pipelining_config: PageServicePipeliningConfigPipelined, - protocol_version: PagestreamProtocolVersion, ctx: &RequestContext, ) -> ( (PostgresBackendReader, TimelineHandles), @@ -1432,7 +1358,6 @@ impl PageServerHandler { &mut timeline_handles, &cancel_batcher, &ctx, - protocol_version, request_span.clone(), ) .await; @@ -1478,14 +1403,8 @@ impl PageServerHandler { batch .throttle_and_record_start_processing(&self.cancel) .await?; - self.pagesteam_handle_batched_message( - pgb_writer, - batch, - &cancel, - protocol_version, - &ctx, - ) - .await?; + self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx) + .await?; } } }); @@ -1671,10 +1590,6 @@ impl PageServerHandler { .await?; Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - rel: req.rel, exists, })) } @@ -1701,10 +1616,6 @@ impl PageServerHandler { .await?; Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - rel: req.rel, n_blocks, })) } @@ -1732,10 +1643,6 @@ impl PageServerHandler { let db_size = total_blocks as i64 * BLCKSZ as i64; Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse { - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - db_node: req.dbnode, db_size, })) } @@ -1745,9 +1652,9 @@ impl PageServerHandler { &mut self, timeline: &Timeline, effective_lsn: Lsn, - requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>, + requests: smallvec::SmallVec<[(RelTag, BlockNumber, SmgrOpTimer); 1]>, ctx: &RequestContext, - ) -> Vec> { + ) -> Vec> { debug_assert_current_span_has_tenant_and_timeline_id(); timeline @@ -1756,7 +1663,7 @@ impl PageServerHandler { let results = timeline .get_rel_page_at_lsn_batched( - requests.iter().map(|p| (&p.rel, &p.blkno)), + requests.iter().map(|(reltag, blkno, _)| (reltag, blkno)), effective_lsn, ctx, ) @@ -1768,26 +1675,16 @@ impl PageServerHandler { requests .into_iter() .zip(results.into_iter()) - .map(|(req, res)| { + .map(|((_, _, timer), res)| { res.map(|page| { ( PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - rel: req.rel, - blkno: req.blkno, page, }), - req.timer, + timer, ) }) - .map_err(|e| BatchedPageStreamError { - err: PageStreamError::from(e), - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - }) + .map_err(PageStreamError::from) }), ) } @@ -1814,14 +1711,7 @@ impl PageServerHandler { let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?; Ok(PagestreamBeMessage::GetSlruSegment( - PagestreamGetSlruSegmentResponse { - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - kind: req.kind, - segno: req.segno, - segment, - }, + PagestreamGetSlruSegmentResponse { segment }, )) } @@ -2016,7 +1906,6 @@ struct FullBackupCmd { struct PageStreamCmd { tenant_id: TenantId, timeline_id: TimelineId, - protocol_version: PagestreamProtocolVersion, } /// `lease lsn tenant timeline lsn` @@ -2037,7 +1926,7 @@ enum PageServiceCmd { } impl PageStreamCmd { - fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result { + fn parse(query: &str) -> anyhow::Result { let parameters = query.split_whitespace().collect_vec(); if parameters.len() != 2 { bail!( @@ -2052,7 +1941,6 @@ impl PageStreamCmd { Ok(Self { tenant_id, timeline_id, - protocol_version, }) } } @@ -2190,14 +2078,7 @@ impl PageServiceCmd { bail!("cannot parse query: {query}") }; match cmd.to_ascii_lowercase().as_str() { - "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse( - other, - PagestreamProtocolVersion::V2, - )?)), - "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse( - other, - PagestreamProtocolVersion::V3, - )?)), + "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(other)?)), "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)), "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)), "lease" => { @@ -2279,21 +2160,25 @@ where PageServiceCmd::PageStream(PageStreamCmd { tenant_id, timeline_id, - protocol_version, }) => { tracing::Span::current() .record("tenant_id", field::display(tenant_id)) .record("timeline_id", field::display(timeline_id)); self.check_permission(Some(tenant_id))?; - let command_kind = match protocol_version { - PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2, - PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3, - }; - COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc(); - self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx) - .await?; + COMPUTE_COMMANDS_COUNTERS + .for_command(ComputeCommandKind::PageStreamV2) + .inc(); + + self.handle_pagerequests( + pgb, + tenant_id, + timeline_id, + PagestreamProtocolVersion::V2, + ctx, + ) + .await?; } PageServiceCmd::BaseBackup(BaseBackupCmd { tenant_id, @@ -2472,8 +2357,7 @@ mod tests { cmd, PageServiceCmd::PageStream(PageStreamCmd { tenant_id, - timeline_id, - protocol_version: PagestreamProtocolVersion::V2, + timeline_id }) ); let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap(); diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index c21f07b1d7d7..88d0a5292bf7 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -556,9 +556,6 @@ pageserver_connect(shardno_t shard_no, int elevel) switch (neon_protocol_version) { - case 3: - pagestream_query = psprintf("pagestream_v3 %s %s", neon_tenant, neon_timeline); - break; case 2: pagestream_query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline); break; @@ -1140,7 +1137,7 @@ pg_init_libpagestore(void) &neon_protocol_version, 2, /* use protocol version 2 */ 2, /* min */ - 3, /* max */ + 2, /* max */ PGC_SU_BACKEND, 0, /* no flags required */ NULL, NULL, NULL); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index de1a22b690d3..f905e3b0faa3 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -44,15 +44,10 @@ typedef enum T_NeonGetSlruSegmentResponse, } NeonMessageTag; -typedef uint64 NeonRequestId; - /* base struct for c-style inheritance */ typedef struct { - NeonMessageTag tag; - NeonRequestId reqid; - XLogRecPtr lsn; - XLogRecPtr not_modified_since; + NeonMessageTag tag; } NeonMessage; #define messageTag(m) (((const NeonMessage *)(m))->tag) @@ -72,7 +67,6 @@ typedef enum { SLRU_MULTIXACT_OFFSETS } SlruKind; - /*-- * supertype of all the Neon*Request structs below. * @@ -94,94 +88,92 @@ typedef enum { * These structs describe the V2 of these requests. (The old now-defunct V1 * protocol contained just one LSN and a boolean 'latest' flag.) */ -typedef NeonMessage NeonRequest; +typedef struct +{ + NeonMessageTag tag; + XLogRecPtr lsn; + XLogRecPtr not_modified_since; +} NeonRequest; typedef struct { - NeonRequest req; - NRelFileInfo rinfo; - ForkNumber forknum; + NeonRequest req; + NRelFileInfo rinfo; + ForkNumber forknum; } NeonExistsRequest; typedef struct { - NeonRequest req; - NRelFileInfo rinfo; - ForkNumber forknum; + NeonRequest req; + NRelFileInfo rinfo; + ForkNumber forknum; } NeonNblocksRequest; typedef struct { - NeonRequest req; - Oid dbNode; + NeonRequest req; + Oid dbNode; } NeonDbSizeRequest; typedef struct { - NeonRequest req; - NRelFileInfo rinfo; - ForkNumber forknum; - BlockNumber blkno; + NeonRequest req; + NRelFileInfo rinfo; + ForkNumber forknum; + BlockNumber blkno; } NeonGetPageRequest; typedef struct { - NeonRequest req; - SlruKind kind; - int segno; + NeonRequest req; + SlruKind kind; + int segno; } NeonGetSlruSegmentRequest; /* supertype of all the Neon*Response structs below */ -typedef NeonMessage NeonResponse; +typedef struct +{ + NeonMessageTag tag; +} NeonResponse; typedef struct { - NeonResponse resp; - NRelFileInfo rinfo; - ForkNumber forknum; - bool exists; + NeonMessageTag tag; + bool exists; } NeonExistsResponse; typedef struct { - NeonResponse resp; - NRelFileInfo rinfo; - ForkNumber forknum; - uint32 n_blocks; + NeonMessageTag tag; + uint32 n_blocks; } NeonNblocksResponse; typedef struct { - NeonResponse resp; - NRelFileInfo rinfo; - ForkNumber forknum; - BlockNumber blkno; - char page[FLEXIBLE_ARRAY_MEMBER]; + NeonMessageTag tag; + char page[FLEXIBLE_ARRAY_MEMBER]; } NeonGetPageResponse; #define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ)) typedef struct { - NeonResponse resp; - Oid dbNode; - int64 db_size; + NeonMessageTag tag; + int64 db_size; } NeonDbSizeResponse; typedef struct { - NeonResponse resp; - char message[FLEXIBLE_ARRAY_MEMBER]; /* null-terminated error - * message */ + NeonMessageTag tag; + char message[FLEXIBLE_ARRAY_MEMBER]; /* null-terminated error + * message */ } NeonErrorResponse; typedef struct { - NeonResponse resp; - SlruKind kind; - int segno; - int n_blocks; - char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT]; + NeonMessageTag tag; + int n_blocks; + char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT]; } NeonGetSlruSegmentResponse; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index ec13dd72e0b8..385905d9cee9 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -120,9 +120,6 @@ static bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block static BlockNumber neon_nblocks(SMgrRelation reln, ForkNumber forknum); -static uint32 local_request_counter; -#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter) - /* * Prefetch implementation: * @@ -191,11 +188,15 @@ typedef struct PrefetchRequest uint8 status; /* see PrefetchStatus for valid values */ uint8 flags; /* see PrefetchRequestFlags */ neon_request_lsns request_lsns; - NeonRequestId reqid; NeonResponse *response; /* may be null */ uint64 my_ring_index; } PrefetchRequest; +StaticAssertDecl(sizeof(PrefetchRequest) == 64, + "We prefer to have a power-of-2 size for this struct. Please" + " try to find an alternative solution before reaching to" + " increase the expected size here"); + /* prefetch buffer lookup hash table */ typedef struct PrfHashEntry @@ -364,7 +365,6 @@ compact_prefetch_buffers(void) target_slot->shard_no = source_slot->shard_no; target_slot->status = source_slot->status; target_slot->response = source_slot->response; - target_slot->reqid = source_slot->reqid; target_slot->request_lsns = source_slot->request_lsns; target_slot->my_ring_index = empty_ring_index; @@ -797,7 +797,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns NeonGetPageRequest request = { .req.tag = T_NeonGetPageRequest, - .req.reqid = GENERATE_REQUEST_ID(), /* lsn and not_modified_since are filled in below */ .rinfo = BufTagGetNRelFileInfo(slot->buftag), .forknum = slot->buftag.forkNum, @@ -806,8 +805,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns Assert(mySlotNo == MyPState->ring_unused); - slot->reqid = request.req.reqid; - if (force_request_lsns) slot->request_lsns = *force_request_lsns; else @@ -1180,10 +1177,6 @@ nm_pack_request(NeonRequest *msg) initStringInfo(&s); pq_sendbyte(&s, msg->tag); - if (neon_protocol_version >= 3) - { - pq_sendint64(&s, msg->reqid); - } pq_sendint64(&s, msg->lsn); pq_sendint64(&s, msg->not_modified_since); @@ -1261,16 +1254,8 @@ NeonResponse * nm_unpack_response(StringInfo s) { NeonMessageTag tag = pq_getmsgbyte(s); - NeonResponse resp_hdr = {0}; /* make valgrind happy */ NeonResponse *resp = NULL; - resp_hdr.tag = tag; - if (neon_protocol_version >= 3) - { - resp_hdr.reqid = pq_getmsgint64(s); - resp_hdr.lsn = pq_getmsgint64(s); - resp_hdr.not_modified_since = pq_getmsgint64(s); - } switch (tag) { /* pagestore -> pagestore_client */ @@ -1278,14 +1263,7 @@ nm_unpack_response(StringInfo s) { NeonExistsResponse *msg_resp = palloc0(sizeof(NeonExistsResponse)); - if (neon_protocol_version >= 3) - { - NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4); - NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4); - NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4); - msg_resp->forknum = pq_getmsgbyte(s); - } - msg_resp->resp = resp_hdr; + msg_resp->tag = tag; msg_resp->exists = pq_getmsgbyte(s); pq_getmsgend(s); @@ -1297,14 +1275,7 @@ nm_unpack_response(StringInfo s) { NeonNblocksResponse *msg_resp = palloc0(sizeof(NeonNblocksResponse)); - if (neon_protocol_version >= 3) - { - NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4); - NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4); - NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4); - msg_resp->forknum = pq_getmsgbyte(s); - } - msg_resp->resp = resp_hdr; + msg_resp->tag = tag; msg_resp->n_blocks = pq_getmsgint(s, 4); pq_getmsgend(s); @@ -1317,20 +1288,12 @@ nm_unpack_response(StringInfo s) NeonGetPageResponse *msg_resp; msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE); - if (neon_protocol_version >= 3) - { - NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4); - NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4); - NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4); - msg_resp->forknum = pq_getmsgbyte(s); - msg_resp->blkno = pq_getmsgint(s, 4); - } - msg_resp->resp = resp_hdr; + msg_resp->tag = tag; /* XXX: should be varlena */ memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ); pq_getmsgend(s); - Assert(msg_resp->resp.tag == T_NeonGetPageResponse); + Assert(msg_resp->tag == T_NeonGetPageResponse); resp = (NeonResponse *) msg_resp; break; @@ -1340,11 +1303,7 @@ nm_unpack_response(StringInfo s) { NeonDbSizeResponse *msg_resp = palloc0(sizeof(NeonDbSizeResponse)); - if (neon_protocol_version >= 3) - { - msg_resp->dbNode = pq_getmsgint(s, 4); - } - msg_resp->resp = resp_hdr; + msg_resp->tag = tag; msg_resp->db_size = pq_getmsgint64(s); pq_getmsgend(s); @@ -1362,7 +1321,7 @@ nm_unpack_response(StringInfo s) msglen = strlen(msgtext); msg_resp = palloc0(sizeof(NeonErrorResponse) + msglen + 1); - msg_resp->resp = resp_hdr; + msg_resp->tag = tag; memcpy(msg_resp->message, msgtext, msglen + 1); pq_getmsgend(s); @@ -1373,17 +1332,9 @@ nm_unpack_response(StringInfo s) case T_NeonGetSlruSegmentResponse: { NeonGetSlruSegmentResponse *msg_resp; - int n_blocks; + int n_blocks = pq_getmsgint(s, 4); msg_resp = palloc(sizeof(NeonGetSlruSegmentResponse)); - - if (neon_protocol_version >= 3) - { - msg_resp->kind = pq_getmsgbyte(s); - msg_resp->segno = pq_getmsgint(s, 4); - } - msg_resp->resp = resp_hdr; - - n_blocks = pq_getmsgint(s, 4); + msg_resp->tag = tag; msg_resp->n_blocks = n_blocks; memcpy(msg_resp->data, pq_getmsgbytes(s, n_blocks * BLCKSZ), n_blocks * BLCKSZ); pq_getmsgend(s); @@ -2355,7 +2306,6 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) { NeonExistsRequest request = { .req.tag = T_NeonExistsRequest, - .req.reqid = GENERATE_REQUEST_ID(), .req.lsn = request_lsns.request_lsn, .req.not_modified_since = request_lsns.not_modified_since, .rinfo = InfoFromSMgrRel(reln), @@ -2363,59 +2313,31 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) }; resp = page_server_request(&request); + } - switch (resp->tag) - { - case T_NeonExistsResponse: - { - NeonExistsResponse* exists_resp = (NeonExistsResponse *) resp; - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since || - !RelFileInfoEquals(exists_resp->rinfo, request.rinfo) || - exists_resp->forknum != forkNum) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(exists_resp->rinfo), exists_resp->forknum, - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since), RelFileInfoFmt(request.rinfo), forkNum); - } - } - exists = exists_resp->exists; - break; - } - case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since)); - } - } - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", - resp->reqid, - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forkNum, - LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; + switch (resp->tag) + { + case T_NeonExistsResponse: + exists = ((NeonExistsResponse *) resp)->exists; + break; - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x", - T_NeonExistsResponse, T_NeonErrorResponse, resp->tag); - } - pfree(resp); + case T_NeonErrorResponse: + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg(NEON_TAG "could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", + RelFileInfoFmt(InfoFromSMgrRel(reln)), + forkNum, + LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), + errdetail("page server returned error: %s", + ((NeonErrorResponse *) resp)->message))); + break; + + default: + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x", + T_NeonExistsResponse, T_NeonErrorResponse, resp->tag); } + pfree(resp); return exists; } @@ -3023,43 +2945,15 @@ neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_block switch (resp->tag) { case T_NeonGetPageResponse: - { - NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp; - if (neon_protocol_version >= 3) - { - if (resp->reqid != slot->reqid || - resp->lsn != slot->request_lsns.request_lsn || - resp->not_modified_since != slot->request_lsns.not_modified_since || - !RelFileInfoEquals(getpage_resp->rinfo, rinfo) || - getpage_resp->forknum != forkNum || - getpage_resp->blkno != base_blockno + i) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->rinfo), getpage_resp->forknum, getpage_resp->blkno, - slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, base_blockno + i); - } - } - memcpy(buffer, getpage_resp->page, BLCKSZ); + memcpy(buffer, ((NeonGetPageResponse *) resp)->page, BLCKSZ); lfc_write(rinfo, forkNum, blockno, buffer); break; - } + case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != slot->reqid || - resp->lsn != slot->request_lsns.request_lsn || - resp->not_modified_since != slot->request_lsns.not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since)); - } - } ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[shard %d, reqid %lx] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", - slot->shard_no, resp->reqid, blockno, RelFileInfoFmt(rinfo), + errmsg(NEON_TAG "[shard %d] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", + slot->shard_no, blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(reqlsns->effective_request_lsn)), errdetail("page server returned error: %s", ((NeonErrorResponse *) resp)->message))); @@ -3543,7 +3437,6 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) { NeonNblocksRequest request = { .req.tag = T_NeonNblocksRequest, - .req.reqid = GENERATE_REQUEST_ID(), .req.lsn = request_lsns.request_lsn, .req.not_modified_since = request_lsns.not_modified_since, .rinfo = InfoFromSMgrRel(reln), @@ -3551,67 +3444,39 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) }; resp = page_server_request(&request); + } - switch (resp->tag) - { - case T_NeonNblocksResponse: - { - NeonNblocksResponse * relsize_resp = (NeonNblocksResponse *) resp; - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since || - !RelFileInfoEquals(relsize_resp->rinfo, request.rinfo) || - relsize_resp->forknum != forknum) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(relsize_resp->rinfo), relsize_resp->forknum, - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since), RelFileInfoFmt(request.rinfo), forknum); - } - } - n_blocks = relsize_resp->n_blocks; - break; - } - case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since)); - } - } - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", - resp->reqid, - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forknum, - LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; - - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x", - T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag); - } - update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks); + switch (resp->tag) + { + case T_NeonNblocksResponse: + n_blocks = ((NeonNblocksResponse *) resp)->n_blocks; + break; - neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks", - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forknum, - LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), - n_blocks); + case T_NeonErrorResponse: + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg(NEON_TAG "could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", + RelFileInfoFmt(InfoFromSMgrRel(reln)), + forknum, + LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), + errdetail("page server returned error: %s", + ((NeonErrorResponse *) resp)->message))); + break; - pfree(resp); + default: + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x", + T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag); } + update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks); + + neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks", + RelFileInfoFmt(InfoFromSMgrRel(reln)), + forknum, + LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), + n_blocks); + + pfree(resp); return n_blocks; } @@ -3632,67 +3497,39 @@ neon_dbsize(Oid dbNode) { NeonDbSizeRequest request = { .req.tag = T_NeonDbSizeRequest, - .req.reqid = GENERATE_REQUEST_ID(), .req.lsn = request_lsns.request_lsn, .req.not_modified_since = request_lsns.not_modified_since, .dbNode = dbNode, }; resp = page_server_request(&request); + } - switch (resp->tag) - { - case T_NeonDbSizeResponse: - { - NeonDbSizeResponse* dbsize_resp = (NeonDbSizeResponse *) resp; - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since || - dbsize_resp->dbNode != dbNode) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), dbsize_resp->dbNode, - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since), dbNode); - } - } - db_size = dbsize_resp->db_size; - break; - } - case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since)); - } - } - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read db size of db %u from page server at lsn %X/%08X", - resp->reqid, - dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; - - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x", - T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag); - } + switch (resp->tag) + { + case T_NeonDbSizeResponse: + db_size = ((NeonDbSizeResponse *) resp)->db_size; + break; - neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes", - dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), db_size); + case T_NeonErrorResponse: + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg(NEON_TAG "could not read db size of db %u from page server at lsn %X/%08X", + dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), + errdetail("page server returned error: %s", + ((NeonErrorResponse *) resp)->message))); + break; - pfree(resp); + default: + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x", + T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag); } + + neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes", + dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), db_size); + + pfree(resp); return db_size; } @@ -4025,7 +3862,6 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf request = (NeonGetSlruSegmentRequest) { .req.tag = T_NeonGetSlruSegmentRequest, - .req.reqid = GENERATE_REQUEST_ID(), .req.lsn = request_lsn, .req.not_modified_since = not_modified_since, .kind = kind, @@ -4044,42 +3880,14 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf switch (resp->tag) { case T_NeonGetSlruSegmentResponse: - { - NeonGetSlruSegmentResponse* slru_resp = (NeonGetSlruSegmentResponse *) resp; - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since || - slru_resp->kind != kind || - slru_resp->segno != segno) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), slru_resp->kind, slru_resp->segno, - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since), kind, segno); - } - } - n_blocks = slru_resp->n_blocks; - memcpy(buffer, slru_resp->data, n_blocks*BLCKSZ); + n_blocks = ((NeonGetSlruSegmentResponse *) resp)->n_blocks; + memcpy(buffer, ((NeonGetSlruSegmentResponse *) resp)->data, n_blocks*BLCKSZ); break; - } + case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since)); - } - } ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read SLRU %d segment %d at lsn %X/%08X", - resp->reqid, + errmsg(NEON_TAG "could not read SLRU %d segment %d at lsn %X/%08X", kind, segno, LSN_FORMAT_ARGS(request_lsn)), @@ -4220,7 +4028,6 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, NeonNblocksRequest request = { .req = (NeonRequest) { .tag = T_NeonNblocksRequest, - .reqid = GENERATE_REQUEST_ID(), .lsn = end_recptr, .not_modified_since = end_recptr, },