penumbra_sdk_ibc/component/rpc/
consensus_query.rs

1use crate::prefix::MerklePrefixExt;
2use crate::IBC_COMMITMENT_PREFIX;
3use async_trait::async_trait;
4use ibc_proto::ibc::core::channel::v1::query_server::Query as ConsensusQuery;
5use ibc_proto::ibc::core::channel::v1::{
6    Channel, PacketState, QueryChannelClientStateRequest, QueryChannelClientStateResponse,
7    QueryChannelConsensusStateRequest, QueryChannelConsensusStateResponse,
8    QueryChannelParamsRequest, QueryChannelParamsResponse, QueryChannelRequest,
9    QueryChannelResponse, QueryChannelsRequest, QueryChannelsResponse,
10    QueryConnectionChannelsRequest, QueryConnectionChannelsResponse,
11    QueryNextSequenceReceiveRequest, QueryNextSequenceReceiveResponse,
12    QueryNextSequenceSendRequest, QueryNextSequenceSendResponse, QueryPacketAcknowledgementRequest,
13    QueryPacketAcknowledgementResponse, QueryPacketAcknowledgementsRequest,
14    QueryPacketAcknowledgementsResponse, QueryPacketCommitmentRequest,
15    QueryPacketCommitmentResponse, QueryPacketCommitmentsRequest, QueryPacketCommitmentsResponse,
16    QueryPacketReceiptRequest, QueryPacketReceiptResponse, QueryUnreceivedAcksRequest,
17    QueryUnreceivedAcksResponse, QueryUnreceivedPacketsRequest, QueryUnreceivedPacketsResponse,
18    QueryUpgradeErrorRequest, QueryUpgradeErrorResponse, QueryUpgradeRequest, QueryUpgradeResponse,
19};
20use ibc_proto::ibc::core::client::v1::{Height, IdentifiedClientState};
21use ibc_types::path::{
22    AckPath, ChannelEndPath, ClientConsensusStatePath, ClientStatePath, CommitmentPath,
23    ReceiptPath, SeqRecvPath, SeqSendPath,
24};
25use ibc_types::DomainType;
26
27use ibc_types::core::channel::{ChannelId, IdentifiedChannelEnd, PortId};
28
29use ibc_types::core::connection::ConnectionId;
30use prost::Message;
31
32use std::str::FromStr;
33
34use crate::component::{ChannelStateReadExt, ConnectionStateReadExt, HostInterface};
35
36use super::utils::determine_snapshot_from_metadata;
37use super::IbcQuery;
38
39#[async_trait]
40impl<HI: HostInterface + Send + Sync + 'static> ConsensusQuery for IbcQuery<HI> {
41    /// Channel queries an IBC Channel.
42    #[tracing::instrument(skip(self), err, level = "debug")]
43    async fn channel(
44        &self,
45        request: tonic::Request<QueryChannelRequest>,
46    ) -> std::result::Result<tonic::Response<QueryChannelResponse>, tonic::Status> {
47        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
48            Err(err) => return Err(tonic::Status::aborted(
49                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
50            )),
51            Ok(snapshot) => snapshot,
52        };
53        let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
54            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
55        let port_id = PortId::from_str(request.get_ref().port_id.as_str())
56            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
57
58        let (channel, proof) = snapshot
59            .get_with_proof(
60                IBC_COMMITMENT_PREFIX
61                    .apply_string(ChannelEndPath::new(&port_id, &channel_id).to_string())
62                    .as_bytes()
63                    .to_vec(),
64            )
65            .await
66            .map(|res| {
67                let channel = res
68                    .0
69                    .map(|chan_bytes| Channel::decode(chan_bytes.as_ref()))
70                    .transpose();
71
72                (channel, res.1)
73            })
74            .map_err(|e| tonic::Status::aborted(format!("couldn't get channel: {e}")))?;
75
76        let channel =
77            channel.map_err(|e| tonic::Status::aborted(format!("couldn't decode channel: {e}")))?;
78
79        let res = QueryChannelResponse {
80            channel,
81            proof: proof.encode_to_vec(),
82            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
83                revision_height: HI::get_block_height(&snapshot)
84                    .await
85                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
86                    + 1,
87                revision_number: HI::get_revision_number(&snapshot)
88                    .await
89                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
90            }),
91        };
92
93        Ok(tonic::Response::new(res))
94    }
95    /// Channels queries all the IBC channels of a chain.
96    #[tracing::instrument(skip(self), err, level = "debug")]
97    async fn channels(
98        &self,
99        _request: tonic::Request<QueryChannelsRequest>,
100    ) -> std::result::Result<tonic::Response<QueryChannelsResponse>, tonic::Status> {
101        let snapshot = self.storage.latest_snapshot();
102
103        let height = Height {
104            revision_number: HI::get_revision_number(&snapshot)
105                .await
106                .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
107            revision_height: HI::get_block_height(&snapshot)
108                .await
109                .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
110        };
111
112        let channel_counter = snapshot
113            .get_channel_counter()
114            .await
115            .map_err(|e| tonic::Status::aborted(format!("couldn't get channel counter: {e}")))?;
116
117        let mut channels = vec![];
118        for chan_idx in 0..channel_counter {
119            let chan_id = ChannelId(format!("channel-{}", chan_idx));
120            let channel = snapshot
121                .get_channel(&chan_id, &PortId::transfer())
122                .await
123                .map_err(|e| {
124                    tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
125                })?
126                .ok_or("unable to get channel")
127                .map_err(|e| {
128                    tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
129                })?;
130
131            let id_chan = IdentifiedChannelEnd {
132                channel_id: chan_id,
133                port_id: PortId::transfer(),
134                channel_end: channel,
135                upgrade_sequence: 0,
136            };
137            channels.push(id_chan.into());
138        }
139
140        let res = QueryChannelsResponse {
141            channels,
142            pagination: None,
143            height: Some(height),
144        };
145
146        Ok(tonic::Response::new(res))
147    }
148
149    /// ConnectionChannels queries all the channels associated with a connection end.
150    #[tracing::instrument(skip(self), err, level = "debug")]
151    async fn connection_channels(
152        &self,
153        request: tonic::Request<QueryConnectionChannelsRequest>,
154    ) -> std::result::Result<tonic::Response<QueryConnectionChannelsResponse>, tonic::Status> {
155        let snapshot = self.storage.latest_snapshot();
156        let height = Height {
157            revision_number: HI::get_revision_number(&snapshot)
158                .await
159                .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
160            revision_height: HI::get_block_height(&snapshot)
161                .await
162                .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
163        };
164
165        let request = request.get_ref();
166        let connection_id: ConnectionId = ConnectionId::from_str(&request.connection)
167            .map_err(|e| tonic::Status::aborted(format!("invalid connection id: {e}")))?;
168
169        // look up all of the channels for this connection
170        let channel_counter = snapshot
171            .get_channel_counter()
172            .await
173            .map_err(|e| tonic::Status::aborted(format!("couldn't get channel counter: {e}")))?;
174
175        let mut channels = vec![];
176        for chan_idx in 0..channel_counter {
177            let chan_id = ChannelId(format!("channel-{}", chan_idx));
178            let channel = snapshot
179                .get_channel(&chan_id, &PortId::transfer())
180                .await
181                .map_err(|e| {
182                    tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
183                })?
184                .ok_or("unable to get channel")
185                .map_err(|e| {
186                    tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
187                })?;
188            if channel.connection_hops.contains(&connection_id) {
189                let id_chan = IdentifiedChannelEnd {
190                    channel_id: chan_id,
191                    port_id: PortId::transfer(),
192                    channel_end: channel,
193                    upgrade_sequence: 0,
194                };
195                channels.push(id_chan.into());
196            }
197        }
198
199        let res = QueryConnectionChannelsResponse {
200            channels,
201            pagination: None,
202            height: Some(height),
203        };
204
205        Ok(tonic::Response::new(res))
206    }
207
208    /// ChannelClientState queries for the client state for the channel associated
209    /// with the provided channel identifiers.
210    #[tracing::instrument(skip(self), err, level = "debug")]
211    async fn channel_client_state(
212        &self,
213        request: tonic::Request<QueryChannelClientStateRequest>,
214    ) -> std::result::Result<tonic::Response<QueryChannelClientStateResponse>, tonic::Status> {
215        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
216            Err(err) => return Err(tonic::Status::aborted(
217                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
218            )),
219            Ok(snapshot) => snapshot,
220        };
221
222        // 1. get the channel
223        let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
224            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
225        let port_id = PortId::from_str(request.get_ref().port_id.as_str())
226            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
227
228        let channel = snapshot
229            .get_channel(&channel_id, &port_id)
230            .await
231            .map_err(|e| {
232                tonic::Status::aborted(format!(
233                    "couldn't get channel {channel_id} for port {port_id}: {e}"
234                ))
235            })?
236            .ok_or("unable to get channel")
237            .map_err(|e| {
238                tonic::Status::aborted(format!(
239                    "couldn't get channel {channel_id} for port {port_id}: {e}"
240                ))
241            })?;
242
243        // 2. get the connection for the channel
244        let connection_id = channel
245            .connection_hops
246            .first()
247            .ok_or("channel has no connection hops")
248            .map_err(|e| {
249                tonic::Status::aborted(format!(
250                    "couldn't get connection for channel {channel_id} for port {port_id}: {e}"
251                ))
252            })?;
253
254        let connection = snapshot.get_connection(&connection_id).await.map_err(|e| {
255            tonic::Status::aborted(format!(
256                "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
257            ))
258        })?.ok_or("unable to get connection").map_err(|e| {
259            tonic::Status::aborted(format!(
260                "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
261            ))
262        })?;
263
264        // 3. get the client state for the connection
265        let (client_state, proof) = snapshot
266            .get_with_proof(
267                IBC_COMMITMENT_PREFIX
268                    .apply_string(ClientStatePath::new(&connection.client_id).to_string())
269                    .as_bytes()
270                    .to_vec(),
271            )
272            .await
273            .map_err(|e| tonic::Status::aborted(format!("couldn't get client state: {e}")))?;
274
275        let client_state_any = client_state
276            .map(|cs_bytes| ibc_proto::google::protobuf::Any::decode(cs_bytes.as_ref()))
277            .transpose()
278            .map_err(|e| tonic::Status::aborted(format!("couldn't decode client state: {e}")))?;
279
280        let identified_client_state = IdentifiedClientState {
281            client_id: connection.client_id.clone().to_string(),
282            client_state: client_state_any,
283        };
284
285        Ok(tonic::Response::new(QueryChannelClientStateResponse {
286            identified_client_state: Some(identified_client_state),
287            proof: proof.encode_to_vec(),
288            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
289                revision_height: HI::get_block_height(&snapshot)
290                    .await
291                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
292                    + 1,
293                revision_number: HI::get_revision_number(&snapshot)
294                    .await
295                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
296            }),
297        }))
298    }
299    /// ChannelConsensusState queries for the consensus state for the channel
300    /// associated with the provided channel identifiers.
301    #[tracing::instrument(skip(self), err, level = "debug")]
302    async fn channel_consensus_state(
303        &self,
304        request: tonic::Request<QueryChannelConsensusStateRequest>,
305    ) -> std::result::Result<tonic::Response<QueryChannelConsensusStateResponse>, tonic::Status>
306    {
307        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
308            Err(err) => return Err(tonic::Status::aborted(
309                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
310            )),
311            Ok(snapshot) => snapshot,
312        };
313        let consensus_state_height = ibc_types::core::client::Height {
314            revision_number: request.get_ref().revision_number,
315            revision_height: request.get_ref().revision_height,
316        };
317
318        // 1. get the channel
319        let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
320            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
321        let port_id = PortId::from_str(request.get_ref().port_id.as_str())
322            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
323
324        let channel = snapshot
325            .get_channel(&channel_id, &port_id)
326            .await
327            .map_err(|e| {
328                tonic::Status::aborted(format!(
329                    "couldn't get channel {channel_id} for port {port_id}: {e}"
330                ))
331            })?
332            .ok_or("unable to get channel")
333            .map_err(|e| {
334                tonic::Status::aborted(format!(
335                    "couldn't get channel {channel_id} for port {port_id}: {e}"
336                ))
337            })?;
338
339        // 2. get the connection for the channel
340        let connection_id = channel
341            .connection_hops
342            .first()
343            .ok_or("channel has no connection hops")
344            .map_err(|e| {
345                tonic::Status::aborted(format!(
346                    "couldn't get connection for channel {channel_id} for port {port_id}: {e}"
347                ))
348            })?;
349
350        let connection = snapshot.get_connection(&connection_id).await.map_err(|e| {
351            tonic::Status::aborted(format!(
352                "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
353            ))
354        })?.ok_or("unable to get connection").map_err(|e| {
355            tonic::Status::aborted(format!(
356                "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
357            ))
358        })?;
359
360        // 3. get the consensus state for the connection
361        let (consensus_state, proof) = snapshot
362            .get_with_proof(
363                IBC_COMMITMENT_PREFIX
364                    .apply_string(
365                        ClientConsensusStatePath::new(
366                            &connection.client_id,
367                            &consensus_state_height,
368                        )
369                        .to_string(),
370                    )
371                    .as_bytes()
372                    .to_vec(),
373            )
374            .await
375            .map_err(|e| tonic::Status::aborted(format!("couldn't get client state: {e}")))?;
376
377        let consensus_state_any = consensus_state
378            .map(|cs_bytes| ibc_proto::google::protobuf::Any::decode(cs_bytes.as_ref()))
379            .transpose()
380            .map_err(|e| tonic::Status::aborted(format!("couldn't decode client state: {e}")))?;
381
382        Ok(tonic::Response::new(QueryChannelConsensusStateResponse {
383            consensus_state: consensus_state_any,
384            client_id: connection.client_id.clone().to_string(),
385            proof: proof.encode_to_vec(),
386            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
387                revision_height: HI::get_block_height(&snapshot)
388                    .await
389                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
390                    + 1,
391                revision_number: HI::get_revision_number(&snapshot)
392                    .await
393                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
394            }),
395        }))
396    }
397    /// PacketCommitment queries a stored packet commitment hash.
398    #[tracing::instrument(skip(self), err, level = "debug")]
399    async fn packet_commitment(
400        &self,
401        request: tonic::Request<QueryPacketCommitmentRequest>,
402    ) -> std::result::Result<tonic::Response<QueryPacketCommitmentResponse>, tonic::Status> {
403        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
404            Err(err) => return Err(tonic::Status::aborted(
405                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
406            )),
407            Ok(snapshot) => snapshot,
408        };
409
410        let port_id = PortId::from_str(&request.get_ref().port_id)
411            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
412        let channel_id = ChannelId::from_str(&request.get_ref().channel_id)
413            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
414
415        let (commitment, proof) = snapshot
416            .get_with_proof(
417                IBC_COMMITMENT_PREFIX
418                    .apply_string(
419                        CommitmentPath::new(
420                            &port_id,
421                            &channel_id,
422                            request.get_ref().sequence.into(),
423                        )
424                        .to_string(),
425                    )
426                    .as_bytes()
427                    .to_vec(),
428            )
429            .await
430            .map_err(|e| tonic::Status::aborted(format!("couldn't get packet commitment: {e}")))?;
431
432        let commitment =
433            commitment.ok_or_else(|| tonic::Status::aborted("commitment not found"))?;
434
435        Ok(tonic::Response::new(QueryPacketCommitmentResponse {
436            commitment,
437            proof: proof.encode_to_vec(),
438            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
439                revision_height: HI::get_block_height(&snapshot)
440                    .await
441                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
442                    + 1,
443                revision_number: HI::get_revision_number(&snapshot)
444                    .await
445                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
446            }),
447        }))
448    }
449    /// PacketCommitments returns all the packet commitments hashes associated
450    /// with a channel.
451    #[tracing::instrument(skip(self), err, level = "debug")]
452    async fn packet_commitments(
453        &self,
454        request: tonic::Request<QueryPacketCommitmentsRequest>,
455    ) -> std::result::Result<tonic::Response<QueryPacketCommitmentsResponse>, tonic::Status> {
456        let snapshot = self.storage.latest_snapshot();
457        let height = snapshot.version();
458        let request = request.get_ref();
459
460        let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
461            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
462        let port_id: PortId = PortId::from_str(&request.port_id)
463            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
464
465        let mut commitment_states = vec![];
466        let commitment_counter = snapshot
467            .get_send_sequence(&chan_id, &port_id)
468            .await
469            .map_err(|e| {
470                tonic::Status::aborted(format!(
471                    "couldn't get send sequence for channel {chan_id} and port {port_id}: {e}"
472                ))
473            })?;
474
475        // this starts at 1; the first commitment index is 1 (from ibc spec)
476        for commitment_idx in 1..commitment_counter {
477            let commitment = snapshot
478                .get_packet_commitment_by_id(&chan_id, &port_id, commitment_idx)
479                .await.map_err(|e| {
480                    tonic::Status::aborted(format!(
481                        "couldn't get packet commitment for channel {chan_id} and port {port_id} at index {commitment_idx}: {e}"
482                    ))
483                })?;
484            if commitment.is_none() {
485                continue;
486            }
487            let commitment = commitment.expect("commitment existence was checked earlier");
488
489            let commitment_state = PacketState {
490                port_id: request.port_id.clone(),
491                channel_id: request.channel_id.clone(),
492                sequence: commitment_idx,
493                data: commitment.clone(),
494            };
495
496            commitment_states.push(commitment_state);
497        }
498
499        let height = Height {
500            revision_number: 0,
501            revision_height: height,
502        };
503
504        let res = QueryPacketCommitmentsResponse {
505            commitments: commitment_states,
506            pagination: None,
507            height: Some(height),
508        };
509
510        Ok(tonic::Response::new(res))
511    }
512    /// PacketReceipt queries if a given packet sequence has been received on the
513    /// queried chain
514    #[tracing::instrument(skip(self), err, level = "debug")]
515    async fn packet_receipt(
516        &self,
517        request: tonic::Request<QueryPacketReceiptRequest>,
518    ) -> std::result::Result<tonic::Response<QueryPacketReceiptResponse>, tonic::Status> {
519        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
520            Err(err) => return Err(tonic::Status::aborted(
521                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
522            )),
523            Ok(snapshot) => snapshot,
524        };
525
526        let port_id = PortId::from_str(&request.get_ref().port_id)
527            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
528        let channel_id = ChannelId::from_str(&request.get_ref().channel_id)
529            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
530
531        let (receipt, proof) = snapshot
532            .get_with_proof(
533                IBC_COMMITMENT_PREFIX
534                    .apply_string(
535                        ReceiptPath::new(&port_id, &channel_id, request.get_ref().sequence.into())
536                            .to_string(),
537                    )
538                    .as_bytes()
539                    .to_vec(),
540            )
541            .await
542            .map_err(|e| tonic::Status::aborted(format!("couldn't get packet commitment: {e}")))?;
543
544        Ok(tonic::Response::new(QueryPacketReceiptResponse {
545            received: receipt.is_some(),
546            proof: proof.encode_to_vec(),
547            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
548                revision_height: HI::get_block_height(&snapshot)
549                    .await
550                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
551                    + 1,
552                revision_number: HI::get_revision_number(&snapshot)
553                    .await
554                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
555            }),
556        }))
557    }
558    /// PacketAcknowledgement queries a stored packet acknowledgement hash.
559    #[tracing::instrument(skip(self), err, level = "debug")]
560    async fn packet_acknowledgement(
561        &self,
562        request: tonic::Request<QueryPacketAcknowledgementRequest>,
563    ) -> std::result::Result<tonic::Response<QueryPacketAcknowledgementResponse>, tonic::Status>
564    {
565        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
566            Err(err) => return Err(tonic::Status::aborted(
567                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
568            )),
569            Ok(snapshot) => snapshot,
570        };
571
572        let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
573            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
574        let port_id = PortId::from_str(request.get_ref().port_id.as_str())
575            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
576
577        let (acknowledgement, proof) = snapshot
578            .get_with_proof(
579                IBC_COMMITMENT_PREFIX
580                    .apply_string(
581                        AckPath::new(&port_id, &channel_id, request.get_ref().sequence.into())
582                            .to_string(),
583                    )
584                    .as_bytes()
585                    .to_vec(),
586            )
587            .await
588            .map_err(|e| {
589                tonic::Status::aborted(format!("couldn't get packet acknowledgement: {e}"))
590            })?;
591
592        let acknowledgement =
593            acknowledgement.ok_or_else(|| tonic::Status::aborted("acknowledgement not found"))?;
594
595        Ok(tonic::Response::new(QueryPacketAcknowledgementResponse {
596            acknowledgement,
597            proof: proof.encode_to_vec(),
598            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
599                revision_height: HI::get_block_height(&snapshot)
600                    .await
601                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
602                    + 1,
603                revision_number: HI::get_revision_number(&snapshot)
604                    .await
605                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
606            }),
607        }))
608    }
609    /// PacketAcknowledgements returns all the packet acknowledgements associated
610    /// with a channel.
611    #[tracing::instrument(skip(self), err, level = "debug")]
612    async fn packet_acknowledgements(
613        &self,
614        request: tonic::Request<QueryPacketAcknowledgementsRequest>,
615    ) -> std::result::Result<tonic::Response<QueryPacketAcknowledgementsResponse>, tonic::Status>
616    {
617        let snapshot = self.storage.latest_snapshot();
618        let height = Height {
619            revision_number: 0,
620            revision_height: snapshot.version(),
621        };
622        let request = request.get_ref();
623
624        let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
625            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
626        let port_id: PortId = PortId::from_str(&request.port_id)
627            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
628
629        let mut acks = vec![];
630        for ack_idx in &request.packet_commitment_sequences {
631            let maybe_ack = snapshot
632                .get_packet_acknowledgement(&port_id, &chan_id, *ack_idx)
633                .await.map_err(|e| {
634                    tonic::Status::aborted(format!(
635                        "couldn't get packet acknowledgement for channel {chan_id} and port {port_id} at index {ack_idx}: {e}"
636                    ))
637                })?;
638
639            // Only include the ack if it was found; otherwise, signal lack of
640            // by omitting it from the response.
641            if let Some(ack) = maybe_ack {
642                let ack_state = PacketState {
643                    port_id: request.port_id.clone(),
644                    channel_id: request.channel_id.clone(),
645                    sequence: *ack_idx,
646                    data: ack.clone(),
647                };
648
649                acks.push(ack_state);
650            }
651        }
652
653        let res = QueryPacketAcknowledgementsResponse {
654            acknowledgements: acks,
655            pagination: None,
656            height: Some(height),
657        };
658
659        Ok(tonic::Response::new(res))
660    }
661    /// UnreceivedPackets returns all the unreceived IBC packets associated with a
662    /// channel and sequences.
663    #[tracing::instrument(skip(self), err, level = "debug")]
664    async fn unreceived_packets(
665        &self,
666        request: tonic::Request<QueryUnreceivedPacketsRequest>,
667    ) -> std::result::Result<tonic::Response<QueryUnreceivedPacketsResponse>, tonic::Status> {
668        let snapshot = self.storage.latest_snapshot();
669        let height = snapshot.version();
670        let request = request.get_ref();
671
672        let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
673            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
674        let port_id: PortId = PortId::from_str(&request.port_id)
675            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
676
677        let mut unreceived_seqs = vec![];
678
679        for seq in request.packet_commitment_sequences.clone() {
680            if seq == 0 {
681                return Err(tonic::Status::aborted(format!(
682                    "packet sequence {} cannot be 0",
683                    seq
684                )));
685            }
686
687            if !snapshot
688                .seen_packet_by_channel(&chan_id, &port_id, seq)
689                .await.map_err(|e| {
690                    tonic::Status::aborted(format!(
691                        "couldn't get packet commitment for channel {chan_id} and port {port_id} at index {seq}: {e}"
692                    ))
693                })?
694            {
695                unreceived_seqs.push(seq);
696            }
697        }
698
699        let height = Height {
700            revision_number: 0,
701            revision_height: height,
702        };
703
704        let res = QueryUnreceivedPacketsResponse {
705            sequences: unreceived_seqs,
706            height: Some(height),
707        };
708
709        Ok(tonic::Response::new(res))
710    }
711    /// UnreceivedAcks returns all the unreceived IBC acknowledgements associated
712    /// with a channel and sequences.
713    #[tracing::instrument(skip(self), err, level = "debug")]
714    async fn unreceived_acks(
715        &self,
716        request: tonic::Request<QueryUnreceivedAcksRequest>,
717    ) -> std::result::Result<tonic::Response<QueryUnreceivedAcksResponse>, tonic::Status> {
718        let snapshot = self.storage.latest_snapshot();
719        let height = Height {
720            revision_number: 0,
721            revision_height: snapshot.version(),
722        };
723        let request = request.get_ref();
724
725        let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
726            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
727        let port_id: PortId = PortId::from_str(&request.port_id)
728            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
729
730        let mut unreceived_seqs = vec![];
731
732        for seq in request.packet_ack_sequences.clone() {
733            if seq == 0 {
734                return Err(tonic::Status::aborted(format!(
735                    "packet sequence {} cannot be 0",
736                    seq
737                )));
738            }
739
740            if snapshot
741                .get_packet_commitment_by_id(&chan_id, &port_id, seq)
742                .await.map_err(|e| {
743                    tonic::Status::aborted(format!(
744                        "couldn't get packet commitment for channel {chan_id} and port {port_id} at index {seq}: {e}"
745                    ))
746                })?
747                .is_some()
748            {
749                unreceived_seqs.push(seq);
750            }
751        }
752
753        let res = QueryUnreceivedAcksResponse {
754            sequences: unreceived_seqs,
755            height: Some(height),
756        };
757
758        Ok(tonic::Response::new(res))
759    }
760
761    /// NextSequenceReceive returns the next receive sequence for a given channel.
762    #[tracing::instrument(skip(self), err, level = "debug")]
763    async fn next_sequence_receive(
764        &self,
765        request: tonic::Request<QueryNextSequenceReceiveRequest>,
766    ) -> std::result::Result<tonic::Response<QueryNextSequenceReceiveResponse>, tonic::Status> {
767        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
768            Err(err) => return Err(tonic::Status::aborted(
769                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
770            )),
771            Ok(snapshot) => snapshot,
772        };
773
774        let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
775            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
776        let port_id = PortId::from_str(request.get_ref().port_id.as_str())
777            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
778
779        let (next_recv_sequence, proof) = snapshot
780            .get_with_proof(
781                IBC_COMMITMENT_PREFIX
782                    .apply_string(SeqRecvPath::new(&port_id, &channel_id).to_string())
783                    .as_bytes()
784                    .to_vec(),
785            )
786            .await
787            .map_err(|e| tonic::Status::aborted(format!("couldn't get channel: {e}")))?;
788
789        let next_recv_sequence = next_recv_sequence
790            .map(|seq_bytes| u64::from_be_bytes(seq_bytes.try_into().expect("invalid sequence")))
791            .ok_or_else(|| tonic::Status::aborted("next receive sequence not found"))?;
792
793        Ok(tonic::Response::new(QueryNextSequenceReceiveResponse {
794            next_sequence_receive: next_recv_sequence,
795            proof: proof.encode_to_vec(),
796            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
797                revision_height: HI::get_block_height(&snapshot)
798                    .await
799                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
800                    + 1,
801                revision_number: HI::get_revision_number(&snapshot)
802                    .await
803                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
804            }),
805        }))
806    }
807
808    /// Returns the next send sequence for a given channel.
809    #[tracing::instrument(skip(self), err, level = "debug")]
810    async fn next_sequence_send(
811        &self,
812        request: tonic::Request<QueryNextSequenceSendRequest>,
813    ) -> std::result::Result<tonic::Response<QueryNextSequenceSendResponse>, tonic::Status> {
814        let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
815            Err(err) => return Err(tonic::Status::aborted(
816                format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
817            )),
818            Ok(snapshot) => snapshot,
819        };
820
821        let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
822            .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
823        let port_id = PortId::from_str(request.get_ref().port_id.as_str())
824            .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
825
826        let (next_send_sequence, proof) = snapshot
827            .get_with_proof(
828                IBC_COMMITMENT_PREFIX
829                    .apply_string(SeqSendPath::new(&port_id, &channel_id).to_string())
830                    .as_bytes()
831                    .to_vec(),
832            )
833            .await
834            .map_err(|e| tonic::Status::aborted(format!("couldn't get channel: {e}")))?;
835
836        let next_send_sequence = next_send_sequence
837            .map(|seq_bytes| u64::from_be_bytes(seq_bytes.try_into().expect("invalid sequence")))
838            .ok_or_else(|| tonic::Status::aborted("next receive sequence not found"))?;
839
840        Ok(tonic::Response::new(QueryNextSequenceSendResponse {
841            next_sequence_send: next_send_sequence,
842            proof: proof.encode_to_vec(),
843            proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
844                revision_height: HI::get_block_height(&snapshot)
845                    .await
846                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
847                    + 1,
848                revision_number: HI::get_revision_number(&snapshot)
849                    .await
850                    .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
851            }),
852        }))
853    }
854
855    #[tracing::instrument(skip(self), err, level = "debug")]
856    async fn channel_params(
857        &self,
858        _request: tonic::Request<QueryChannelParamsRequest>,
859    ) -> std::result::Result<tonic::Response<QueryChannelParamsResponse>, tonic::Status> {
860        Err(tonic::Status::unimplemented("not implemented"))
861    }
862
863    #[tracing::instrument(skip(self), err, level = "debug")]
864    async fn upgrade(
865        &self,
866        _request: tonic::Request<QueryUpgradeRequest>,
867    ) -> std::result::Result<tonic::Response<QueryUpgradeResponse>, tonic::Status> {
868        Err(tonic::Status::unimplemented("not implemented"))
869    }
870
871    #[tracing::instrument(skip(self), err, level = "debug")]
872    async fn upgrade_error(
873        &self,
874        _request: tonic::Request<QueryUpgradeErrorRequest>,
875    ) -> std::result::Result<tonic::Response<QueryUpgradeErrorResponse>, tonic::Status> {
876        Err(tonic::Status::unimplemented("not implemented"))
877    }
878}