penumbra_sdk_app/server/
mempool.rs

1use anyhow::Result;
2
3use cnidarium::Storage;
4
5use tendermint::v0_37::abci::{
6    request::CheckTx as CheckTxReq, request::CheckTxKind, response::CheckTx as CheckTxRsp,
7    MempoolRequest as Request, MempoolResponse as Response,
8};
9use tokio::sync::mpsc;
10use tower_actor::Message;
11use tracing::Instrument;
12
13use crate::{app::App, metrics};
14
15/// A mempool service that applies transaction checks against an isolated application fork.
16pub struct Mempool {
17    queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
18    storage: Storage,
19}
20
21impl Mempool {
22    pub fn new(
23        storage: Storage,
24        queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
25    ) -> Self {
26        Self { queue, storage }
27    }
28
29    pub async fn check_tx(&mut self, req: Request) -> Result<Response, tower::BoxError> {
30        let Request::CheckTx(CheckTxReq {
31            tx: tx_bytes, kind, ..
32        }) = req;
33
34        let start = tokio::time::Instant::now();
35        let kind_str = match kind {
36            CheckTxKind::New => "new",
37            CheckTxKind::Recheck => "recheck",
38        };
39
40        let mut app = App::new(self.storage.latest_snapshot());
41
42        match app.deliver_tx_bytes(tx_bytes.as_ref()).await {
43            Ok(events) => {
44                let elapsed = start.elapsed();
45                tracing::info!(?elapsed, "tx accepted");
46                metrics::counter!(metrics::MEMPOOL_CHECKTX_TOTAL, "kind" => kind_str, "code" => "0").increment(1);
47                Ok(Response::CheckTx(CheckTxRsp {
48                    events,
49                    ..Default::default()
50                }))
51            }
52            Err(e) => {
53                let elapsed = start.elapsed();
54                tracing::info!(?e, ?elapsed, "tx rejected");
55                metrics::counter!(metrics::MEMPOOL_CHECKTX_TOTAL, "kind" => kind_str, "code" => "1").increment(1);
56                Ok(Response::CheckTx(CheckTxRsp {
57                    code: 1.into(),
58                    // Use the alternate format specifier to include the chain of error causes.
59                    log: format!("{e:#}"),
60                    ..Default::default()
61                }))
62            }
63        }
64    }
65
66    pub async fn run(mut self) -> Result<(), tower::BoxError> {
67        tracing::info!("mempool service started");
68        while let Some(Message {
69            req,
70            rsp_sender,
71            span,
72            // We could perform `CheckTx` asynchronously, and poll many
73            // entries from the queue:
74            // See https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.recv_many
75        }) = self.queue.recv().await
76        {
77            let result = self.check_tx(req).instrument(span).await;
78            let _ = rsp_sender.send(result);
79        }
80        tracing::info!("mempool service stopped");
81        Ok(())
82    }
83}