penumbra_sdk_app/server/
consensus.rs1use anyhow::Result;
2
3use cnidarium::Storage;
4use tendermint::abci::Event;
5use tendermint::v0_37::abci::{
6 request, response, ConsensusRequest as Request, ConsensusResponse as Response,
7};
8use tokio::sync::mpsc;
9use tower::BoxError;
10use tower_actor::Message;
11use tracing::Instrument;
12
13use crate::app::App;
14
15pub struct Consensus {
16 queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
17 storage: Storage,
18 app: App,
19}
20
21pub type ConsensusService = tower_actor::Actor<Request, Response, BoxError>;
22
23fn trace_events(events: &[Event]) {
24 for event in events {
25 let span = tracing::debug_span!("event", kind = ?event.kind);
26 span.in_scope(|| {
27 for attr in &event.attributes {
28 tracing::debug!(
29 k = %String::from_utf8_lossy(attr.key_bytes()),
30 v = %String::from_utf8_lossy(attr.value_bytes()),
31 );
32 }
33 })
34 }
35}
36
37impl Consensus {
38 const QUEUE_SIZE: usize = 10;
39
40 pub fn new(storage: Storage) -> ConsensusService {
41 tower_actor::Actor::new(Self::QUEUE_SIZE, |queue: _| {
42 Consensus::new_inner(storage, queue).run()
43 })
44 }
45
46 fn new_inner(
47 storage: Storage,
48 queue: mpsc::Receiver<Message<Request, Response, tower::BoxError>>,
49 ) -> Self {
50 let app = App::new(storage.latest_snapshot());
51
52 Self {
53 queue,
54 storage,
55 app,
56 }
57 }
58
59 async fn run(mut self) -> Result<(), tower::BoxError> {
60 while let Some(Message {
61 req,
62 rsp_sender,
63 span,
64 }) = self.queue.recv().await
65 {
66 let _ = rsp_sender.send(Ok(match req {
70 Request::InitChain(init_chain) => Response::InitChain(
71 self.init_chain(init_chain)
72 .instrument(span)
73 .await
74 .expect("init_chain must succeed"),
75 ),
76 Request::PrepareProposal(proposal) => Response::PrepareProposal(
77 self.prepare_proposal(proposal)
78 .instrument(span)
79 .await
80 .expect("prepare proposal must succeed"),
81 ),
82 Request::ProcessProposal(proposal) => Response::ProcessProposal(
83 self.process_proposal(proposal)
84 .instrument(span)
85 .await
86 .expect("process proposal must succeed"),
87 ),
88 Request::BeginBlock(begin_block) => Response::BeginBlock(
89 self.begin_block(begin_block)
90 .instrument(span)
91 .await
92 .expect("begin_block must succeed"),
93 ),
94 Request::DeliverTx(deliver_tx) => {
95 Response::DeliverTx(self.deliver_tx(deliver_tx).instrument(span.clone()).await)
96 }
97 Request::EndBlock(end_block) => {
98 Response::EndBlock(self.end_block(end_block).instrument(span).await)
99 }
100 Request::Commit => Response::Commit(
101 self.commit()
102 .instrument(span)
103 .await
104 .expect("commit must succeed"),
105 ),
106 }));
107 }
108 Ok(())
109 }
110
111 async fn init_chain(&mut self, init_chain: request::InitChain) -> Result<response::InitChain> {
116 let app_state: crate::genesis::AppState =
118 serde_json::from_slice(&init_chain.app_state_bytes)
119 .expect("can parse app_state in genesis file");
120
121 self.app.init_chain(&app_state).await;
122
123 let validators = self.app.cometbft_validator_updates();
130
131 let app_hash = match &app_state {
132 crate::genesis::AppState::Checkpoint(h) => {
133 tracing::info!(?h, "genesis state is a checkpoint");
134 self.storage.latest_snapshot().root_hash().await?
137 }
138 crate::genesis::AppState::Content(_) => {
139 tracing::info!("genesis state is a full configuration");
140 if self.storage.latest_version() != u64::MAX {
142 anyhow::bail!("database already initialized");
143 }
144 self.app.commit(self.storage.clone()).await
146 }
147 };
148
149 tracing::info!(
150 consensus_params = ?init_chain.consensus_params,
151 ?validators,
152 app_hash = ?app_hash,
153 "finished init_chain"
154 );
155
156 Ok(response::InitChain {
157 consensus_params: Some(init_chain.consensus_params),
158 validators,
159 app_hash: app_hash.0.to_vec().try_into()?,
160 })
161 }
162
163 async fn prepare_proposal(
164 &mut self,
165 proposal: request::PrepareProposal,
166 ) -> Result<response::PrepareProposal> {
167 tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, "preparing proposal");
168 let mut tmp_app = App::new(self.storage.latest_snapshot());
170 Ok(tmp_app.prepare_proposal(proposal).await)
173 }
174
175 async fn process_proposal(
176 &mut self,
177 proposal: request::ProcessProposal,
178 ) -> Result<response::ProcessProposal> {
179 tracing::info!(height = ?proposal.height, proposer = ?proposal.proposer_address, proposal_hash = %proposal.hash, "processing proposal");
180 let mut tmp_app = App::new(self.storage.latest_snapshot());
183 Ok(tmp_app.process_proposal(proposal).await)
184 }
185
186 async fn begin_block(
187 &mut self,
188 begin_block: request::BeginBlock,
189 ) -> Result<response::BeginBlock> {
190 tracing::info!(time = ?begin_block.header.time, "beginning block");
193
194 let events = self.app.begin_block(&begin_block).await;
195
196 Ok(response::BeginBlock { events })
197 }
198
199 async fn deliver_tx(&mut self, deliver_tx: request::DeliverTx) -> response::DeliverTx {
200 let rsp = self.app.deliver_tx_bytes(deliver_tx.tx.as_ref()).await;
203
204 match rsp {
205 Ok(events) => {
206 trace_events(&events);
207 response::DeliverTx {
208 events,
209 ..Default::default()
210 }
211 }
212 Err(e) => {
213 tracing::info!(?e, "deliver_tx failed");
214 response::DeliverTx {
215 code: 1.into(),
216 log: format!("{e:#}"),
218 ..Default::default()
219 }
220 }
221 }
222 }
223
224 async fn end_block(&mut self, end_block: request::EndBlock) -> response::EndBlock {
225 let latest_state_version = self.storage.latest_version();
226 tracing::info!(height = ?end_block.height, ?latest_state_version, "ending block");
227 if latest_state_version >= end_block.height as u64 {
228 tracing::warn!(
229 %latest_state_version,
230 %end_block.height,
231 "chain state version is ahead of the block height, this is an unexpected corruption of chain state"
232 );
233 }
234 let events = self.app.end_block(&end_block).await;
235 trace_events(&events);
236
237 let validator_updates = self.app.cometbft_validator_updates();
242
243 tracing::debug!(
244 ?validator_updates,
245 "sending validator updates to tendermint"
246 );
247
248 response::EndBlock {
249 validator_updates,
250 consensus_param_updates: None,
251 events,
252 }
253 }
254
255 async fn commit(&mut self) -> Result<response::Commit> {
256 let app_hash = self.app.commit(self.storage.clone()).await;
257 tracing::info!(?app_hash, "committed block");
258
259 Ok(response::Commit {
260 data: app_hash.0.to_vec().into(),
261 retain_height: 0u32.into(),
262 })
263 }
264}