Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] - FIX few bugs - TODO: split in proper commits #48

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions stratum-v1/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ defmt-03 = [
"heapless/defmt-03",
"serde-json-core/defmt",
]
log = ["dep:log"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not be necessary, an optional dep, create implicilty a featrue with same name.

Does it break something for you ?


[dev-dependencies]
embedded-io = { workspace = true, features = ["std"] }
Expand Down
104 changes: 79 additions & 25 deletions stratum-v1/examples/tokio-cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,89 @@ use tokio::{
net::TcpStream,
sync::{watch, Mutex},
};

/*
+------------------------+-------+-----------------------------------+---------------------------------------------------------------+
| Pool URL | Port | Web URL | Status |
+------------------------+-------+-----------------------------------+---------------------------------------------------------------+
| public-pool.io | 21496 | https://web.public-pool.io | Open Source Solo Bitcoin Mining Pool supporting open source |
| | | | miners |
+------------------------+-------+-----------------------------------+---------------------------------------------------------------+
| pool.nerdminers.org | 3333 | https://nerdminers.org | The official Nerdminer pool site - Maintained by @golden-guy |
+------------------------+-------+-----------------------------------+---------------------------------------------------------------+
| pool.nerdminer.io | 3333 | https://nerdminer.io | Maintained by CHMEX |
+------------------------+-------+-----------------------------------+---------------------------------------------------------------+
| pool.pyblock.xyz | 3333 | https://pool.pyblock.xyz/ | Maintained by curly60e |
+------------------------+-------+-----------------------------------+---------------------------------------------------------------+
| pool.sethforprivacy.com| 3333 | https://pool.sethforprivacy.com/ | Maintained by @sethforprivacy - public-pool fork |
+------------------------+-------+-----------------------------------+---------------------------------------------------------------+
*/
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();

let pool =
Select::new("Which Pool should be used?", vec!["Public-Pool", "Braiins"]).prompt()?;
let pool = Select::new(
"Which Pool should be used?",
vec![
"Public-Pool",
"NerdMiners.org",
"NerdMiner.io",
"PyBlock",
"SethForPrivacy",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more pool, why not, but can you keep braiins, I need it for my tests

],
)
.prompt()?;

let addr = match pool {
"Public-Pool" => SocketAddr::new(Ipv4Addr::new(68, 235, 52, 36).into(), 21496),
"Braiins" => SocketAddr::new(Ipv4Addr::new(64, 225, 5, 77).into(), 3333),
// public-pool.io = 172.234.17.37:21496
"Public-Pool" => SocketAddr::new(Ipv4Addr::new(172, 234, 17, 37).into(), 21496),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx to have fixed the public-pool IP address

// pool.nerdminers.org = 144.91.83.152:3333
"NerdMiners.org" => SocketAddr::new(Ipv4Addr::new(144, 91, 83, 152).into(), 3333),
// pool.nerdminer.io = 88.99.209.94:3333
"NerdMiner.io" => SocketAddr::new(Ipv4Addr::new(88, 99, 209, 94).into(), 3333),
// pool.pyblock.xyz = 172.81.181.23:3333
"PyBlock" => SocketAddr::new(Ipv4Addr::new(172, 81, 181, 23).into(), 3333),
// pool.sethforprivacy.com = 23.137.57.100:3333
"SethForPrivacy" => SocketAddr::new(Ipv4Addr::new(23, 137, 57, 100).into(), 3333),
_ => unreachable!(),
};

println!("Connecting to {}", addr);
let stream = TcpStream::connect(addr).await?;

println!("Connected");
let conn = adapter::FromTokio::<TcpStream>::new(stream);

let mut client = Client::<_, 1480, 512>::new(conn);
client.enable_software_rolling(true, false, false);

println!("Enabled software rolling");
let client_tx = Arc::new(Mutex::new(client));
let client_rx = Arc::clone(&client_tx);

let (authorized_tx, mut authorized_rx) = watch::channel(false);

tokio::spawn(async move {
loop {
println!("Waiting for message");
tokio::time::sleep(Duration::from_millis(100)).await;
let mut c = client_rx.lock().await;
match c.poll_message().await {
Ok(msg) => match msg {
Some(Message::Configured) => {
c.send_connect(Some(String::<32>::from_str("demo").unwrap()))
println!("Configured start connecting");
// c.send_connect(None).await.unwrap();
c.send_connect(Some(String::<32>::from_str("esp-miner-rs").unwrap()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please get this str from a cli parameter, esp-miner-rs has nothing to do here in this crate (proof is you are using it in your hash-hammer app)

.await
.unwrap();
}
Some(Message::Connected) => {
println!("Connected start authorizing");
c.send_authorize(
match pool {
"Public-Pool" => String::<64>::from_str(
"1HLQGxzAQWnLore3fWHc2W8UP1CgMv1GKQ.miner1",
"bc1qgaq3nk8yvd8294n6t27j8zwjcft9rm448f9tet",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replacing my address by your in the source code is not fair, but you can add a cli parameter for that, to allow you using your own.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I was still playing with this example while I am getting the hash-hammer up. As you wanned discus the code in the PR so I pushed it right away. I need to clean up code from the changes like that which I did for the testing purposes. I will clean up this code in some time together with applying rest of your comments :)

)
.unwrap(),
"pool.nerdminers.org" => String::<64>::from_str(
"bc1qgaq3nk8yvd8294n6t27j8zwjcft9rm448f9tet",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the other pool you added need their own stratum username/pass, if there are the same, please factorise in this match

)
.unwrap(),
"Braiins" => String::<64>::from_str("slush.miner1").unwrap(),
Expand All @@ -70,12 +112,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.unwrap();
}
Some(Message::Authorized) => {
println!("Authorized");
authorized_tx.send(true).unwrap();
}
Some(Message::Share {
accepted: _,
rejected: _,
}) => {
Some(
a @ Message::Share {
accepted: _,
rejected: _,
},
) => {
println!("Share: {:?}", a);
// TODO update the display if any
}
Some(Message::VersionMask(_mask)) => {
Expand All @@ -87,28 +133,35 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Some(Message::CleanJobs) => {
// TODO clean the job queue and immediately start hashing a new job
}
None => {}
None => {
println!("No message");
}
},
Err(e) => {
error!("Client receive_message error: {:?}", e);
panic!("Client receive_message error: {:?}", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want to panic!() here, better remove the error!() above

}
}
}
});

tokio::time::sleep(Duration::from_millis(1500)).await;
{
let mut c = client_tx.lock().await;
let exts = Extensions {
version_rolling: Some(VersionRolling {
mask: Some(0x1fffe000),
min_bit_count: Some(10),
min_bit_count: Some(16),
}),
minimum_difficulty: None,
minimum_difficulty: Some(256),
subscribe_extranonce: None,
info: None,
};
c.send_configure(exts).await.unwrap();
}
println!("Waiting for authorization");
authorized_rx.changed().await.unwrap();
println!("Authorized 1");
loop {
// TODO: use client.roll_job() to get a new job at the rate the hardware need it
tokio::time::sleep(Duration::from_millis(5000)).await;
Expand Down Expand Up @@ -210,15 +263,16 @@ mod adapter {

impl<T: super::Readable + Unpin + ?Sized> embedded_io_async::ReadReady for FromTokio<T> {
fn read_ready(&mut self) -> Result<bool, Self::Error> {
// TODO: This crash at runtime :
// Cannot start a runtime from within a runtime. This happens because a function (like `block_on`)
// attempted to block the current thread while the thread is being used to drive asynchronous tasks.
tokio::runtime::Handle::current().block_on(poll_fn(|cx| {
match Pin::new(&mut self.inner).poll_read_ready(cx) {
Poll::Ready(_) => Poll::Ready(Ok(true)),
Poll::Pending => Poll::Ready(Ok(false)),
}
}))
let h = tokio::runtime::Handle::current();

tokio::task::block_in_place(|| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx for fixing this

h.block_on(poll_fn(|cx| {
match Pin::new(&mut self.inner).poll_read_ready(cx) {
Poll::Ready(_) => Poll::Ready(Ok(true)),
Poll::Pending => Poll::Ready(Ok(false)),
}
}))
})
}
}

Expand Down
43 changes: 38 additions & 5 deletions stratum-v1/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,23 @@ impl<C: Read + ReadReady + Write, const RX_BUF_SIZE: usize, const TX_BUF_SIZE: u
pub async fn poll_message(&mut self) -> Result<Option<Message>> {
let mut msg = None;
let mut start = 0;
while let Some(stop) = self.rx_buf[start..self.rx_free_pos]

while let Some(mut stop) = self.rx_buf[start..self.rx_free_pos]
.iter()
.position(|&c| c == b'\n')
{
stop += start;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you 100% confident with your framing refactor (start/stop/...) ?
I did spend extensive time testing and debugging it.

trace!("Buffer start: {:?}", &self.rx_buf[..start]);
trace!("Current : {:?}", &self.rx_buf[start..stop]);
trace!("Buffer end: {:?}", &self.rx_buf[stop..]);
let line = &self.rx_buf[start..stop];
trace!("Start: {}, Stop: {}", start, stop);
debug!(
"Received Message [{}..{}], free pos: {}",
start, stop, self.rx_free_pos
);
trace!("{:?}", line);
trace!("Self.reqs: {:?}", self.reqs);
if let Some(id) = response::parse_id(line)? {
// it's a Response
match self.reqs.get(&id) {
Expand Down Expand Up @@ -128,6 +135,10 @@ impl<C: Read + ReadReady + Write, const RX_BUF_SIZE: usize, const TX_BUF_SIZE: u
msg = Some(Message::Authorized);
}
}
Some(ReqKind::SuggestDifficulty) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx for adding support for SuggestDifficulty

self.reqs.remove(&id);
info!("Suggested Difficulty Accepted");
}
Some(ReqKind::Submit) => {
match response::parse_submit(line) {
Ok(_) => {
Expand All @@ -138,14 +149,14 @@ impl<C: Read + ReadReady + Write, const RX_BUF_SIZE: usize, const TX_BUF_SIZE: u
);
}
Err(Error::Pool {
code: _c, // TODO: use this code to differentiate why share has been rejected
code: c, // TODO: use this code to differentiate why share has been rejected
message: _,
detail: _,
}) => {
self.shares_rejected += 1;
info!(
"Share #{} Rejected, count: {}/{}",
id, self.shares_accepted, self.shares_rejected
"Share #{} Rejected, count: {}/{}, code: {}",
id, self.shares_accepted, self.shares_rejected, c
);
}
Err(e) => return Err(e),
Expand Down Expand Up @@ -183,11 +194,17 @@ impl<C: Read + ReadReady + Write, const RX_BUF_SIZE: usize, const TX_BUF_SIZE: u
}
}
start = stop + 1;
if msg.is_some() {
break;
}
}
trace!("start: {}, free pos: {}", start, self.rx_free_pos);
if start > 0 && self.rx_free_pos > start {
debug!("copy {} bytes @0", self.rx_free_pos - start);
self.rx_buf.copy_within(start..self.rx_free_pos, 0);
self.rx_free_pos -= start;
} else if start == self.rx_free_pos {
self.rx_free_pos = 0;
}
if self.network_conn.read_ready().map_err(|_| Error::Network)? {
let n = self
Expand All @@ -196,7 +213,14 @@ impl<C: Read + ReadReady + Write, const RX_BUF_SIZE: usize, const TX_BUF_SIZE: u
.await
.map_err(|_| Error::Network)?;
debug!("read {} bytes @{}", n, self.rx_free_pos);
trace!("{:?}", &self.rx_buf[self.rx_free_pos..self.rx_free_pos + n]);
trace!(
"read content {:?}",
&self.rx_buf[self.rx_free_pos..self.rx_free_pos + n]
);
trace!(
"read content as string {:?}",
core::str::from_utf8(&self.rx_buf[self.rx_free_pos..self.rx_free_pos + n])
);
self.rx_free_pos += n;
}
Ok(msg)
Expand Down Expand Up @@ -234,6 +258,15 @@ impl<C: Read + ReadReady + Write, const RX_BUF_SIZE: usize, const TX_BUF_SIZE: u
debug!("Send Configure: {} bytes, id = {}", n, self.req_id);
self.send_req(n).await
}
pub async fn send_suggest_difficulty(&mut self, difficulty: u32) -> Result<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add the proper rust doc above ?

if self.configuration.is_none() {
return Err(Error::NotConfigured);
}
self.prepare_req(ReqKind::SuggestDifficulty)?;
let n = request::suggest_difficulty(self.req_id, difficulty, self.tx_buf.as_mut_slice())?;
debug!("Send Suggest Difficulty: {} bytes, id = {}", n, self.req_id);
self.send_req(n).await
}

/// # Connect Client
///
Expand Down
21 changes: 21 additions & 0 deletions stratum-v1/src/client/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub(crate) enum ReqKind {
Connect,
Authorize,
Submit,
SuggestDifficulty,
}

///Request representation.
Expand Down Expand Up @@ -44,6 +45,13 @@ pub struct Request<P> {
pub params: Option<P>,
}

#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "defmt-03", derive(defmt::Format))]
pub struct SuggestDifficulty {
/// Suggested minimum difficulty for the pool.
pub difficulty: Option<u32>,
}

#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "defmt-03", derive(defmt::Format))]
pub struct VersionRolling {
Expand Down Expand Up @@ -218,6 +226,19 @@ pub(crate) fn authorize(
serde_json_core::to_slice(&req, buf).map_err(|_| Error::JsonBufferFull)
}

pub(crate) fn suggest_difficulty(id: u64, difficulty: u32, buf: &mut [u8]) -> Result<usize> {
let method = "mining.suggest_difficulty".try_into().unwrap();
let mut vec = Vec::<u32, 1>::new();
vec.push(difficulty).map_err(|_| Error::VecFull)?;
let params = Some(vec);
let req = Request::<Vec<u32, 1>> {
method,
params,
id: Some(id),
};
serde_json_core::to_slice(&req, buf).map_err(|_| Error::JsonBufferFull)
}

#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "defmt-03", derive(defmt::Format))]
pub struct Share {
Expand Down
5 changes: 5 additions & 0 deletions stratum-v1/src/client/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ use heapless::{String, Vec};
use serde::{Deserialize, Deserializer};

pub(crate) fn parse_id(resp: &[u8]) -> Result<Option<u64>> {
trace!(
"Parsing id from response: {:#?}",
core::str::from_utf8(resp).unwrap()
);
#[derive(Debug, Deserialize)]
#[cfg_attr(feature = "defmt-03", derive(defmt::Format))]
struct IdOnly {
id: Option<u64>,
}
let id = serde_json_core::from_slice::<IdOnly>(resp)?.0.id;
trace!("Parsed id: {:?}", id);
match id {
None => Ok(None),
Some(id) => Ok(Some(id)),
Expand Down