Skip to content

Commit

Permalink
Small changes to expose WAL
Browse files Browse the repository at this point in the history
  • Loading branch information
Vlad Krasnov committed Jun 2, 2021
1 parent 0c064a9 commit e899d18
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 7 deletions.
33 changes: 33 additions & 0 deletions postgres-protocol/src/message/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub const PARAMETER_STATUS_TAG: u8 = b'S';
pub const PARAMETER_DESCRIPTION_TAG: u8 = b't';
pub const ROW_DESCRIPTION_TAG: u8 = b'T';
pub const READY_FOR_QUERY_TAG: u8 = b'Z';
pub const COPY_BOTH_RESPONSE_TAG: u8 = b'W';

#[derive(Debug, Copy, Clone)]
pub struct Header {
Expand Down Expand Up @@ -93,6 +94,7 @@ pub enum Message {
CopyDone,
CopyInResponse(CopyInResponseBody),
CopyOutResponse(CopyOutResponseBody),
CopyBothResponse(CopyBothResponseBody),
DataRow(DataRowBody),
EmptyQueryResponse,
ErrorResponse(ErrorResponseBody),
Expand Down Expand Up @@ -190,6 +192,16 @@ impl Message {
storage,
})
}
COPY_BOTH_RESPONSE_TAG => {
let format = buf.read_u8()?;
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Message::CopyBothResponse(CopyBothResponseBody {
format,
len,
storage,
})
}
EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
BACKEND_KEY_DATA_TAG => {
let process_id = buf.read_i32::<BigEndian>()?;
Expand Down Expand Up @@ -524,6 +536,27 @@ impl CopyOutResponseBody {
}
}

pub struct CopyBothResponseBody {
storage: Bytes,
len: u16,
format: u8,
}

impl CopyBothResponseBody {
#[inline]
pub fn format(&self) -> u8 {
self.format
}

#[inline]
pub fn column_formats(&self) -> ColumnFormats<'_> {
ColumnFormats {
remaining: self.len,
buf: &self.storage,
}
}
}

pub struct DataRowBody {
storage: Bytes,
len: u16,
Expand Down
2 changes: 1 addition & 1 deletion tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl Client {
}
}

pub(crate) fn inner(&self) -> &Arc<InnerClient> {
pub fn inner(&self) -> &Arc<InnerClient> {
&self.inner
}

Expand Down
11 changes: 11 additions & 0 deletions tokio-postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ pub struct Config {
pub(crate) keepalives_idle: Duration,
pub(crate) target_session_attrs: TargetSessionAttrs,
pub(crate) channel_binding: ChannelBinding,
pub(crate) replication: Option<String>,
}

impl Default for Config {
Expand All @@ -184,6 +185,7 @@ impl Config {
keepalives_idle: Duration::from_secs(2 * 60 * 60),
target_session_attrs: TargetSessionAttrs::Any,
channel_binding: ChannelBinding::Prefer,
replication: None,
}
}

Expand Down Expand Up @@ -224,6 +226,12 @@ impl Config {
self
}

/// Sets the kind of replication.
pub fn set_replication_database(&mut self) -> &mut Config {
self.replication = Some("database".to_string());
self
}

/// Gets the name of the database to connect to, if one has been configured
/// with the `dbname` method.
pub fn get_dbname(&self) -> Option<&str> {
Expand Down Expand Up @@ -476,6 +484,9 @@ impl Config {
};
self.channel_binding(channel_binding);
}
"replication" => {
self.replication = Some(value.to_string());
}
key => {
return Err(Error::config_parse(Box::new(UnknownOption(
key.to_string(),
Expand Down
3 changes: 3 additions & 0 deletions tokio-postgres/src/connect_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ where
if let Some(application_name) = &config.application_name {
params.push(("application_name", &**application_name));
}
if let Some(replication) = &config.replication {
params.push(("replication", &**replication));
}

let mut buf = BytesMut::new();
frontend::startup_message(params, &mut buf).map_err(Error::encode)?;
Expand Down
11 changes: 6 additions & 5 deletions tokio-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
//! | `with-uuid-0_8` | Enable support for the `uuid` crate. | [uuid](https://crates.io/crates/uuid) 0.8 | no |
//! | `with-time-0_2` | Enable support for the `time` crate. | [time](https://crates.io/crates/time) 0.2 | no |
#![doc(html_root_url = "https://docs.rs/tokio-postgres/0.7")]
#![warn(rust_2018_idioms, clippy::all, missing_docs)]
#![warn(rust_2018_idioms, clippy::all)]

pub use crate::cancel_token::CancelToken;
pub use crate::client::Client;
Expand All @@ -138,23 +138,24 @@ pub use crate::to_statement::ToStatement;
pub use crate::transaction::Transaction;
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
use crate::types::ToSql;
pub use postgres_protocol::message::backend::Message;

pub mod binary_copy;
mod bind;
#[cfg(feature = "runtime")]
mod cancel_query;
mod cancel_query_raw;
mod cancel_token;
mod client;
mod codec;
pub mod client;
pub mod codec;
pub mod config;
#[cfg(feature = "runtime")]
mod connect;
mod connect_raw;
#[cfg(feature = "runtime")]
mod connect_socket;
mod connect_tls;
mod connection;
pub mod connection;
mod copy_in;
mod copy_out;
pub mod error;
Expand All @@ -164,7 +165,7 @@ mod portal;
mod prepare;
mod query;
pub mod row;
mod simple_query;
pub mod simple_query;
#[cfg(feature = "runtime")]
mod socket;
mod statement;
Expand Down
2 changes: 1 addition & 1 deletion tokio-postgres/src/simple_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Erro
}
}

fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
pub fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
client.with_buf(|buf| {
frontend::query(query, buf).map_err(Error::encode)?;
Ok(buf.split().freeze())
Expand Down

2 comments on commit e899d18

@glittershark
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vkrasnov can we PR this upstream maybe?

@vkrasnov
Copy link

@vkrasnov vkrasnov commented on e899d18 Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vkrasnov can we PR this upstream maybe?

There is a similar PR sfackler#778
which I actually took the necessary bits from with minor changes.

Please sign in to comment.