1use crate::prefix::MerklePrefixExt;
2use crate::IBC_COMMITMENT_PREFIX;
3use async_trait::async_trait;
4use ibc_proto::ibc::core::channel::v1::query_server::Query as ConsensusQuery;
5use ibc_proto::ibc::core::channel::v1::{
6 Channel, PacketState, QueryChannelClientStateRequest, QueryChannelClientStateResponse,
7 QueryChannelConsensusStateRequest, QueryChannelConsensusStateResponse,
8 QueryChannelParamsRequest, QueryChannelParamsResponse, QueryChannelRequest,
9 QueryChannelResponse, QueryChannelsRequest, QueryChannelsResponse,
10 QueryConnectionChannelsRequest, QueryConnectionChannelsResponse,
11 QueryNextSequenceReceiveRequest, QueryNextSequenceReceiveResponse,
12 QueryNextSequenceSendRequest, QueryNextSequenceSendResponse, QueryPacketAcknowledgementRequest,
13 QueryPacketAcknowledgementResponse, QueryPacketAcknowledgementsRequest,
14 QueryPacketAcknowledgementsResponse, QueryPacketCommitmentRequest,
15 QueryPacketCommitmentResponse, QueryPacketCommitmentsRequest, QueryPacketCommitmentsResponse,
16 QueryPacketReceiptRequest, QueryPacketReceiptResponse, QueryUnreceivedAcksRequest,
17 QueryUnreceivedAcksResponse, QueryUnreceivedPacketsRequest, QueryUnreceivedPacketsResponse,
18 QueryUpgradeErrorRequest, QueryUpgradeErrorResponse, QueryUpgradeRequest, QueryUpgradeResponse,
19};
20use ibc_proto::ibc::core::client::v1::{Height, IdentifiedClientState};
21use ibc_types::path::{
22 AckPath, ChannelEndPath, ClientConsensusStatePath, ClientStatePath, CommitmentPath,
23 ReceiptPath, SeqRecvPath, SeqSendPath,
24};
25use ibc_types::DomainType;
26
27use ibc_types::core::channel::{ChannelId, IdentifiedChannelEnd, PortId};
28
29use ibc_types::core::connection::ConnectionId;
30use prost::Message;
31
32use std::str::FromStr;
33
34use crate::component::{ChannelStateReadExt, ConnectionStateReadExt, HostInterface};
35
36use super::utils::determine_snapshot_from_metadata;
37use super::IbcQuery;
38
39#[async_trait]
40impl<HI: HostInterface + Send + Sync + 'static> ConsensusQuery for IbcQuery<HI> {
41 #[tracing::instrument(skip(self), err, level = "debug")]
43 async fn channel(
44 &self,
45 request: tonic::Request<QueryChannelRequest>,
46 ) -> std::result::Result<tonic::Response<QueryChannelResponse>, tonic::Status> {
47 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
48 Err(err) => return Err(tonic::Status::aborted(
49 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
50 )),
51 Ok(snapshot) => snapshot,
52 };
53 let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
54 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
55 let port_id = PortId::from_str(request.get_ref().port_id.as_str())
56 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
57
58 let (channel, proof) = snapshot
59 .get_with_proof(
60 IBC_COMMITMENT_PREFIX
61 .apply_string(ChannelEndPath::new(&port_id, &channel_id).to_string())
62 .as_bytes()
63 .to_vec(),
64 )
65 .await
66 .map(|res| {
67 let channel = res
68 .0
69 .map(|chan_bytes| Channel::decode(chan_bytes.as_ref()))
70 .transpose();
71
72 (channel, res.1)
73 })
74 .map_err(|e| tonic::Status::aborted(format!("couldn't get channel: {e}")))?;
75
76 let channel =
77 channel.map_err(|e| tonic::Status::aborted(format!("couldn't decode channel: {e}")))?;
78
79 let res = QueryChannelResponse {
80 channel,
81 proof: proof.encode_to_vec(),
82 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
83 revision_height: HI::get_block_height(&snapshot)
84 .await
85 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
86 + 1,
87 revision_number: HI::get_revision_number(&snapshot)
88 .await
89 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
90 }),
91 };
92
93 Ok(tonic::Response::new(res))
94 }
95 #[tracing::instrument(skip(self), err, level = "debug")]
97 async fn channels(
98 &self,
99 _request: tonic::Request<QueryChannelsRequest>,
100 ) -> std::result::Result<tonic::Response<QueryChannelsResponse>, tonic::Status> {
101 let snapshot = self.storage.latest_snapshot();
102
103 let height = Height {
104 revision_number: HI::get_revision_number(&snapshot)
105 .await
106 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
107 revision_height: HI::get_block_height(&snapshot)
108 .await
109 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
110 };
111
112 let channel_counter = snapshot
113 .get_channel_counter()
114 .await
115 .map_err(|e| tonic::Status::aborted(format!("couldn't get channel counter: {e}")))?;
116
117 let mut channels = vec![];
118 for chan_idx in 0..channel_counter {
119 let chan_id = ChannelId(format!("channel-{}", chan_idx));
120 let channel = snapshot
121 .get_channel(&chan_id, &PortId::transfer())
122 .await
123 .map_err(|e| {
124 tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
125 })?
126 .ok_or("unable to get channel")
127 .map_err(|e| {
128 tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
129 })?;
130
131 let id_chan = IdentifiedChannelEnd {
132 channel_id: chan_id,
133 port_id: PortId::transfer(),
134 channel_end: channel,
135 upgrade_sequence: 0,
136 };
137 channels.push(id_chan.into());
138 }
139
140 let res = QueryChannelsResponse {
141 channels,
142 pagination: None,
143 height: Some(height),
144 };
145
146 Ok(tonic::Response::new(res))
147 }
148
149 #[tracing::instrument(skip(self), err, level = "debug")]
151 async fn connection_channels(
152 &self,
153 request: tonic::Request<QueryConnectionChannelsRequest>,
154 ) -> std::result::Result<tonic::Response<QueryConnectionChannelsResponse>, tonic::Status> {
155 let snapshot = self.storage.latest_snapshot();
156 let height = Height {
157 revision_number: HI::get_revision_number(&snapshot)
158 .await
159 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
160 revision_height: HI::get_block_height(&snapshot)
161 .await
162 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
163 };
164
165 let request = request.get_ref();
166 let connection_id: ConnectionId = ConnectionId::from_str(&request.connection)
167 .map_err(|e| tonic::Status::aborted(format!("invalid connection id: {e}")))?;
168
169 let channel_counter = snapshot
171 .get_channel_counter()
172 .await
173 .map_err(|e| tonic::Status::aborted(format!("couldn't get channel counter: {e}")))?;
174
175 let mut channels = vec![];
176 for chan_idx in 0..channel_counter {
177 let chan_id = ChannelId(format!("channel-{}", chan_idx));
178 let channel = snapshot
179 .get_channel(&chan_id, &PortId::transfer())
180 .await
181 .map_err(|e| {
182 tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
183 })?
184 .ok_or("unable to get channel")
185 .map_err(|e| {
186 tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
187 })?;
188 if channel.connection_hops.contains(&connection_id) {
189 let id_chan = IdentifiedChannelEnd {
190 channel_id: chan_id,
191 port_id: PortId::transfer(),
192 channel_end: channel,
193 upgrade_sequence: 0,
194 };
195 channels.push(id_chan.into());
196 }
197 }
198
199 let res = QueryConnectionChannelsResponse {
200 channels,
201 pagination: None,
202 height: Some(height),
203 };
204
205 Ok(tonic::Response::new(res))
206 }
207
208 #[tracing::instrument(skip(self), err, level = "debug")]
211 async fn channel_client_state(
212 &self,
213 request: tonic::Request<QueryChannelClientStateRequest>,
214 ) -> std::result::Result<tonic::Response<QueryChannelClientStateResponse>, tonic::Status> {
215 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
216 Err(err) => return Err(tonic::Status::aborted(
217 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
218 )),
219 Ok(snapshot) => snapshot,
220 };
221
222 let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
224 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
225 let port_id = PortId::from_str(request.get_ref().port_id.as_str())
226 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
227
228 let channel = snapshot
229 .get_channel(&channel_id, &port_id)
230 .await
231 .map_err(|e| {
232 tonic::Status::aborted(format!(
233 "couldn't get channel {channel_id} for port {port_id}: {e}"
234 ))
235 })?
236 .ok_or("unable to get channel")
237 .map_err(|e| {
238 tonic::Status::aborted(format!(
239 "couldn't get channel {channel_id} for port {port_id}: {e}"
240 ))
241 })?;
242
243 let connection_id = channel
245 .connection_hops
246 .first()
247 .ok_or("channel has no connection hops")
248 .map_err(|e| {
249 tonic::Status::aborted(format!(
250 "couldn't get connection for channel {channel_id} for port {port_id}: {e}"
251 ))
252 })?;
253
254 let connection = snapshot.get_connection(&connection_id).await.map_err(|e| {
255 tonic::Status::aborted(format!(
256 "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
257 ))
258 })?.ok_or("unable to get connection").map_err(|e| {
259 tonic::Status::aborted(format!(
260 "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
261 ))
262 })?;
263
264 let (client_state, proof) = snapshot
266 .get_with_proof(
267 IBC_COMMITMENT_PREFIX
268 .apply_string(ClientStatePath::new(&connection.client_id).to_string())
269 .as_bytes()
270 .to_vec(),
271 )
272 .await
273 .map_err(|e| tonic::Status::aborted(format!("couldn't get client state: {e}")))?;
274
275 let client_state_any = client_state
276 .map(|cs_bytes| ibc_proto::google::protobuf::Any::decode(cs_bytes.as_ref()))
277 .transpose()
278 .map_err(|e| tonic::Status::aborted(format!("couldn't decode client state: {e}")))?;
279
280 let identified_client_state = IdentifiedClientState {
281 client_id: connection.client_id.clone().to_string(),
282 client_state: client_state_any,
283 };
284
285 Ok(tonic::Response::new(QueryChannelClientStateResponse {
286 identified_client_state: Some(identified_client_state),
287 proof: proof.encode_to_vec(),
288 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
289 revision_height: HI::get_block_height(&snapshot)
290 .await
291 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
292 + 1,
293 revision_number: HI::get_revision_number(&snapshot)
294 .await
295 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
296 }),
297 }))
298 }
299 #[tracing::instrument(skip(self), err, level = "debug")]
302 async fn channel_consensus_state(
303 &self,
304 request: tonic::Request<QueryChannelConsensusStateRequest>,
305 ) -> std::result::Result<tonic::Response<QueryChannelConsensusStateResponse>, tonic::Status>
306 {
307 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
308 Err(err) => return Err(tonic::Status::aborted(
309 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
310 )),
311 Ok(snapshot) => snapshot,
312 };
313 let consensus_state_height = ibc_types::core::client::Height {
314 revision_number: request.get_ref().revision_number,
315 revision_height: request.get_ref().revision_height,
316 };
317
318 let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
320 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
321 let port_id = PortId::from_str(request.get_ref().port_id.as_str())
322 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
323
324 let channel = snapshot
325 .get_channel(&channel_id, &port_id)
326 .await
327 .map_err(|e| {
328 tonic::Status::aborted(format!(
329 "couldn't get channel {channel_id} for port {port_id}: {e}"
330 ))
331 })?
332 .ok_or("unable to get channel")
333 .map_err(|e| {
334 tonic::Status::aborted(format!(
335 "couldn't get channel {channel_id} for port {port_id}: {e}"
336 ))
337 })?;
338
339 let connection_id = channel
341 .connection_hops
342 .first()
343 .ok_or("channel has no connection hops")
344 .map_err(|e| {
345 tonic::Status::aborted(format!(
346 "couldn't get connection for channel {channel_id} for port {port_id}: {e}"
347 ))
348 })?;
349
350 let connection = snapshot.get_connection(&connection_id).await.map_err(|e| {
351 tonic::Status::aborted(format!(
352 "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
353 ))
354 })?.ok_or("unable to get connection").map_err(|e| {
355 tonic::Status::aborted(format!(
356 "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
357 ))
358 })?;
359
360 let (consensus_state, proof) = snapshot
362 .get_with_proof(
363 IBC_COMMITMENT_PREFIX
364 .apply_string(
365 ClientConsensusStatePath::new(
366 &connection.client_id,
367 &consensus_state_height,
368 )
369 .to_string(),
370 )
371 .as_bytes()
372 .to_vec(),
373 )
374 .await
375 .map_err(|e| tonic::Status::aborted(format!("couldn't get client state: {e}")))?;
376
377 let consensus_state_any = consensus_state
378 .map(|cs_bytes| ibc_proto::google::protobuf::Any::decode(cs_bytes.as_ref()))
379 .transpose()
380 .map_err(|e| tonic::Status::aborted(format!("couldn't decode client state: {e}")))?;
381
382 Ok(tonic::Response::new(QueryChannelConsensusStateResponse {
383 consensus_state: consensus_state_any,
384 client_id: connection.client_id.clone().to_string(),
385 proof: proof.encode_to_vec(),
386 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
387 revision_height: HI::get_block_height(&snapshot)
388 .await
389 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
390 + 1,
391 revision_number: HI::get_revision_number(&snapshot)
392 .await
393 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
394 }),
395 }))
396 }
397 #[tracing::instrument(skip(self), err, level = "debug")]
399 async fn packet_commitment(
400 &self,
401 request: tonic::Request<QueryPacketCommitmentRequest>,
402 ) -> std::result::Result<tonic::Response<QueryPacketCommitmentResponse>, tonic::Status> {
403 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
404 Err(err) => return Err(tonic::Status::aborted(
405 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
406 )),
407 Ok(snapshot) => snapshot,
408 };
409
410 let port_id = PortId::from_str(&request.get_ref().port_id)
411 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
412 let channel_id = ChannelId::from_str(&request.get_ref().channel_id)
413 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
414
415 let (commitment, proof) = snapshot
416 .get_with_proof(
417 IBC_COMMITMENT_PREFIX
418 .apply_string(
419 CommitmentPath::new(
420 &port_id,
421 &channel_id,
422 request.get_ref().sequence.into(),
423 )
424 .to_string(),
425 )
426 .as_bytes()
427 .to_vec(),
428 )
429 .await
430 .map_err(|e| tonic::Status::aborted(format!("couldn't get packet commitment: {e}")))?;
431
432 let commitment =
433 commitment.ok_or_else(|| tonic::Status::aborted("commitment not found"))?;
434
435 Ok(tonic::Response::new(QueryPacketCommitmentResponse {
436 commitment,
437 proof: proof.encode_to_vec(),
438 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
439 revision_height: HI::get_block_height(&snapshot)
440 .await
441 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
442 + 1,
443 revision_number: HI::get_revision_number(&snapshot)
444 .await
445 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
446 }),
447 }))
448 }
449 #[tracing::instrument(skip(self), err, level = "debug")]
452 async fn packet_commitments(
453 &self,
454 request: tonic::Request<QueryPacketCommitmentsRequest>,
455 ) -> std::result::Result<tonic::Response<QueryPacketCommitmentsResponse>, tonic::Status> {
456 let snapshot = self.storage.latest_snapshot();
457 let height = snapshot.version();
458 let request = request.get_ref();
459
460 let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
461 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
462 let port_id: PortId = PortId::from_str(&request.port_id)
463 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
464
465 let mut commitment_states = vec![];
466 let commitment_counter = snapshot
467 .get_send_sequence(&chan_id, &port_id)
468 .await
469 .map_err(|e| {
470 tonic::Status::aborted(format!(
471 "couldn't get send sequence for channel {chan_id} and port {port_id}: {e}"
472 ))
473 })?;
474
475 for commitment_idx in 1..commitment_counter {
477 let commitment = snapshot
478 .get_packet_commitment_by_id(&chan_id, &port_id, commitment_idx)
479 .await.map_err(|e| {
480 tonic::Status::aborted(format!(
481 "couldn't get packet commitment for channel {chan_id} and port {port_id} at index {commitment_idx}: {e}"
482 ))
483 })?;
484 if commitment.is_none() {
485 continue;
486 }
487 let commitment = commitment.expect("commitment existence was checked earlier");
488
489 let commitment_state = PacketState {
490 port_id: request.port_id.clone(),
491 channel_id: request.channel_id.clone(),
492 sequence: commitment_idx,
493 data: commitment.clone(),
494 };
495
496 commitment_states.push(commitment_state);
497 }
498
499 let height = Height {
500 revision_number: 0,
501 revision_height: height,
502 };
503
504 let res = QueryPacketCommitmentsResponse {
505 commitments: commitment_states,
506 pagination: None,
507 height: Some(height),
508 };
509
510 Ok(tonic::Response::new(res))
511 }
512 #[tracing::instrument(skip(self), err, level = "debug")]
515 async fn packet_receipt(
516 &self,
517 request: tonic::Request<QueryPacketReceiptRequest>,
518 ) -> std::result::Result<tonic::Response<QueryPacketReceiptResponse>, tonic::Status> {
519 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
520 Err(err) => return Err(tonic::Status::aborted(
521 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
522 )),
523 Ok(snapshot) => snapshot,
524 };
525
526 let port_id = PortId::from_str(&request.get_ref().port_id)
527 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
528 let channel_id = ChannelId::from_str(&request.get_ref().channel_id)
529 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
530
531 let (receipt, proof) = snapshot
532 .get_with_proof(
533 IBC_COMMITMENT_PREFIX
534 .apply_string(
535 ReceiptPath::new(&port_id, &channel_id, request.get_ref().sequence.into())
536 .to_string(),
537 )
538 .as_bytes()
539 .to_vec(),
540 )
541 .await
542 .map_err(|e| tonic::Status::aborted(format!("couldn't get packet commitment: {e}")))?;
543
544 Ok(tonic::Response::new(QueryPacketReceiptResponse {
545 received: receipt.is_some(),
546 proof: proof.encode_to_vec(),
547 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
548 revision_height: HI::get_block_height(&snapshot)
549 .await
550 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
551 + 1,
552 revision_number: HI::get_revision_number(&snapshot)
553 .await
554 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
555 }),
556 }))
557 }
558 #[tracing::instrument(skip(self), err, level = "debug")]
560 async fn packet_acknowledgement(
561 &self,
562 request: tonic::Request<QueryPacketAcknowledgementRequest>,
563 ) -> std::result::Result<tonic::Response<QueryPacketAcknowledgementResponse>, tonic::Status>
564 {
565 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
566 Err(err) => return Err(tonic::Status::aborted(
567 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
568 )),
569 Ok(snapshot) => snapshot,
570 };
571
572 let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
573 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
574 let port_id = PortId::from_str(request.get_ref().port_id.as_str())
575 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
576
577 let (acknowledgement, proof) = snapshot
578 .get_with_proof(
579 IBC_COMMITMENT_PREFIX
580 .apply_string(
581 AckPath::new(&port_id, &channel_id, request.get_ref().sequence.into())
582 .to_string(),
583 )
584 .as_bytes()
585 .to_vec(),
586 )
587 .await
588 .map_err(|e| {
589 tonic::Status::aborted(format!("couldn't get packet acknowledgement: {e}"))
590 })?;
591
592 let acknowledgement =
593 acknowledgement.ok_or_else(|| tonic::Status::aborted("acknowledgement not found"))?;
594
595 Ok(tonic::Response::new(QueryPacketAcknowledgementResponse {
596 acknowledgement,
597 proof: proof.encode_to_vec(),
598 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
599 revision_height: HI::get_block_height(&snapshot)
600 .await
601 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
602 + 1,
603 revision_number: HI::get_revision_number(&snapshot)
604 .await
605 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
606 }),
607 }))
608 }
609 #[tracing::instrument(skip(self), err, level = "debug")]
612 async fn packet_acknowledgements(
613 &self,
614 request: tonic::Request<QueryPacketAcknowledgementsRequest>,
615 ) -> std::result::Result<tonic::Response<QueryPacketAcknowledgementsResponse>, tonic::Status>
616 {
617 let snapshot = self.storage.latest_snapshot();
618 let height = Height {
619 revision_number: 0,
620 revision_height: snapshot.version(),
621 };
622 let request = request.get_ref();
623
624 let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
625 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
626 let port_id: PortId = PortId::from_str(&request.port_id)
627 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
628
629 let mut acks = vec![];
630 for ack_idx in &request.packet_commitment_sequences {
631 let maybe_ack = snapshot
632 .get_packet_acknowledgement(&port_id, &chan_id, *ack_idx)
633 .await.map_err(|e| {
634 tonic::Status::aborted(format!(
635 "couldn't get packet acknowledgement for channel {chan_id} and port {port_id} at index {ack_idx}: {e}"
636 ))
637 })?;
638
639 if let Some(ack) = maybe_ack {
642 let ack_state = PacketState {
643 port_id: request.port_id.clone(),
644 channel_id: request.channel_id.clone(),
645 sequence: *ack_idx,
646 data: ack.clone(),
647 };
648
649 acks.push(ack_state);
650 }
651 }
652
653 let res = QueryPacketAcknowledgementsResponse {
654 acknowledgements: acks,
655 pagination: None,
656 height: Some(height),
657 };
658
659 Ok(tonic::Response::new(res))
660 }
661 #[tracing::instrument(skip(self), err, level = "debug")]
664 async fn unreceived_packets(
665 &self,
666 request: tonic::Request<QueryUnreceivedPacketsRequest>,
667 ) -> std::result::Result<tonic::Response<QueryUnreceivedPacketsResponse>, tonic::Status> {
668 let snapshot = self.storage.latest_snapshot();
669 let height = snapshot.version();
670 let request = request.get_ref();
671
672 let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
673 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
674 let port_id: PortId = PortId::from_str(&request.port_id)
675 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
676
677 let mut unreceived_seqs = vec![];
678
679 for seq in request.packet_commitment_sequences.clone() {
680 if seq == 0 {
681 return Err(tonic::Status::aborted(format!(
682 "packet sequence {} cannot be 0",
683 seq
684 )));
685 }
686
687 if !snapshot
688 .seen_packet_by_channel(&chan_id, &port_id, seq)
689 .await.map_err(|e| {
690 tonic::Status::aborted(format!(
691 "couldn't get packet commitment for channel {chan_id} and port {port_id} at index {seq}: {e}"
692 ))
693 })?
694 {
695 unreceived_seqs.push(seq);
696 }
697 }
698
699 let height = Height {
700 revision_number: 0,
701 revision_height: height,
702 };
703
704 let res = QueryUnreceivedPacketsResponse {
705 sequences: unreceived_seqs,
706 height: Some(height),
707 };
708
709 Ok(tonic::Response::new(res))
710 }
711 #[tracing::instrument(skip(self), err, level = "debug")]
714 async fn unreceived_acks(
715 &self,
716 request: tonic::Request<QueryUnreceivedAcksRequest>,
717 ) -> std::result::Result<tonic::Response<QueryUnreceivedAcksResponse>, tonic::Status> {
718 let snapshot = self.storage.latest_snapshot();
719 let height = Height {
720 revision_number: 0,
721 revision_height: snapshot.version(),
722 };
723 let request = request.get_ref();
724
725 let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
726 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
727 let port_id: PortId = PortId::from_str(&request.port_id)
728 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
729
730 let mut unreceived_seqs = vec![];
731
732 for seq in request.packet_ack_sequences.clone() {
733 if seq == 0 {
734 return Err(tonic::Status::aborted(format!(
735 "packet sequence {} cannot be 0",
736 seq
737 )));
738 }
739
740 if snapshot
741 .get_packet_commitment_by_id(&chan_id, &port_id, seq)
742 .await.map_err(|e| {
743 tonic::Status::aborted(format!(
744 "couldn't get packet commitment for channel {chan_id} and port {port_id} at index {seq}: {e}"
745 ))
746 })?
747 .is_some()
748 {
749 unreceived_seqs.push(seq);
750 }
751 }
752
753 let res = QueryUnreceivedAcksResponse {
754 sequences: unreceived_seqs,
755 height: Some(height),
756 };
757
758 Ok(tonic::Response::new(res))
759 }
760
761 #[tracing::instrument(skip(self), err, level = "debug")]
763 async fn next_sequence_receive(
764 &self,
765 request: tonic::Request<QueryNextSequenceReceiveRequest>,
766 ) -> std::result::Result<tonic::Response<QueryNextSequenceReceiveResponse>, tonic::Status> {
767 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
768 Err(err) => return Err(tonic::Status::aborted(
769 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
770 )),
771 Ok(snapshot) => snapshot,
772 };
773
774 let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
775 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
776 let port_id = PortId::from_str(request.get_ref().port_id.as_str())
777 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
778
779 let (next_recv_sequence, proof) = snapshot
780 .get_with_proof(
781 IBC_COMMITMENT_PREFIX
782 .apply_string(SeqRecvPath::new(&port_id, &channel_id).to_string())
783 .as_bytes()
784 .to_vec(),
785 )
786 .await
787 .map_err(|e| tonic::Status::aborted(format!("couldn't get channel: {e}")))?;
788
789 let next_recv_sequence = next_recv_sequence
790 .map(|seq_bytes| u64::from_be_bytes(seq_bytes.try_into().expect("invalid sequence")))
791 .ok_or_else(|| tonic::Status::aborted("next receive sequence not found"))?;
792
793 Ok(tonic::Response::new(QueryNextSequenceReceiveResponse {
794 next_sequence_receive: next_recv_sequence,
795 proof: proof.encode_to_vec(),
796 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
797 revision_height: HI::get_block_height(&snapshot)
798 .await
799 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
800 + 1,
801 revision_number: HI::get_revision_number(&snapshot)
802 .await
803 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
804 }),
805 }))
806 }
807
808 #[tracing::instrument(skip(self), err, level = "debug")]
810 async fn next_sequence_send(
811 &self,
812 request: tonic::Request<QueryNextSequenceSendRequest>,
813 ) -> std::result::Result<tonic::Response<QueryNextSequenceSendResponse>, tonic::Status> {
814 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
815 Err(err) => return Err(tonic::Status::aborted(
816 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
817 )),
818 Ok(snapshot) => snapshot,
819 };
820
821 let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
822 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
823 let port_id = PortId::from_str(request.get_ref().port_id.as_str())
824 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
825
826 let (next_send_sequence, proof) = snapshot
827 .get_with_proof(
828 IBC_COMMITMENT_PREFIX
829 .apply_string(SeqSendPath::new(&port_id, &channel_id).to_string())
830 .as_bytes()
831 .to_vec(),
832 )
833 .await
834 .map_err(|e| tonic::Status::aborted(format!("couldn't get channel: {e}")))?;
835
836 let next_send_sequence = next_send_sequence
837 .map(|seq_bytes| u64::from_be_bytes(seq_bytes.try_into().expect("invalid sequence")))
838 .ok_or_else(|| tonic::Status::aborted("next receive sequence not found"))?;
839
840 Ok(tonic::Response::new(QueryNextSequenceSendResponse {
841 next_sequence_send: next_send_sequence,
842 proof: proof.encode_to_vec(),
843 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
844 revision_height: HI::get_block_height(&snapshot)
845 .await
846 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
847 + 1,
848 revision_number: HI::get_revision_number(&snapshot)
849 .await
850 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
851 }),
852 }))
853 }
854
855 #[tracing::instrument(skip(self), err, level = "debug")]
856 async fn channel_params(
857 &self,
858 _request: tonic::Request<QueryChannelParamsRequest>,
859 ) -> std::result::Result<tonic::Response<QueryChannelParamsResponse>, tonic::Status> {
860 Err(tonic::Status::unimplemented("not implemented"))
861 }
862
863 #[tracing::instrument(skip(self), err, level = "debug")]
864 async fn upgrade(
865 &self,
866 _request: tonic::Request<QueryUpgradeRequest>,
867 ) -> std::result::Result<tonic::Response<QueryUpgradeResponse>, tonic::Status> {
868 Err(tonic::Status::unimplemented("not implemented"))
869 }
870
871 #[tracing::instrument(skip(self), err, level = "debug")]
872 async fn upgrade_error(
873 &self,
874 _request: tonic::Request<QueryUpgradeErrorRequest>,
875 ) -> std::result::Result<tonic::Response<QueryUpgradeErrorResponse>, tonic::Status> {
876 Err(tonic::Status::unimplemented("not implemented"))
877 }
878}