Skip to content

Commit

Permalink
Move filters to a separate RPC trait
Browse files Browse the repository at this point in the history
  • Loading branch information
sorpaas committed Oct 2, 2017
1 parent 5d01397 commit 5e68406
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
6 changes: 6 additions & 0 deletions src/rpc/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::str::FromStr;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use blockchain::chain::HeaderHash;
use rpc::RPCLogFilter;

use super::{RPCLog, Either};
use super::util::*;
Expand Down Expand Up @@ -104,6 +105,11 @@ impl FilterManager {
}
}

pub fn from_log_filter(&self, log: RPCLogFilter) -> Result<LogFilter, Error> {
let state = self.state.lock().unwrap();
from_log_filter(&state, log)
}

pub fn install_log_filter(&mut self, filter: LogFilter) -> usize {
let id = self.filters.len();
self.filters.insert(id, Filter::Log(filter.clone()));
Expand Down
11 changes: 9 additions & 2 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ build_rpc_trait! {
#[rpc(name = "eth_getCompilers")]
fn compilers(&self) -> Result<Vec<String>, Error>;

#[rpc(name = "eth_getLogs")]
fn logs(&self, RPCLogFilter) -> Result<Vec<RPCLog>, Error>;
}
}

build_rpc_trait! {
pub trait FilterRPC {
#[rpc(name = "eth_newFilter")]
fn new_filter(&self, RPCLogFilter) -> Result<String, Error>;
#[rpc(name = "eth_newBlockFilter")]
Expand All @@ -258,8 +265,6 @@ build_rpc_trait! {
fn filter_changes(&self, String) -> Result<Either<Vec<String>, Vec<RPCLog>>, Error>;
#[rpc(name = "eth_getFilterLogs")]
fn filter_logs(&self, String) -> Result<Vec<RPCLog>, Error>;
#[rpc(name = "eth_getLogs")]
fn logs(&self, RPCLogFilter) -> Result<Vec<RPCLog>, Error>;
}
}

Expand Down Expand Up @@ -291,11 +296,13 @@ pub fn rpc_loop<P: 'static + Patch + Send>(
state: Arc<Mutex<MinerState>>, addr: &SocketAddr, channel: Sender<bool>
) {
let rpc = serves::MinerEthereumRPC::<P>::new(state.clone(), channel);
let filter = serves::MinerFilterRPC::<P>::new(state.clone());
let debug = serves::MinerDebugRPC::<P>::new(state);

let mut io = IoHandler::default();

io.extend_with(rpc.to_delegate());
io.extend_with(filter.to_delegate());
io.extend_with(debug.to_delegate());

let server = ServerBuilder::new(io)
Expand Down
47 changes: 30 additions & 17 deletions src/rpc/serves.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{EthereumRPC, DebugRPC, Either, RPCTransaction, RPCTrace, RPCStep, RPCBlock, RPCLog, RPCReceipt, RPCLogFilter, RPCBlockTrace, RPCDump, RPCDumpAccount, RPCTraceConfig};
use super::{EthereumRPC, FilterRPC, DebugRPC, Either, RPCTransaction, RPCTrace, RPCStep, RPCBlock, RPCLog, RPCReceipt, RPCLogFilter, RPCBlockTrace, RPCDump, RPCDumpAccount, RPCTraceConfig};
use super::util::*;
use super::filter::*;
use super::serialize::*;
Expand All @@ -23,31 +23,44 @@ use std::marker::PhantomData;
use jsonrpc_macros::Trailing;

pub struct MinerEthereumRPC<P: Patch + Send> {
filter: Mutex<FilterManager>,
state: Arc<Mutex<MinerState>>,
channel: Sender<bool>,
_patch: PhantomData<P>,
}

pub struct MinerFilterRPC<P: Patch + Send> {
filter: Mutex<FilterManager>,
_patch: PhantomData<P>,
}

pub struct MinerDebugRPC<P: Patch + Send> {
state: Arc<Mutex<MinerState>>,
_patch: PhantomData<P>,
}

unsafe impl<P: Patch + Send> Sync for MinerEthereumRPC<P> { }
unsafe impl<P: Patch + Send> Sync for MinerFilterRPC<P> { }
unsafe impl<P: Patch + Send> Sync for MinerDebugRPC<P> { }

impl<P: Patch + Send> MinerEthereumRPC<P> {
pub fn new(state: Arc<Mutex<MinerState>>, channel: Sender<bool>) -> Self {
MinerEthereumRPC {
filter: Mutex::new(FilterManager::new(state.clone())),
channel,
state,
_patch: PhantomData,
}
}
}

impl<P: Patch + Send> MinerFilterRPC<P> {
pub fn new(state: Arc<Mutex<MinerState>>) -> Self {
MinerFilterRPC {
filter: Mutex::new(FilterManager::new(state)),
_patch: PhantomData,
}
}
}

impl<P: Patch + Send> MinerDebugRPC<P> {
pub fn new(state: Arc<Mutex<MinerState>>) -> Self {
MinerDebugRPC {
Expand Down Expand Up @@ -517,12 +530,21 @@ impl<P: 'static + Patch + Send> EthereumRPC for MinerEthereumRPC<P> {
Ok(Vec::new())
}

fn logs(&self, log: RPCLogFilter) -> Result<Vec<RPCLog>, Error> {
let state = self.state.lock().unwrap();

match from_log_filter(&state, log) {
Ok(filter) => Ok(get_logs(&state, filter)?),
Err(_) => Ok(Vec::new()),
}
}
}

impl<P: 'static + Patch + Send> FilterRPC for MinerFilterRPC<P> {
fn new_filter(&self, log: RPCLogFilter) -> Result<String, Error> {
let filter = {
let state = self.state.lock().unwrap();
from_log_filter(&state, log)?
};
let id = self.filter.lock().unwrap().install_log_filter(filter);
let mut filter = self.filter.lock().unwrap();
let log_filter = filter.from_log_filter(log)?;
let id = filter.install_log_filter(log_filter);
Ok(format!("0x{:x}", id))
}

Expand Down Expand Up @@ -551,15 +573,6 @@ impl<P: 'static + Patch + Send> EthereumRPC for MinerEthereumRPC<P> {
let id = U256::from_str(&id)?.as_usize();
Ok(self.filter.lock().unwrap().get_logs(id)?)
}

fn logs(&self, log: RPCLogFilter) -> Result<Vec<RPCLog>, Error> {
let state = self.state.lock().unwrap();

match from_log_filter(&state, log) {
Ok(filter) => Ok(get_logs(&state, filter)?),
Err(_) => Ok(Vec::new()),
}
}
}

impl<P: 'static + Patch + Send> DebugRPC for MinerDebugRPC<P> {
Expand Down

0 comments on commit 5e68406

Please sign in to comment.