Skip to content

Commit

Permalink
more features and ci
Browse files Browse the repository at this point in the history
  • Loading branch information
nullchinchilla committed Mar 9, 2024
1 parent d680c65 commit 29ec7a9
Show file tree
Hide file tree
Showing 28 changed files with 597 additions and 113 deletions.
40 changes: 40 additions & 0 deletions .github/workflows/build_and_upload.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# .github/workflows/build_and_upload.yml
name: Build and Upload

on: [push]

jobs:
build:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Install musl-tools
run: sudo apt-get install -y musl-tools

- name: Install Rust
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
target: x86_64-unknown-linux-musl

- name: Build geph5-bridge
run: cargo build --locked --release --target x86_64-unknown-linux-musl --manifest-path binaries/geph5-bridge/Cargo.toml

- name: Build geph5-exit
run: cargo build --locked --release --target x86_64-unknown-linux-musl --manifest-path binaries/geph5-exit/Cargo.toml

# - name: Upload to S3
# uses: jakejarvis/s3-sync-action@master
# with:
# args: --acl public-read --follow-symlinks --delete
# env:
# AWS_S3_BUCKET: ${{ secrets.AWS_S3_BUCKET }}
# AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
# AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
# AWS_REGION: "us-west-1"
# SOURCE_DIR: "target/x86_64-unknown-linux-musl/release"
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 27 additions & 9 deletions binaries/geph5-bridge/src/listen_forward.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use std::{net::SocketAddr, sync::Arc, time::Duration};
use std::{
net::{IpAddr, SocketAddr},
sync::Arc,
time::Duration,
};

