penumbra_sdk_app/app/
mod.rs

1use std::process;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result};
6use async_trait::async_trait;
7use cnidarium::{ArcStateDeltaExt, Snapshot, StateDelta, StateRead, StateWrite, Storage};
8use cnidarium_component::Component;
9use ibc_types::core::connection::ChainId;
10use jmt::RootHash;
11use penumbra_sdk_auction::component::{Auction, StateReadExt as _, StateWriteExt as _};
12use penumbra_sdk_community_pool::component::{CommunityPool, StateWriteExt as _};
13use penumbra_sdk_community_pool::StateReadExt as _;
14use penumbra_sdk_compact_block::component::CompactBlockManager;
15use penumbra_sdk_dex::component::StateReadExt as _;
16use penumbra_sdk_dex::component::{Dex, StateWriteExt as _};
17use penumbra_sdk_distributions::component::{Distributions, StateReadExt as _, StateWriteExt as _};
18use penumbra_sdk_fee::component::{FeeComponent, StateReadExt as _, StateWriteExt as _};
19use penumbra_sdk_funding::component::Funding;
20use penumbra_sdk_funding::component::{StateReadExt as _, StateWriteExt as _};
21use penumbra_sdk_governance::component::{Governance, StateReadExt as _, StateWriteExt as _};
22use penumbra_sdk_ibc::component::{Ibc, StateWriteExt as _};
23use penumbra_sdk_ibc::StateReadExt as _;
24use penumbra_sdk_proto::core::app::v1::TransactionsByHeightResponse;
25use penumbra_sdk_proto::DomainType;
26use penumbra_sdk_sct::component::clock::EpochRead;
27use penumbra_sdk_sct::component::sct::Sct;
28use penumbra_sdk_sct::component::{StateReadExt as _, StateWriteExt as _};
29use penumbra_sdk_sct::epoch::Epoch;
30use penumbra_sdk_shielded_pool::component::{ShieldedPool, StateReadExt as _, StateWriteExt as _};
31use penumbra_sdk_stake::component::{
32    stake::ConsensusUpdateRead, Staking, StateReadExt as _, StateWriteExt as _,
33};
34use penumbra_sdk_transaction::Transaction;
35use prost::Message as _;
36use tendermint::abci::{self, Event};
37
38use tendermint::v0_37::abci::{request, response};
39use tendermint::validator::Update;
40use tokio::time::sleep;
41use tracing::{instrument, Instrument};
42
43use crate::action_handler::AppActionHandler;
44use crate::genesis::AppState;
45use crate::params::change::ParameterChangeExt as _;
46use crate::params::AppParameters;
47use crate::{CommunityPoolStateReadExt, PenumbraHost};
48
49pub mod state_key;
50
51/// The inter-block state being written to by the application.
52type InterBlockState = Arc<StateDelta<Snapshot>>;
53
54/// The maximum size of a CometBFT block payload (1MB)
55pub const MAX_BLOCK_TXS_PAYLOAD_BYTES: usize = 1024 * 1024;
56
57/// The maximum size of a single individual transaction (96KB).
58pub const MAX_TRANSACTION_SIZE_BYTES: usize = 96 * 1024;
59
60/// The maximum size of the evidence portion of a block (30KB).
61pub const MAX_EVIDENCE_SIZE_BYTES: usize = 30 * 1024;
62
63/// The Penumbra application, written as a bundle of [`Component`]s.
64///
65/// The [`App`] is not a [`Component`], but
66/// it constructs the components and exposes a [`commit`](App::commit) that
67/// commits the changes to the persistent storage and resets its subcomponents.
68pub struct App {
69    state: InterBlockState,
70}
71
72impl App {
73    /// Constructs a new application, using the provided [`Snapshot`].
74    /// Callers should ensure that [`App::is_ready`]) returns `true`, but this is not enforced.
75    #[instrument(skip_all)]
76    pub fn new(snapshot: Snapshot) -> Self {
77        tracing::debug!("initializing App instance");
78
79        // We perform the `Arc` wrapping of `State` here to ensure
80        // there should be no unexpected copies elsewhere.
81        let state = Arc::new(StateDelta::new(snapshot));
82
83        Self { state }
84    }
85
86    /// Returns whether the application is ready to start.
87    #[instrument(skip_all, ret)]
88    pub async fn is_ready(state: Snapshot) -> bool {
89        // If the chain is halted, we are not ready to start the application.
90        // This is a safety mechanism to prevent the chain from starting if it
91        // is in a halted state.
92        !state.is_chain_halted().await
93    }
94
95    // StateDelta::apply only works when the StateDelta wraps an underlying
96    // StateWrite.  But if we want to share the StateDelta with spawned tasks,
97    // we usually can't wrap a StateWrite instance, which requires exclusive
98    // access. This method "externally" applies the state delta to the
99    // inter-block state.
100    //
101    // Invariant: `state_tx` and `self.state` are the only two references to the
102    // inter-block state.
103    fn apply(&mut self, state_tx: StateDelta<InterBlockState>) -> Vec<Event> {
104        let (state2, mut cache) = state_tx.flatten();
105        std::mem::drop(state2);
106        // Now there is only one reference to the inter-block state: self.state
107
108        let events = cache.take_events();
109        cache.apply_to(
110            Arc::get_mut(&mut self.state).expect("no other references to inter-block state"),
111        );
112
113        events
114    }
115
116    pub async fn init_chain(&mut self, app_state: &AppState) {
117        let mut state_tx = self
118            .state
119            .try_begin_transaction()
120            .expect("state Arc should not be referenced elsewhere");
121        match app_state {
122            AppState::Content(genesis) => {
123                state_tx.put_chain_id(genesis.chain_id.clone());
124                Sct::init_chain(&mut state_tx, Some(&genesis.sct_content)).await;
125                ShieldedPool::init_chain(&mut state_tx, Some(&genesis.shielded_pool_content)).await;
126                Distributions::init_chain(&mut state_tx, Some(&genesis.distributions_content))
127                    .await;
128                Staking::init_chain(
129                    &mut state_tx,
130                    Some(&(
131                        genesis.stake_content.clone(),
132                        genesis.shielded_pool_content.clone(),
133                    )),
134                )
135                .await;
136                Ibc::init_chain(&mut state_tx, Some(&genesis.ibc_content)).await;
137                Auction::init_chain(&mut state_tx, Some(&genesis.auction_content)).await;
138                Dex::init_chain(&mut state_tx, Some(&genesis.dex_content)).await;
139                CommunityPool::init_chain(&mut state_tx, Some(&genesis.community_pool_content))
140                    .await;
141                Governance::init_chain(&mut state_tx, Some(&genesis.governance_content)).await;
142                FeeComponent::init_chain(&mut state_tx, Some(&genesis.fee_content)).await;
143                Funding::init_chain(&mut state_tx, Some(&genesis.funding_content)).await;
144
145                state_tx
146                    .finish_block()
147                    .await
148                    .expect("must be able to finish compact block");
149            }
150            AppState::Checkpoint(_) => {
151                ShieldedPool::init_chain(&mut state_tx, None).await;
152                Distributions::init_chain(&mut state_tx, None).await;
153                Staking::init_chain(&mut state_tx, None).await;
154                Ibc::init_chain(&mut state_tx, None).await;
155                Dex::init_chain(&mut state_tx, None).await;
156                Governance::init_chain(&mut state_tx, None).await;
157                CommunityPool::init_chain(&mut state_tx, None).await;
158                FeeComponent::init_chain(&mut state_tx, None).await;
159                Funding::init_chain(&mut state_tx, None).await;
160            }
161        };
162
163        // Note that `init_chain` can not emit any events, and we do not want to
164        // work around this as it violates the design principle that events are changes
165        // to initial data.
166        //
167        // This means that indexers are responsible for parsing genesis data and bootstrapping
168        // their initial state before processing chronological events.
169        //
170        // See: https://github.com/penumbra-zone/penumbra/pull/4449#discussion_r1636868800
171
172        state_tx.apply();
173    }
174
175    pub async fn prepare_proposal(
176        &mut self,
177        proposal: request::PrepareProposal,
178    ) -> response::PrepareProposal {
179        if self.state.is_chain_halted().await {
180            // If we find ourselves preparing a proposal for a halted chain
181            // we stop abruptly to prevent any progress.
182            // The persistent halt mechanism will prevent restarts until we are ready.
183            process::exit(0);
184        }
185
186        let mut included_txs = Vec::new();
187        let num_candidate_txs = proposal.txs.len();
188        tracing::debug!(
189            "processing PrepareProposal, found {} candidate transactions",
190            num_candidate_txs
191        );
192
193        // This is a node controlled parameter that is different from the homonymous
194        // mempool's `max_tx_bytes`. Comet will send us raw proposals that exceed this
195        // limit, presuming that a subset of those transactions will be shed.
196        // More context in https://github.com/cometbft/cometbft/blob/v0.37.5/spec/abci/abci%2B%2B_app_requirements.md
197        let max_proposal_size_bytes = proposal.max_tx_bytes as u64;
198        // Tracking the size of the proposal
199        let mut proposal_size_bytes = 0u64;
200
201        for tx in proposal.txs {
202            let transaction_size = tx.len() as u64;
203
204            // We compute the total proposal size if we were to include this transaction.
205            let total_with_tx = proposal_size_bytes.saturating_add(transaction_size);
206
207            // This should never happen, unless Comet is misbehaving because of a bug
208            // or a misconfiguration. We handle it gracefully, to prioritize forward progress.
209            if transaction_size > MAX_TRANSACTION_SIZE_BYTES as u64 {
210                continue;
211            }
212
213            // First, we filter proposals to fit within the block limit.
214            if total_with_tx >= max_proposal_size_bytes {
215                break;
216            }
217
218            // Then, we make sure to only include successful transactions.
219            match self.deliver_tx_bytes(&tx).await {
220                Ok(_) => {
221                    proposal_size_bytes = total_with_tx;
222                    included_txs.push(tx)
223                }
224                Err(_) => continue,
225            }
226        }
227
228        // The evidence payload is validated by Comet, we can lean on three guarantees:
229        // 1. The total payload is bound by `MAX_EVIDENCE_SIZE_BYTES`
230        // 2. Expired evidence is filtered
231        // 3. Evidence is valid.
232        tracing::debug!(
233            "finished processing PrepareProposal, including {}/{} candidate transactions",
234            included_txs.len(),
235            num_candidate_txs
236        );
237
238        response::PrepareProposal { txs: included_txs }
239    }
240
241    #[instrument(skip_all, ret, level = "debug")]
242    pub async fn process_proposal(
243        &mut self,
244        proposal: request::ProcessProposal,
245    ) -> response::ProcessProposal {
246        tracing::debug!(
247            height = proposal.height.value(),
248            proposer = ?proposal.proposer_address,
249            proposal_hash = ?proposal.hash,
250            "processing proposal"
251        );
252
253        // Proposal validation:
254        // 1. Total evidence payload committed is below [`MAX_EVIDENCE_SIZE_BYTES`]
255        // 2. Individual transactions are at most [`MAX_TRANSACTION_SIZE_BYTES`]
256        // 3. The total transaction payload is below [`MAX_BLOCK_PAYLOAD_SIZE_BYTES`]
257        // 4. Each transaction applies successfully.
258        let mut evidence_buffer: Vec<u8> = Vec::with_capacity(MAX_EVIDENCE_SIZE_BYTES);
259        let mut bytes_tracker = 0usize;
260
261        for evidence in proposal.misbehavior {
262            // This should be pretty cheap, we allow for `MAX_EVIDENCE_SIZE_BYTES` in total
263            // but a single evidence datum should be an order of magnitude smaller than that.
264            evidence_buffer.clear();
265            let proto_evidence: tendermint_proto::v0_37::abci::Misbehavior = evidence.into();
266            let evidence_size = match proto_evidence.encode(&mut evidence_buffer) {
267                Ok(_) => evidence_buffer.len(),
268                Err(_) => return response::ProcessProposal::Reject,
269            };
270            bytes_tracker = bytes_tracker.saturating_add(evidence_size);
271            if bytes_tracker > MAX_EVIDENCE_SIZE_BYTES {
272                return response::ProcessProposal::Reject;
273            }
274        }
275
276        // The evidence payload is valid, now we validate the block txs
277        // payload: they MUST be below the tx size limit, and apply cleanly on
278        // state fork.
279        let mut total_txs_payload_size = 0usize;
280        for tx in proposal.txs {
281            let tx_size = tx.len();
282            if tx_size > MAX_TRANSACTION_SIZE_BYTES {
283                return response::ProcessProposal::Reject;
284            }
285
286            total_txs_payload_size = total_txs_payload_size.saturating_add(tx_size);
287            if total_txs_payload_size >= MAX_BLOCK_TXS_PAYLOAD_BYTES {
288                return response::ProcessProposal::Reject;
289            }
290
291            match self.deliver_tx_bytes(&tx).await {
292                Ok(_) => continue,
293                Err(_) => return response::ProcessProposal::Reject,
294            }
295        }
296
297        response::ProcessProposal::Accept
298    }
299
300    pub async fn begin_block(&mut self, begin_block: &request::BeginBlock) -> Vec<abci::Event> {
301        let mut state_tx = StateDelta::new(self.state.clone());
302
303        // If a app parameter change is scheduled for this block, apply it here,
304        // before any other component has executed. This ensures that app
305        // parameter changes are consistently applied precisely at the boundary
306        // between blocks.
307        //
308        // Note that because _nothing_ has executed yet, we need to get the
309        // current height from the begin_block request, rather than from the
310        // state (it will be set by the SCT component, which executes first).
311        if let Some(change) = state_tx
312            .param_changes_for_height(begin_block.header.height.into())
313            .await
314            .expect("param changes should always be readable, even if unset")
315        {
316            let old_params = state_tx
317                .get_app_params()
318                .await
319                .expect("must be able to read app params");
320            match change.apply_changes(old_params) {
321                Ok(new_params) => {
322                    tracing::info!(?change, "applied app parameter change");
323                    state_tx.put_app_params(new_params);
324                }
325                Err(e) => {
326                    // N.B. this is an "info" rather than "warn" because it does not report
327                    // a problem with _this instance of the application_, but rather is an expected
328                    // behavior.
329                    tracing::info!(?change, ?e, "failed to apply approved app parameter change");
330                }
331            }
332        }
333
334        // Run each of the begin block handlers for each component, in sequence:
335        let mut arc_state_tx = Arc::new(state_tx);
336        Sct::begin_block(&mut arc_state_tx, begin_block).await;
337        ShieldedPool::begin_block(&mut arc_state_tx, begin_block).await;
338        Distributions::begin_block(&mut arc_state_tx, begin_block).await;
339        Ibc::begin_block::<PenumbraHost, StateDelta<Arc<StateDelta<cnidarium::Snapshot>>>>(
340            &mut arc_state_tx,
341            begin_block,
342        )
343        .await;
344        Auction::begin_block(&mut arc_state_tx, begin_block).await;
345        Dex::begin_block(&mut arc_state_tx, begin_block).await;
346        CommunityPool::begin_block(&mut arc_state_tx, begin_block).await;
347        Governance::begin_block(&mut arc_state_tx, begin_block).await;
348        Staking::begin_block(&mut arc_state_tx, begin_block).await;
349        FeeComponent::begin_block(&mut arc_state_tx, begin_block).await;
350        Funding::begin_block(&mut arc_state_tx, begin_block).await;
351
352        let state_tx = Arc::try_unwrap(arc_state_tx)
353            .expect("components did not retain copies of shared state");
354
355        // Apply the state from `begin_block` and return the events (we'll append to them if
356        // necessary based on the results of applying the Community Pool transactions queued)
357        let mut events = self.apply(state_tx);
358
359        // Deliver Community Pool transactions here, before any other block processing (effectively adding
360        // synthetic transactions slotted in after the start of the block but before any user
361        // transactions)
362        let pending_transactions = self
363            .state
364            .pending_community_pool_transactions()
365            .await
366            .expect("Community Pool transactions should always be readable");
367        for transaction in pending_transactions {
368            // NOTE: We are *intentionally* using `deliver_tx_allowing_community_pool_spends` here, rather than
369            // `deliver_tx`, because here is the **ONLY** place we want to permit Community Pool spends, when
370            // delivering transactions that have been scheduled by the chain itself for delivery.
371            tracing::info!(?transaction, "delivering Community Pool transaction");
372            match self
373                .deliver_tx_allowing_community_pool_spends(Arc::new(transaction))
374                .await
375            {
376                Err(error) => {
377                    tracing::warn!(?error, "failed to deliver Community Pool transaction");
378                }
379                Ok(community_pool_tx_events) => events.extend(community_pool_tx_events),
380            }
381        }
382
383        events
384    }
385
386    /// Wrapper function for [`Self::deliver_tx`]  that decodes from bytes.
387    pub async fn deliver_tx_bytes(&mut self, tx_bytes: &[u8]) -> Result<Vec<abci::Event>> {
388        let tx = Arc::new(Transaction::decode(tx_bytes).context("decoding transaction")?);
389        self.deliver_tx(tx)
390            .await
391            .context("failed to deliver transaction")
392    }
393
394    pub async fn deliver_tx(&mut self, tx: Arc<Transaction>) -> Result<Vec<abci::Event>> {
395        // Ensure that any normally-delivered transaction (originating from a user) does not contain
396        // any Community Pool spends or outputs; the only place those are permitted is transactions originating
397        // from the chain itself:
398        anyhow::ensure!(
399            tx.community_pool_spends().peekable().peek().is_none(),
400            "Community Pool spends are not permitted in user-submitted transactions"
401        );
402        anyhow::ensure!(
403            tx.community_pool_outputs().peekable().peek().is_none(),
404            "Community Pool outputs are not permitted in user-submitted transactions"
405        );
406
407        // Now that we've ensured that there are not any Community Pool spends or outputs, we can deliver the transaction:
408        self.deliver_tx_allowing_community_pool_spends(tx).await
409    }
410
411    async fn deliver_tx_allowing_community_pool_spends(
412        &mut self,
413        tx: Arc<Transaction>,
414    ) -> Result<Vec<abci::Event>> {
415        // Both stateful and stateless checks take the transaction as
416        // verification context.  The separate clone of the Arc<Transaction>
417        // means it can be passed through the whole tree of checks.
418        //
419        // We spawn tasks for each set of checks, to do CPU-bound stateless checks
420        // and I/O-bound stateful checks at the same time.
421        let tx2 = tx.clone();
422        let stateless = tokio::spawn(
423            async move { tx2.check_stateless(()).await }.instrument(tracing::Span::current()),
424        );
425        let tx2 = tx.clone();
426        let state2 = self.state.clone();
427        let stateful = tokio::spawn(
428            async move { tx2.check_historical(state2).await }.instrument(tracing::Span::current()),
429        );
430
431        stateless
432            .await
433            .context("waiting for check_stateless check tasks")?
434            .context("check_stateless failed")?;
435        stateful
436            .await
437            .context("waiting for check_stateful tasks")?
438            .context("check_stateful failed")?;
439
440        // At this point, the stateful checks should have completed,
441        // leaving us with exclusive access to the Arc<State>.
442        let mut state_tx = self
443            .state
444            .try_begin_transaction()
445            .expect("state Arc should be present and unique");
446
447        // Index the transaction:
448        let height = state_tx.get_block_height().await?;
449        let transaction = Arc::as_ref(&tx).clone();
450        state_tx
451            .put_block_transaction(height, transaction.into())
452            .await
453            .context("storing transactions")?;
454
455        tx.check_and_execute(&mut state_tx)
456            .await
457            .context("executing transaction")?;
458
459        // At this point, we've completed execution successfully with no errors,
460        // so we can apply the transaction to the State. Otherwise, we'd have
461        // bubbled up an error and dropped the StateTransaction.
462        Ok(state_tx.apply().1)
463    }
464
465    #[tracing::instrument(skip_all, fields(height = %end_block.height))]
466    pub async fn end_block(&mut self, end_block: &request::EndBlock) -> Vec<abci::Event> {
467        let state_tx = StateDelta::new(self.state.clone());
468
469        tracing::debug!("running app components' `end_block` hooks");
470        let mut arc_state_tx = Arc::new(state_tx);
471        Sct::end_block(&mut arc_state_tx, end_block).await;
472        ShieldedPool::end_block(&mut arc_state_tx, end_block).await;
473        Distributions::end_block(&mut arc_state_tx, end_block).await;
474        Ibc::end_block(&mut arc_state_tx, end_block).await;
475        Auction::end_block(&mut arc_state_tx, end_block).await;
476        Dex::end_block(&mut arc_state_tx, end_block).await;
477        CommunityPool::end_block(&mut arc_state_tx, end_block).await;
478        Governance::end_block(&mut arc_state_tx, end_block).await;
479        Staking::end_block(&mut arc_state_tx, end_block).await;
480        FeeComponent::end_block(&mut arc_state_tx, end_block).await;
481        Funding::end_block(&mut arc_state_tx, end_block).await;
482        let mut state_tx = Arc::try_unwrap(arc_state_tx)
483            .expect("components did not retain copies of shared state");
484        tracing::debug!("finished app components' `end_block` hooks");
485
486        let current_height = state_tx
487            .get_block_height()
488            .await
489            .expect("able to get block height in end_block");
490        let current_epoch = state_tx
491            .get_current_epoch()
492            .await
493            .expect("able to get current epoch in end_block");
494
495        let is_end_epoch = current_epoch.is_scheduled_epoch_end(
496            current_height,
497            state_tx
498                .get_epoch_duration_parameter()
499                .await
500                .expect("able to get epoch duration in end_block"),
501        ) || state_tx.is_epoch_ending_early().await;
502
503        // If a chain upgrade is scheduled for the next block, we trigger an early epoch change
504        // so that the upgraded chain starts at a clean epoch boundary.
505        let is_chain_upgrade = state_tx
506            .is_pre_upgrade_height()
507            .await
508            .expect("able to detect upgrade heights");
509
510        if is_end_epoch || is_chain_upgrade {
511            tracing::info!(%is_end_epoch, %is_chain_upgrade, ?current_height, "ending epoch");
512
513            let mut arc_state_tx = Arc::new(state_tx);
514
515            Sct::end_epoch(&mut arc_state_tx)
516                .await
517                .expect("able to call end_epoch on Sct component");
518            Distributions::end_epoch(&mut arc_state_tx)
519                .await
520                .expect("able to call end_epoch on Distributions component");
521            Ibc::end_epoch(&mut arc_state_tx)
522                .await
523                .expect("able to call end_epoch on IBC component");
524            Auction::end_epoch(&mut arc_state_tx)
525                .await
526                .expect("able to call end_epoch on auction component");
527            Dex::end_epoch(&mut arc_state_tx)
528                .await
529                .expect("able to call end_epoch on dex component");
530            CommunityPool::end_epoch(&mut arc_state_tx)
531                .await
532                .expect("able to call end_epoch on Community Pool component");
533            Governance::end_epoch(&mut arc_state_tx)
534                .await
535                .expect("able to call end_epoch on Governance component");
536            ShieldedPool::end_epoch(&mut arc_state_tx)
537                .await
538                .expect("able to call end_epoch on shielded pool component");
539            Staking::end_epoch(&mut arc_state_tx)
540                .await
541                .expect("able to call end_epoch on Staking component");
542            FeeComponent::end_epoch(&mut arc_state_tx)
543                .await
544                .expect("able to call end_epoch on Fee component");
545            Funding::end_epoch(&mut arc_state_tx)
546                .await
547                .expect("able to call end_epoch on Funding component");
548
549            let mut state_tx = Arc::try_unwrap(arc_state_tx)
550                .expect("components did not retain copies of shared state");
551
552            state_tx
553                .finish_epoch()
554                .await
555                .expect("must be able to finish compact block");
556
557            // set the epoch for the next block
558            penumbra_sdk_sct::component::clock::EpochManager::put_epoch_by_height(
559                &mut state_tx,
560                current_height + 1,
561                Epoch {
562                    index: current_epoch.index + 1,
563                    start_height: current_height + 1,
564                },
565            );
566
567            self.apply(state_tx)
568        } else {
569            // set the epoch for the next block
570            penumbra_sdk_sct::component::clock::EpochManager::put_epoch_by_height(
571                &mut state_tx,
572                current_height + 1,
573                current_epoch,
574            );
575
576            state_tx
577                .finish_block()
578                .await
579                .expect("must be able to finish compact block");
580
581            self.apply(state_tx)
582        }
583    }
584
585    /// Commits the application state to persistent storage,
586    /// returning the new root hash and storage version.
587    ///
588    /// This method also resets `self` as if it were constructed
589    /// as an empty state over top of the newly written storage.
590    pub async fn commit(&mut self, storage: Storage) -> RootHash {
591        // We need to extract the State we've built up to commit it.  Fill in a dummy state.
592        let dummy_state = StateDelta::new(storage.latest_snapshot());
593        let mut state = Arc::try_unwrap(std::mem::replace(&mut self.state, Arc::new(dummy_state)))
594            .expect("we have exclusive ownership of the State at commit()");
595
596        // Check if an emergency halt has been signaled.
597        let should_halt = state.is_chain_halted().await;
598
599        let is_pre_upgrade_height = state
600            .is_pre_upgrade_height()
601            .await
602            .expect("must be able to read upgrade height");
603
604        // If the next height is an upgrade height, we signal a halt and turn
605        // a `halt_bit` on which will prevent the chain from restarting without
606        // running a migration.
607        if is_pre_upgrade_height {
608            tracing::info!("pre-upgrade height reached, signaling halt");
609            state.signal_halt();
610        }
611
612        // Commit the pending writes, clearing the state.
613        let jmt_root = storage
614            .commit(state)
615            .await
616            .expect("must be able to successfully commit to storage");
617
618        // We want to halt the node, but not before we submit an ABCI `Commit`
619        // response to `CometBFT`. To do this, we schedule a process exit in `2s`,
620        // assuming a `5s` timeout.
621        // See #4443 for more context.
622        if should_halt || is_pre_upgrade_height {
623            tokio::spawn(async move {
624                sleep(Duration::from_secs(2)).await;
625                tracing::info!("halt signal recorded, exiting process");
626                std::process::exit(0);
627            });
628        }
629
630        tracing::debug!(?jmt_root, "finished committing state");
631
632        // Get the latest version of the state, now that we've committed it.
633        self.state = Arc::new(StateDelta::new(storage.latest_snapshot()));
634
635        jmt_root
636    }
637
638    pub fn cometbft_validator_updates(&self) -> Vec<Update> {
639        self.state
640            .cometbft_validator_updates()
641            // If the cometbft validator updates are not set, we return an empty
642            // update set, signaling no change to Tendermint.
643            .unwrap_or_default()
644    }
645}
646
647#[async_trait]
648pub trait StateReadExt: StateRead {
649    async fn get_chain_id(&self) -> Result<String> {
650        let raw_chain_id = self
651            .get_raw(state_key::data::chain_id())
652            .await?
653            .expect("chain id is always set");
654
655        Ok(String::from_utf8_lossy(&raw_chain_id).to_string())
656    }
657
658    /// Checks a provided chain_id against the chain state.
659    ///
660    /// Passes through if the provided chain_id is empty or matches, and
661    /// otherwise errors.
662    async fn check_chain_id(&self, provided: &str) -> Result<()> {
663        let chain_id = self
664            .get_chain_id()
665            .await
666            .context(format!("error getting chain id: '{provided}'"))?;
667        if provided.is_empty() || provided == chain_id {
668            Ok(())
669        } else {
670            Err(anyhow::anyhow!(
671                "provided chain_id {} does not match chain_id {}",
672                provided,
673                chain_id
674            ))
675        }
676    }
677
678    /// Gets the chain revision number, from the chain ID
679    async fn get_revision_number(&self) -> Result<u64> {
680        let cid_str = self.get_chain_id().await?;
681
682        Ok(ChainId::from_string(&cid_str).version())
683    }
684
685    /// Returns the set of app parameters
686    async fn get_app_params(&self) -> Result<AppParameters> {
687        let chain_id = self.get_chain_id().await?;
688        let community_pool_params: penumbra_sdk_community_pool::params::CommunityPoolParameters =
689            self.get_community_pool_params().await?;
690        let distributions_params = self.get_distributions_params().await?;
691        let ibc_params = self.get_ibc_params().await?;
692        let fee_params = self.get_fee_params().await?;
693        let funding_params = self.get_funding_params().await?;
694        let governance_params = self.get_governance_params().await?;
695        let sct_params = self.get_sct_params().await?;
696        let shielded_pool_params = self.get_shielded_pool_params().await?;
697        let stake_params = self.get_stake_params().await?;
698        let dex_params = self.get_dex_params().await?;
699        let auction_params = self.get_auction_params().await?;
700
701        Ok(AppParameters {
702            chain_id,
703            auction_params,
704            community_pool_params,
705            distributions_params,
706            fee_params,
707            funding_params,
708            governance_params,
709            ibc_params,
710            sct_params,
711            shielded_pool_params,
712            stake_params,
713            dex_params,
714        })
715    }
716
717    async fn transactions_by_height(
718        &self,
719        block_height: u64,
720    ) -> Result<TransactionsByHeightResponse> {
721        let transactions = match self
722            .nonverifiable_get_raw(
723                state_key::cometbft_data::transactions_by_height(block_height).as_bytes(),
724            )
725            .await?
726        {
727            Some(transactions) => transactions,
728            None => TransactionsByHeightResponse {
729                transactions: vec![],
730                block_height,
731            }
732            .encode_to_vec(),
733        };
734
735        Ok(TransactionsByHeightResponse::decode(&transactions[..])?)
736    }
737}
738
739impl<
740        T: StateRead
741            + penumbra_sdk_stake::StateReadExt
742            + penumbra_sdk_governance::component::StateReadExt
743            + penumbra_sdk_fee::component::StateReadExt
744            + penumbra_sdk_community_pool::component::StateReadExt
745            + penumbra_sdk_sct::component::clock::EpochRead
746            + penumbra_sdk_ibc::component::StateReadExt
747            + penumbra_sdk_distributions::component::StateReadExt
748            + ?Sized,
749    > StateReadExt for T
750{
751}
752
753#[async_trait]
754pub trait StateWriteExt: StateWrite {
755    /// Sets the chain ID.
756    fn put_chain_id(&mut self, chain_id: String) {
757        self.put_raw(state_key::data::chain_id().into(), chain_id.into_bytes());
758    }
759
760    /// Stores the transactions that occurred during a CometBFT block.
761    /// This is used to create a durable transaction log for clients to retrieve;
762    /// the CometBFT `get_block_by_height` RPC call will only return data for blocks
763    /// since the last checkpoint, so we need to store the transactions separately.
764    async fn put_block_transaction(
765        &mut self,
766        height: u64,
767        transaction: penumbra_sdk_proto::core::transaction::v1::Transaction,
768    ) -> Result<()> {
769        // Extend the existing transactions with the new one.
770        let mut transactions_response = self.transactions_by_height(height).await?;
771        transactions_response.transactions = transactions_response
772            .transactions
773            .into_iter()
774            .chain(std::iter::once(transaction))
775            .collect();
776
777        self.nonverifiable_put_raw(
778            state_key::cometbft_data::transactions_by_height(height).into(),
779            transactions_response.encode_to_vec(),
780        );
781        Ok(())
782    }
783
784    /// Writes the app parameters to the state.
785    ///
786    /// Each component stores its own parameters separately, so this method
787    /// splits up the provided parameters structure and writes it out to each component.
788    fn put_app_params(&mut self, params: AppParameters) {
789        // To make sure we don't forget to write any parts, destructure the entire params
790        let AppParameters {
791            chain_id,
792            auction_params,
793            community_pool_params,
794            distributions_params,
795            fee_params,
796            funding_params,
797            governance_params,
798            ibc_params,
799            sct_params,
800            shielded_pool_params,
801            stake_params,
802            dex_params,
803        } = params;
804
805        // Ignore writes to the chain_id
806        // TODO(erwan): we are momentarily not supporting chain_id changes
807        // until the IBC host chain changes land.
808        // See: https://github.com/penumbra-zone/penumbra/issues/3617#issuecomment-1917708221
809        std::mem::drop(chain_id);
810
811        self.put_auction_params(auction_params);
812        self.put_community_pool_params(community_pool_params);
813        self.put_distributions_params(distributions_params);
814        self.put_fee_params(fee_params);
815        self.put_funding_params(funding_params);
816        self.put_governance_params(governance_params);
817        self.put_ibc_params(ibc_params);
818        self.put_sct_params(sct_params);
819        self.put_shielded_pool_params(shielded_pool_params);
820        self.put_stake_params(stake_params);
821        self.put_dex_params(dex_params);
822    }
823}
824
825impl<T: StateWrite + ?Sized> StateWriteExt for T {}