penumbra_sdk_mock_tendermint_proxy/
proxy.rs

1use {
2    penumbra_sdk_proto::{
3        tendermint::p2p::DefaultNodeInfo,
4        util::tendermint_proxy::v1::{
5            tendermint_proxy_service_server::TendermintProxyService, AbciQueryRequest,
6            AbciQueryResponse, BroadcastTxAsyncRequest, BroadcastTxAsyncResponse,
7            BroadcastTxSyncRequest, BroadcastTxSyncResponse, GetBlockByHeightRequest,
8            GetBlockByHeightResponse, GetStatusRequest, GetStatusResponse, GetTxRequest,
9            GetTxResponse, SyncInfo,
10        },
11    },
12    std::{
13        collections::BTreeMap,
14        sync::{Arc, RwLock},
15    },
16    tap::{Tap, TapFallible, TapOptional},
17    tendermint::{
18        block::{Block, Height},
19        Time,
20    },
21    tonic::Status,
22    tracing::instrument,
23};
24
25/// A tendermint proxy service for use in tests.
26///
27/// This type implements [`TendermintProxyService`], but can be configured to report the blocks
28/// generated by a [`penumbra_sdk_mock_consensus::TestNode`].
29#[derive(Default)]
30pub struct TestNodeProxy {
31    inner: Arc<Inner>,
32}
33
34#[derive(Default)]
35struct Inner {
36    /// A map of the [`Blocks`] that have been seen so far, keyed by [`Height`].
37    blocks: RwLock<BTreeMap<Height, Block>>,
38}
39
40impl TestNodeProxy {
41    /// Creates a new [`TestNodeProxy`].
42    pub fn new<C>() -> Self {
43        Default::default()
44    }
45
46    /// Returns a boxed function that will add [`Blocks`] to this proxy.
47    pub fn on_block_callback(&self) -> penumbra_sdk_mock_consensus::OnBlockFn {
48        // Create a new reference to the shared map of blocks we've seen.
49        let Self { inner } = self;
50        let inner = Arc::clone(inner);
51
52        Box::new(move |block| inner.on_block(block))
53    }
54
55    /// Returns the last committed block height.
56    fn last_block_height(&self) -> tendermint::block::Height {
57        self.inner
58            .blocks()
59            .last_key_value()
60            .map(|(height, _)| *height)
61            .expect("blocks should not be empty")
62    }
63
64    /// Returns the latest block timestamp.
65    fn timestamp(&self) -> Time {
66        self.inner
67            .blocks()
68            .last_key_value()
69            .map(|(_, block)| block)
70            .expect("blocks should not be empty")
71            .header
72            .time
73    }
74}
75
76impl Inner {
77    #[instrument(level = "debug", skip_all)]
78    fn on_block(&self, block: tendermint::Block) {
79        // Add this block to the proxy's book-keeping.
80        let height = block.header.height;
81        self.blocks_mut()
82            .insert(height, block)
83            .map(|_overwritten| {
84                // ...or panic if we have been given block with duplicate heights.
85                panic!("proxy received two blocks with height {height}");
86            })
87            .tap_none(|| {
88                tracing::debug!(?height, "received block");
89            });
90    }
91
92    /// Acquires a write-lock on the map of blocks we have seen before.
93    fn blocks(&self) -> std::sync::RwLockReadGuard<'_, BTreeMap<Height, Block>> {
94        let Self { blocks } = self;
95        blocks
96            .tap(|_| tracing::trace!("acquiring read lock"))
97            .read()
98            .tap(|_| tracing::trace!("acquired read lock"))
99            .tap_err(|_| tracing::error!("failed to acquire read lock"))
100            .expect("block lock should never be poisoned")
101    }
102
103    /// Acquires a write-lock on the map of blocks we have seen before.
104    fn blocks_mut(&self) -> std::sync::RwLockWriteGuard<'_, BTreeMap<Height, Block>> {
105        let Self { blocks } = self;
106        blocks
107            .tap(|_| tracing::trace!("acquiring write lock"))
108            .write()
109            .tap(|_| tracing::trace!("acquired write lock"))
110            .tap_err(|_| tracing::error!("failed to acquire write lock"))
111            .expect("block lock should never be poisoned")
112    }
113}
114
115#[tonic::async_trait]
116impl TendermintProxyService for TestNodeProxy {
117    async fn get_tx(
118        &self,
119        _req: tonic::Request<GetTxRequest>,
120    ) -> Result<tonic::Response<GetTxResponse>, Status> {
121        Err(Status::unimplemented("get_tx"))
122    }
123
124    /// Broadcasts a transaction asynchronously.
125    #[instrument(
126        level = "info",
127        skip_all,
128        fields(req_id = tracing::field::Empty),
129    )]
130    async fn broadcast_tx_async(
131        &self,
132        _req: tonic::Request<BroadcastTxAsyncRequest>,
133    ) -> Result<tonic::Response<BroadcastTxAsyncResponse>, Status> {
134        Ok(tonic::Response::new(BroadcastTxAsyncResponse {
135            code: 0,
136            data: Vec::default(),
137            log: String::default(),
138            hash: Vec::default(),
139        }))
140    }
141
142    // Broadcasts a transaction synchronously.
143    #[instrument(
144        level = "info",
145        skip_all,
146        fields(req_id = tracing::field::Empty),
147    )]
148    async fn broadcast_tx_sync(
149        &self,
150        _req: tonic::Request<BroadcastTxSyncRequest>,
151    ) -> Result<tonic::Response<BroadcastTxSyncResponse>, Status> {
152        Ok(tonic::Response::new(BroadcastTxSyncResponse {
153            code: 0,
154            data: Vec::default(),
155            log: String::default(),
156            hash: Vec::default(),
157        }))
158    }
159
160    // Queries the current status.
161    #[instrument(level = "info", skip_all)]
162    async fn get_status(
163        &self,
164        req: tonic::Request<GetStatusRequest>,
165    ) -> Result<tonic::Response<GetStatusResponse>, Status> {
166        let GetStatusRequest { .. } = req.into_inner();
167        let latest_block_height = self.last_block_height().into();
168        let block_ts: tendermint_proto::google::protobuf::Timestamp = self.timestamp().into();
169        let sync_info = SyncInfo {
170            latest_block_hash: self
171                .inner
172                .blocks()
173                .last_key_value()
174                .map(|(_, b)| b.header.hash().into())
175                .unwrap_or_default(),
176            latest_app_hash: self
177                .inner
178                .blocks()
179                .last_key_value()
180                .map(|(_, b)| b.header.app_hash.clone().into())
181                .unwrap_or_default(),
182            latest_block_height,
183            latest_block_time: Some(pbjson_types::Timestamp {
184                seconds: block_ts.seconds,
185                nanos: block_ts.nanos,
186            }),
187            // Tests run with a single node, so it is never catching up.
188            catching_up: false,
189        };
190
191        Ok(GetStatusResponse {
192            node_info: Some(DefaultNodeInfo::default()),
193            sync_info: Some(sync_info),
194            validator_info: Some(Default::default()),
195        })
196        .map(tonic::Response::new)
197    }
198
199    #[instrument(level = "info", skip_all)]
200    async fn abci_query(
201        &self,
202        _req: tonic::Request<AbciQueryRequest>,
203    ) -> Result<tonic::Response<AbciQueryResponse>, Status> {
204        Err(Status::unimplemented("abci_query"))
205    }
206
207    #[instrument(level = "info", skip_all)]
208    async fn get_block_by_height(
209        &self,
210        req: tonic::Request<GetBlockByHeightRequest>,
211    ) -> Result<tonic::Response<GetBlockByHeightResponse>, Status> {
212        // Parse the height from the inbound client request.
213        let GetBlockByHeightRequest { height } = req.into_inner();
214        let height =
215            tendermint::block::Height::try_from(height).expect("height should be less than 2^63");
216
217        let block = self.inner.blocks().get(&height).cloned();
218        // the response uses the penumbra type but internally we use the tendermint type
219        let proto_block = block
220            .clone()
221            .map(penumbra_sdk_proto::tendermint::types::Block::try_from)
222            .transpose()
223            .or_else(|e| {
224                tracing::warn!(?height, error = ?e, "proxy: error fetching blocks");
225                Err(tonic::Status::internal("error fetching blocks"))
226            })?;
227
228        Ok(GetBlockByHeightResponse {
229            block_id: block.map(|b| penumbra_sdk_proto::tendermint::types::BlockId {
230                hash: b.header.hash().into(),
231                part_set_header: Some(penumbra_sdk_proto::tendermint::types::PartSetHeader {
232                    total: 0,
233                    hash: vec![],
234                }),
235            }),
236            block: proto_block,
237        })
238        .map(tonic::Response::new)
239    }
240}