1use crate::prefix::MerklePrefixExt;
2use crate::IBC_COMMITMENT_PREFIX;
3use async_trait::async_trait;
4use ibc_proto::ibc::core::channel::v1::query_server::Query as ConsensusQuery;
5use ibc_proto::ibc::core::channel::v1::{
6 Channel, PacketState, QueryChannelClientStateRequest, QueryChannelClientStateResponse,
7 QueryChannelConsensusStateRequest, QueryChannelConsensusStateResponse,
8 QueryChannelParamsRequest, QueryChannelParamsResponse, QueryChannelRequest,
9 QueryChannelResponse, QueryChannelsRequest, QueryChannelsResponse,
10 QueryConnectionChannelsRequest, QueryConnectionChannelsResponse,
11 QueryNextSequenceReceiveRequest, QueryNextSequenceReceiveResponse,
12 QueryNextSequenceSendRequest, QueryNextSequenceSendResponse, QueryPacketAcknowledgementRequest,
13 QueryPacketAcknowledgementResponse, QueryPacketAcknowledgementsRequest,
14 QueryPacketAcknowledgementsResponse, QueryPacketCommitmentRequest,
15 QueryPacketCommitmentResponse, QueryPacketCommitmentsRequest, QueryPacketCommitmentsResponse,
16 QueryPacketReceiptRequest, QueryPacketReceiptResponse, QueryUnreceivedAcksRequest,
17 QueryUnreceivedAcksResponse, QueryUnreceivedPacketsRequest, QueryUnreceivedPacketsResponse,
18 QueryUpgradeErrorRequest, QueryUpgradeErrorResponse, QueryUpgradeRequest, QueryUpgradeResponse,
19};
20use ibc_proto::ibc::core::client::v1::{Height, IdentifiedClientState};
21use ibc_types::path::{
22 AckPath, ChannelEndPath, ClientConsensusStatePath, ClientStatePath, CommitmentPath,
23 ReceiptPath, SeqRecvPath, SeqSendPath,
24};
25use ibc_types::DomainType;
26
27use ibc_types::core::channel::{ChannelId, IdentifiedChannelEnd, PortId};
28
29use ibc_types::core::connection::ConnectionId;
30use prost::Message;
31
32use std::str::FromStr;
33
34use crate::component::{ChannelStateReadExt, ConnectionStateReadExt, HostInterface};
35
36use super::utils::determine_snapshot_from_metadata;
37use super::IbcQuery;
38
39#[async_trait]
40impl<HI: HostInterface + Send + Sync + 'static> ConsensusQuery for IbcQuery<HI> {
41 #[tracing::instrument(skip(self), err, level = "debug")]
43 async fn channel(
44 &self,
45 request: tonic::Request<QueryChannelRequest>,
46 ) -> std::result::Result<tonic::Response<QueryChannelResponse>, tonic::Status> {
47 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
48 Err(err) => return Err(tonic::Status::aborted(
49 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
50 )),
51 Ok(snapshot) => snapshot,
52 };
53 let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
54 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
55 let port_id = PortId::from_str(request.get_ref().port_id.as_str())
56 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
57
58 let (channel, proof) = snapshot
59 .get_with_proof(
60 IBC_COMMITMENT_PREFIX
61 .apply_string(ChannelEndPath::new(&port_id, &channel_id).to_string())
62 .as_bytes()
63 .to_vec(),
64 )
65 .await
66 .map(|res| {
67 let channel = res
68 .0
69 .map(|chan_bytes| Channel::decode(chan_bytes.as_ref()))
70 .transpose();
71
72 (channel, res.1)
73 })
74 .map_err(|e| tonic::Status::aborted(format!("couldn't get channel: {e}")))?;
75
76 let channel =
77 channel.map_err(|e| tonic::Status::aborted(format!("couldn't decode channel: {e}")))?;
78
79 let res = QueryChannelResponse {
80 channel,
81 proof: proof.encode_to_vec(),
82 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
83 revision_height: HI::get_block_height(&snapshot)
84 .await
85 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
86 + 1,
87 revision_number: HI::get_revision_number(&snapshot)
88 .await
89 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
90 }),
91 };
92
93 Ok(tonic::Response::new(res))
94 }
95 #[tracing::instrument(skip(self), err, level = "debug")]
97 async fn channels(
98 &self,
99 _request: tonic::Request<QueryChannelsRequest>,
100 ) -> std::result::Result<tonic::Response<QueryChannelsResponse>, tonic::Status> {
101 let snapshot = self.storage.latest_snapshot();
102
103 let height = Height {
104 revision_number: HI::get_revision_number(&snapshot)
105 .await
106 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
107 revision_height: HI::get_block_height(&snapshot)
108 .await
109 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
110 };
111
112 let channel_counter = snapshot
113 .get_channel_counter()
114 .await
115 .map_err(|e| tonic::Status::aborted(format!("couldn't get channel counter: {e}")))?;
116
117 let mut channels = vec![];
118 for chan_idx in 0..channel_counter {
119 let chan_id = ChannelId(format!("channel-{}", chan_idx));
120 let channel = snapshot
121 .get_channel(&chan_id, &PortId::transfer())
122 .await
123 .map_err(|e| {
124 tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
125 })?
126 .ok_or("unable to get channel")
127 .map_err(|e| {
128 tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
129 })?;
130
131 let id_chan = IdentifiedChannelEnd {
132 channel_id: chan_id,
133 port_id: PortId::transfer(),
134 channel_end: channel,
135 upgrade_sequence: 0,
136 };
137 channels.push(id_chan.into());
138 }
139
140 let res = QueryChannelsResponse {
141 channels,
142 pagination: None,
143 height: Some(height),
144 };
145
146 Ok(tonic::Response::new(res))
147 }
148
149 #[tracing::instrument(skip(self), err, level = "debug")]
151 async fn connection_channels(
152 &self,
153 request: tonic::Request<QueryConnectionChannelsRequest>,
154 ) -> std::result::Result<tonic::Response<QueryConnectionChannelsResponse>, tonic::Status> {
155 let snapshot = self.storage.latest_snapshot();
156 let height = Height {
157 revision_number: HI::get_revision_number(&snapshot)
158 .await
159 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
160 revision_height: HI::get_block_height(&snapshot)
161 .await
162 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
163 };
164
165 let request = request.get_ref();
166 let connection_id: ConnectionId = ConnectionId::from_str(&request.connection)
167 .map_err(|e| tonic::Status::aborted(format!("invalid connection id: {e}")))?;
168
169 let channel_counter = snapshot
171 .get_channel_counter()
172 .await
173 .map_err(|e| tonic::Status::aborted(format!("couldn't get channel counter: {e}")))?;
174
175 let mut channels = vec![];
176 for chan_idx in 0..channel_counter {
177 let chan_id = ChannelId(format!("channel-{}", chan_idx));
178 let channel = snapshot
179 .get_channel(&chan_id, &PortId::transfer())
180 .await
181 .map_err(|e| {
182 tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
183 })?
184 .ok_or("unable to get channel")
185 .map_err(|e| {
186 tonic::Status::aborted(format!("couldn't get channel {chan_id}: {e}"))
187 })?;
188 if channel.connection_hops.contains(&connection_id) {
189 let id_chan = IdentifiedChannelEnd {
190 channel_id: chan_id,
191 port_id: PortId::transfer(),
192 channel_end: channel,
193 upgrade_sequence: 0,
194 };
195 channels.push(id_chan.into());
196 }
197 }
198
199 let res = QueryConnectionChannelsResponse {
200 channels,
201 pagination: None,
202 height: Some(height),
203 };
204
205 Ok(tonic::Response::new(res))
206 }
207
208 #[tracing::instrument(skip(self), err, level = "debug")]
211 async fn channel_client_state(
212 &self,
213 request: tonic::Request<QueryChannelClientStateRequest>,
214 ) -> std::result::Result<tonic::Response<QueryChannelClientStateResponse>, tonic::Status> {
215 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
216 Err(err) => return Err(tonic::Status::aborted(
217 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
218 )),
219 Ok(snapshot) => snapshot,
220 };
221
222 let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
224 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
225 let port_id = PortId::from_str(request.get_ref().port_id.as_str())
226 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
227
228 let channel = snapshot
229 .get_channel(&channel_id, &port_id)
230 .await
231 .map_err(|e| {
232 tonic::Status::aborted(format!(
233 "couldn't get channel {channel_id} for port {port_id}: {e}"
234 ))
235 })?
236 .ok_or("unable to get channel")
237 .map_err(|e| {
238 tonic::Status::aborted(format!(
239 "couldn't get channel {channel_id} for port {port_id}: {e}"
240 ))
241 })?;
242
243 let connection_id = channel
245 .connection_hops
246 .first()
247 .ok_or("channel has no connection hops")
248 .map_err(|e| {
249 tonic::Status::aborted(format!(
250 "couldn't get connection for channel {channel_id} for port {port_id}: {e}"
251 ))
252 })?;
253
254 let connection = snapshot.get_connection(&connection_id).await.map_err(|e| {
255 tonic::Status::aborted(format!(
256 "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
257 ))
258 })?.ok_or("unable to get connection").map_err(|e| {
259 tonic::Status::aborted(format!(
260 "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
261 ))
262 })?;
263
264 let (client_state, proof) = snapshot
266 .get_with_proof(
267 IBC_COMMITMENT_PREFIX
268 .apply_string(ClientStatePath::new(&connection.client_id).to_string())
269 .as_bytes()
270 .to_vec(),
271 )
272 .await
273 .map_err(|e| tonic::Status::aborted(format!("couldn't get client state: {e}")))?;
274
275 let client_state_any = client_state
276 .map(|cs_bytes| ibc_proto::google::protobuf::Any::decode(cs_bytes.as_ref()))
277 .transpose()
278 .map_err(|e| tonic::Status::aborted(format!("couldn't decode client state: {e}")))?;
279
280 let identified_client_state = IdentifiedClientState {
281 client_id: connection.client_id.clone().to_string(),
282 client_state: client_state_any,
283 };
284
285 Ok(tonic::Response::new(QueryChannelClientStateResponse {
286 identified_client_state: Some(identified_client_state),
287 proof: proof.encode_to_vec(),
288 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
289 revision_height: HI::get_block_height(&snapshot)
290 .await
291 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
292 + 1,
293 revision_number: HI::get_revision_number(&snapshot)
294 .await
295 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
296 }),
297 }))
298 }
299 #[tracing::instrument(skip(self), err, level = "debug")]
302 async fn channel_consensus_state(
303 &self,
304 request: tonic::Request<QueryChannelConsensusStateRequest>,
305 ) -> std::result::Result<tonic::Response<QueryChannelConsensusStateResponse>, tonic::Status>
306 {
307 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
308 Err(err) => return Err(tonic::Status::aborted(
309 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
310 )),
311 Ok(snapshot) => snapshot,
312 };
313 let consensus_state_height = ibc_types::core::client::Height {
314 revision_number: request.get_ref().revision_number,
315 revision_height: request.get_ref().revision_height,
316 };
317
318 let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
320 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
321 let port_id = PortId::from_str(request.get_ref().port_id.as_str())
322 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
323
324 let channel = snapshot
325 .get_channel(&channel_id, &port_id)
326 .await
327 .map_err(|e| {
328 tonic::Status::aborted(format!(
329 "couldn't get channel {channel_id} for port {port_id}: {e}"
330 ))
331 })?
332 .ok_or("unable to get channel")
333 .map_err(|e| {
334 tonic::Status::aborted(format!(
335 "couldn't get channel {channel_id} for port {port_id}: {e}"
336 ))
337 })?;
338
339 let connection_id = channel
341 .connection_hops
342 .first()
343 .ok_or("channel has no connection hops")
344 .map_err(|e| {
345 tonic::Status::aborted(format!(
346 "couldn't get connection for channel {channel_id} for port {port_id}: {e}"
347 ))
348 })?;
349
350 let connection = snapshot.get_connection(&connection_id).await.map_err(|e| {
351 tonic::Status::aborted(format!(
352 "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
353 ))
354 })?.ok_or("unable to get connection").map_err(|e| {
355 tonic::Status::aborted(format!(
356 "couldn't get connection {connection_id} for channel {channel_id} for port {port_id}: {e}"
357 ))
358 })?;
359
360 let (consensus_state, proof) = snapshot
362 .get_with_proof(
363 IBC_COMMITMENT_PREFIX
364 .apply_string(
365 ClientConsensusStatePath::new(
366 &connection.client_id,
367 &consensus_state_height,
368 )
369 .to_string(),
370 )
371 .as_bytes()
372 .to_vec(),
373 )
374 .await
375 .map_err(|e| tonic::Status::aborted(format!("couldn't get client state: {e}")))?;
376
377 let consensus_state_any = consensus_state
378 .map(|cs_bytes| ibc_proto::google::protobuf::Any::decode(cs_bytes.as_ref()))
379 .transpose()
380 .map_err(|e| tonic::Status::aborted(format!("couldn't decode client state: {e}")))?;
381
382 Ok(tonic::Response::new(QueryChannelConsensusStateResponse {
383 consensus_state: consensus_state_any,
384 client_id: connection.client_id.clone().to_string(),
385 proof: proof.encode_to_vec(),
386 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
387 revision_height: HI::get_block_height(&snapshot)
388 .await
389 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
390 + 1,
391 revision_number: HI::get_revision_number(&snapshot)
392 .await
393 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
394 }),
395 }))
396 }
397 #[tracing::instrument(skip(self), err, level = "debug")]
399 async fn packet_commitment(
400 &self,
401 request: tonic::Request<QueryPacketCommitmentRequest>,
402 ) -> std::result::Result<tonic::Response<QueryPacketCommitmentResponse>, tonic::Status> {
403 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
404 Err(err) => return Err(tonic::Status::aborted(
405 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
406 )),
407 Ok(snapshot) => snapshot,
408 };
409
410 let port_id = PortId::from_str(&request.get_ref().port_id)
411 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
412 let channel_id = ChannelId::from_str(&request.get_ref().channel_id)
413 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
414
415 let (commitment, proof) = snapshot
416 .get_with_proof(
417 IBC_COMMITMENT_PREFIX
418 .apply_string(
419 CommitmentPath::new(
420 &port_id,
421 &channel_id,
422 request.get_ref().sequence.into(),
423 )
424 .to_string(),
425 )
426 .as_bytes()
427 .to_vec(),
428 )
429 .await
430 .map_err(|e| tonic::Status::aborted(format!("couldn't get packet commitment: {e}")))?;
431
432 let commitment =
433 commitment.ok_or_else(|| tonic::Status::aborted("commitment not found"))?;
434
435 Ok(tonic::Response::new(QueryPacketCommitmentResponse {
436 commitment,
437 proof: proof.encode_to_vec(),
438 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
439 revision_height: HI::get_block_height(&snapshot)
440 .await
441 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
442 + 1,
443 revision_number: HI::get_revision_number(&snapshot)
444 .await
445 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
446 }),
447 }))
448 }
449 #[tracing::instrument(skip(self), err, level = "debug")]
452 async fn packet_commitments(
453 &self,
454 request: tonic::Request<QueryPacketCommitmentsRequest>,
455 ) -> std::result::Result<tonic::Response<QueryPacketCommitmentsResponse>, tonic::Status> {
456 let snapshot = self.storage.latest_snapshot();
457 let height = snapshot.version();
458 let request = request.get_ref();
459
460 let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
461 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
462 let port_id: PortId = PortId::from_str(&request.port_id)
463 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
464
465 let mut commitment_states = vec![];
466 let commitment_counter = snapshot
467 .get_send_sequence(&chan_id, &port_id)
468 .await
469 .map_err(|e| {
470 tonic::Status::aborted(format!(
471 "couldn't get send sequence for channel {chan_id} and port {port_id}: {e}"
472 ))
473 })?;
474
475 for commitment_idx in 1..commitment_counter {
477 let commitment = snapshot
478 .get_packet_commitment_by_id(&chan_id, &port_id, commitment_idx)
479 .await.map_err(|e| {
480 tonic::Status::aborted(format!(
481 "couldn't get packet commitment for channel {chan_id} and port {port_id} at index {commitment_idx}: {e}"
482 ))
483 })?;
484 if commitment.is_none() {
485 continue;
486 }
487 let commitment = commitment.expect("commitment existence was checked earlier");
488
489 let commitment_state = PacketState {
490 port_id: request.port_id.clone(),
491 channel_id: request.channel_id.clone(),
492 sequence: commitment_idx,
493 data: commitment.clone(),
494 };
495
496 commitment_states.push(commitment_state);
497 }
498
499 let height = Height {
500 revision_number: 0,
501 revision_height: height,
502 };
503
504 let res = QueryPacketCommitmentsResponse {
505 commitments: commitment_states,
506 pagination: None,
507 height: Some(height),
508 };
509
510 Ok(tonic::Response::new(res))
511 }
512 #[tracing::instrument(skip(self), err, level = "debug")]
515 async fn packet_receipt(
516 &self,
517 request: tonic::Request<QueryPacketReceiptRequest>,
518 ) -> std::result::Result<tonic::Response<QueryPacketReceiptResponse>, tonic::Status> {
519 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
520 Err(err) => return Err(tonic::Status::aborted(
521 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
522 )),
523 Ok(snapshot) => snapshot,
524 };
525
526 let port_id = PortId::from_str(&request.get_ref().port_id)
527 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
528 let channel_id = ChannelId::from_str(&request.get_ref().channel_id)
529 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
530
531 let (receipt, proof) = snapshot
532 .get_with_proof(
533 IBC_COMMITMENT_PREFIX
534 .apply_string(
535 ReceiptPath::new(&port_id, &channel_id, request.get_ref().sequence.into())
536 .to_string(),
537 )
538 .as_bytes()
539 .to_vec(),
540 )
541 .await
542 .map_err(|e| tonic::Status::aborted(format!("couldn't get packet commitment: {e}")))?;
543
544 Ok(tonic::Response::new(QueryPacketReceiptResponse {
545 received: receipt.is_some(),
546 proof: proof.encode_to_vec(),
547 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
548 revision_height: HI::get_block_height(&snapshot)
549 .await
550 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
551 + 1,
552 revision_number: HI::get_revision_number(&snapshot)
553 .await
554 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
555 }),
556 }))
557 }
558 #[tracing::instrument(skip(self), err, level = "debug")]
560 async fn packet_acknowledgement(
561 &self,
562 request: tonic::Request<QueryPacketAcknowledgementRequest>,
563 ) -> std::result::Result<tonic::Response<QueryPacketAcknowledgementResponse>, tonic::Status>
564 {
565 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
566 Err(err) => return Err(tonic::Status::aborted(
567 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
568 )),
569 Ok(snapshot) => snapshot,
570 };
571
572 let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
573 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
574 let port_id = PortId::from_str(request.get_ref().port_id.as_str())
575 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
576
577 let (acknowledgement, proof) = snapshot
578 .get_with_proof(
579 IBC_COMMITMENT_PREFIX
580 .apply_string(
581 AckPath::new(&port_id, &channel_id, request.get_ref().sequence.into())
582 .to_string(),
583 )
584 .as_bytes()
585 .to_vec(),
586 )
587 .await
588 .map_err(|e| {
589 tonic::Status::aborted(format!("couldn't get packet acknowledgement: {e}"))
590 })?;
591
592 let acknowledgement =
593 acknowledgement.ok_or_else(|| tonic::Status::aborted("acknowledgement not found"))?;
594
595 Ok(tonic::Response::new(QueryPacketAcknowledgementResponse {
596 acknowledgement,
597 proof: proof.encode_to_vec(),
598 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
599 revision_height: HI::get_block_height(&snapshot)
600 .await
601 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
602 + 1,
603 revision_number: HI::get_revision_number(&snapshot)
604 .await
605 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
606 }),
607 }))
608 }
609 #[tracing::instrument(skip(self), err, level = "debug")]
612 async fn packet_acknowledgements(
613 &self,
614 request: tonic::Request<QueryPacketAcknowledgementsRequest>,
615 ) -> std::result::Result<tonic::Response<QueryPacketAcknowledgementsResponse>, tonic::Status>
616 {
617 let snapshot = self.storage.latest_snapshot();
618 let height = Height {
619 revision_number: 0,
620 revision_height: snapshot.version(),
621 };
622 let request = request.get_ref();
623
624 let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
625 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
626 let port_id: PortId = PortId::from_str(&request.port_id)
627 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
628
629 let ack_counter = snapshot
630 .get_ack_sequence(&chan_id, &port_id)
631 .await
632 .map_err(|e| {
633 tonic::Status::aborted(format!(
634 "couldn't get ack sequence for channel {chan_id} and port {port_id}: {e}"
635 ))
636 })?;
637
638 let mut acks = vec![];
639 for ack_idx in 0..ack_counter {
640 let maybe_ack = snapshot
641 .get_packet_acknowledgement(&port_id, &chan_id, ack_idx)
642 .await.map_err(|e| {
643 tonic::Status::aborted(format!(
644 "couldn't get packet acknowledgement for channel {chan_id} and port {port_id} at index {ack_idx}: {e}"
645 ))
646 })?;
647
648 if let Some(ack) = maybe_ack {
651 let ack_state = PacketState {
652 port_id: request.port_id.clone(),
653 channel_id: request.channel_id.clone(),
654 sequence: ack_idx,
655 data: ack.clone(),
656 };
657
658 acks.push(ack_state);
659 }
660 }
661
662 let res = QueryPacketAcknowledgementsResponse {
663 acknowledgements: acks,
664 pagination: None,
665 height: Some(height),
666 };
667
668 Ok(tonic::Response::new(res))
669 }
670 #[tracing::instrument(skip(self), err, level = "debug")]
673 async fn unreceived_packets(
674 &self,
675 request: tonic::Request<QueryUnreceivedPacketsRequest>,
676 ) -> std::result::Result<tonic::Response<QueryUnreceivedPacketsResponse>, tonic::Status> {
677 let snapshot = self.storage.latest_snapshot();
678 let height = snapshot.version();
679 let request = request.get_ref();
680
681 let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
682 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
683 let port_id: PortId = PortId::from_str(&request.port_id)
684 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
685
686 let mut unreceived_seqs = vec![];
687
688 for seq in request.packet_commitment_sequences.clone() {
689 if seq == 0 {
690 return Err(tonic::Status::aborted(format!(
691 "packet sequence {} cannot be 0",
692 seq
693 )));
694 }
695
696 if !snapshot
697 .seen_packet_by_channel(&chan_id, &port_id, seq)
698 .await.map_err(|e| {
699 tonic::Status::aborted(format!(
700 "couldn't get packet commitment for channel {chan_id} and port {port_id} at index {seq}: {e}"
701 ))
702 })?
703 {
704 unreceived_seqs.push(seq);
705 }
706 }
707
708 let height = Height {
709 revision_number: 0,
710 revision_height: height,
711 };
712
713 let res = QueryUnreceivedPacketsResponse {
714 sequences: unreceived_seqs,
715 height: Some(height),
716 };
717
718 Ok(tonic::Response::new(res))
719 }
720 #[tracing::instrument(skip(self), err, level = "debug")]
723 async fn unreceived_acks(
724 &self,
725 request: tonic::Request<QueryUnreceivedAcksRequest>,
726 ) -> std::result::Result<tonic::Response<QueryUnreceivedAcksResponse>, tonic::Status> {
727 let snapshot = self.storage.latest_snapshot();
728 let height = Height {
729 revision_number: 0,
730 revision_height: snapshot.version(),
731 };
732 let request = request.get_ref();
733
734 let chan_id: ChannelId = ChannelId::from_str(&request.channel_id)
735 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
736 let port_id: PortId = PortId::from_str(&request.port_id)
737 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
738
739 let mut unreceived_seqs = vec![];
740
741 for seq in request.packet_ack_sequences.clone() {
742 if seq == 0 {
743 return Err(tonic::Status::aborted(format!(
744 "packet sequence {} cannot be 0",
745 seq
746 )));
747 }
748
749 if snapshot
750 .get_packet_commitment_by_id(&chan_id, &port_id, seq)
751 .await.map_err(|e| {
752 tonic::Status::aborted(format!(
753 "couldn't get packet commitment for channel {chan_id} and port {port_id} at index {seq}: {e}"
754 ))
755 })?
756 .is_some()
757 {
758 unreceived_seqs.push(seq);
759 }
760 }
761
762 let res = QueryUnreceivedAcksResponse {
763 sequences: unreceived_seqs,
764 height: Some(height),
765 };
766
767 Ok(tonic::Response::new(res))
768 }
769
770 #[tracing::instrument(skip(self), err, level = "debug")]
772 async fn next_sequence_receive(
773 &self,
774 request: tonic::Request<QueryNextSequenceReceiveRequest>,
775 ) -> std::result::Result<tonic::Response<QueryNextSequenceReceiveResponse>, tonic::Status> {
776 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
777 Err(err) => return Err(tonic::Status::aborted(
778 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
779 )),
780 Ok(snapshot) => snapshot,
781 };
782
783 let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
784 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
785 let port_id = PortId::from_str(request.get_ref().port_id.as_str())
786 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
787
788 let (next_recv_sequence, proof) = snapshot
789 .get_with_proof(
790 IBC_COMMITMENT_PREFIX
791 .apply_string(SeqRecvPath::new(&port_id, &channel_id).to_string())
792 .as_bytes()
793 .to_vec(),
794 )
795 .await
796 .map_err(|e| tonic::Status::aborted(format!("couldn't get channel: {e}")))?;
797
798 let next_recv_sequence = next_recv_sequence
799 .map(|seq_bytes| u64::from_be_bytes(seq_bytes.try_into().expect("invalid sequence")))
800 .ok_or_else(|| tonic::Status::aborted("next receive sequence not found"))?;
801
802 Ok(tonic::Response::new(QueryNextSequenceReceiveResponse {
803 next_sequence_receive: next_recv_sequence,
804 proof: proof.encode_to_vec(),
805 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
806 revision_height: HI::get_block_height(&snapshot)
807 .await
808 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
809 + 1,
810 revision_number: HI::get_revision_number(&snapshot)
811 .await
812 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
813 }),
814 }))
815 }
816
817 #[tracing::instrument(skip(self), err, level = "debug")]
819 async fn next_sequence_send(
820 &self,
821 request: tonic::Request<QueryNextSequenceSendRequest>,
822 ) -> std::result::Result<tonic::Response<QueryNextSequenceSendResponse>, tonic::Status> {
823 let snapshot = match determine_snapshot_from_metadata(self.storage.clone(), request.metadata()) {
824 Err(err) => return Err(tonic::Status::aborted(
825 format!("could not determine the correct snapshot to open given the `\"height\"` header of the request: {err:#}")
826 )),
827 Ok(snapshot) => snapshot,
828 };
829
830 let channel_id = ChannelId::from_str(request.get_ref().channel_id.as_str())
831 .map_err(|e| tonic::Status::aborted(format!("invalid channel id: {e}")))?;
832 let port_id = PortId::from_str(request.get_ref().port_id.as_str())
833 .map_err(|e| tonic::Status::aborted(format!("invalid port id: {e}")))?;
834
835 let (next_send_sequence, proof) = snapshot
836 .get_with_proof(
837 IBC_COMMITMENT_PREFIX
838 .apply_string(SeqSendPath::new(&port_id, &channel_id).to_string())
839 .as_bytes()
840 .to_vec(),
841 )
842 .await
843 .map_err(|e| tonic::Status::aborted(format!("couldn't get channel: {e}")))?;
844
845 let next_send_sequence = next_send_sequence
846 .map(|seq_bytes| u64::from_be_bytes(seq_bytes.try_into().expect("invalid sequence")))
847 .ok_or_else(|| tonic::Status::aborted("next receive sequence not found"))?;
848
849 Ok(tonic::Response::new(QueryNextSequenceSendResponse {
850 next_sequence_send: next_send_sequence,
851 proof: proof.encode_to_vec(),
852 proof_height: Some(ibc_proto::ibc::core::client::v1::Height {
853 revision_height: HI::get_block_height(&snapshot)
854 .await
855 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?
856 + 1,
857 revision_number: HI::get_revision_number(&snapshot)
858 .await
859 .map_err(|e| tonic::Status::aborted(format!("couldn't decode height: {e}")))?,
860 }),
861 }))
862 }
863
864 #[tracing::instrument(skip(self), err, level = "debug")]
865 async fn channel_params(
866 &self,
867 _request: tonic::Request<QueryChannelParamsRequest>,
868 ) -> std::result::Result<tonic::Response<QueryChannelParamsResponse>, tonic::Status> {
869 Err(tonic::Status::unimplemented("not implemented"))
870 }
871
872 #[tracing::instrument(skip(self), err, level = "debug")]
873 async fn upgrade(
874 &self,
875 _request: tonic::Request<QueryUpgradeRequest>,
876 ) -> std::result::Result<tonic::Response<QueryUpgradeResponse>, tonic::Status> {
877 Err(tonic::Status::unimplemented("not implemented"))
878 }
879
880 #[tracing::instrument(skip(self), err, level = "debug")]
881 async fn upgrade_error(
882 &self,
883 _request: tonic::Request<QueryUpgradeErrorRequest>,
884 ) -> std::result::Result<tonic::Response<QueryUpgradeErrorResponse>, tonic::Status> {
885 Err(tonic::Status::unimplemented("not implemented"))
886 }
887}