use std::process;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use async_trait::async_trait;
use cnidarium::{ArcStateDeltaExt, Snapshot, StateDelta, StateRead, StateWrite, Storage};
use cnidarium_component::Component;
use ibc_types::core::connection::ChainId;
use jmt::RootHash;
use penumbra_auction::component::{Auction, StateReadExt as _, StateWriteExt as _};
use penumbra_community_pool::component::{CommunityPool, StateWriteExt as _};
use penumbra_community_pool::StateReadExt as _;
use penumbra_compact_block::component::CompactBlockManager;
use penumbra_dex::component::StateReadExt as _;
use penumbra_dex::component::{Dex, StateWriteExt as _};
use penumbra_distributions::component::{Distributions, StateReadExt as _, StateWriteExt as _};
use penumbra_fee::component::{FeeComponent, StateReadExt as _, StateWriteExt as _};
use penumbra_funding::component::Funding;
use penumbra_funding::component::{StateReadExt as _, StateWriteExt as _};
use penumbra_governance::component::{Governance, StateReadExt as _, StateWriteExt as _};
use penumbra_ibc::component::{Ibc, StateWriteExt as _};
use penumbra_ibc::StateReadExt as _;
use penumbra_proto::core::app::v1::TransactionsByHeightResponse;
use penumbra_proto::DomainType;
use penumbra_sct::component::clock::EpochRead;
use penumbra_sct::component::sct::Sct;
use penumbra_sct::component::{StateReadExt as _, StateWriteExt as _};
use penumbra_sct::epoch::Epoch;
use penumbra_shielded_pool::component::{ShieldedPool, StateReadExt as _, StateWriteExt as _};
use penumbra_stake::component::{
stake::ConsensusUpdateRead, Staking, StateReadExt as _, StateWriteExt as _,
};
use penumbra_transaction::Transaction;
use prost::Message as _;
use tendermint::abci::{self, Event};
use tendermint::v0_37::abci::{request, response};
use tendermint::validator::Update;
use tokio::time::sleep;
use tracing::{instrument, Instrument};
use crate::action_handler::AppActionHandler;
use crate::genesis::AppState;
use crate::params::change::ParameterChangeExt as _;
use crate::params::AppParameters;
use crate::{CommunityPoolStateReadExt, PenumbraHost};
pub mod state_key;
type InterBlockState = Arc<StateDelta<Snapshot>>;
pub const MAX_BLOCK_TXS_PAYLOAD_BYTES: usize = 1024 * 1024;
pub const MAX_TRANSACTION_SIZE_BYTES: usize = 96 * 1024;
pub const MAX_EVIDENCE_SIZE_BYTES: usize = 30 * 1024;
pub struct App {
state: InterBlockState,
}
impl App {
#[instrument(skip_all)]
pub fn new(snapshot: Snapshot) -> Self {
tracing::debug!("initializing App instance");
let state = Arc::new(StateDelta::new(snapshot));
Self { state }
}
#[instrument(skip_all, ret)]
pub async fn is_ready(state: Snapshot) -> bool {
!state.is_chain_halted().await
}
fn apply(&mut self, state_tx: StateDelta<InterBlockState>) -> Vec<Event> {
let (state2, mut cache) = state_tx.flatten();
std::mem::drop(state2);
let events = cache.take_events();
cache.apply_to(
Arc::get_mut(&mut self.state).expect("no other references to inter-block state"),
);
events
}
pub async fn init_chain(&mut self, app_state: &AppState) {
let mut state_tx = self
.state
.try_begin_transaction()
.expect("state Arc should not be referenced elsewhere");
match app_state {
AppState::Content(genesis) => {
state_tx.put_chain_id(genesis.chain_id.clone());
Sct::init_chain(&mut state_tx, Some(&genesis.sct_content)).await;
ShieldedPool::init_chain(&mut state_tx, Some(&genesis.shielded_pool_content)).await;
Distributions::init_chain(&mut state_tx, Some(&genesis.distributions_content))
.await;
Staking::init_chain(
&mut state_tx,
Some(&(
genesis.stake_content.clone(),
genesis.shielded_pool_content.clone(),
)),
)
.await;
Ibc::init_chain(&mut state_tx, Some(&genesis.ibc_content)).await;
Auction::init_chain(&mut state_tx, Some(&genesis.auction_content)).await;
Dex::init_chain(&mut state_tx, Some(&genesis.dex_content)).await;
CommunityPool::init_chain(&mut state_tx, Some(&genesis.community_pool_content))
.await;
Governance::init_chain(&mut state_tx, Some(&genesis.governance_content)).await;
FeeComponent::init_chain(&mut state_tx, Some(&genesis.fee_content)).await;
Funding::init_chain(&mut state_tx, Some(&genesis.funding_content)).await;
state_tx
.finish_block()
.await
.expect("must be able to finish compact block");
}
AppState::Checkpoint(_) => {
ShieldedPool::init_chain(&mut state_tx, None).await;
Distributions::init_chain(&mut state_tx, None).await;
Staking::init_chain(&mut state_tx, None).await;
Ibc::init_chain(&mut state_tx, None).await;
Dex::init_chain(&mut state_tx, None).await;
Governance::init_chain(&mut state_tx, None).await;
CommunityPool::init_chain(&mut state_tx, None).await;
FeeComponent::init_chain(&mut state_tx, None).await;
Funding::init_chain(&mut state_tx, None).await;
}
};
state_tx.apply();
}
pub async fn prepare_proposal(
&mut self,
proposal: request::PrepareProposal,
) -> response::PrepareProposal {
if self.state.is_chain_halted().await {
process::exit(0);
}
let mut included_txs = Vec::new();
let num_candidate_txs = proposal.txs.len();
tracing::debug!(
"processing PrepareProposal, found {} candidate transactions",
num_candidate_txs
);
let max_proposal_size_bytes = proposal.max_tx_bytes as u64;
let mut proposal_size_bytes = 0u64;
for tx in proposal.txs {
let transaction_size = tx.len() as u64;
let total_with_tx = proposal_size_bytes.saturating_add(transaction_size);
if transaction_size > MAX_TRANSACTION_SIZE_BYTES as u64 {
continue;
}
if total_with_tx >= max_proposal_size_bytes {
break;
}
match self.deliver_tx_bytes(&tx).await {
Ok(_) => {
proposal_size_bytes = total_with_tx;
included_txs.push(tx)
}
Err(_) => continue,
}
}
tracing::debug!(
"finished processing PrepareProposal, including {}/{} candidate transactions",
included_txs.len(),
num_candidate_txs
);
response::PrepareProposal { txs: included_txs }
}
#[instrument(skip_all, ret, level = "debug")]
pub async fn process_proposal(
&mut self,
proposal: request::ProcessProposal,
) -> response::ProcessProposal {
tracing::debug!(
height = proposal.height.value(),
proposer = ?proposal.proposer_address,
proposal_hash = ?proposal.hash,
"processing proposal"
);
let mut evidence_buffer: Vec<u8> = Vec::with_capacity(MAX_EVIDENCE_SIZE_BYTES);
let mut bytes_tracker = 0usize;
for evidence in proposal.misbehavior {
evidence_buffer.clear();
let proto_evidence: tendermint_proto::v0_37::abci::Misbehavior = evidence.into();
let evidence_size = match proto_evidence.encode(&mut evidence_buffer) {
Ok(_) => evidence_buffer.len(),
Err(_) => return response::ProcessProposal::Reject,
};
bytes_tracker = bytes_tracker.saturating_add(evidence_size);
if bytes_tracker > MAX_EVIDENCE_SIZE_BYTES {
return response::ProcessProposal::Reject;
}
}
let mut total_txs_payload_size = 0usize;
for tx in proposal.txs {
let tx_size = tx.len();
if tx_size > MAX_TRANSACTION_SIZE_BYTES {
return response::ProcessProposal::Reject;
}
total_txs_payload_size = total_txs_payload_size.saturating_add(tx_size);
if total_txs_payload_size >= MAX_BLOCK_TXS_PAYLOAD_BYTES {
return response::ProcessProposal::Reject;
}
match self.deliver_tx_bytes(&tx).await {
Ok(_) => continue,
Err(_) => return response::ProcessProposal::Reject,
}
}
response::ProcessProposal::Accept
}
pub async fn begin_block(&mut self, begin_block: &request::BeginBlock) -> Vec<abci::Event> {
let mut state_tx = StateDelta::new(self.state.clone());
if let Some(change) = state_tx
.param_changes_for_height(begin_block.header.height.into())
.await
.expect("param changes should always be readable, even if unset")
{
let old_params = state_tx
.get_app_params()
.await
.expect("must be able to read app params");
match change.apply_changes(old_params) {
Ok(new_params) => {
tracing::info!(?change, "applied app parameter change");
state_tx.put_app_params(new_params);
}
Err(e) => {
tracing::info!(?change, ?e, "failed to apply approved app parameter change");
}
}
}
let mut arc_state_tx = Arc::new(state_tx);
Sct::begin_block(&mut arc_state_tx, begin_block).await;
ShieldedPool::begin_block(&mut arc_state_tx, begin_block).await;
Distributions::begin_block(&mut arc_state_tx, begin_block).await;
Ibc::begin_block::<PenumbraHost, StateDelta<Arc<StateDelta<cnidarium::Snapshot>>>>(
&mut arc_state_tx,
begin_block,
)
.await;
Auction::begin_block(&mut arc_state_tx, begin_block).await;
Dex::begin_block(&mut arc_state_tx, begin_block).await;
CommunityPool::begin_block(&mut arc_state_tx, begin_block).await;
Governance::begin_block(&mut arc_state_tx, begin_block).await;
Staking::begin_block(&mut arc_state_tx, begin_block).await;
FeeComponent::begin_block(&mut arc_state_tx, begin_block).await;
Funding::begin_block(&mut arc_state_tx, begin_block).await;
let state_tx = Arc::try_unwrap(arc_state_tx)
.expect("components did not retain copies of shared state");
let mut events = self.apply(state_tx);
let pending_transactions = self
.state
.pending_community_pool_transactions()
.await
.expect("Community Pool transactions should always be readable");
for transaction in pending_transactions {
tracing::info!(?transaction, "delivering Community Pool transaction");
match self
.deliver_tx_allowing_community_pool_spends(Arc::new(transaction))
.await
{
Err(error) => {
tracing::warn!(?error, "failed to deliver Community Pool transaction");
}
Ok(community_pool_tx_events) => events.extend(community_pool_tx_events),
}
}
events
}
pub async fn deliver_tx_bytes(&mut self, tx_bytes: &[u8]) -> Result<Vec<abci::Event>> {
let tx = Arc::new(Transaction::decode(tx_bytes).context("decoding transaction")?);
self.deliver_tx(tx)
.await
.context("failed to deliver transaction")
}
pub async fn deliver_tx(&mut self, tx: Arc<Transaction>) -> Result<Vec<abci::Event>> {
anyhow::ensure!(
tx.community_pool_spends().peekable().peek().is_none(),
"Community Pool spends are not permitted in user-submitted transactions"
);
anyhow::ensure!(
tx.community_pool_outputs().peekable().peek().is_none(),
"Community Pool outputs are not permitted in user-submitted transactions"
);
self.deliver_tx_allowing_community_pool_spends(tx).await
}
async fn deliver_tx_allowing_community_pool_spends(
&mut self,
tx: Arc<Transaction>,
) -> Result<Vec<abci::Event>> {
let tx2 = tx.clone();
let stateless = tokio::spawn(
async move { tx2.check_stateless(()).await }.instrument(tracing::Span::current()),
);
let tx2 = tx.clone();
let state2 = self.state.clone();
let stateful = tokio::spawn(
async move { tx2.check_historical(state2).await }.instrument(tracing::Span::current()),
);
stateless
.await
.context("waiting for check_stateless check tasks")?
.context("check_stateless failed")?;
stateful
.await
.context("waiting for check_stateful tasks")?
.context("check_stateful failed")?;
let mut state_tx = self
.state
.try_begin_transaction()
.expect("state Arc should be present and unique");
let height = state_tx.get_block_height().await?;
let transaction = Arc::as_ref(&tx).clone();
state_tx
.put_block_transaction(height, transaction.into())
.await
.context("storing transactions")?;
tx.check_and_execute(&mut state_tx)
.await
.context("executing transaction")?;
Ok(state_tx.apply().1)
}
#[tracing::instrument(skip_all, fields(height = %end_block.height))]
pub async fn end_block(&mut self, end_block: &request::EndBlock) -> Vec<abci::Event> {
let state_tx = StateDelta::new(self.state.clone());
tracing::debug!("running app components' `end_block` hooks");
let mut arc_state_tx = Arc::new(state_tx);
Sct::end_block(&mut arc_state_tx, end_block).await;
ShieldedPool::end_block(&mut arc_state_tx, end_block).await;
Distributions::end_block(&mut arc_state_tx, end_block).await;
Ibc::end_block(&mut arc_state_tx, end_block).await;
Auction::end_block(&mut arc_state_tx, end_block).await;
Dex::end_block(&mut arc_state_tx, end_block).await;
CommunityPool::end_block(&mut arc_state_tx, end_block).await;
Governance::end_block(&mut arc_state_tx, end_block).await;
Staking::end_block(&mut arc_state_tx, end_block).await;
FeeComponent::end_block(&mut arc_state_tx, end_block).await;
Funding::end_block(&mut arc_state_tx, end_block).await;
let mut state_tx = Arc::try_unwrap(arc_state_tx)
.expect("components did not retain copies of shared state");
tracing::debug!("finished app components' `end_block` hooks");
let current_height = state_tx
.get_block_height()
.await
.expect("able to get block height in end_block");
let current_epoch = state_tx
.get_current_epoch()
.await
.expect("able to get current epoch in end_block");
let is_end_epoch = current_epoch.is_scheduled_epoch_end(
current_height,
state_tx
.get_epoch_duration_parameter()
.await
.expect("able to get epoch duration in end_block"),
) || state_tx.is_epoch_ending_early().await;
let is_chain_upgrade = state_tx
.is_pre_upgrade_height()
.await
.expect("able to detect upgrade heights");
if is_end_epoch || is_chain_upgrade {
tracing::info!(%is_end_epoch, %is_chain_upgrade, ?current_height, "ending epoch");
let mut arc_state_tx = Arc::new(state_tx);
Sct::end_epoch(&mut arc_state_tx)
.await
.expect("able to call end_epoch on Sct component");
Distributions::end_epoch(&mut arc_state_tx)
.await
.expect("able to call end_epoch on Distributions component");
Ibc::end_epoch(&mut arc_state_tx)
.await
.expect("able to call end_epoch on IBC component");
Auction::end_epoch(&mut arc_state_tx)
.await
.expect("able to call end_epoch on auction component");
Dex::end_epoch(&mut arc_state_tx)
.await
.expect("able to call end_epoch on dex component");
CommunityPool::end_epoch(&mut arc_state_tx)
.await
.expect("able to call end_epoch on Community Pool component");
Governance::end_epoch(&mut arc_state_tx)
.await
.expect("able to call end_epoch on Governance component");
ShieldedPool::end_epoch(&mut arc_state_tx)
.await
.expect("able to call end_epoch on shielded pool component");
Staking::end_epoch(&mut arc_state_tx)
.await
.expect("able to call end_epoch on Staking component");
FeeComponent::end_epoch(&mut arc_state_tx)
.await
.expect("able to call end_epoch on Fee component");
Funding::end_epoch(&mut arc_state_tx)
.await
.expect("able to call end_epoch on Funding component");
let mut state_tx = Arc::try_unwrap(arc_state_tx)
.expect("components did not retain copies of shared state");
state_tx
.finish_epoch()
.await
.expect("must be able to finish compact block");
penumbra_sct::component::clock::EpochManager::put_epoch_by_height(
&mut state_tx,
current_height + 1,
Epoch {
index: current_epoch.index + 1,
start_height: current_height + 1,
},
);
self.apply(state_tx)
} else {
penumbra_sct::component::clock::EpochManager::put_epoch_by_height(
&mut state_tx,
current_height + 1,
current_epoch,
);
state_tx
.finish_block()
.await
.expect("must be able to finish compact block");
self.apply(state_tx)
}
}
pub async fn commit(&mut self, storage: Storage) -> RootHash {
let dummy_state = StateDelta::new(storage.latest_snapshot());
let mut state = Arc::try_unwrap(std::mem::replace(&mut self.state, Arc::new(dummy_state)))
.expect("we have exclusive ownership of the State at commit()");
let should_halt = state.is_chain_halted().await;
let is_pre_upgrade_height = state
.is_pre_upgrade_height()
.await
.expect("must be able to read upgrade height");
if is_pre_upgrade_height {
tracing::info!("pre-upgrade height reached, signaling halt");
state.signal_halt();
}
let jmt_root = storage
.commit(state)
.await
.expect("must be able to successfully commit to storage");
if should_halt || is_pre_upgrade_height {
tokio::spawn(async move {
sleep(Duration::from_secs(2)).await;
tracing::info!("halt signal recorded, exiting process");
std::process::exit(0);
});
}
tracing::debug!(?jmt_root, "finished committing state");
self.state = Arc::new(StateDelta::new(storage.latest_snapshot()));
jmt_root
}
pub fn cometbft_validator_updates(&self) -> Vec<Update> {
self.state
.cometbft_validator_updates()
.unwrap_or_default()
}
}
#[async_trait]
pub trait StateReadExt: StateRead {
async fn get_chain_id(&self) -> Result<String> {
let raw_chain_id = self
.get_raw(state_key::data::chain_id())
.await?
.expect("chain id is always set");
Ok(String::from_utf8_lossy(&raw_chain_id).to_string())
}
async fn check_chain_id(&self, provided: &str) -> Result<()> {
let chain_id = self
.get_chain_id()
.await
.context(format!("error getting chain id: '{provided}'"))?;
if provided.is_empty() || provided == chain_id {
Ok(())
} else {
Err(anyhow::anyhow!(
"provided chain_id {} does not match chain_id {}",
provided,
chain_id
))
}
}
async fn get_revision_number(&self) -> Result<u64> {
let cid_str = self.get_chain_id().await?;
Ok(ChainId::from_string(&cid_str).version())
}
async fn get_app_params(&self) -> Result<AppParameters> {
let chain_id = self.get_chain_id().await?;
let community_pool_params: penumbra_community_pool::params::CommunityPoolParameters =
self.get_community_pool_params().await?;
let distributions_params = self.get_distributions_params().await?;
let ibc_params = self.get_ibc_params().await?;
let fee_params = self.get_fee_params().await?;
let funding_params = self.get_funding_params().await?;
let governance_params = self.get_governance_params().await?;
let sct_params = self.get_sct_params().await?;
let shielded_pool_params = self.get_shielded_pool_params().await?;
let stake_params = self.get_stake_params().await?;
let dex_params = self.get_dex_params().await?;
let auction_params = self.get_auction_params().await?;
Ok(AppParameters {
chain_id,
auction_params,
community_pool_params,
distributions_params,
fee_params,
funding_params,
governance_params,
ibc_params,
sct_params,
shielded_pool_params,
stake_params,
dex_params,
})
}
async fn transactions_by_height(
&self,
block_height: u64,
) -> Result<TransactionsByHeightResponse> {
let transactions = match self
.nonverifiable_get_raw(
state_key::cometbft_data::transactions_by_height(block_height).as_bytes(),
)
.await?
{
Some(transactions) => transactions,
None => TransactionsByHeightResponse {
transactions: vec![],
block_height,
}
.encode_to_vec(),
};
Ok(TransactionsByHeightResponse::decode(&transactions[..])?)
}
}
impl<
T: StateRead
+ penumbra_stake::StateReadExt
+ penumbra_governance::component::StateReadExt
+ penumbra_fee::component::StateReadExt
+ penumbra_community_pool::component::StateReadExt
+ penumbra_sct::component::clock::EpochRead
+ penumbra_ibc::component::StateReadExt
+ penumbra_distributions::component::StateReadExt
+ ?Sized,
> StateReadExt for T
{
}
#[async_trait]
pub trait StateWriteExt: StateWrite {
fn put_chain_id(&mut self, chain_id: String) {
self.put_raw(state_key::data::chain_id().into(), chain_id.into_bytes());
}
async fn put_block_transaction(
&mut self,
height: u64,
transaction: penumbra_proto::core::transaction::v1::Transaction,
) -> Result<()> {
let mut transactions_response = self.transactions_by_height(height).await?;
transactions_response.transactions = transactions_response
.transactions
.into_iter()
.chain(std::iter::once(transaction))
.collect();
self.nonverifiable_put_raw(
state_key::cometbft_data::transactions_by_height(height).into(),
transactions_response.encode_to_vec(),
);
Ok(())
}
fn put_app_params(&mut self, params: AppParameters) {
let AppParameters {
chain_id,
auction_params,
community_pool_params,
distributions_params,
fee_params,
funding_params,
governance_params,
ibc_params,
sct_params,
shielded_pool_params,
stake_params,
dex_params,
} = params;
std::mem::drop(chain_id);
self.put_auction_params(auction_params);
self.put_community_pool_params(community_pool_params);
self.put_distributions_params(distributions_params);
self.put_fee_params(fee_params);
self.put_funding_params(funding_params);
self.put_governance_params(governance_params);
self.put_ibc_params(ibc_params);
self.put_sct_params(sct_params);
self.put_shielded_pool_params(shielded_pool_params);
self.put_stake_params(stake_params);
self.put_dex_params(dex_params);
}
}
impl<T: StateWrite + ?Sized> StateWriteExt for T {}