penumbra_ibc/component/msg_handler/
recv_packet.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
use anyhow::{Context, Result};
use async_trait::async_trait;
use cnidarium::StateWrite;
use ibc_types::core::{
    channel::{
        channel::{Order as ChannelOrder, State as ChannelState},
        events,
        msgs::MsgRecvPacket,
        PortId,
    },
    client::Height as IBCHeight,
    connection::State as ConnectionState,
};

use crate::component::{
    app_handler::{AppHandlerCheck, AppHandlerExecute},
    channel::{StateReadExt as _, StateWriteExt},
    connection::StateReadExt as _,
    proof_verification::PacketProofVerifier,
    HostInterface, MsgHandler,
};

#[async_trait]
impl MsgHandler for MsgRecvPacket {
    async fn check_stateless<AH>(&self) -> Result<()> {
        // NOTE: no additional stateless validation is possible

        Ok(())
    }

    async fn try_execute<
        S: StateWrite,
        AH: AppHandlerCheck + AppHandlerExecute,
        HI: HostInterface,
    >(
        &self,
        mut state: S,
    ) -> Result<()> {
        tracing::debug!(msg = ?self);
        tracing::debug!(data = ?String::from_utf8_lossy(&self.packet.data));
        let channel = state
            .get_channel(&self.packet.chan_on_b, &self.packet.port_on_b)
            .await?
            .ok_or_else(|| anyhow::anyhow!("channel not found"))?;
        if !channel.state_matches(&ChannelState::Open) {
            anyhow::bail!("channel is not open");
        }

        // TODO: capability authentication?

        if self.packet.port_on_a != channel.counterparty().port_id {
            anyhow::bail!("packet source port does not match channel");
        }
        let counterparty_channel = channel
            .counterparty()
            .channel_id()
            .ok_or_else(|| anyhow::anyhow!("missing channel id"))?;

        if self.packet.chan_on_a.ne(counterparty_channel) {
            anyhow::bail!("packet source channel does not match channel");
        }

        let connection = state
            .get_connection(&channel.connection_hops[0])
            .await?
            .ok_or_else(|| anyhow::anyhow!("connection not found for channel"))?;

        if !connection.state_matches(&ConnectionState::Open) {
            anyhow::bail!("connection for channel is not open");
        }

        let block_height = HI::get_block_height(&state).await?;
        let height = IBCHeight::new(HI::get_revision_number(&state).await?, block_height)?;

        if self.packet.timeout_height_on_b.has_expired(height) {
            anyhow::bail!("packet has timed out");
        }

        let packet_timeout = self.packet.timeout_timestamp_on_b.into_tm_time();

        // TODO: is this correct logic?
        // If the packet has no timeout timestamp, what do we do?
        if let Some(packet_timeout) = packet_timeout {
            let block_time = HI::get_block_timestamp(&state).await?;
            if block_time >= packet_timeout {
                anyhow::bail!(
                    "packet has timed out: block time {:?} >= packet timeout {:?}",
                    block_time,
                    packet_timeout
                );
            }
        }

        state
            .verify_packet_recv_proof::<HI>(&connection, self)
            .await
            .with_context(|| format!("packet {:?} failed to verify", self.packet))?;

        if channel.ordering == ChannelOrder::Ordered {
            let next_sequence_recv = state
                .get_recv_sequence(&self.packet.chan_on_b, &self.packet.port_on_b)
                .await?;

            if self.packet.sequence != next_sequence_recv.into() {
                anyhow::bail!("packet sequence number does not match");
            }
        } else if state.seen_packet(&self.packet).await? {
            anyhow::bail!("packet has already been processed");
        }

        let transfer = PortId::transfer();
        if self.packet.port_on_b == transfer {
            AH::recv_packet_check(&mut state, self).await?;
        } else {
            anyhow::bail!("invalid port id");
        }

        if channel.ordering == ChannelOrder::Ordered {
            let mut next_sequence_recv = state
                .get_recv_sequence(&self.packet.chan_on_b, &self.packet.port_on_b)
                .await?;

            next_sequence_recv += 1;
            state.put_recv_sequence(
                &self.packet.chan_on_b,
                &self.packet.port_on_b,
                next_sequence_recv,
            );
        } else {
            // for unordered channels we must set the receipt so it can be verified on the other side
            // this receipt does not contain any data, since the packet has not yet been processed
            // it's just a single store key set to an empty string to indicate that the packet has been received
            state.put_packet_receipt(&self.packet);
        }

        state.record(
            events::packet::ReceivePacket {
                packet_data: self.packet.data.clone(),
                timeout_height: self.packet.timeout_height_on_b,
                timeout_timestamp: self.packet.timeout_timestamp_on_b,
                sequence: self.packet.sequence,
                src_port_id: self.packet.port_on_a.clone(),
                src_channel_id: self.packet.chan_on_a.clone(),
                dst_port_id: self.packet.port_on_b.clone(),
                dst_channel_id: self.packet.chan_on_b.clone(),
                channel_ordering: channel.ordering,
                dst_connection_id: channel.connection_hops[0].clone(),
            }
            .into(),
        );

        let transfer = PortId::transfer();
        // todo: should this be part of the app handler logic?
        if self.packet.port_on_b == transfer {
            AH::recv_packet_execute(state, self).await?;
        } else {
            anyhow::bail!("invalid port id");
        }

        Ok(())
    }
}