use anyhow::Context;
use async_trait::async_trait;
use deadpool::managed::Pool;
use futures_util::AsyncReadExt as _;
Expand All @@ -15,9 +20,11 @@ use sillad::{
use smol::future::FutureExt as _;

use stdcode::StdcodeSerializeExt;
use tap::Tap;

pub async fn listen_forward_loop(listener: impl Listener) {
pub async fn listen_forward_loop(my_ip: IpAddr, listener: impl Listener) {
let state = State {
my_ip,
mapping: Cache::builder()
.time_to_idle(Duration::from_secs(86400))
.build(),
Expand All @@ -29,18 +36,22 @@ pub async fn listen_forward_loop(listener: impl Listener) {

struct State {
// b2e_dest => (metadata, task)
mapping: Cache<SocketAddr, (SocketAddr, Arc<smol::Task<anyhow::Result<()>>>)>,
my_ip: IpAddr,
mapping: Cache<(SocketAddr, B2eMetadata), (SocketAddr, Arc<smol::Task<anyhow::Result<()>>>)>,
}

#[async_trait]
impl BridgeControlProtocol for State {
async fn tcp_forward(&self, b2e_dest: SocketAddr, metadata: B2eMetadata) -> SocketAddr {
self.mapping
.get_with(b2e_dest, async {
.get_with((b2e_dest, metadata.clone()), async {
let listener = TcpListener::bind("0.0.0.0:0".parse().unwrap())
.await
.unwrap();
let addr = listener.local_addr().await;
let addr = listener
.local_addr()
.await
.tap_mut(|s| s.set_ip(self.my_ip));
let task = smolscale::spawn(handle_one_listener(listener, b2e_dest, metadata));
(addr, Arc::new(task))
})
Expand All @@ -58,7 +69,9 @@ async fn handle_one_listener(
let client_conn = listener.accept().await?;
let metadata = metadata.clone();
smolscale::spawn(async move {
let exit_conn = dial_pooled(b2e_dest, &metadata.stdcode()).await?;
let exit_conn = dial_pooled(b2e_dest, &metadata.stdcode())
.await
.inspect_err(|e| tracing::warn!("cannot dial pooled: {:?}", e))?;
let (client_read, client_write) = client_conn.split();
let (exit_read, exit_write) = exit_conn.split();
smol::io::copy(exit_read, client_write)
Expand Down Expand Up @@ -86,9 +99,13 @@ async fn dial_pooled(b2e_dest: SocketAddr, metadata: &[u8]) -> anyhow::Result<pi
.max_size(20)
.build()
})
.await?;
let mux = pool.get().await?;
let stream = mux.open(metadata).await?;
.await
.context("cannot get pool")?;
let mux = pool.get().await.context("cannot get from pool")?;
let stream = mux
.open(metadata)
.await
.context("cannot open through mux")?;
Ok(stream)
}

Expand All @@ -112,6 +129,7 @@ impl deadpool::managed::Manager for MuxManager {
conn: &mut Self::Type,
_: &deadpool::managed::Metrics,
) -> deadpool::managed::RecycleResult<Self::Error> {
tracing::debug!(alive = conn.is_alive(), "trying to recycle");
if !conn.is_alive() {
let error = conn.open(b"").await.expect_err("dude");
return Err(error.into());
Expand Down
4 changes: 2 additions & 2 deletions binaries/geph5-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn main() {
let control_cookie = format!("bridge-cookie-{}", rand::random::<u128>());
let control_listener = SosistabListener::new(listener, Cookie::new(&control_cookie));
let upload_loop = broker_upload_loop(control_listen, control_cookie);
let listen_loop = listen_forward_loop(control_listener);
let listen_loop = listen_forward_loop(my_ip, control_listener);
upload_loop.race(listen_loop).await
})
}
Expand Down Expand Up @@ -88,6 +88,6 @@ async fn broker_upload_loop(control_listen: SocketAddr, control_cookie: String)
.await
.unwrap()
.unwrap();
async_io::Timer::after(Duration::from_secs(60)).await;
async_io::Timer::after(Duration::from_secs(10)).await;
}
}
4 changes: 3 additions & 1 deletion binaries/geph5-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ serde_yaml = "0.9.32"
smolscale = "0.4.4"
sqlx = { version = "0.7", features = [ "runtime-tokio-rustls", "postgres", "chrono" ] }
geph5-broker-protocol={path="../../libraries/geph5-broker-protocol"}
geph5-misc-rpc={path="../../libraries/geph5-misc-rpc"}
async-trait = "0.1.77"
nanorpc = "0.1.12"
thiserror = "1.0.57"
Expand All @@ -31,4 +32,5 @@ ed25519-dalek = "2.1.1"
tokio = { version = "1.0", features = ["full"] }
tracing-subscriber = "0.3.18"
nanorpc-sillad={path="../../libraries/nanorpc-sillad"}
sillad={path="../../libraries/sillad"}
sillad={path="../../libraries/sillad"}
sillad-sosistab3={path="../../libraries/sillad-sosistab3"}
21 changes: 21 additions & 0 deletions binaries/geph5-broker/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{ops::Deref, str::FromStr, time::Duration};

use async_io::Timer;
use geph5_broker_protocol::BridgeDescriptor;
use once_cell::sync::Lazy;
use rand::Rng;
use sqlx::{
Expand Down Expand Up @@ -40,6 +41,10 @@ pub async fn database_gc_loop() -> anyhow::Result<()> {
.execute(POSTGRES.deref())
.await?;
tracing::debug!(rows_affected = res.rows_affected(), "cleaned up exits");
let res = sqlx::query("delete from bridges_new where expiry > extract(epoch from now())")
.execute(POSTGRES.deref())
.await?;
tracing::debug!(rows_affected = res.rows_affected(), "cleaned up bridges");
}
}

Expand Down Expand Up @@ -78,3 +83,19 @@ pub async fn insert_exit(exit: &ExitRow) -> anyhow::Result<()> {
.await?;
Ok(())
}

pub async fn query_bridges(key: &str) -> anyhow::Result<Vec<BridgeDescriptor>> {
static RANDOM: Lazy<String> = Lazy::new(|| format!("rando-{}", rand::random::<u128>()));
let raw: Vec<(String, String, String, i64)> = sqlx::query_as(r"
select distinct on (pool) listen, cookie, pool, expiry from bridges_new order by pool, encode(digest(listen || $1 || $2, 'sha256'), 'hex');
").bind(key).bind(RANDOM.deref()).fetch_all(POSTGRES.deref()).await?;
Ok(raw
.into_iter()
.map(|row| BridgeDescriptor {
control_listen: row.0.parse().unwrap(),
control_cookie: row.1,
pool: row.2,
expiry: row.3 as _,
})
.collect())
}
1 change: 1 addition & 0 deletions binaries/geph5-broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use smolscale::immortal::{Immortal, RespawnStrategy};
use std::{fs, net::SocketAddr, path::PathBuf};

mod database;
mod routes;
mod rpc_impl;

/// The global config file.
Expand Down
52 changes: 52 additions & 0 deletions binaries/geph5-broker/src/routes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::{
net::SocketAddr,
time::{Duration, SystemTime},
};

use geph5_broker_protocol::{BridgeDescriptor, RouteDescriptor};
use geph5_misc_rpc::bridge::{B2eMetadata, BridgeControlClient, ObfsProtocol};
use moka::future::Cache;
use nanorpc_sillad::DialerTransport;
use once_cell::sync::Lazy;
use sillad::tcp::TcpDialer;
use sillad_sosistab3::{dialer::SosistabDialer, Cookie};

pub async fn bridge_to_leaf_route(
bridge: &BridgeDescriptor,
exit_b2e: SocketAddr,
) -> anyhow::Result<RouteDescriptor> {
static CACHE: Lazy<Cache<(SocketAddr, SocketAddr), RouteDescriptor>> = Lazy::new(|| {
Cache::builder()
.time_to_live(Duration::from_secs(60))
.build()
});

let cookie = Cookie::new(&bridge.control_cookie);

CACHE
.try_get_with((bridge.control_listen, exit_b2e), async {
let dialer = SosistabDialer {
inner: TcpDialer {
dest_addr: bridge.control_listen,
},
cookie,
};
let cookie = format!("exit-cookie-{}", rand::random::<u128>());
let control_client = BridgeControlClient(DialerTransport(dialer));
let forwarded_listen = control_client
.tcp_forward(
exit_b2e,
B2eMetadata {
protocol: ObfsProtocol::Sosistab3(cookie.clone()),
expiry: SystemTime::now() + Duration::from_secs(86400),
},
)
.await?;
anyhow::Ok(RouteDescriptor::Sosistab3 {
cookie,
lower: RouteDescriptor::Tcp(forwarded_listen).into(),
})
})
.await
.map_err(|err| anyhow::anyhow!("bridge comms failed: {:?}", err))
}
25 changes: 20 additions & 5 deletions binaries/geph5-broker/src/rpc_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{ops::Deref, sync::Arc, time::Duration};
use std::{net::SocketAddr, ops::Deref, sync::Arc, time::Duration};

use async_trait::async_trait;
use ed25519_dalek::{SigningKey, VerifyingKey};
Expand All @@ -11,7 +11,8 @@ use moka::future::Cache;
use once_cell::sync::Lazy;

use crate::{
database::{insert_exit, ExitRow, POSTGRES},
database::{insert_exit, query_bridges, ExitRow, POSTGRES},
routes::bridge_to_leaf_route,
CONFIG_FILE,
};

Expand Down Expand Up @@ -72,9 +73,23 @@ impl BrokerProtocol for BrokerImpl {
.map_err(|e: Arc<GenericError>| e.deref().clone())
}

async fn get_routes(&self, _exit: String) -> Result<RouteDescriptor, GenericError> {
// Implement your logic here
unimplemented!();
async fn get_routes(&self, exit: SocketAddr) -> Result<RouteDescriptor, GenericError> {
// TODO auth
let raw_descriptors = query_bridges("").await?;
let mut routes = vec![];
for desc in raw_descriptors {
match bridge_to_leaf_route(&desc, exit).await {
Ok(route) => routes.push(route),
Err(err) => {
tracing::warn!(
err = debug(err),
bridge = debug(desc),
"could not communicate"
)
}
}
}
Ok(RouteDescriptor::Race(routes))
}

async fn put_exit(&self, descriptor: Mac<Signed<ExitDescriptor>>) -> Result<(), GenericError> {
Expand Down
1 change: 1 addition & 0 deletions binaries/geph5-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ geph5-broker-protocol={path="../../libraries/geph5-broker-protocol"}
geph5-misc-rpc={path="../../libraries/geph5-misc-rpc"}
picomux={path="../../libraries/picomux"}
sillad={path="../../libraries/sillad"}
sillad-sosistab3={path="../../libraries/sillad-sosistab3"}
nanorpc-sillad={path="../../libraries/nanorpc-sillad"}
reqwest = {version="0.11.24", default-features=false, features=["rustls-tls"]}

Expand Down
Loading

0 comments on commit 29ec7a9

Please sign in to comment.