penumbra_sdk_ibc/component/msg_handler/
recv_packet.rs1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use cnidarium::StateWrite;
4use ibc_types::core::{
5 channel::{
6 channel::{Order as ChannelOrder, State as ChannelState},
7 events,
8 msgs::MsgRecvPacket,
9 PortId,
10 },
11 client::Height as IBCHeight,
12 connection::State as ConnectionState,
13};
14
15use crate::component::{
16 app_handler::{AppHandlerCheck, AppHandlerExecute},
17 channel::{StateReadExt as _, StateWriteExt},
18 connection::StateReadExt as _,
19 proof_verification::PacketProofVerifier,
20 HostInterface, MsgHandler,
21};
22
23#[async_trait]
24impl MsgHandler for MsgRecvPacket {
25 async fn check_stateless<AH>(&self) -> Result<()> {
26 Ok(())
29 }
30
31 async fn try_execute<
32 S: StateWrite,
33 AH: AppHandlerCheck + AppHandlerExecute,
34 HI: HostInterface,
35 >(
36 &self,
37 mut state: S,
38 ) -> Result<()> {
39 tracing::debug!(msg = ?self);
40 tracing::debug!(data = ?String::from_utf8_lossy(&self.packet.data));
41 let channel = state
42 .get_channel(&self.packet.chan_on_b, &self.packet.port_on_b)
43 .await?
44 .ok_or_else(|| anyhow::anyhow!("channel not found"))?;
45 if !channel.state_matches(&ChannelState::Open) {
46 anyhow::bail!("channel is not open");
47 }
48
49 if self.packet.port_on_a != channel.counterparty().port_id {
52 anyhow::bail!("packet source port does not match channel");
53 }
54 let counterparty_channel = channel
55 .counterparty()
56 .channel_id()
57 .ok_or_else(|| anyhow::anyhow!("missing channel id"))?;
58
59 if self.packet.chan_on_a.ne(counterparty_channel) {
60 anyhow::bail!("packet source channel does not match channel");
61 }
62
63 let connection = state
64 .get_connection(&channel.connection_hops[0])
65 .await?
66 .ok_or_else(|| anyhow::anyhow!("connection not found for channel"))?;
67
68 if !connection.state_matches(&ConnectionState::Open) {
69 anyhow::bail!("connection for channel is not open");
70 }
71
72 let block_height = HI::get_block_height(&state).await?;
73 let height = IBCHeight::new(HI::get_revision_number(&state).await?, block_height)?;
74
75 if self.packet.timeout_height_on_b.has_expired(height) {
76 anyhow::bail!("packet has timed out");
77 }
78
79 let packet_timeout = self.packet.timeout_timestamp_on_b.into_tm_time();
80
81 if let Some(packet_timeout) = packet_timeout {
84 let block_time = HI::get_block_timestamp(&state).await?;
85 if block_time >= packet_timeout {
86 anyhow::bail!(
87 "packet has timed out: block time {:?} >= packet timeout {:?}",
88 block_time,
89 packet_timeout
90 );
91 }
92 }
93
94 state
95 .verify_packet_recv_proof::<HI>(&connection, self)
96 .await
97 .with_context(|| format!("packet {:?} failed to verify", self.packet))?;
98
99 if channel.ordering == ChannelOrder::Ordered {
100 let next_sequence_recv = state
101 .get_recv_sequence(&self.packet.chan_on_b, &self.packet.port_on_b)
102 .await?;
103
104 if self.packet.sequence != next_sequence_recv.into() {
105 anyhow::bail!("packet sequence number does not match");
106 }
107 } else if state.seen_packet(&self.packet).await? {
108 anyhow::bail!("packet has already been processed");
109 }
110
111 let transfer = PortId::transfer();
112 if self.packet.port_on_b == transfer {
113 AH::recv_packet_check(&mut state, self).await?;
114 } else {
115 anyhow::bail!("invalid port id");
116 }
117
118 if channel.ordering == ChannelOrder::Ordered {
119 let mut next_sequence_recv = state
120 .get_recv_sequence(&self.packet.chan_on_b, &self.packet.port_on_b)
121 .await?;
122
123 next_sequence_recv += 1;
124 state.put_recv_sequence(
125 &self.packet.chan_on_b,
126 &self.packet.port_on_b,
127 next_sequence_recv,
128 );
129 } else {
130 state.put_packet_receipt(&self.packet);
134 }
135
136 state.record(
137 events::packet::ReceivePacket {
138 packet_data: self.packet.data.clone(),
139 timeout_height: self.packet.timeout_height_on_b,
140 timeout_timestamp: self.packet.timeout_timestamp_on_b,
141 sequence: self.packet.sequence,
142 src_port_id: self.packet.port_on_a.clone(),
143 src_channel_id: self.packet.chan_on_a.clone(),
144 dst_port_id: self.packet.port_on_b.clone(),
145 dst_channel_id: self.packet.chan_on_b.clone(),
146 channel_ordering: channel.ordering,
147 dst_connection_id: channel.connection_hops[0].clone(),
148 }
149 .into(),
150 );
151
152 let transfer = PortId::transfer();
153 if self.packet.port_on_b == transfer {
155 AH::recv_packet_execute(state, self).await?;
156 } else {
157 anyhow::bail!("invalid port id");
158 }
159
160 Ok(())
161 }
162}