Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
stratum: diff manager
  • Loading branch information
sanlee42 committed Mar 21, 2024
1 parent 0868631 commit 4a8971c
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 20 deletions.
62 changes: 62 additions & 0 deletions stratum/src/diff_manager.rs
@@ -0,0 +1,62 @@
use starcoin_types::U256;

use crate::difficulty_to_target_hex;
pub const SHARE_SUBMIT_PERIOD: u64 = 5;
pub const INIT_HASH_RATE: u64 = 5;
pub const MINI_UPDATE_PERIOD: u64 = 5;
pub struct DifficultyManager {
pub timestamp_since_last_update: u64,
pub submits_since_last_update: u32,
pub hash_rate: u64,
pub difficulty: U256,
}

impl DifficultyManager {
pub fn get_target(&self) -> String {
difficulty_to_target_hex(self.difficulty)
}

fn get_difficulty_from_hashrate(hash_rate: u64, share_submit_period: u64) -> U256 {
U256::from(hash_rate * share_submit_period)
}

fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("time went backwards")
.as_secs()
}

pub fn new() -> Self {
Self {
timestamp_since_last_update: Self::current_timestamp(),
submits_since_last_update: 0,
hash_rate: INIT_HASH_RATE,
difficulty: Self::get_difficulty_from_hashrate(INIT_HASH_RATE, SHARE_SUBMIT_PERIOD),
}
}

pub fn find_seal(&mut self) {
self.submits_since_last_update += 1;
}

pub fn try_update(&mut self) {
let current_timestamp = Self::current_timestamp();

let pass_time = current_timestamp - self.timestamp_since_last_update;
if pass_time < MINI_UPDATE_PERIOD {
return;
}

if self.submits_since_last_update == 0 {
self.hash_rate = self.hash_rate / 2
} else {
// hash_rate = difficulty / avg_time = difficulty / (pass_time / submits_of_share)
self.hash_rate =
(self.difficulty / pass_time * self.submits_since_last_update).as_u64();
}
self.timestamp_since_last_update = current_timestamp;
self.difficulty = Self::get_difficulty_from_hashrate(self.hash_rate, SHARE_SUBMIT_PERIOD);
self.submits_since_last_update = 0;
}
}
1 change: 1 addition & 0 deletions stratum/src/lib.rs
@@ -1,5 +1,6 @@
use starcoin_types::U256;

pub mod diff_manager;
pub mod rpc;
pub mod service;
pub mod stratum;
Expand Down
37 changes: 33 additions & 4 deletions stratum/src/rpc.rs
@@ -1,4 +1,3 @@
use crate::difficulty_to_target_hex;
use crate::stratum::Stratum;
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
use futures::FutureExt;
Expand All @@ -16,6 +15,7 @@ use starcoin_types::block::BlockHeaderExtra;
use starcoin_types::system_events::MintBlockEvent;
use std::borrow::BorrowMut;
use std::convert::TryInto;

