penumbra_sdk_ibc/component/rpc/
consensus_query.rs

1use crate::prefix::MerklePrefixExt;
2use crate::IBC_COMMITMENT_PREFIX;
3use async_trait::async_trait;
4use ibc_proto::ibc::core::channel::v1::query_server::Query as ConsensusQuery;
5use ibc_proto::ibc::core::channel::v1::{
6    Channel, PacketState, QueryChannelClientStateRequest, QueryChannelClientStateResponse,
7    QueryChannelConsensusStateRequest, QueryChannelConsensusStateResponse,
8    QueryChannelParamsRequest, QueryChannelParamsResponse, QueryChannelRequest,
9    QueryChannelResponse, QueryChannelsRequest, QueryChannelsResponse,
10    QueryConnectionChannelsRequest, QueryConnectionChannelsResponse,
11    QueryNextSequenceReceiveRequest, QueryNextSequenceReceiveResponse,
12    QueryNextSequenceSendRequest, QueryNextSequenceSendResponse, QueryPacketAcknowledgementRequest,
13    QueryPacketAcknowledgementResponse, QueryPacketAcknowledgementsRequest,
14    QueryPacketAcknowledgementsResponse, QueryPacketCommitmentRequest,
15    QueryPacketCommitmentResponse, QueryPacketCommitmentsRequest, QueryPacketCommitmentsResponse,
16    QueryPacketReceiptRequest, QueryPacketReceiptResponse, QueryUnreceivedAcksRequest,
17    QueryUnreceivedAcksResponse, QueryUnreceivedPacketsRequest, QueryUnreceivedPacketsResponse,
18    QueryUpgradeErrorRequest, QueryUpgradeErrorResponse, QueryUpgradeRequest, QueryUpgradeResponse,
19};
20use ibc_proto::ibc::core::client::v1::{Height, IdentifiedClientState};
21use ibc_types::path::{
22    AckPath, ChannelEndPath, ClientConsensusStatePath, ClientStatePath, CommitmentPath,
23    ReceiptPath, SeqRecvPath, SeqSendPath,
24};
25use ibc_types::DomainType;
26
27use ibc_types::core::channel::{ChannelId, IdentifiedChannelEnd, PortId};
28
29use ibc_types::core::connection::ConnectionId;
30use prost::Message;
31
32use std::str::FromStr;
33
34use crate::component::{ChannelStateReadExt, ConnectionStateReadExt, HostInterface};
35
36use super::utils::determine_snapshot_from_metadata;
37use super::IbcQuery;
38
39#[async_trait]
40impl<HI: HostInterface + Send + Sync + 'static> ConsensusQuery for IbcQuery<HI> {
41    /// Channel queries an IBC Channel.
42    #[tracing::instrument(skip(self), err, level = "debug")]
43    async fn channel(
44        &self,
45        request: tonic::Request<QueryChannelRequest>,
46    ) -> std::result::Result<tonic::Response<QueryChannelResponse>, tonic::Status> {
47        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
48            Err(err) => return Err(tonic::Status::aborted(
49                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
50            )),
51            Ok(snapshot) => snapshot,
52        };
53        let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
54            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
55        let port_id = PortId::from_str(request.get_ref().port_id.as_str())
56            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
57
58        let (channel, proof) = snapshot
59            .get_with_proof(
60                IBC_COMMITMENT_PREFIX
61                    .apply_string(ChannelEndPath::new(&port_id, &channel_id).to_string())
62                    .as_bytes()
63                    .to_vec(),
64            )
65            .await
66            .map(|res| {
67                let channel = res
68                    .0
69                    .map(|chan_bytes| Channel::decode(chan_bytes.as_ref()))
70                    .transpose();
71
72                (channel, res.1)
73            })
74            .map_err(|e| tonic::Status::aborted(format!("couldn't get channel: {e}")))?;
75
76        let channel =
77            channel.map_err(|e| tonic::Status::aborted(format!("couldn't decode channel: {e}")))?;
78
79        let res = QueryChannelResponse {
80            channel,
81            proof: proof.encode_to_vec(),
82            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
83                revision_height: HI::get_block_height(&snapshot)
84                    .await
85                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
86                    + 1,
87                revision_number: HI::get_revision_number(&snapshot)
88                    .await
89                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
90            }),
91        };
92
93        Ok(tonic::Response::new(res))
94    }
95    /// Channels queries all the IBC channels of a chain.
96    #[tracing::instrument(skip(self), err, level = "debug")]
97    async fn channels(
98        &self,
99        _request: tonic::Request<QueryChannelsRequest>,
100    ) -> std::result::Result<tonic::Response<QueryChannelsResponse>, tonic::Status> {
101        let snapshot = self.storage.latest_snapshot();
102
103        let height = Height {
104            revision_number: HI::get_revision_number(&snapshot)
105                .await
106                .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
107            revision_height: HI::get_block_height(&snapshot)
108                .await
109                .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
110        };
111
112        let channel_counter = snapshot
113            .get_channel_counter()
114            .await
115            .map_err(|e| tonic::Status::aborted(format!("couldn't get channel counter: {e}")))?;
116
117        let mut channels = vec![];
118        for chan_idx in 0..channel_counter {
119            let chan_id = ChannelId(format!("channel-{}", chan_idx));
120            let channel = snapshot
121                .get_channel(&chan_id, &PortId::transfer())
122                .await
123                .map_err(|e| {
124                    tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
125                })?
126                .ok_or("unable to get channel")
127                .map_err(|e| {
128                    tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
129                })?;
130
131            let id_chan = IdentifiedChannelEnd {
132                channel_id: chan_id,
133                port_id: PortId::transfer(),
134                channel_end: channel,
135                upgrade_sequence: 0,
136            };
137            channels.push(id_chan.into());
138        }
139
140        let res = QueryChannelsResponse {
141            channels,
142            pagination: None,
143            height: Some(height),
144        };
145
146        Ok(tonic::Response::new(res))
147    }
148
149    /// ConnectionChannels queries all the channels associated with a connection end.
150    #[tracing::instrument(skip(self), err, level = "debug")]
151    async fn connection_channels(
152        &self,
153        request: tonic::Request<QueryConnectionChannelsRequest>,
154    ) -> std::result::Result<tonic::Response<QueryConnectionChannelsResponse>, tonic::Status> {
155        let snapshot = self.storage.latest_snapshot();
156        let height = Height {
157            revision_number: HI::get_revision_number(&snapshot)
158                .await
159                .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
160            revision_height: HI::get_block_height(&snapshot)
161                .await
162                .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
163        };
164
165        let request = request.get_ref();
166        let connection_id: ConnectionId = ConnectionId::from_str(&request.connection)
167            .map_err(|e| tonic::Status::aborted(format!("invalid connection id: {e}")))?;
168
169        // look up all of the channels for this connection
170        let channel_counter = snapshot
171            .get_channel_counter()
172            .await
173            .map_err(|e| tonic::Status::aborted(format!("couldn't get channel counter: {e}")))?;
174
175        let mut channels = vec![];
176        for chan_idx in 0..channel_counter {
177            let chan_id = ChannelId(format!("channel-{}", chan_idx));
178            let channel = snapshot
179                .get_channel(&chan_id, &PortId::transfer())
180                .await
181                .map_err(|e| {
182                    tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
183                })?
184                .ok_or("unable to get channel")
185                .map_err(|e| {
186                    tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
187                })?;
188            if channel.connection_hops.contains(&connection_id) {
189                let id_chan = IdentifiedChannelEnd {
190                    channel_id: chan_id,
191                    port_id: PortId::transfer(),
192                    channel_end: channel,
193                    upgrade_sequence: 0,
194                };
195                channels.push(id_chan.into());
196            }
197        }
198
199        let res = QueryConnectionChannelsResponse {
200            channels,
201            pagination: None,
202            height: Some(height),
203        };
204
205        Ok(tonic::Response::new(res))
206    }
207
208    /// ChannelClientState queries for the client state for the channel associated
209    /// with the provided channel identifiers.
210    #[tracing::instrument(skip(self), err, level = "debug")]
211    async fn channel_client_state(
212        &self,
213        request: tonic::Request<QueryChannelClientStateRequest>,
214    ) -> std::result::Result<tonic::Response<QueryChannelClientStateResponse>, tonic::Status> {
215        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
216            Err(err) => return Err(tonic::Status::aborted(
217                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
218            )),
219            Ok(snapshot) => snapshot,
220        };
221
222        // 1. get the channel
223        let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
224            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
225        let port_id = PortId::from_str(request.get_ref().port_id.as_str())
226            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
227
228        let channel = snapshot
229            .get_channel(&channel_id, &port_id)
230            .await
231            .map_err(|e| {
232                tonic::Status::aborted(format!(
233                    "couldn't get channel {channel_id} for port {port_id}: {e}"
234                ))
235            })?
236            .ok_or("unable to get channel")
237            .map_err(|e| {
238                tonic::Status::aborted(format!(
239                    "couldn't get channel {channel_id} for port {port_id}: {e}"
240                ))
241            })?;
242
243        // 2. get the connection for the channel
244        let connection_id = channel
245            .connection_hops
246            .first()
247            .ok_or("channel has no connection hops")
248            .map_err(|e| {
249                tonic::Status::aborted(format!(
250                    "couldn't get connection for channel {channel_id} for port {port_id}: {e}"
251                ))
252            })?;
253
254        let connection = snapshot.get_connection(&connection_id).await.map_err(|e| {
255            tonic::Status::aborted(format!(
256                "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
257            ))
258        })?.ok_or("unable to get connection").map_err(|e| {
259            tonic::Status::aborted(format!(
260                "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
261            ))
262        })?;
263
264        // 3. get the client state for the connection
265        let (client_state, proof) = snapshot
266            .get_with_proof(
267                IBC_COMMITMENT_PREFIX
268                    .apply_string(ClientStatePath::new(&connection.client_id).to_string())
269                    .as_bytes()
270                    .to_vec(),
271            )
272            .await
273            .map_err(|e| tonic::Status::aborted(format!("couldn't get client state: {e}")))?;
274
275        let client_state_any = client_state
276            .map(|cs_bytes| ibc_proto::google::protobuf::Any::decode(cs_bytes.as_ref()))
277            .transpose()
278            .map_err(|e| tonic::Status::aborted(format!("couldn't decode client state: {e}")))?;
279
280        let identified_client_state = IdentifiedClientState {
281            client_id: connection.client_id.clone().to_string(),
282            client_state: client_state_any,
283        };
284
285        Ok(tonic::Response::new(QueryChannelClientStateResponse {
286            identified_client_state: Some(identified_client_state),
287            proof: proof.encode_to_vec(),
288            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
289                revision_height: HI::get_block_height(&snapshot)
290                    .await
291                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
292                    + 1,
293                revision_number: HI::get_revision_number(&snapshot)
294                    .await
295                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
296            }),
297        }))
298    }
299    /// ChannelConsensusState queries for the consensus state for the channel
300    /// associated with the provided channel identifiers.
301    #[tracing::instrument(skip(self), err, level = "debug")]
302    async fn channel_consensus_state(
303        &self,
304        request: tonic::Request<QueryChannelConsensusStateRequest>,
305    ) -> std::result::Result<tonic::Response<QueryChannelConsensusStateResponse>, tonic::Status>
306    {
307        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
308            Err(err) => return Err(tonic::Status::aborted(
309                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
310            )),
311            Ok(snapshot) => snapshot,
312        };
313        let consensus_state_height = ibc_types::core::client::Height {
314            revision_number: request.get_ref().revision_number,
315            revision_height: request.get_ref().revision_height,
316        };
317
318        // 1. get the channel
319        let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
320            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
321        let port_id = PortId::from_str(request.get_ref().port_id.as_str())
322            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
323
324        let channel = snapshot
325            .get_channel(&channel_id, &port_id)
326            .await
327            .map_err(|e| {
328                tonic::Status::aborted(format!(
329                    "couldn't get channel {channel_id} for port {port_id}: {e}"
330                ))
331            })?
332            .ok_or("unable to get channel")
333            .map_err(|e| {
334                tonic::Status::aborted(format!(
335                    "couldn't get channel {channel_id} for port {port_id}: {e}"
336                ))
337            })?;
338
339        // 2. get the connection for the channel
340        let connection_id = channel
341            .connection_hops
342            .first()
343            .ok_or("channel has no connection hops")
344            .map_err(|e| {
345                tonic::Status::aborted(format!(
346                    "couldn't get connection for channel {channel_id} for port {port_id}: {e}"
347                ))
348            })?;
349
350        let connection = snapshot.get_connection(&connection_id).await.map_err(|e| {
351            tonic::Status::aborted(format!(
352                "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
353            ))
354        })?.ok_or("unable to get connection").map_err(|e| {
355            tonic::Status::aborted(format!(
356                "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
357            ))
358        })?;
359
360        // 3. get the consensus state for the connection
361        let (consensus_state, proof) = snapshot
362            .get_with_proof(
363                IBC_COMMITMENT_PREFIX
364                    .apply_string(
365                        ClientConsensusStatePath::new(
366                            &connection.client_id,
367                            &consensus_state_height,
368                        )
369                        .to_string(),
370                    )
371                    .as_bytes()
372                    .to_vec(),
373            )
374            .await
375            .map_err(|e| tonic::Status::aborted(format!("couldn't get client state: {e}")))?;
376
377        let consensus_state_any = consensus_state
378            .map(|cs_bytes| ibc_proto::google::protobuf::Any::decode(cs_bytes.as_ref()))
379            .transpose()
380            .map_err(|e| tonic::Status::aborted(format!("couldn't decode client state: {e}")))?;
381
382        Ok(tonic::Response::new(QueryChannelConsensusStateResponse {
383            consensus_state: consensus_state_any,
384            client_id: connection.client_id.clone().to_string(),
385            proof: proof.encode_to_vec(),
386            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
387                revision_height: HI::get_block_height(&snapshot)
388                    .await
389                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
390                    + 1,
391                revision_number: HI::get_revision_number(&snapshot)
392                    .await
393                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
394            }),
395        }))
396    }
397    /// PacketCommitment queries a stored packet commitment hash.
398    #[tracing::instrument(skip(self), err, level = "debug")]
399    async fn packet_commitment(
400        &self,
401        request: tonic::Request<QueryPacketCommitmentRequest>,
402    ) -> std::result::Result<tonic::Response<QueryPacketCommitmentResponse>, tonic::Status> {
403        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
404            Err(err) => return Err(tonic::Status::aborted(
405                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
406            )),
407            Ok(snapshot) => snapshot,
408        };
409
410        let port_id = PortId::from_str(&request.get_ref().port_id)
411            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
412        let channel_id = ChannelId::from_str(&request.get_ref().channel_id)
413            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
414
415        let (commitment, proof) = snapshot
416            .get_with_proof(
417                IBC_COMMITMENT_PREFIX
418                    .apply_string(
419                        CommitmentPath::new(
420                            &port_id,
421                            &channel_id,
422                            request.get_ref().sequence.into(),
423                        )
424                        .to_string(),
425                    )
426                    .as_bytes()
427                    .to_vec(),
428            )
429            .await
430            .map_err(|e| tonic::Status::aborted(format!("couldn't get packet commitment: {e}")))?;
431
432        let commitment =
433            commitment.ok_or_else(|| tonic::Status::aborted("commitment not found"))?;
434
435        Ok(tonic::Response::new(QueryPacketCommitmentResponse {
436            commitment,
437            proof: proof.encode_to_vec(),
438            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
439                revision_height: HI::get_block_height(&snapshot)
440                    .await
441                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
442                    + 1,
443                revision_number: HI::get_revision_number(&snapshot)
444                    .await
445                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
446            }),
447        }))
448    }
449    /// PacketCommitments returns all the packet commitments hashes associated
450    /// with a channel.
451    #[tracing::instrument(skip(self), err, level = "debug")]
452    async fn packet_commitments(
453        &self,
454        request: tonic::Request<QueryPacketCommitmentsRequest>,
455    ) -> std::result::Result<tonic::Response<QueryPacketCommitmentsResponse>, tonic::Status> {
456        let snapshot = self.storage.latest_snapshot();
457        let height = snapshot.version();
458        let request = request.get_ref();
459
460        let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
461            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
462        let port_id: PortId = PortId::from_str(&request.port_id)
463            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
464
465        let mut commitment_states = vec![];
466        let commitment_counter = snapshot
467            .get_send_sequence(&chan_id, &port_id)
468            .await
469            .map_err(|e| {
470                tonic::Status::aborted(format!(
471                    "couldn't get send sequence for channel {chan_id} and port {port_id}: {e}"
472                ))
473            })?;
474
475        // this starts at 1; the first commitment index is 1 (from ibc spec)
476        for commitment_idx in 1..commitment_counter {
477            let commitment = snapshot
478                .get_packet_commitment_by_id(&chan_id, &port_id, commitment_idx)
479                .await.map_err(|e| {
480                    tonic::Status::aborted(format!(
481                        "couldn't get packet commitment for channel {chan_id} and port {port_id} at index {commitment_idx}: {e}"
482                    ))
483                })?;
484            if commitment.is_none() {
485                continue;
486            }
487            let commitment = commitment.expect("commitment existence was checked earlier");
488
489            let commitment_state = PacketState {
490                port_id: request.port_id.clone(),
491                channel_id: request.channel_id.clone(),
492                sequence: commitment_idx,
493                data: commitment.clone(),
494            };
495
496            commitment_states.push(commitment_state);
497        }
498
499        let height = Height {
500            revision_number: 0,
501            revision_height: height,
502        };
503
504        let res = QueryPacketCommitmentsResponse {
505            commitments: commitment_states,
506            pagination: None,
507            height: Some(height),
508        };
509
510        Ok(tonic::Response::new(res))
511    }
512    /// PacketReceipt queries if a given packet sequence has been received on the
513    /// queried chain
514    #[tracing::instrument(skip(self), err, level = "debug")]
515    async fn packet_receipt(
516        &self,
517        request: tonic::Request<QueryPacketReceiptRequest>,
518    ) -> std::result::Result<tonic::Response<QueryPacketReceiptResponse>, tonic::Status> {
519        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
520            Err(err) => return Err(tonic::Status::aborted(
521                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
522            )),
523            Ok(snapshot) => snapshot,
524        };
525
526        let port_id = PortId::from_str(&request.get_ref().port_id)
527            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
528        let channel_id = ChannelId::from_str(&request.get_ref().channel_id)
529            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
530
531        let (receipt, proof) = snapshot
532            .get_with_proof(
533                IBC_COMMITMENT_PREFIX
534                    .apply_string(
535                        ReceiptPath::new(&port_id, &channel_id, request.get_ref().sequence.into())
536                            .to_string(),
537                    )
538                    .as_bytes()
539                    .to_vec(),
540            )
541            .await
542            .map_err(|e| tonic::Status::aborted(format!("couldn't get packet commitment: {e}")))?;
543
544        Ok(tonic::Response::new(QueryPacketReceiptResponse {
545            received: receipt.is_some(),
546            proof: proof.encode_to_vec(),
547            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
548                revision_height: HI::get_block_height(&snapshot)
549                    .await
550                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
551                    + 1,
552                revision_number: HI::get_revision_number(&snapshot)
553                    .await
554                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
555            }),
556        }))
557    }
558    /// PacketAcknowledgement queries a stored packet acknowledgement hash.
559    #[tracing::instrument(skip(self), err, level = "debug")]
560    async fn packet_acknowledgement(
561        &self,
562        request: tonic::Request<QueryPacketAcknowledgementRequest>,
563    ) -> std::result::Result<tonic::Response<QueryPacketAcknowledgementResponse>, tonic::Status>
564    {
565        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
566            Err(err) => return Err(tonic::Status::aborted(
567                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
568            )),
569            Ok(snapshot) => snapshot,
570        };
571
572        let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
573            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
574        let port_id = PortId::from_str(request.get_ref().port_id.as_str())
575            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
576
577        let (acknowledgement, proof) = snapshot
578            .get_with_proof(
579                IBC_COMMITMENT_PREFIX
580                    .apply_string(
581                        AckPath::new(&port_id, &channel_id, request.get_ref().sequence.into())
582                            .to_string(),
583                    )
584                    .as_bytes()
585                    .to_vec(),
586            )
587            .await
588            .map_err(|e| {
589                tonic::Status::aborted(format!("couldn't get packet acknowledgement: {e}"))
590            })?;
591
592        let acknowledgement =
593            acknowledgement.ok_or_else(|| tonic::Status::aborted("acknowledgement not found"))?;
594
595        Ok(tonic::Response::new(QueryPacketAcknowledgementResponse {
596            acknowledgement,
597            proof: proof.encode_to_vec(),
598            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
599                revision_height: HI::get_block_height(&snapshot)
600                    .await
601                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
602                    + 1,
603                revision_number: HI::get_revision_number(&snapshot)
604                    .await
605                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
606            }),
607        }))
608    }
609    /// PacketAcknowledgements returns all the packet acknowledgements associated
610    /// with a channel.
611    #[tracing::instrument(skip(self), err, level = "debug")]
612    async fn packet_acknowledgements(
613        &self,
614        request: tonic::Request<QueryPacketAcknowledgementsRequest>,
615    ) -> std::result::Result<tonic::Response<QueryPacketAcknowledgementsResponse>, tonic::Status>
616    {
617        let snapshot = self.storage.latest_snapshot();
618        let height = Height {
619            revision_number: 0,
620            revision_height: snapshot.version(),
621        };
622        let request = request.get_ref();
623
624        let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
625            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
626        let port_id: PortId = PortId::from_str(&request.port_id)
627            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
628
629        let ack_counter = snapshot
630            .get_ack_sequence(&chan_id, &port_id)
631            .await
632            .map_err(|e| {
633                tonic::Status::aborted(format!(
634                    "couldn't get ack sequence for channel {chan_id} and port {port_id}: {e}"
635                ))
636            })?;
637
638        let mut acks = vec![];
639        for ack_idx in 0..ack_counter {
640            let maybe_ack = snapshot
641                .get_packet_acknowledgement(&port_id, &chan_id, ack_idx)
642                .await.map_err(|e| {
643                    tonic::Status::aborted(format!(
644                        "couldn't get packet acknowledgement for channel {chan_id} and port {port_id} at index {ack_idx}: {e}"
645                    ))
646                })?;
647
648            // Only include the ack if it was found; otherwise, signal lack of
649            // by omitting it from the response.
650            if let Some(ack) = maybe_ack {
651                let ack_state = PacketState {
652                    port_id: request.port_id.clone(),
653                    channel_id: request.channel_id.clone(),
654                    sequence: ack_idx,
655                    data: ack.clone(),
656                };
657
658                acks.push(ack_state);
659            }
660        }
661
662        let res = QueryPacketAcknowledgementsResponse {
663            acknowledgements: acks,
664            pagination: None,
665            height: Some(height),
666        };
667
668        Ok(tonic::Response::new(res))
669    }
670    /// UnreceivedPackets returns all the unreceived IBC packets associated with a
671    /// channel and sequences.
672    #[tracing::instrument(skip(self), err, level = "debug")]
673    async fn unreceived_packets(
674        &self,
675        request: tonic::Request<QueryUnreceivedPacketsRequest>,
676    ) -> std::result::Result<tonic::Response<QueryUnreceivedPacketsResponse>, tonic::Status> {
677        let snapshot = self.storage.latest_snapshot();
678        let height = snapshot.version();
679        let request = request.get_ref();
680
681        let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
682            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
683        let port_id: PortId = PortId::from_str(&request.port_id)
684            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
685
686        let mut unreceived_seqs = vec![];
687
688        for seq in request.packet_commitment_sequences.clone() {
689            if seq == 0 {
690                return Err(tonic::Status::aborted(format!(
691                    "packet sequence {} cannot be 0",
692                    seq
693                )));
694            }
695
696            if !snapshot
697                .seen_packet_by_channel(&chan_id, &port_id, seq)
698                .await.map_err(|e| {
699                    tonic::Status::aborted(format!(
700                        "couldn't get packet commitment for channel {chan_id} and port {port_id} at index {seq}: {e}"
701                    ))
702                })?
703            {
704                unreceived_seqs.push(seq);
705            }
706        }
707
708        let height = Height {
709            revision_number: 0,
710            revision_height: height,
711        };
712
713        let res = QueryUnreceivedPacketsResponse {
714            sequences: unreceived_seqs,
715            height: Some(height),
716        };
717
718        Ok(tonic::Response::new(res))
719    }
720    /// UnreceivedAcks returns all the unreceived IBC acknowledgements associated
721    /// with a channel and sequences.
722    #[tracing::instrument(skip(self), err, level = "debug")]
723    async fn unreceived_acks(
724        &self,
725        request: tonic::Request<QueryUnreceivedAcksRequest>,
726    ) -> std::result::Result<tonic::Response<QueryUnreceivedAcksResponse>, tonic::Status> {
727        let snapshot = self.storage.latest_snapshot();
728        let height = Height {
729            revision_number: 0,
730            revision_height: snapshot.version(),
731        };
732        let request = request.get_ref();
733
734        let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
735            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
736        let port_id: PortId = PortId::from_str(&request.port_id)
737            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
738
739        let mut unreceived_seqs = vec![];
740
741        for seq in request.packet_ack_sequences.clone() {
742            if seq == 0 {
743                return Err(tonic::Status::aborted(format!(
744                    "packet sequence {} cannot be 0",
745                    seq
746                )));
747            }
748
749            if snapshot
750                .get_packet_commitment_by_id(&chan_id, &port_id, seq)
751                .await.map_err(|e| {
752                    tonic::Status::aborted(format!(
753                        "couldn't get packet commitment for channel {chan_id} and port {port_id} at index {seq}: {e}"
754                    ))
755                })?
756                .is_some()
757            {
758                unreceived_seqs.push(seq);
759            }
760        }
761
762        let res = QueryUnreceivedAcksResponse {
763            sequences: unreceived_seqs,
764            height: Some(height),
765        };
766
767        Ok(tonic::Response::new(res))
768    }
769
770    /// NextSequenceReceive returns the next receive sequence for a given channel.
771    #[tracing::instrument(skip(self), err, level = "debug")]
772    async fn next_sequence_receive(
773        &self,
774        request: tonic::Request<QueryNextSequenceReceiveRequest>,
775    ) -> std::result::Result<tonic::Response<QueryNextSequenceReceiveResponse>, tonic::Status> {
776        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
777            Err(err) => return Err(tonic::Status::aborted(
778                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
779            )),
780            Ok(snapshot) => snapshot,
781        };
782
783        let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
784            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
785        let port_id = PortId::from_str(request.get_ref().port_id.as_str())
786            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
787
788        let (next_recv_sequence, proof) = snapshot
789            .get_with_proof(
790                IBC_COMMITMENT_PREFIX
791                    .apply_string(SeqRecvPath::new(&port_id, &channel_id).to_string())
792                    .as_bytes()
793                    .to_vec(),
794            )
795            .await
796            .map_err(|e| tonic::Status::aborted(format!("couldn't get channel: {e}")))?;
797
798        let next_recv_sequence = next_recv_sequence
799            .map(|seq_bytes| u64::from_be_bytes(seq_bytes.try_into().expect("invalid sequence")))
800            .ok_or_else(|| tonic::Status::aborted("next receive sequence not found"))?;
801
802        Ok(tonic::Response::new(QueryNextSequenceReceiveResponse {
803            next_sequence_receive: next_recv_sequence,
804            proof: proof.encode_to_vec(),
805            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
806                revision_height: HI::get_block_height(&snapshot)
807                    .await
808                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
809                    + 1,
810                revision_number: HI::get_revision_number(&snapshot)
811                    .await
812                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
813            }),
814        }))
815    }
816
817    /// Returns the next send sequence for a given channel.
818    #[tracing::instrument(skip(self), err, level = "debug")]
819    async fn next_sequence_send(
820        &self,
821        request: tonic::Request<QueryNextSequenceSendRequest>,
822    ) -> std::result::Result<tonic::Response<QueryNextSequenceSendResponse>, tonic::Status> {
823        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
824            Err(err) => return Err(tonic::Status::aborted(
825                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
826            )),
827            Ok(snapshot) => snapshot,
828        };
829
830        let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
831            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
832        let port_id = PortId::from_str(request.get_ref().port_id.as_str())
833            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
834
835        let (next_send_sequence, proof) = snapshot
836            .get_with_proof(
837                IBC_COMMITMENT_PREFIX
838                    .apply_string(SeqSendPath::new(&port_id, &channel_id).to_string())
839                    .as_bytes()
840                    .to_vec(),
841            )
842            .await
843            .map_err(|e| tonic::Status::aborted(format!("couldn't get channel: {e}")))?;
844
845        let next_send_sequence = next_send_sequence
846            .map(|seq_bytes| u64::from_be_bytes(seq_bytes.try_into().expect("invalid sequence")))
847            .ok_or_else(|| tonic::Status::aborted("next receive sequence not found"))?;
848
849        Ok(tonic::Response::new(QueryNextSequenceSendResponse {
850            next_sequence_send: next_send_sequence,
851            proof: proof.encode_to_vec(),
852            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
853                revision_height: HI::get_block_height(&snapshot)
854                    .await
855                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
856                    + 1,
857                revision_number: HI::get_revision_number(&snapshot)
858                    .await
859                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
860            }),
861        }))
862    }
863
864    #[tracing::instrument(skip(self), err, level = "debug")]
865    async fn channel_params(
866        &self,
867        _request: tonic::Request<QueryChannelParamsRequest>,
868    ) -> std::result::Result<tonic::Response<QueryChannelParamsResponse>, tonic::Status> {
869        Err(tonic::Status::unimplemented("not implemented"))
870    }
871
872    #[tracing::instrument(skip(self), err, level = "debug")]
873    async fn upgrade(
874        &self,
875        _request: tonic::Request<QueryUpgradeRequest>,
876    ) -> std::result::Result<tonic::Response<QueryUpgradeResponse>, tonic::Status> {
877        Err(tonic::Status::unimplemented("not implemented"))
878    }
879
880    #[tracing::instrument(skip(self), err, level = "debug")]
881    async fn upgrade_error(
882        &self,
883        _request: tonic::Request<QueryUpgradeErrorRequest>,
884    ) -> std::result::Result<tonic::Response<QueryUpgradeErrorResponse>, tonic::Status> {
885        Err(tonic::Status::unimplemented("not implemented"))
886    }
887}