penumbra_mock_tendermint_proxy/
proxy.rsuse {
penumbra_proto::{
tendermint::p2p::DefaultNodeInfo,
util::tendermint_proxy::v1::{
tendermint_proxy_service_server::TendermintProxyService, AbciQueryRequest,
AbciQueryResponse, BroadcastTxAsyncRequest, BroadcastTxAsyncResponse,
BroadcastTxSyncRequest, BroadcastTxSyncResponse, GetBlockByHeightRequest,
GetBlockByHeightResponse, GetStatusRequest, GetStatusResponse, GetTxRequest,
GetTxResponse, SyncInfo,
},
},
std::{
collections::BTreeMap,
sync::{Arc, RwLock},
},
tap::{Tap, TapFallible, TapOptional},
tendermint::{
block::{Block, Height},
Time,
},
tonic::Status,
tracing::instrument,
};
#[derive(Default)]
pub struct TestNodeProxy {
inner: Arc<Inner>,
}
#[derive(Default)]
struct Inner {
blocks: RwLock<BTreeMap<Height, Block>>,
}
impl TestNodeProxy {
pub fn new<C>() -> Self {
Default::default()
}
pub fn on_block_callback(&self) -> penumbra_mock_consensus::OnBlockFn {
let Self { inner } = self;
let inner = Arc::clone(inner);
Box::new(move |block| inner.on_block(block))
}
fn last_block_height(&self) -> tendermint::block::Height {
self.inner
.blocks()
.last_key_value()
.map(|(height, _)| *height)
.expect("blocks should not be empty")
}
fn timestamp(&self) -> Time {
self.inner
.blocks()
.last_key_value()
.map(|(_, block)| block)
.expect("blocks should not be empty")
.header
.time
}
}
impl Inner {
#[instrument(level = "debug", skip_all)]
fn on_block(&self, block: tendermint::Block) {
let height = block.header.height;
self.blocks_mut()
.insert(height, block)
.map(|_overwritten| {
panic!("proxy received two blocks with height {height}");
})
.tap_none(|| {
tracing::debug!(?height, "received block");
});
}
fn blocks(&self) -> std::sync::RwLockReadGuard<'_, BTreeMap<Height, Block>> {
let Self { blocks } = self;
blocks
.tap(|_| tracing::trace!("acquiring read lock"))
.read()
.tap(|_| tracing::trace!("acquired read lock"))
.tap_err(|_| tracing::error!("failed to acquire read lock"))
.expect("block lock should never be poisoned")
}
fn blocks_mut(&self) -> std::sync::RwLockWriteGuard<'_, BTreeMap<Height, Block>> {
let Self { blocks } = self;
blocks
.tap(|_| tracing::trace!("acquiring write lock"))
.write()
.tap(|_| tracing::trace!("acquired write lock"))
.tap_err(|_| tracing::error!("failed to acquire write lock"))
.expect("block lock should never be poisoned")
}
}
#[tonic::async_trait]
impl TendermintProxyService for TestNodeProxy {
async fn get_tx(
&self,
_req: tonic::Request<GetTxRequest>,
) -> Result<tonic::Response<GetTxResponse>, Status> {
Err(Status::unimplemented("get_tx"))
}
#[instrument(
level = "info",
skip_all,
fields(req_id = tracing::field::Empty),
)]
async fn broadcast_tx_async(
&self,
_req: tonic::Request<BroadcastTxAsyncRequest>,
) -> Result<tonic::Response<BroadcastTxAsyncResponse>, Status> {
Ok(tonic::Response::new(BroadcastTxAsyncResponse {
code: 0,
data: Vec::default(),
log: String::default(),
hash: Vec::default(),
}))
}
#[instrument(
level = "info",
skip_all,
fields(req_id = tracing::field::Empty),
)]
async fn broadcast_tx_sync(
&self,
_req: tonic::Request<BroadcastTxSyncRequest>,
) -> Result<tonic::Response<BroadcastTxSyncResponse>, Status> {
Ok(tonic::Response::new(BroadcastTxSyncResponse {
code: 0,
data: Vec::default(),
log: String::default(),
hash: Vec::default(),
}))
}
#[instrument(level = "info", skip_all)]
async fn get_status(
&self,
req: tonic::Request<GetStatusRequest>,
) -> Result<tonic::Response<GetStatusResponse>, Status> {
let GetStatusRequest { .. } = req.into_inner();
let latest_block_height = self.last_block_height().into();
let block_ts: tendermint_proto::google::protobuf::Timestamp = self.timestamp().into();
let sync_info = SyncInfo {
latest_block_hash: self
.inner
.blocks()
.last_key_value()
.map(|(_, b)| b.header.hash().into())
.unwrap_or_default(),
latest_app_hash: self
.inner
.blocks()
.last_key_value()
.map(|(_, b)| b.header.app_hash.clone().into())
.unwrap_or_default(),
latest_block_height,
latest_block_time: Some(pbjson_types::Timestamp {
seconds: block_ts.seconds,
nanos: block_ts.nanos,
}),
catching_up: false,
};
Ok(GetStatusResponse {
node_info: Some(DefaultNodeInfo::default()),
sync_info: Some(sync_info),
validator_info: Some(Default::default()),
})
.map(tonic::Response::new)
}
#[instrument(level = "info", skip_all)]
async fn abci_query(
&self,
_req: tonic::Request<AbciQueryRequest>,
) -> Result<tonic::Response<AbciQueryResponse>, Status> {
Err(Status::unimplemented("abci_query"))
}
#[instrument(level = "info", skip_all)]
async fn get_block_by_height(
&self,
req: tonic::Request<GetBlockByHeightRequest>,
) -> Result<tonic::Response<GetBlockByHeightResponse>, Status> {
let GetBlockByHeightRequest { height } = req.into_inner();
let height =
tendermint::block::Height::try_from(height).expect("height should be less than 2^63");
let block = self.inner.blocks().get(&height).cloned();
let proto_block = block
.clone()
.map(penumbra_proto::tendermint::types::Block::try_from)
.transpose()
.or_else(|e| {
tracing::warn!(?height, error = ?e, "proxy: error fetching blocks");
Err(tonic::Status::internal("error fetching blocks"))
})?;
Ok(GetBlockByHeightResponse {
block_id: block.map(|b| penumbra_proto::tendermint::types::BlockId {
hash: b.header.hash().into(),
part_set_header: Some(penumbra_proto::tendermint::types::PartSetHeader {
total: 0,
hash: vec![],
}),
}),
block: proto_block,
})
.map(tonic::Response::new)
}
}