use std::io::Write;
use std::sync::mpsc::TrySendError;
use std::sync::Arc;
Expand Down Expand Up @@ -203,21 +203,50 @@ impl StratumJob {
Ok(BlockHeaderExtra::new(extra))
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct JobId {
pub job_id: [u8; 8],
}
impl JobId {
pub fn from_bob(minting_bob: &[u8]) -> JobId {
let mut job_id = [0u8; 8];
job_id.copy_from_slice(&minting_bob[0..8]);
Self { job_id }
}
pub fn encode(&self) -> String {
hex::encode(&self.job_id)
}
pub fn equal_with(&self, minting_bob: &[u8]) -> bool {
&self.job_id[..] == &minting_bob[0..8]
}
pub fn new(job_id: &String) -> anyhow::Result<Self> {
let job_id: [u8; 8] = hex::decode(job_id)
.map_err(|_| anyhow::anyhow!("Decode job_id failed"))?
.try_into()
.map_err(|_| anyhow::anyhow!("Invalid job id with bad length"))?;
Ok(Self { job_id })
}
}

impl StratumJobResponse {
pub fn from(e: &MintBlockEvent, login: Option<LoginRequest>, worker_id: [u8; 4]) -> Self {
pub fn from(
e: &MintBlockEvent,
login: Option<LoginRequest>,
worker_id: [u8; 4],
target: String,
) -> Self {
let mut minting_blob = e.minting_blob.clone();
let _ = minting_blob[35..39].borrow_mut().write_all(&worker_id);
let worker_id_hex = hex::encode(worker_id);
let job_id = hex::encode(&e.minting_blob[0..8]);
let job_id = JobId::from_bob(&e.minting_blob).encode();
Self {
login,
id: worker_id_hex.clone(),
status: "OK".into(),
job: StratumJob {
height: 0,
id: worker_id_hex,
target: difficulty_to_target_hex(e.difficulty),
target,
job_id,
blob: hex::encode(&minting_blob),
},
Expand Down
60 changes: 44 additions & 16 deletions stratum/src/stratum.rs
@@ -1,4 +1,5 @@
use crate::rpc::*;
use crate::diff_manager::DifficultyManager;
use crate::{difficulty_to_target_hex, rpc::*};
use anyhow::Result;
use futures::channel::mpsc;
use futures::StreamExt;
Expand Down Expand Up @@ -29,21 +30,48 @@ impl Stratum {
mint_block_subscribers: Default::default(),
}
}

fn next_id(&self) -> u32 {
self.uid.fetch_add(1, atomic::Ordering::SeqCst)
}
fn sync_current_job(&mut self) -> Result<Option<MintBlockEvent>> {

fn sync_upstream_job(&mut self) -> Result<Option<MintBlockEvent>> {
let service = self.miner_service.clone();
let subscribers_num = self.mint_block_subscribers.len() as u32;
futures::executor::block_on(service.send(UpdateSubscriberNumRequest {
number: Some(subscribers_num),
}))
}
fn send_to_all(&mut self, event: MintBlockEvent) {

fn get_downstream_job(
subscribe_id: u32,
login: LoginRequest,
set_login: bool,
upstreaum_event: &MintBlockEvent,
diff_manager: &DifficultyManager,
) -> StratumJobResponse {
let worker_id = login.generate_worker_id(subscribe_id);
let target = diff_manager.get_target();
let job = StratumJobResponse::from(
upstreaum_event,
if set_login { Some(login) } else { None },
worker_id,
difficulty_to_target_hex(upstreaum_event.difficulty), //target,
);

return job;
}

fn dispatch_job_to_clients(&mut self, event: MintBlockEvent) {
let mut remove_outdated = vec![];
for (id, (ch, login)) in self.mint_block_subscribers.iter() {
let worker_id = login.generate_worker_id(*id);
let job = StratumJobResponse::from(&event, None, worker_id);
let job = Self::get_downstream_job(
*id,
login.clone(),
false,
&event,
&DifficultyManager::new(),
);
info!(target: "stratum", "dispatch startum job:{:?}", job);
if let Err(err) = ch.unbounded_send(job) {
if err.is_disconnected() {
Expand Down Expand Up @@ -74,7 +102,7 @@ impl ActorService for Stratum {

impl EventHandler<Self, MintBlockEvent> for Stratum {
fn handle_event(&mut self, event: MintBlockEvent, _ctx: &mut ServiceContext<Stratum>) {
self.send_to_all(event);
self.dispatch_job_to_clients(event);
}
}

Expand Down Expand Up @@ -128,12 +156,12 @@ impl ServiceHandler<Self, SubscribeJobEvent> for Stratum {
error!(target: "stratum", "Subscriber assign is failed");
}
});
if let Ok(Some(event)) = self.sync_current_job() {
if let Ok(Some(event)) = self.sync_upstream_job() {
let downstream_job =
Self::get_downstream_job(sub_id, login, true, &event, &DifficultyManager::new());
ctx.spawn(async move {
let worker_id = login.generate_worker_id(sub_id);
let stratum_result = StratumJobResponse::from(&event, Some(login), worker_id);
info!(target:"stratum", "Respond to stratum subscribe:{:?}", stratum_result);
if let Err(err) = sender.unbounded_send(stratum_result) {
info!(target:"stratum", "Respond to stratum subscribe:{:?}", downstream_job);
if let Err(err) = sender.unbounded_send(downstream_job) {
error!(target: "stratum", "Failed to send MintBlockEvent: {}", err);
}
});
Expand All @@ -146,11 +174,11 @@ impl ServiceHandler<Self, SubscribeJobEvent> for Stratum {
impl ServiceHandler<Self, SubmitShareEvent> for Stratum {
fn handle(&mut self, msg: SubmitShareEvent, _ctx: &mut ServiceContext<Self>) -> Result<()> {
info!(target: "stratum", "received submit share event:{:?}", &msg.0);
if let Some(current_mint_event) = self.sync_current_job()? {
let job_id = hex::encode(&current_mint_event.minting_blob[0..8]);
let submit_job_id = msg.0.job_id.clone();
if submit_job_id != job_id {
warn!(target: "stratum", "received job mismatch with current job,{},{}", submit_job_id, job_id);
if let Some(current_mint_event) = self.sync_upstream_job()? {
let job_id = JobId::new(&msg.0.job_id)?;
let submit_job_id = JobId::from_bob(&current_mint_event.minting_blob);
if job_id != submit_job_id {
info!(target: "stratum", "received job mismatch with current job,{:?},{:?}",job_id,submit_job_id);
return Ok(());
};
let mut seal: MinerSubmitSealRequest = msg.0.try_into()?;
Expand Down

0 comments on commit 4a8971c

Please sign in to comment.