1use std::{
3 future::Future,
4 pin::Pin,
5 task::{Context, Poll},
6 vec,
7};
8
9use crate::PenumbraHost;
10use anyhow::Context as _;
11use cnidarium::Storage;
12use futures::FutureExt;
13use ibc_proto::ibc::core::{
14 channel::v1::{
15 PacketState, QueryChannelsRequest, QueryChannelsResponse,
16 QueryPacketAcknowledgementsRequest, QueryPacketAcknowledgementsResponse,
17 QueryPacketCommitmentsRequest, QueryPacketCommitmentsResponse, QueryUnreceivedAcksRequest,
18 QueryUnreceivedAcksResponse, QueryUnreceivedPacketsRequest, QueryUnreceivedPacketsResponse,
19 },
20 client::v1::{IdentifiedClientState, QueryClientStatesRequest, QueryClientStatesResponse},
21};
22use ibc_proto::ibc::core::{
23 channel::v1::{QueryConnectionChannelsRequest, QueryConnectionChannelsResponse},
24 connection::v1::{QueryConnectionsRequest, QueryConnectionsResponse},
25};
26use ibc_types::core::channel::IdentifiedChannelEnd;
27use ibc_types::core::channel::{ChannelId, PortId};
28use ibc_types::core::client::ClientId;
29use ibc_types::core::connection::ConnectionId;
30use ibc_types::core::connection::IdentifiedConnectionEnd;
31use penumbra_sdk_ibc::component::ChannelStateReadExt as _;
32use penumbra_sdk_ibc::component::ClientStateReadExt as _;
33use penumbra_sdk_ibc::component::ConnectionStateReadExt as _;
34use penumbra_sdk_ibc::component::HostInterface;
35use prost::Message;
36use std::str::FromStr;
37use tendermint::v0_37::abci::{
38 request,
39 response::{self, Echo},
40 InfoRequest, InfoResponse,
41};
42use tower_abci::BoxError;
43use tracing::Instrument;
44
45use penumbra_sdk_tower_trace::v037::RequestExt;
46
47const ABCI_INFO_VERSION: &str = env!("CARGO_PKG_VERSION");
48
49#[derive(Clone, Debug)]
54pub struct Info {
55 storage: Storage,
57 }
59
60impl Info {
61 pub fn new(storage: Storage) -> Self {
62 Self { storage }
63 }
64
65 async fn info(&self, info: request::Info) -> anyhow::Result<response::Info> {
66 let state = self.storage.latest_snapshot();
67 let last_block_height = PenumbraHost::get_block_height(&state)
76 .await
77 .unwrap_or_default()
80 .try_into()?;
81
82 let app_version = crate::APP_VERSION;
83
84 tracing::info!(?info, state_version = ?state.version(), last_block_height = ?last_block_height, ?app_version,"reporting height in info query");
85
86 let last_block_app_hash = state.root_hash().await?.0.to_vec().try_into()?;
87
88 Ok(response::Info {
89 data: "penumbra".to_string(),
90 version: ABCI_INFO_VERSION.to_string(),
91 app_version,
92 last_block_height,
93 last_block_app_hash,
94 })
95 }
96
97 async fn get_snapshot_for_height(
98 &self,
99 height: u64,
100 ) -> anyhow::Result<(cnidarium::Snapshot, u64)> {
101 match height {
102 0 => {
111 let height = self.storage.latest_snapshot().version().saturating_sub(1);
112 let snapshot = self
113 .storage
114 .snapshot(height)
115 .ok_or_else(|| anyhow::anyhow!("no snapshot of height {height}"))?;
116
117 Ok((snapshot, height))
118 }
119 height => {
120 let snapshot = self
121 .storage
122 .snapshot(height)
123 .ok_or_else(|| anyhow::anyhow!("no snapshot of height {height}"))?;
124
125 Ok((snapshot, height))
126 }
127 }
128 }
129
130 async fn query(&self, query: request::Query) -> anyhow::Result<response::Query> {
131 tracing::debug!("got query");
133
134 match query.path.as_str() {
135 "state/key" => {
136 let (snapshot, height) = self
137 .get_snapshot_for_height(u64::from(query.height))
138 .await?;
139
140 let key = hex::decode(&query.data).unwrap_or_else(|_| query.data.to_vec());
141
142 let (value, proof) =
143 snapshot
144 .get_with_proof(key.clone())
145 .await
146 .with_context(|| {
147 format!("failed to get key {}", String::from_utf8_lossy(&key))
148 })?;
149
150 let mut ops = vec![];
151 for commitment_proof in proof.proofs {
152 match commitment_proof
153 .clone()
154 .proof
155 .expect("should have non empty commitment proofs")
156 {
157 ics23::commitment_proof::Proof::Exist(x_proof) => {
158 let proof_op = tendermint::merkle::proof::ProofOp {
159 field_type: "jmt:v".to_string(),
160 key: x_proof.key,
161 data: commitment_proof.encode_to_vec(),
162 };
163 ops.push(proof_op);
164 }
165 ics23::commitment_proof::Proof::Nonexist(nx_proof) => {
166 let proof_op = tendermint::merkle::proof::ProofOp {
167 field_type: "jmt:v".to_string(),
168 key: nx_proof.key,
169 data: commitment_proof.encode_to_vec(),
170 };
171 ops.push(proof_op);
172 }
173 ics23::commitment_proof::Proof::Batch(_) => {
174 anyhow::bail!("batch proofs not supported in abci query")
175 }
176 ics23::commitment_proof::Proof::Compressed(_) => {
177 anyhow::bail!("compressed proofs not supported in abci query")
178 }
179 }
180 }
181 let proof_ops = tendermint::merkle::proof::ProofOps { ops };
182 let value = value.unwrap_or_else(Vec::new);
183
184 Ok(response::Query {
185 code: 0.into(),
186 key: query.data,
187 log: "".to_string(),
188 value: value.into(),
189 proof: Some(proof_ops),
190 height: height.try_into().context("failed to convert height")?,
191 codespace: "".to_string(),
192 info: "".to_string(),
193 index: 0,
194 })
195 }
196 "/ibc.core.connection.v1.Query/Connections" => {
197 let (snapshot, height) = self
198 .get_snapshot_for_height(u64::from(query.height))
199 .await?;
200
201 let _request = QueryConnectionsRequest::decode(query.data.clone())
203 .context("failed to decode QueryConnectionsRequest")?;
204
205 let connection_counter = snapshot.get_connection_counter().await?;
206
207 let mut connections = vec![];
208 for conn_idx in 0..connection_counter.0 {
209 let conn_id = ConnectionId(format!("connection-{}", conn_idx));
210 let connection = snapshot
211 .get_connection(&conn_id)
212 .await?
213 .context("couldn't find connection")?;
214 let id_conn = IdentifiedConnectionEnd {
215 connection_id: conn_id,
216 connection_end: connection,
217 };
218 connections.push(id_conn.into());
219 }
220
221 let res_value = QueryConnectionsResponse {
222 connections,
223 pagination: None,
224 height: None,
225 }
226 .encode_to_vec();
227
228 Ok(response::Query {
229 code: 0.into(),
230 key: query.data,
231 log: "".to_string(),
232 value: res_value.into(),
233 proof: None,
234 height: height.try_into().context("failed to convert height")?,
235 codespace: "".to_string(),
236 info: "".to_string(),
237 index: 0,
238 })
239 }
240 "/ibc.core.channel.v1.Query/Channels" => {
241 let (snapshot, height) = self
242 .get_snapshot_for_height(u64::from(query.height))
243 .await?;
244
245 let _request = QueryChannelsRequest::decode(query.data.clone())
247 .context("failed to decode QueryConnectionsRequest")?;
248
249 let channel_counter = snapshot.get_channel_counter().await?;
250
251 let mut channels = vec![];
252 for chan_idx in 0..channel_counter {
253 let chan_id = ChannelId(format!("channel-{}", chan_idx));
254 let channel = snapshot
255 .get_channel(&chan_id, &PortId::transfer())
256 .await?
257 .context("couldn't find channel")?;
258 let id_chan = IdentifiedChannelEnd {
259 channel_id: chan_id,
260 port_id: PortId::transfer(),
261 channel_end: channel,
262 upgrade_sequence: 0,
263 };
264 channels.push(id_chan.into());
265 }
266
267 let res_value = QueryChannelsResponse {
268 channels,
269 pagination: None,
270 height: None,
271 }
272 .encode_to_vec();
273
274 Ok(response::Query {
275 code: 0.into(),
276 key: query.data,
277 log: "".to_string(),
278 value: res_value.into(),
279 proof: None,
280 height: height.try_into().context("failed to convert height")?,
281 codespace: "".to_string(),
282 info: "".to_string(),
283 index: 0,
284 })
285 }
286 "/ibc.core.channel.v1.Query/ConnectionChannels" => {
287 let (snapshot, height) = self
288 .get_snapshot_for_height(u64::from(query.height))
289 .await?;
290
291 let request = QueryConnectionChannelsRequest::decode(query.data.clone())
292 .context("failed to decode QueryConnectionChannelsRequest")?;
293
294 let connection_id: ConnectionId = ConnectionId::from_str(&request.connection)
295 .context("couldn't decode connection id from request")?;
296
297 let channel_counter = snapshot.get_channel_counter().await?;
299
300 let mut channels = vec![];
301 for chan_idx in 0..channel_counter {
302 let chan_id = ChannelId(format!("channel-{}", chan_idx));
303 let channel = snapshot
304 .get_channel(&chan_id, &PortId::transfer())
305 .await?
306 .context("couldn't find channel")?;
307 if channel.connection_hops.contains(&connection_id) {
308 let id_chan = IdentifiedChannelEnd {
309 channel_id: chan_id,
310 port_id: PortId::transfer(),
311 channel_end: channel,
312 upgrade_sequence: 0,
313 };
314 channels.push(id_chan.into());
315 }
316 }
317
318 let res_value = QueryConnectionChannelsResponse {
319 channels,
320 pagination: None,
321 height: None,
322 }
323 .encode_to_vec();
324
325 Ok(response::Query {
326 code: 0.into(),
327 key: query.data,
328 log: "".to_string(),
329 value: res_value.into(),
330 proof: None,
331 height: height.try_into().context("failed to convert height")?,
332 codespace: "".to_string(),
333 info: "".to_string(),
334 index: 0,
335 })
336 }
337 "/ibc.core.client.v1.Query/ClientStates" => {
338 let (snapshot, height) = self
339 .get_snapshot_for_height(u64::from(query.height))
340 .await?;
341
342 let _request = QueryClientStatesRequest::decode(query.data.clone())
344 .context("failed to decode QueryClientStatesRequest")?;
345
346 let client_counter = snapshot.client_counter().await?.0;
347
348 let mut client_states = vec![];
349 for client_idx in 0..client_counter {
350 let client_id =
352 ClientId::from_str(format!("07-tendermint-{}", client_idx).as_str())?;
353 let client_state = snapshot.get_client_state(&client_id).await;
354 let id_client = IdentifiedClientState {
355 client_id: client_id.to_string(),
356 client_state: client_state.ok().map(|state| state.into()), };
358 client_states.push(id_client);
359 }
360
361 let res_value = QueryClientStatesResponse {
362 client_states,
363 pagination: None,
364 }
365 .encode_to_vec();
366
367 Ok(response::Query {
368 code: 0.into(),
369 key: query.data,
370 log: "".to_string(),
371 value: res_value.into(),
372 proof: None,
373 height: height.try_into().context("failed to convert height")?,
374 codespace: "".to_string(),
375 info: "".to_string(),
376 index: 0,
377 })
378 }
379 "/ibc.core.channel.v1.Query/PacketCommitments" => {
380 let (snapshot, height) = self
381 .get_snapshot_for_height(u64::from(query.height))
382 .await?;
383
384 let request = QueryPacketCommitmentsRequest::decode(query.data.clone())
385 .context("failed to decode QueryPacketCommitmentsRequest")?;
386
387 let chan_id: ChannelId =
388 ChannelId::from_str(&request.channel_id).context("invalid channel id")?;
389 let port_id: PortId =
390 PortId::from_str(&request.port_id).context("invalid port id")?;
391
392 let mut commitment_states = vec![];
393 let commitment_counter = snapshot.get_send_sequence(&chan_id, &port_id).await?;
394
395 for commitment_idx in 1..commitment_counter {
397 let commitment = snapshot
398 .get_packet_commitment_by_id(&chan_id, &port_id, commitment_idx)
399 .await?;
400 if commitment.is_none() {
401 continue;
402 }
403 let commitment = commitment.expect("commitment is Some");
404
405 let commitment_state = PacketState {
406 port_id: request.port_id.clone(),
407 channel_id: request.channel_id.clone(),
408 sequence: commitment_idx,
409 data: commitment.clone(),
410 };
411
412 commitment_states.push(commitment_state);
413 }
414
415 let res_value = QueryPacketCommitmentsResponse {
416 commitments: commitment_states,
417 pagination: None,
418 height: None,
419 }
420 .encode_to_vec();
421
422 Ok(response::Query {
423 code: 0.into(),
424 key: query.data,
425 log: "".to_string(),
426 value: res_value.into(),
427 proof: None,
428 height: height.try_into().context("failed to convert height")?,
429 codespace: "".to_string(),
430 info: "".to_string(),
431 index: 0,
432 })
433 }
434 "/ibc.core.channel.v1.Query/PacketAcknowledgements" => {
435 let (snapshot, height) = self
436 .get_snapshot_for_height(u64::from(query.height))
437 .await?;
438
439 let request = QueryPacketAcknowledgementsRequest::decode(query.data.clone())
440 .context("failed to decode QueryPacketAcknowledgementsRequest")?;
441
442 let chan_id: ChannelId =
443 ChannelId::from_str(&request.channel_id).context("invalid channel id")?;
444 let port_id: PortId =
445 PortId::from_str(&request.port_id).context("invalid port id")?;
446
447 let ack_counter = snapshot.get_ack_sequence(&chan_id, &port_id).await?;
448
449 let mut acks = vec![];
450 for ack_idx in 0..ack_counter {
451 let ack = snapshot
452 .get_packet_acknowledgement(&port_id, &chan_id, ack_idx)
453 .await?
454 .ok_or_else(|| anyhow::anyhow!("couldn't find ack"))?;
455
456 let ack_state = PacketState {
457 port_id: request.port_id.clone(),
458 channel_id: request.channel_id.clone(),
459 sequence: ack_idx,
460 data: ack.clone(),
461 };
462
463 acks.push(ack_state);
464 }
465
466 let res_value = QueryPacketAcknowledgementsResponse {
467 acknowledgements: acks,
468 pagination: None,
469 height: None,
470 }
471 .encode_to_vec();
472
473 Ok(response::Query {
474 code: 0.into(),
475 key: query.data,
476 log: "".to_string(),
477 value: res_value.into(),
478 proof: None,
479 height: height.try_into().context("failed to convert height")?,
480 codespace: "".to_string(),
481 info: "".to_string(),
482 index: 0,
483 })
484 }
485
486 "/ibc.core.channel.v1.Query/UnreceivedPackets" => {
505 let (snapshot, height) = self
506 .get_snapshot_for_height(u64::from(query.height))
507 .await?;
508
509 let request = QueryUnreceivedPacketsRequest::decode(query.data.clone())
510 .context("failed to decode QueryUnreceivedPacketsRequest")?;
511
512 let chan_id: ChannelId =
513 ChannelId::from_str(&request.channel_id).context("invalid channel id")?;
514 let port_id: PortId =
515 PortId::from_str(&request.port_id).context("invalid port id")?;
516
517 let mut unreceived_seqs = vec![];
518
519 for seq in request.packet_commitment_sequences {
520 if seq == 0 {
521 anyhow::bail!("packet sequence {} cannot be 0", seq);
522 }
523
524 if !snapshot
525 .seen_packet_by_channel(&chan_id, &port_id, seq)
526 .await?
527 {
528 unreceived_seqs.push(seq);
529 }
530 }
531
532 let res_value = QueryUnreceivedPacketsResponse {
533 sequences: unreceived_seqs,
534 height: None,
535 }
536 .encode_to_vec();
537
538 Ok(response::Query {
539 code: 0.into(),
540 key: query.data,
541 log: "".to_string(),
542 value: res_value.into(),
543 proof: None,
544 height: height.try_into().context("failed to convert height")?,
545 codespace: "".to_string(),
546 info: "".to_string(),
547 index: 0,
548 })
549 }
550
551 "/ibc.core.channel.v1.Query/UnreceivedAcks" => {
552 let (snapshot, height) = self
553 .get_snapshot_for_height(u64::from(query.height))
554 .await?;
555
556 let request = QueryUnreceivedAcksRequest::decode(query.data.clone())
557 .context("failed to decode QueryUnreceivedAcksRequest")?;
558
559 let chan_id: ChannelId =
560 ChannelId::from_str(&request.channel_id).context("invalid channel id")?;
561 let port_id: PortId =
562 PortId::from_str(&request.port_id).context("invalid port id")?;
563
564 let mut unreceived_seqs = vec![];
565
566 for seq in request.packet_ack_sequences {
567 if seq == 0 {
568 anyhow::bail!("packet sequence {} cannot be 0", seq);
569 }
570
571 if snapshot
572 .get_packet_commitment_by_id(&chan_id, &port_id, seq)
573 .await?
574 .is_some()
575 {
576 unreceived_seqs.push(seq);
577 }
578 }
579
580 let res_value = QueryUnreceivedAcksResponse {
581 sequences: unreceived_seqs,
582 height: None,
583 }
584 .encode_to_vec();
585
586 Ok(response::Query {
587 code: 0.into(),
588 key: query.data,
589 log: "".to_string(),
590 value: res_value.into(),
591 proof: None,
592 height: height.try_into().context("failed to convert height")?,
593 codespace: "".to_string(),
594 info: "".to_string(),
595 index: 0,
596 })
597 }
598
599 _ => Err(anyhow::anyhow!(
600 "requested unrecognized path in ABCI query: {}",
601 query.path
602 )),
603 }
604 }
605}
606
607impl tower_service::Service<InfoRequest> for Info {
608 type Response = InfoResponse;
609 type Error = BoxError;
610 type Future = Pin<Box<dyn Future<Output = Result<InfoResponse, BoxError>> + Send + 'static>>;
611
612 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
613 Poll::Ready(Ok(()))
614 }
615
616 fn call(&mut self, req: InfoRequest) -> Self::Future {
617 let span = req.create_span();
618 let self2 = self.clone();
619
620 async move {
621 match req {
622 InfoRequest::Info(info) => self2
623 .info(info)
624 .await
625 .map(InfoResponse::Info)
626 .map_err(Into::into),
627 InfoRequest::Query(query) => match self2.query(query).await {
628 Ok(rsp) => {
629 tracing::debug!(value = ?rsp.value);
630 Ok(InfoResponse::Query(rsp))
631 }
632 Err(e) => {
633 tracing::debug!(error = ?e);
634 Ok(InfoResponse::Query(response::Query {
635 code: 1.into(),
636 log: format!("{:#}", e),
637 ..Default::default()
638 }))
639 }
640 },
641 InfoRequest::Echo(echo) => Ok(InfoResponse::Echo(Echo {
642 message: echo.message,
643 })),
644 }
645 }
646 .instrument(span)
647 .boxed()
648 }
649}