penumbra_sdk_app/app/
mod.rs

1use std::process;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result};
6use async_trait::async_trait;
7use cnidarium::{ArcStateDeltaExt, Snapshot, StateDelta, StateRead, StateWrite, Storage};
8use cnidarium_component::Component;
9use ibc_types::core::connection::ChainId;
10use jmt::RootHash;
11use penumbra_sdk_auction::component::{Auction, StateReadExt as _, StateWriteExt as _};
12use penumbra_sdk_community_pool::component::{CommunityPool, StateWriteExt as _};
13use penumbra_sdk_community_pool::StateReadExt as _;
14use penumbra_sdk_compact_block::component::CompactBlockManager;
15use penumbra_sdk_dex::component::StateReadExt as _;
16use penumbra_sdk_dex::component::{Dex, StateWriteExt as _};
17use penumbra_sdk_distributions::component::{Distributions, StateReadExt as _, StateWriteExt as _};
18use penumbra_sdk_fee::component::{FeeComponent, StateReadExt as _, StateWriteExt as _};
19use penumbra_sdk_funding::component::Funding;
20use penumbra_sdk_funding::component::{StateReadExt as _, StateWriteExt as _};
21use penumbra_sdk_governance::component::{Governance, StateReadExt as _, StateWriteExt as _};
22use penumbra_sdk_ibc::component::{Ibc, StateWriteExt as _};
23use penumbra_sdk_ibc::StateReadExt as _;
24use penumbra_sdk_proto::core::app::v1::TransactionsByHeightResponse;
25use penumbra_sdk_proto::{DomainType, StateWriteProto as _};
26use penumbra_sdk_sct::component::clock::EpochRead;
27use penumbra_sdk_sct::component::sct::Sct;
28use penumbra_sdk_sct::component::{StateReadExt as _, StateWriteExt as _};
29use penumbra_sdk_sct::epoch::Epoch;
30use penumbra_sdk_shielded_pool::component::{ShieldedPool, StateReadExt as _, StateWriteExt as _};
31use penumbra_sdk_stake::component::{
32    stake::ConsensusUpdateRead, Staking, StateReadExt as _, StateWriteExt as _,
33};
34use penumbra_sdk_transaction::Transaction;
35use prost::Message as _;
36use tendermint::abci::{self, Event};
37
38use tendermint::v0_37::abci::{request, response};
39use tendermint::validator::Update;
40use tokio::time::sleep;
41use tracing::{instrument, Instrument};
42
43use crate::action_handler::AppActionHandler;
44use crate::event::EventAppParametersChange;
45use crate::genesis::AppState;
46use crate::params::change::ParameterChangeExt as _;
47use crate::params::AppParameters;
48use crate::{CommunityPoolStateReadExt, PenumbraHost};
49
50pub mod state_key;
51
52/// The inter-block state being written to by the application.
53type InterBlockState = Arc<StateDelta<Snapshot>>;
54
55/// The maximum size of a CometBFT block payload (1MB)
56pub const MAX_BLOCK_TXS_PAYLOAD_BYTES: usize = 1024 * 1024;
57
58/// The maximum size of a single individual transaction (96KB).
59pub const MAX_TRANSACTION_SIZE_BYTES: usize = 96 * 1024;
60
61/// The maximum size of the evidence portion of a block (30KB).
62pub const MAX_EVIDENCE_SIZE_BYTES: usize = 30 * 1024;
63
64/// The Penumbra application, written as a bundle of [`Component`]s.
65///
66/// The [`App`] is not a [`Component`], but
67/// it constructs the components and exposes a [`commit`](App::commit) that
68/// commits the changes to the persistent storage and resets its subcomponents.
69pub struct App {
70    state: InterBlockState,
71}
72
73impl App {
74    /// Constructs a new application, using the provided [`Snapshot`].
75    /// Callers should ensure that [`App::is_ready`]) returns `true`, but this is not enforced.
76    #[instrument(skip_all)]
77    pub fn new(snapshot: Snapshot) -> Self {
78        tracing::debug!("initializing App instance");
79
80        // We perform the `Arc` wrapping of `State` here to ensure
81        // there should be no unexpected copies elsewhere.
82        let state = Arc::new(StateDelta::new(snapshot));
83
84        Self { state }
85    }
86
87    /// Returns whether the application is ready to start.
88    #[instrument(skip_all, ret)]
89    pub async fn is_ready(state: Snapshot) -> bool {
90        // If the chain is halted, we are not ready to start the application.
91        // This is a safety mechanism to prevent the chain from starting if it
92        // is in a halted state.
93        !state.is_chain_halted().await
94    }
95
96    // StateDelta::apply only works when the StateDelta wraps an underlying
97    // StateWrite.  But if we want to share the StateDelta with spawned tasks,
98    // we usually can't wrap a StateWrite instance, which requires exclusive
99    // access. This method "externally" applies the state delta to the
100    // inter-block state.
101    //
102    // Invariant: `state_tx` and `self.state` are the only two references to the
103    // inter-block state.
104    fn apply(&mut self, state_tx: StateDelta<InterBlockState>) -> Vec<Event> {
105        let (state2, mut cache) = state_tx.flatten();
106        std::mem::drop(state2);
107        // Now there is only one reference to the inter-block state: self.state
108
109        let events = cache.take_events();
110        cache.apply_to(
111            Arc::get_mut(&mut self.state).expect("no other references to inter-block state"),
112        );
113
114        events
115    }
116
117    pub async fn init_chain(&mut self, app_state: &AppState) {
118        let mut state_tx = self
119            .state
120            .try_begin_transaction()
121            .expect("state Arc should not be referenced elsewhere");
122        match app_state {
123            AppState::Content(genesis) => {
124                state_tx.put_chain_id(genesis.chain_id.clone());
125                Sct::init_chain(&mut state_tx, Some(&genesis.sct_content)).await;
126                ShieldedPool::init_chain(&mut state_tx, Some(&genesis.shielded_pool_content)).await;
127                Distributions::init_chain(&mut state_tx, Some(&genesis.distributions_content))
128                    .await;
129                Staking::init_chain(
130                    &mut state_tx,
131                    Some(&(
132                        genesis.stake_content.clone(),
133                        genesis.shielded_pool_content.clone(),
134                    )),
135                )
136                .await;
137                Ibc::init_chain(&mut state_tx, Some(&genesis.ibc_content)).await;
138                Auction::init_chain(&mut state_tx, Some(&genesis.auction_content)).await;
139                Dex::init_chain(&mut state_tx, Some(&genesis.dex_content)).await;
140                CommunityPool::init_chain(&mut state_tx, Some(&genesis.community_pool_content))
141                    .await;
142                Governance::init_chain(&mut state_tx, Some(&genesis.governance_content)).await;
143                FeeComponent::init_chain(&mut state_tx, Some(&genesis.fee_content)).await;
144                Funding::init_chain(&mut state_tx, Some(&genesis.funding_content)).await;
145
146                state_tx
147                    .finish_block()
148                    .await
149                    .expect("must be able to finish compact block");
150            }
151            AppState::Checkpoint(_) => {
152                ShieldedPool::init_chain(&mut state_tx, None).await;
153                Distributions::init_chain(&mut state_tx, None).await;
154                Staking::init_chain(&mut state_tx, None).await;
155                Ibc::init_chain(&mut state_tx, None).await;
156                Dex::init_chain(&mut state_tx, None).await;
157                Governance::init_chain(&mut state_tx, None).await;
158                CommunityPool::init_chain(&mut state_tx, None).await;
159                FeeComponent::init_chain(&mut state_tx, None).await;
160                Funding::init_chain(&mut state_tx, None).await;
161            }
162        };
163
164        // Note that `init_chain` can not emit any events, and we do not want to
165        // work around this as it violates the design principle that events are changes
166        // to initial data.
167        //
168        // This means that indexers are responsible for parsing genesis data and bootstrapping
169        // their initial state before processing chronological events.
170        //
171        // See: https://github.com/penumbra-zone/penumbra/pull/4449#discussion_r1636868800
172
173        state_tx.apply();
174    }
175
176    pub async fn prepare_proposal(
177        &mut self,
178        proposal: request::PrepareProposal,
179    ) -> response::PrepareProposal {
180        if self.state.is_chain_halted().await {
181            // If we find ourselves preparing a proposal for a halted chain
182            // we stop abruptly to prevent any progress.
183            // The persistent halt mechanism will prevent restarts until we are ready.
184            process::exit(0);
185        }
186
187        let mut included_txs = Vec::new();
188        let num_candidate_txs = proposal.txs.len();
189        tracing::debug!(
190            "processing PrepareProposal, found {} candidate transactions",
191            num_candidate_txs
192        );
193
194        // This is a node controlled parameter that is different from the homonymous
195        // mempool's `max_tx_bytes`. Comet will send us raw proposals that exceed this
196        // limit, presuming that a subset of those transactions will be shed.
197        // More context in https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements.md
198        let max_proposal_size_bytes = proposal.max_tx_bytes as u64;
199        // Tracking the size of the proposal
200        let mut proposal_size_bytes = 0u64;
201
202        for tx in proposal.txs {
203            let transaction_size = tx.len() as u64;
204
205            // We compute the total proposal size if we were to include this transaction.
206            let total_with_tx = proposal_size_bytes.saturating_add(transaction_size);
207
208            // This should never happen, unless Comet is misbehaving because of a bug
209            // or a misconfiguration. We handle it gracefully, to prioritize forward progress.
210            if transaction_size > MAX_TRANSACTION_SIZE_BYTES as u64 {
211                continue;
212            }
213
214            // First, we filter proposals to fit within the block limit.
215            if total_with_tx >= max_proposal_size_bytes {
216                break;
217            }
218
219            // Then, we make sure to only include successful transactions.
220            match self.deliver_tx_bytes(&tx).await {
221                Ok(_) => {
222                    proposal_size_bytes = total_with_tx;
223                    included_txs.push(tx)
224                }
225                Err(_) => continue,
226            }
227        }
228
229        // The evidence payload is validated by Comet, we can lean on three guarantees:
230        // 1. The total payload is bound by `MAX_EVIDENCE_SIZE_BYTES`
231        // 2. Expired evidence is filtered
232        // 3. Evidence is valid.
233        tracing::debug!(
234            "finished processing PrepareProposal, including {}/{} candidate transactions",
235            included_txs.len(),
236            num_candidate_txs
237        );
238
239        response::PrepareProposal { txs: included_txs }
240    }
241
242    #[instrument(skip_all, ret, level = "debug")]
243    pub async fn process_proposal(
244        &mut self,
245        proposal: request::ProcessProposal,
246    ) -> response::ProcessProposal {
247        tracing::debug!(
248            height = proposal.height.value(),
249            proposer = ?proposal.proposer_address,
250            proposal_hash = ?proposal.hash,
251            "processing proposal"
252        );
253
254        // Proposal validation:
255        // 1. Total evidence payload committed is below [`MAX_EVIDENCE_SIZE_BYTES`]
256        // 2. Individual transactions are at most [`MAX_TRANSACTION_SIZE_BYTES`]
257        // 3. The total transaction payload is below [`MAX_BLOCK_PAYLOAD_SIZE_BYTES`]
258        // 4. Each transaction applies successfully.
259        let mut evidence_buffer: Vec<u8> = Vec::with_capacity(MAX_EVIDENCE_SIZE_BYTES);
260        let mut bytes_tracker = 0usize;
261
262        for evidence in proposal.misbehavior {
263            // This should be pretty cheap, we allow for `MAX_EVIDENCE_SIZE_BYTES` in total
264            // but a single evidence datum should be an order of magnitude smaller than that.
265            evidence_buffer.clear();
266            let proto_evidence: tendermint_proto::v0_37::abci::Misbehavior = evidence.into();
267            let evidence_size = match proto_evidence.encode(&mut evidence_buffer) {
268                Ok(_) => evidence_buffer.len(),
269                Err(_) => return response::ProcessProposal::Reject,
270            };
271            bytes_tracker = bytes_tracker.saturating_add(evidence_size);
272            if bytes_tracker > MAX_EVIDENCE_SIZE_BYTES {
273                return response::ProcessProposal::Reject;
274            }
275        }
276
277        // The evidence payload is valid, now we validate the block txs
278        // payload: they MUST be below the tx size limit, and apply cleanly on
279        // state fork.
280        let mut total_txs_payload_size = 0usize;
281        for tx in proposal.txs {
282            let tx_size = tx.len();
283            if tx_size > MAX_TRANSACTION_SIZE_BYTES {
284                return response::ProcessProposal::Reject;
285            }
286
287            total_txs_payload_size = total_txs_payload_size.saturating_add(tx_size);
288            if total_txs_payload_size >= MAX_BLOCK_TXS_PAYLOAD_BYTES {
289                return response::ProcessProposal::Reject;
290            }
291
292            match self.deliver_tx_bytes(&tx).await {
293                Ok(_) => continue,
294                Err(_) => return response::ProcessProposal::Reject,
295            }
296        }
297
298        response::ProcessProposal::Accept
299    }
300
301    pub async fn begin_block(&mut self, begin_block: &request::BeginBlock) -> Vec<abci::Event> {
302        let mut state_tx = StateDelta::new(self.state.clone());
303
304        // If a app parameter change is scheduled for this block, apply it here,
305        // before any other component has executed. This ensures that app
306        // parameter changes are consistently applied precisely at the boundary
307        // between blocks.
308        //
309        // Note that because _nothing_ has executed yet, we need to get the
310        // current height from the begin_block request, rather than from the
311        // state (it will be set by the SCT component, which executes first).
312        if let Some(change) = state_tx
313            .param_changes_for_height(begin_block.header.height.into())
314            .await
315            .expect("param changes should always be readable, even if unset")
316        {
317            let old_params = state_tx
318                .get_app_params()
319                .await
320                .expect("must be able to read app params");
321            match change.apply_changes(old_params) {
322                Ok(new_params) => {
323                    tracing::info!(?change, "applied app parameter change");
324                    state_tx.put_app_params(new_params.clone());
325                    state_tx.record_proto(
326                        EventAppParametersChange {
327                            new_parameters: new_params,
328                        }
329                        .to_proto(),
330                    )
331                }
332                Err(e) => {
333                    // N.B. this is an "info" rather than "warn" because it does not report
334                    // a problem with _this instance of the application_, but rather is an expected
335                    // behavior.
336                    tracing::info!(?change, ?e, "failed to apply approved app parameter change");
337                }
338            }
339        }
340
341        // Run each of the begin block handlers for each component, in sequence:
342        let mut arc_state_tx = Arc::new(state_tx);
343        Sct::begin_block(&mut arc_state_tx, begin_block).await;
344        ShieldedPool::begin_block(&mut arc_state_tx, begin_block).await;
345        Distributions::begin_block(&mut arc_state_tx, begin_block).await;
346        Ibc::begin_block::<PenumbraHost, StateDelta<Arc<StateDelta<cnidarium::Snapshot>>>>(
347            &mut arc_state_tx,
348            begin_block,
349        )
350        .await;
351        Auction::begin_block(&mut arc_state_tx, begin_block).await;
352        Dex::begin_block(&mut arc_state_tx, begin_block).await;
353        CommunityPool::begin_block(&mut arc_state_tx, begin_block).await;
354        Governance::begin_block(&mut arc_state_tx, begin_block).await;
355        Staking::begin_block(&mut arc_state_tx, begin_block).await;
356        FeeComponent::begin_block(&mut arc_state_tx, begin_block).await;
357        Funding::begin_block(&mut arc_state_tx, begin_block).await;
358
359        let state_tx = Arc::try_unwrap(arc_state_tx)
360            .expect("components did not retain copies of shared state");
361
362        // Apply the state from `begin_block` and return the events (we'll append to them if
363        // necessary based on the results of applying the Community Pool transactions queued)
364        let mut events = self.apply(state_tx);
365
366        // Deliver Community Pool transactions here, before any other block processing (effectively adding
367        // synthetic transactions slotted in after the start of the block but before any user
368        // transactions)
369        let pending_transactions = self
370            .state
371            .pending_community_pool_transactions()
372            .await
373            .expect("Community Pool transactions should always be readable");
374        for transaction in pending_transactions {
375            // NOTE: We are *intentionally* using `deliver_tx_allowing_community_pool_spends` here, rather than
376            // `deliver_tx`, because here is the **ONLY** place we want to permit Community Pool spends, when
377            // delivering transactions that have been scheduled by the chain itself for delivery.
378            tracing::info!(?transaction, "delivering Community Pool transaction");
379            match self
380                .deliver_tx_allowing_community_pool_spends(Arc::new(transaction))
381                .await
382            {
383                Err(error) => {
384                    tracing::warn!(?error, "failed to deliver Community Pool transaction");
385                }
386                Ok(community_pool_tx_events) => events.extend(community_pool_tx_events),
387            }
388        }
389
390        events
391    }
392
393    /// Wrapper function for [`Self::deliver_tx`]  that decodes from bytes.
394    pub async fn deliver_tx_bytes(&mut self, tx_bytes: &[u8]) -> Result<Vec<abci::Event>> {
395        let tx = Arc::new(Transaction::decode(tx_bytes).context("decoding transaction")?);
396        self.deliver_tx(tx)
397            .await
398            .context("failed to deliver transaction")
399    }
400
401    pub async fn deliver_tx(&mut self, tx: Arc<Transaction>) -> Result<Vec<abci::Event>> {
402        // Ensure that any normally-delivered transaction (originating from a user) does not contain
403        // any Community Pool spends or outputs; the only place those are permitted is transactions originating
404        // from the chain itself:
405        anyhow::ensure!(
406            tx.community_pool_spends().peekable().peek().is_none(),
407            "Community Pool spends are not permitted in user-submitted transactions"
408        );
409        anyhow::ensure!(
410            tx.community_pool_outputs().peekable().peek().is_none(),
411            "Community Pool outputs are not permitted in user-submitted transactions"
412        );
413
414        // Now that we've ensured that there are not any Community Pool spends or outputs, we can deliver the transaction:
415        self.deliver_tx_allowing_community_pool_spends(tx).await
416    }
417
418    async fn deliver_tx_allowing_community_pool_spends(
419        &mut self,
420        tx: Arc<Transaction>,
421    ) -> Result<Vec<abci::Event>> {
422        // Both stateful and stateless checks take the transaction as
423        // verification context.  The separate clone of the Arc<Transaction>
424        // means it can be passed through the whole tree of checks.
425        //
426        // We spawn tasks for each set of checks, to do CPU-bound stateless checks
427        // and I/O-bound stateful checks at the same time.
428        let tx2 = tx.clone();
429        let stateless = tokio::spawn(
430            async move { tx2.check_stateless(()).await }.instrument(tracing::Span::current()),
431        );
432        let tx2 = tx.clone();
433        let state2 = self.state.clone();
434        let stateful = tokio::spawn(
435            async move { tx2.check_historical(state2).await }.instrument(tracing::Span::current()),
436        );
437
438        stateless
439            .await
440            .context("waiting for check_stateless check tasks")?
441            .context("check_stateless failed")?;
442        stateful
443            .await
444            .context("waiting for check_stateful tasks")?
445            .context("check_stateful failed")?;
446
447        // At this point, the stateful checks should have completed,
448        // leaving us with exclusive access to the Arc<State>.
449        let mut state_tx = self
450            .state
451            .try_begin_transaction()
452            .expect("state Arc should be present and unique");
453
454        // Index the transaction:
455        let height = state_tx.get_block_height().await?;
456        let transaction = Arc::as_ref(&tx).clone();
457        state_tx
458            .put_block_transaction(height, transaction.into())
459            .await
460            .context("storing transactions")?;
461
462        tx.check_and_execute(&mut state_tx)
463            .await
464            .context("executing transaction")?;
465
466        // At this point, we've completed execution successfully with no errors,
467        // so we can apply the transaction to the State. Otherwise, we'd have
468        // bubbled up an error and dropped the StateTransaction.
469        Ok(state_tx.apply().1)
470    }
471
472    #[tracing::instrument(skip_all, fields(height = %end_block.height))]
473    pub async fn end_block(&mut self, end_block: &request::EndBlock) -> Vec<abci::Event> {
474        let state_tx = StateDelta::new(self.state.clone());
475
476        tracing::debug!("running app components' `end_block` hooks");
477        let mut arc_state_tx = Arc::new(state_tx);
478        Sct::end_block(&mut arc_state_tx, end_block).await;
479        ShieldedPool::end_block(&mut arc_state_tx, end_block).await;
480        Distributions::end_block(&mut arc_state_tx, end_block).await;
481        Ibc::end_block(&mut arc_state_tx, end_block).await;
482        Auction::end_block(&mut arc_state_tx, end_block).await;
483        Dex::end_block(&mut arc_state_tx, end_block).await;
484        CommunityPool::end_block(&mut arc_state_tx, end_block).await;
485        Governance::end_block(&mut arc_state_tx, end_block).await;
486        Staking::end_block(&mut arc_state_tx, end_block).await;
487        FeeComponent::end_block(&mut arc_state_tx, end_block).await;
488        Funding::end_block(&mut arc_state_tx, end_block).await;
489        let mut state_tx = Arc::try_unwrap(arc_state_tx)
490            .expect("components did not retain copies of shared state");
491        tracing::debug!("finished app components' `end_block` hooks");
492
493        let current_height = state_tx
494            .get_block_height()
495            .await
496            .expect("able to get block height in end_block");
497        let current_epoch = state_tx
498            .get_current_epoch()
499            .await
500            .expect("able to get current epoch in end_block");
501
502        let is_end_epoch = current_epoch.is_scheduled_epoch_end(
503            current_height,
504            state_tx
505                .get_epoch_duration_parameter()
506                .await
507                .expect("able to get epoch duration in end_block"),
508        ) || state_tx.is_epoch_ending_early().await;
509
510        // If a chain upgrade is scheduled for the next block, we trigger an early epoch change
511        // so that the upgraded chain starts at a clean epoch boundary.
512        let is_chain_upgrade = state_tx
513            .is_pre_upgrade_height()
514            .await
515            .expect("able to detect upgrade heights");
516
517        if is_end_epoch || is_chain_upgrade {
518            tracing::info!(%is_end_epoch, %is_chain_upgrade, ?current_height, "ending epoch");
519
520            let mut arc_state_tx = Arc::new(state_tx);
521
522            Sct::end_epoch(&mut arc_state_tx)
523                .await
524                .expect("able to call end_epoch on Sct component");
525            Distributions::end_epoch(&mut arc_state_tx)
526                .await
527                .expect("able to call end_epoch on Distributions component");
528            Ibc::end_epoch(&mut arc_state_tx)
529                .await
530                .expect("able to call end_epoch on IBC component");
531            Auction::end_epoch(&mut arc_state_tx)
532                .await
533                .expect("able to call end_epoch on auction component");
534            Dex::end_epoch(&mut arc_state_tx)
535                .await
536                .expect("able to call end_epoch on dex component");
537            CommunityPool::end_epoch(&mut arc_state_tx)
538                .await
539                .expect("able to call end_epoch on Community Pool component");
540            Governance::end_epoch(&mut arc_state_tx)
541                .await
542                .expect("able to call end_epoch on Governance component");
543            ShieldedPool::end_epoch(&mut arc_state_tx)
544                .await
545                .expect("able to call end_epoch on shielded pool component");
546            Staking::end_epoch(&mut arc_state_tx)
547                .await
548                .expect("able to call end_epoch on Staking component");
549            FeeComponent::end_epoch(&mut arc_state_tx)
550                .await
551                .expect("able to call end_epoch on Fee component");
552            Funding::end_epoch(&mut arc_state_tx)
553                .await
554                .expect("able to call end_epoch on Funding component");
555
556            let mut state_tx = Arc::try_unwrap(arc_state_tx)
557                .expect("components did not retain copies of shared state");
558
559            state_tx
560                .finish_epoch()
561                .await
562                .expect("must be able to finish compact block");
563
564            // set the epoch for the next block
565            penumbra_sdk_sct::component::clock::EpochManager::put_epoch_by_height(
566                &mut state_tx,
567                current_height + 1,
568                Epoch {
569                    index: current_epoch.index + 1,
570                    start_height: current_height + 1,
571                },
572            );
573
574            self.apply(state_tx)
575        } else {
576            // set the epoch for the next block
577            penumbra_sdk_sct::component::clock::EpochManager::put_epoch_by_height(
578                &mut state_tx,
579                current_height + 1,
580                current_epoch,
581            );
582
583            state_tx
584                .finish_block()
585                .await
586                .expect("must be able to finish compact block");
587
588            self.apply(state_tx)
589        }
590    }
591
592    /// Commits the application state to persistent storage,
593    /// returning the new root hash and storage version.
594    ///
595    /// This method also resets `self` as if it were constructed
596    /// as an empty state over top of the newly written storage.
597    pub async fn commit(&mut self, storage: Storage) -> RootHash {
598        // We need to extract the State we've built up to commit it.  Fill in a dummy state.
599        let dummy_state = StateDelta::new(storage.latest_snapshot());
600        let mut state = Arc::try_unwrap(std::mem::replace(&mut self.state, Arc::new(dummy_state)))
601            .expect("we have exclusive ownership of the State at commit()");
602
603        // Check if an emergency halt has been signaled.
604        let should_halt = state.is_chain_halted().await;
605
606        let is_pre_upgrade_height = state
607            .is_pre_upgrade_height()
608            .await
609            .expect("must be able to read upgrade height");
610
611        // If the next height is an upgrade height, we signal a halt and turn
612        // a `halt_bit` on which will prevent the chain from restarting without
613        // running a migration.
614        if is_pre_upgrade_height {
615            tracing::info!("pre-upgrade height reached, signaling halt");
616            state.signal_halt();
617        }
618
619        // Commit the pending writes, clearing the state.
620        let jmt_root = storage
621            .commit(state)
622            .await
623            .expect("must be able to successfully commit to storage");
624
625        // We want to halt the node, but not before we submit an ABCI `Commit`
626        // response to `CometBFT`. To do this, we schedule a process exit in `2s`,
627        // assuming a `5s` timeout.
628        // See #4443 for more context.
629        if should_halt || is_pre_upgrade_height {
630            tokio::spawn(async move {
631                sleep(Duration::from_secs(2)).await;
632                tracing::info!("halt signal recorded, exiting process");
633                std::process::exit(0);
634            });
635        }
636
637        tracing::debug!(?jmt_root, "finished committing state");
638
639        // Get the latest version of the state, now that we've committed it.
640        self.state = Arc::new(StateDelta::new(storage.latest_snapshot()));
641
642        jmt_root
643    }
644
645    pub fn cometbft_validator_updates(&self) -> Vec<Update> {
646        self.state
647            .cometbft_validator_updates()
648            // If the cometbft validator updates are not set, we return an empty
649            // update set, signaling no change to Tendermint.
650            .unwrap_or_default()
651    }
652}
653
654#[async_trait]
655pub trait StateReadExt: StateRead {
656    async fn get_chain_id(&self) -> Result<String> {
657        let raw_chain_id = self
658            .get_raw(state_key::data::chain_id())
659            .await?
660            .expect("chain id is always set");
661
662        Ok(String::from_utf8_lossy(&raw_chain_id).to_string())
663    }
664
665    /// Checks a provided chain_id against the chain state.
666    ///
667    /// Passes through if the provided chain_id is empty or matches, and
668    /// otherwise errors.
669    async fn check_chain_id(&self, provided: &str) -> Result<()> {
670        let chain_id = self
671            .get_chain_id()
672            .await
673            .context(format!("error getting chain id: '{provided}'"))?;
674        if provided.is_empty() || provided == chain_id {
675            Ok(())
676        } else {
677            Err(anyhow::anyhow!(
678                "provided chain_id {} does not match chain_id {}",
679                provided,
680                chain_id
681            ))
682        }
683    }
684
685    /// Gets the chain revision number, from the chain ID
686    async fn get_revision_number(&self) -> Result<u64> {
687        let cid_str = self.get_chain_id().await?;
688
689        Ok(ChainId::from_string(&cid_str).version())
690    }
691
692    /// Returns the set of app parameters
693    async fn get_app_params(&self) -> Result<AppParameters> {
694        let chain_id = self.get_chain_id().await?;
695        let community_pool_params: penumbra_sdk_community_pool::params::CommunityPoolParameters =
696            self.get_community_pool_params().await?;
697        let distributions_params = self.get_distributions_params().await?;
698        let ibc_params = self.get_ibc_params().await?;
699        let fee_params = self.get_fee_params().await?;
700        let funding_params = self.get_funding_params().await?;
701        let governance_params = self.get_governance_params().await?;
702        let sct_params = self.get_sct_params().await?;
703        let shielded_pool_params = self.get_shielded_pool_params().await?;
704        let stake_params = self.get_stake_params().await?;
705        let dex_params = self.get_dex_params().await?;
706        let auction_params = self.get_auction_params().await?;
707
708        Ok(AppParameters {
709            chain_id,
710            auction_params,
711            community_pool_params,
712            distributions_params,
713            fee_params,
714            funding_params,
715            governance_params,
716            ibc_params,
717            sct_params,
718            shielded_pool_params,
719            stake_params,
720            dex_params,
721        })
722    }
723
724    async fn transactions_by_height(
725        &self,
726        block_height: u64,
727    ) -> Result<TransactionsByHeightResponse> {
728        let transactions = match self
729            .nonverifiable_get_raw(
730                state_key::cometbft_data::transactions_by_height(block_height).as_bytes(),
731            )
732            .await?
733        {
734            Some(transactions) => transactions,
735            None => TransactionsByHeightResponse {
736                transactions: vec![],
737                block_height,
738            }
739            .encode_to_vec(),
740        };
741
742        Ok(TransactionsByHeightResponse::decode(&transactions[..])?)
743    }
744}
745
746impl<
747        T: StateRead
748            + penumbra_sdk_stake::StateReadExt
749            + penumbra_sdk_governance::component::StateReadExt
750            + penumbra_sdk_fee::component::StateReadExt
751            + penumbra_sdk_community_pool::component::StateReadExt
752            + penumbra_sdk_sct::component::clock::EpochRead
753            + penumbra_sdk_ibc::component::StateReadExt
754            + penumbra_sdk_distributions::component::StateReadExt
755            + ?Sized,
756    > StateReadExt for T
757{
758}
759
760#[async_trait]
761pub trait StateWriteExt: StateWrite {
762    /// Sets the chain ID.
763    fn put_chain_id(&mut self, chain_id: String) {
764        self.put_raw(state_key::data::chain_id().into(), chain_id.into_bytes());
765    }
766
767    /// Stores the transactions that occurred during a CometBFT block.
768    /// This is used to create a durable transaction log for clients to retrieve;
769    /// the CometBFT `get_block_by_height` RPC call will only return data for blocks
770    /// since the last checkpoint, so we need to store the transactions separately.
771    async fn put_block_transaction(
772        &mut self,
773        height: u64,
774        transaction: penumbra_sdk_proto::core::transaction::v1::Transaction,
775    ) -> Result<()> {
776        // Extend the existing transactions with the new one.
777        let mut transactions_response = self.transactions_by_height(height).await?;
778        transactions_response.transactions = transactions_response
779            .transactions
780            .into_iter()
781            .chain(std::iter::once(transaction))
782            .collect();
783
784        self.nonverifiable_put_raw(
785            state_key::cometbft_data::transactions_by_height(height).into(),
786            transactions_response.encode_to_vec(),
787        );
788        Ok(())
789    }
790
791    /// Writes the app parameters to the state.
792    ///
793    /// Each component stores its own parameters separately, so this method
794    /// splits up the provided parameters structure and writes it out to each component.
795    fn put_app_params(&mut self, params: AppParameters) {
796        // To make sure we don't forget to write any parts, destructure the entire params
797        let AppParameters {
798            chain_id,
799            auction_params,
800            community_pool_params,
801            distributions_params,
802            fee_params,
803            funding_params,
804            governance_params,
805            ibc_params,
806            sct_params,
807            shielded_pool_params,
808            stake_params,
809            dex_params,
810        } = params;
811
812        // Ignore writes to the chain_id
813        // TODO(erwan): we are momentarily not supporting chain_id changes
814        // until the IBC host chain changes land.
815        // See: https://github.com/penumbra-zone/penumbra/issues/3617#issuecomment-1917708221
816        std::mem::drop(chain_id);
817
818        self.put_auction_params(auction_params);
819        self.put_community_pool_params(community_pool_params);
820        self.put_distributions_params(distributions_params);
821        self.put_fee_params(fee_params);
822        self.put_funding_params(funding_params);
823        self.put_governance_params(governance_params);
824        self.put_ibc_params(ibc_params);
825        self.put_sct_params(sct_params);
826        self.put_shielded_pool_params(shielded_pool_params);
827        self.put_stake_params(stake_params);
828        self.put_dex_params(dex_params);
829    }
830}
831
832impl<T: StateWrite + ?Sized> StateWriteExt for T {}