penumbra_sdk_app/server/
consensus.rs

1use anyhow::Result;
2
3use cnidarium::Storage;
4use tendermint::abci::Event;
5use tendermint::v0_37::abci::{
6    request, response, ConsensusRequest as Request, ConsensusResponse as Response,
7};
8use tokio::sync::mpsc;
9use tower::BoxError;
10use tower_actor::Message;
11use tracing::Instrument;
12
13use crate::app::App;
14
15pub struct Consensus {
16    queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
17    storage: Storage,
18    app: App,
19}
20
21pub type ConsensusService = tower_actor::Actor<Request, Response, BoxError>;
22
23fn trace_events(events: &[Event]) {
24    for event in events {
25        let span = tracing::debug_span!("event", kind = ?event.kind);
26        span.in_scope(|| {
27            for attr in &event.attributes {
28                tracing::debug!(
29                    k = %String::from_utf8_lossy(attr.key_bytes()),
30                    v = %String::from_utf8_lossy(attr.value_bytes()),
31                );
32            }
33        })
34    }
35}
36
37impl Consensus {
38    const QUEUE_SIZE: usize = 10;
39
40    pub fn new(storage: Storage) -> ConsensusService {
41        tower_actor::Actor::new(Self::QUEUE_SIZE, |queue: _| {
42            Consensus::new_inner(storage, queue).run()
43        })
44    }
45
46    fn new_inner(
47        storage: Storage,
48        queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
49    ) -> Self {
50        let app = App::new(storage.latest_snapshot());
51
52        Self {
53            queue,
54            storage,
55            app,
56        }
57    }
58
59    async fn run(mut self) -> Result<(), tower::BoxError> {
60        while let Some(Message {
61            req,
62            rsp_sender,
63            span,
64        }) = self.queue.recv().await
65        {
66            // The send only fails if the receiver was dropped, which happens
67            // if the caller didn't propagate the message back to tendermint
68            // for some reason -- but that's not our problem.
69            let _ = rsp_sender.send(Ok(match req {
70                Request::InitChain(init_chain) => Response::InitChain(
71                    self.init_chain(init_chain)
72                        .instrument(span)
73                        .await
74                        .expect("init_chain must succeed"),
75                ),
76                Request::PrepareProposal(proposal) => Response::PrepareProposal(
77                    self.prepare_proposal(proposal)
78                        .instrument(span)
79                        .await
80                        .expect("prepare proposal must succeed"),
81                ),
82                Request::ProcessProposal(proposal) => Response::ProcessProposal(
83                    self.process_proposal(proposal)
84                        .instrument(span)
85                        .await
86                        .expect("process proposal must succeed"),
87                ),
88                Request::BeginBlock(begin_block) => Response::BeginBlock(
89                    self.begin_block(begin_block)
90                        .instrument(span)
91                        .await
92                        .expect("begin_block must succeed"),
93                ),
94                Request::DeliverTx(deliver_tx) => {
95                    Response::DeliverTx(self.deliver_tx(deliver_tx).instrument(span.clone()).await)
96                }
97                Request::EndBlock(end_block) => {
98                    Response::EndBlock(self.end_block(end_block).instrument(span).await)
99                }
100                Request::Commit => Response::Commit(
101                    self.commit()
102                        .instrument(span)
103                        .await
104                        .expect("commit must succeed"),
105                ),
106            }));
107        }
108        Ok(())
109    }
110
111    /// Initializes the chain based on the genesis data.
112    ///
113    /// The genesis data is provided by tendermint, and is used to initialize
114    /// the database.
115    async fn init_chain(&mut self, init_chain: request::InitChain) -> Result<response::InitChain> {
116        // Note that errors cannot be handled in InitChain, the application must crash.
117        let app_state: crate::genesis::AppState =
118            serde_json::from_slice(&init_chain.app_state_bytes)
119                .expect("can parse app_state in genesis file");
120
121        self.app.init_chain(&app_state).await;
122
123        // Extract the Tendermint validators from the app state
124        //
125        // NOTE: we ignore the validators passed to InitChain.validators, and instead expect them
126        // to be provided inside the initial app genesis state (`GenesisAppState`). Returning those
127        // validators in InitChain::Response tells Tendermint that they are the initial validator
128        // set. See https://docs.tendermint.com/master/spec/abci/abci.html#initchain
129        let validators = self.app.cometbft_validator_updates();
130
131        let app_hash = match &app_state {
132            crate::genesis::AppState::Checkpoint(h) => {
133                tracing::info!(?h, "genesis state is a checkpoint");
134                // If we're starting from a checkpoint, we just need to forward the app hash
135                // back to CometBFT.
136                self.storage.latest_snapshot().root_hash().await?
137            }
138            crate::genesis::AppState::Content(_) => {
139                tracing::info!("genesis state is a full configuration");
140                // Check that we haven't got a duplicated InitChain message for some reason:
141                if self.storage.latest_version() != u64::MAX {
142                    anyhow::bail!("database already initialized");
143                }
144                // Note: App::commit resets internal components, so we don't need to do that ourselves.
145                self.app.commit(self.storage.clone()).await
146            }
147        };
148
149        tracing::info!(
150            consensus_params = ?init_chain.consensus_params,
151            ?validators,
152            app_hash = ?app_hash,
153            "finished init_chain"
154        );
155
156        Ok(response::InitChain {
157            consensus_params: Some(init_chain.consensus_params),
158            validators,
159            app_hash: app_hash.0.to_vec().try_into()?,
160        })
161    }
162
163    async fn prepare_proposal(
164        &mut self,
165        proposal: request::PrepareProposal,
166    ) -> Result<response::PrepareProposal> {
167        tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, "preparing proposal");
168        // We prepare a proposal against an isolated fork of the application state.
169        let mut tmp_app = App::new(self.storage.latest_snapshot());
170        // Once we are done, we discard it so that the application state doesn't get corrupted
171        // if another round of consensus is required because the proposal fails to finalize.
172        Ok(tmp_app.prepare_proposal(proposal).await)
173    }
174
175    async fn process_proposal(
176        &mut self,
177        proposal: request::ProcessProposal,
178    ) -> Result<response::ProcessProposal> {
179        tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, proposal_hash = %proposal.hash, "processing proposal");
180        // We process the proposal in an isolated state fork. Eventually, we should cache this work and
181        // re-use it when processing a `FinalizeBlock` message (starting in `0.38.x`).
182        let mut tmp_app = App::new(self.storage.latest_snapshot());
183        Ok(tmp_app.process_proposal(proposal).await)
184    }
185
186    async fn begin_block(
187        &mut self,
188        begin_block: request::BeginBlock,
189    ) -> Result<response::BeginBlock> {
190        // We don't need to print the block height, because it will already be
191        // included in the span modeling the abci request handling.
192        tracing::info!(time = ?begin_block.header.time, "beginning block");
193
194        let events = self.app.begin_block(&begin_block).await;
195
196        Ok(response::BeginBlock { events })
197    }
198
199    async fn deliver_tx(&mut self, deliver_tx: request::DeliverTx) -> response::DeliverTx {
200        // Unlike the other messages, DeliverTx is fallible, so
201        // inspect the response to report errors.
202        let rsp = self.app.deliver_tx_bytes(deliver_tx.tx.as_ref()).await;
203
204        match rsp {
205            Ok(events) => {
206                trace_events(&events);
207                response::DeliverTx {
208                    events,
209                    ..Default::default()
210                }
211            }
212            Err(e) => {
213                tracing::info!(?e, "deliver_tx failed");
214                response::DeliverTx {
215                    code: 1.into(),
216                    // Use the alternate format specifier to include the chain of error causes.
217                    log: format!("{e:#}"),
218                    ..Default::default()
219                }
220            }
221        }
222    }
223
224    async fn end_block(&mut self, end_block: request::EndBlock) -> response::EndBlock {
225        let latest_state_version = self.storage.latest_version();
226        tracing::info!(height = ?end_block.height, ?latest_state_version, "ending block");
227        if latest_state_version >= end_block.height as u64 {
228            tracing::warn!(
229                %latest_state_version,
230                %end_block.height,
231                "chain state version is ahead of the block height, this is an unexpected corruption of chain state"
232            );
233        }
234        let events = self.app.end_block(&end_block).await;
235        trace_events(&events);
236
237        // Set `tm_validator_updates` to the complete set of
238        // validators and voting power. This must be the last step performed,
239        // after all voting power calculations and validator state transitions have
240        // been completed.
241        let validator_updates = self.app.cometbft_validator_updates();
242
243        tracing::debug!(
244            ?validator_updates,
245            "sending validator updates to tendermint"
246        );
247
248        response::EndBlock {
249            validator_updates,
250            consensus_param_updates: None,
251            events,
252        }
253    }
254
255    async fn commit(&mut self) -> Result<response::Commit> {
256        let app_hash = self.app.commit(self.storage.clone()).await;
257        tracing::info!(?app_hash, "committed block");
258
259        Ok(response::Commit {
260            data: app_hash.0.to_vec().into(),
261            retain_height: 0u32.into(),
262        })
263    }
264}