Skip to content

Commit

Permalink
backend: add interpreted record replication message type (#32)
Browse files Browse the repository at this point in the history
This is to be used by the safekeeper -> pageserver protocol.
  • Loading branch information
VladLazar authored Nov 25, 2024
1 parent a130197 commit 2a2a7c5
Showing 1 changed file with 47 additions and 0 deletions.
47 changes: 47 additions & 0 deletions postgres-protocol/src/message/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub const READY_FOR_QUERY_TAG: u8 = b'Z';
// replication message tags
pub const XLOG_DATA_TAG: u8 = b'w';
pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
pub const INTERPRETED_WAL_RECORD_TAG: u8 = b'0';

// logical replication message tags
const BEGIN_TAG: u8 = b'B';
Expand Down Expand Up @@ -325,6 +326,7 @@ impl Message {
pub enum ReplicationMessage<D> {
XLogData(XLogDataBody<D>),
PrimaryKeepAlive(PrimaryKeepAliveBody),
RawInterpretedWalRecords(RawInterpretedWalRecordsBody<D>),
}

impl ReplicationMessage<Bytes> {
Expand Down Expand Up @@ -370,6 +372,21 @@ impl ReplicationMessage<Bytes> {
reply,
})
}
INTERPRETED_WAL_RECORD_TAG => {
let streaming_lsn = buf.read_u64::<BigEndian>()?;
let commit_lsn = buf.read_u64::<BigEndian>()?;
let next_record_lsn = match buf.read_u64::<BigEndian>()? {
0 => None,
lsn => Some(lsn),
};

ReplicationMessage::RawInterpretedWalRecords(RawInterpretedWalRecordsBody {
streaming_lsn,
commit_lsn,
next_record_lsn,
data: buf.read_all(),
})
}
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
Expand Down Expand Up @@ -950,6 +967,36 @@ impl<D> XLogDataBody<D> {
}
}

#[derive(Debug)]
pub struct RawInterpretedWalRecordsBody<D> {
streaming_lsn: u64,
commit_lsn: u64,
next_record_lsn: Option<u64>,
data: D,
}

impl<D> RawInterpretedWalRecordsBody<D> {
#[inline]
pub fn streaming_lsn(&self) -> u64 {
self.streaming_lsn
}

#[inline]
pub fn commit_lsn(&self) -> u64 {
self.commit_lsn
}

#[inline]
pub fn next_record_lsn(&self) -> Option<u64> {
self.next_record_lsn
}

#[inline]
pub fn data(&self) -> &D {
&self.data
}
}

#[derive(Debug)]
pub struct PrimaryKeepAliveBody {
wal_end: u64,
Expand Down

0 comments on commit 2a2a7c5

Please sign in to comment.