penumbra_sdk_app/server/
mempool.rs1use anyhow::Result;
2
3use cnidarium::Storage;
4
5use tendermint::v0_37::abci::{
6 request::CheckTx as CheckTxReq, request::CheckTxKind, response::CheckTx as CheckTxRsp,
7 MempoolRequest as Request, MempoolResponse as Response,
8};
9use tokio::sync::mpsc;
10use tower_actor::Message;
11use tracing::Instrument;
12
13use crate::{app::App, metrics};
14
15pub struct Mempool {
17 queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
18 storage: Storage,
19}
20
21impl Mempool {
22 pub fn new(
23 storage: Storage,
24 queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
25 ) -> Self {
26 Self { queue, storage }
27 }
28
29 pub async fn check_tx(&mut self, req: Request) -> Result<Response, tower::BoxError> {
30 let Request::CheckTx(CheckTxReq {
31 tx: tx_bytes, kind, ..
32 }) = req;
33
34 let start = tokio::time::Instant::now();
35 let kind_str = match kind {
36 CheckTxKind::New => "new",
37 CheckTxKind::Recheck => "recheck",
38 };
39
40 let mut app = App::new(self.storage.latest_snapshot());
41
42 match app.deliver_tx_bytes(tx_bytes.as_ref()).await {
43 Ok(events) => {
44 let elapsed = start.elapsed();
45 tracing::info!(?elapsed, "tx accepted");
46 metrics::counter!(metrics::MEMPOOL_CHECKTX_TOTAL, "kind" => kind_str, "code" => "0").increment(1);
47 Ok(Response::CheckTx(CheckTxRsp {
48 events,
49 ..Default::default()
50 }))
51 }
52 Err(e) => {
53 let elapsed = start.elapsed();
54 tracing::info!(?e, ?elapsed, "tx rejected");
55 metrics::counter!(metrics::MEMPOOL_CHECKTX_TOTAL, "kind" => kind_str, "code" => "1").increment(1);
56 Ok(Response::CheckTx(CheckTxRsp {
57 code: 1.into(),
58 log: format!("{e:#}"),
60 ..Default::default()
61 }))
62 }
63 }
64 }
65
66 pub async fn run(mut self) -> Result<(), tower::BoxError> {
67 tracing::info!("mempool service started");
68 while let Some(Message {
69 req,
70 rsp_sender,
71 span,
72 }) = self.queue.recv().await
76 {
77 let result = self.check_tx(req).instrument(span).await;
78 let _ = rsp_sender.send(result);
79 }
80 tracing::info!("mempool service stopped");
81 Ok(())
82 }
83}