penumbra_sdk_app/server/
info.rs

1//! Logic for enabling `pd` to interact with chain state.
2use std::{
3    future::Future,
4    pin::Pin,
5    task::{Context, Poll},
6    vec,
7};
8
9use crate::PenumbraHost;
10use anyhow::Context as _;
11use cnidarium::Storage;
12use futures::FutureExt;
13use ibc_proto::ibc::core::{
14    channel::v1::{
15        PacketState, QueryChannelsRequest, QueryChannelsResponse,
16        QueryPacketAcknowledgementsRequest, QueryPacketAcknowledgementsResponse,
17        QueryPacketCommitmentsRequest, QueryPacketCommitmentsResponse, QueryUnreceivedAcksRequest,
18        QueryUnreceivedAcksResponse, QueryUnreceivedPacketsRequest, QueryUnreceivedPacketsResponse,
19    },
20    client::v1::{IdentifiedClientState, QueryClientStatesRequest, QueryClientStatesResponse},
21};
22use ibc_proto::ibc::core::{
23    channel::v1::{QueryConnectionChannelsRequest, QueryConnectionChannelsResponse},
24    connection::v1::{QueryConnectionsRequest, QueryConnectionsResponse},
25};
26use ibc_types::core::channel::IdentifiedChannelEnd;
27use ibc_types::core::channel::{ChannelId, PortId};
28use ibc_types::core::client::ClientId;
29use ibc_types::core::connection::ConnectionId;
30use ibc_types::core::connection::IdentifiedConnectionEnd;
31use penumbra_sdk_ibc::component::ChannelStateReadExt as _;
32use penumbra_sdk_ibc::component::ClientStateReadExt as _;
33use penumbra_sdk_ibc::component::ConnectionStateReadExt as _;
34use penumbra_sdk_ibc::component::HostInterface;
35use prost::Message;
36use std::str::FromStr;
37use tendermint::v0_37::abci::{
38    request,
39    response::{self, Echo},
40    InfoRequest, InfoResponse,
41};
42use tower_abci::BoxError;
43use tracing::Instrument;
44
45use penumbra_sdk_tower_trace::v037::RequestExt;
46
47const ABCI_INFO_VERSION: &str = env!("CARGO_PKG_VERSION");
48
49/// Implements service traits for Tonic gRPC services.
50///
51/// The fields of this struct are the configuration and data
52/// necessary to the gRPC services.
53#[derive(Clone, Debug)]
54pub struct Info {
55    /// Storage interface for retrieving chain state.
56    storage: Storage,
57    // height_rx: watch::Receiver<block::Height>,
58}
59
60impl Info {
61    pub fn new(storage: Storage) -> Self {
62        Self { storage }
63    }
64
65    async fn info(&self, info: request::Info) -> anyhow::Result<response::Info> {
66        let state = self.storage.latest_snapshot();
67        // Previously, we used the latest version of the JMT state to
68        // report the current block height. This worked well because
69        // the JMT version and the current height were always aligned.
70        // However, adding support for genesis migrations breaks this
71        // invariant because there is a pregenesis (aka. phantom)
72        // block that occurs immediately after an upgrade. This block
73        // has height 0. To support this case, we report back the height
74        // that is stored in the state store.
75        let last_block_height = PenumbraHost::get_block_height(&state)
76            .await
77            // if we can't get the block height, we're in pregenesis
78            // in that case we want to report 0 to cometbft.
79            .unwrap_or_default()
80            .try_into()?;
81
82        let app_version = crate::APP_VERSION;
83
84        tracing::info!(?info, state_version = ?state.version(), last_block_height = ?last_block_height, ?app_version,"reporting height in info query");
85
86        let last_block_app_hash = state.root_hash().await?.0.to_vec().try_into()?;
87
88        Ok(response::Info {
89            data: "penumbra".to_string(),
90            version: ABCI_INFO_VERSION.to_string(),
91            app_version,
92            last_block_height,
93            last_block_app_hash,
94        })
95    }
96
97    async fn get_snapshot_for_height(
98        &self,
99        height: u64,
100    ) -> anyhow::Result<(cnidarium::Snapshot, u64)> {
101        match height {
102            // ABCI docs say:
103            //
104            // The default `0` returns data for the latest committed block. Note that
105            // this is the height of the block containing the application's Merkle root
106            // hash, which represents the state as it was after committing the block at
107            // `height - 1`.
108            //
109            // Try to do this behavior by querying at height = latest-1.
110            0 => {
111                let height = self.storage.latest_snapshot().version().saturating_sub(1);
112                let snapshot = self
113                    .storage
114                    .snapshot(height)
115                    .ok_or_else(|| anyhow::anyhow!("no snapshot of height {height}"))?;
116
117                Ok((snapshot, height))
118            }
119            height => {
120                let snapshot = self
121                    .storage
122                    .snapshot(height)
123                    .ok_or_else(|| anyhow::anyhow!("no snapshot of height {height}"))?;
124
125                Ok((snapshot, height))
126            }
127        }
128    }
129
130    async fn query(&self, query: request::Query) -> anyhow::Result<response::Query> {
131        // The other query params are already in the span, so we just need to emit an event.
132        tracing::debug!("got query");
133
134        match query.path.as_str() {
135            "state/key" => {
136                let (snapshot, height) = self
137                    .get_snapshot_for_height(u64::from(query.height))
138                    .await?;
139
140                let key = hex::decode(&query.data).unwrap_or_else(|_| query.data.to_vec());
141
142                let (value, proof) =
143                    snapshot
144                        .get_with_proof(key.clone())
145                        .await
146                        .with_context(|| {
147                            format!("failed to get key {}", String::from_utf8_lossy(&key))
148                        })?;
149
150                let mut ops = vec![];
151                for commitment_proof in proof.proofs {
152                    match commitment_proof
153                        .clone()
154                        .proof
155                        .expect("should have non empty commitment proofs")
156                    {
157                        ics23::commitment_proof::Proof::Exist(x_proof) => {
158                            let proof_op = tendermint::merkle::proof::ProofOp {
159                                field_type: "jmt:v".to_string(),
160                                key: x_proof.key,
161                                data: commitment_proof.encode_to_vec(),
162                            };
163                            ops.push(proof_op);
164                        }
165                        ics23::commitment_proof::Proof::Nonexist(nx_proof) => {
166                            let proof_op = tendermint::merkle::proof::ProofOp {
167                                field_type: "jmt:v".to_string(),
168                                key: nx_proof.key,
169                                data: commitment_proof.encode_to_vec(),
170                            };
171                            ops.push(proof_op);
172                        }
173                        ics23::commitment_proof::Proof::Batch(_) => {
174                            anyhow::bail!("batch proofs not supported in abci query")
175                        }
176                        ics23::commitment_proof::Proof::Compressed(_) => {
177                            anyhow::bail!("compressed proofs not supported in abci query")
178                        }
179                    }
180                }
181                let proof_ops = tendermint::merkle::proof::ProofOps { ops };
182                let value = value.unwrap_or_else(Vec::new);
183
184                Ok(response::Query {
185                    code: 0.into(),
186                    key: query.data,
187                    log: "".to_string(),
188                    value: value.into(),
189                    proof: Some(proof_ops),
190                    height: height.try_into().context("failed to convert height")?,
191                    codespace: "".to_string(),
192                    info: "".to_string(),
193                    index: 0,
194                })
195            }
196            "/ibc.core.connection.v1.Query/Connections" => {
197                let (snapshot, height) = self
198                    .get_snapshot_for_height(u64::from(query.height))
199                    .await?;
200
201                // TODO: handle request.pagination
202                let _request = QueryConnectionsRequest::decode(query.data.clone())
203                    .context("failed to decode QueryConnectionsRequest")?;
204
205                let connection_counter = snapshot.get_connection_counter().await?;
206
207                let mut connections = vec![];
208                for conn_idx in 0..connection_counter.0 {
209                    let conn_id = ConnectionId(format!("connection-{}", conn_idx));
210                    let connection = snapshot
211                        .get_connection(&conn_id)
212                        .await?
213                        .context("couldn't find connection")?;
214                    let id_conn = IdentifiedConnectionEnd {
215                        connection_id: conn_id,
216                        connection_end: connection,
217                    };
218                    connections.push(id_conn.into());
219                }
220
221                let res_value = QueryConnectionsResponse {
222                    connections,
223                    pagination: None,
224                    height: None,
225                }
226                .encode_to_vec();
227
228                Ok(response::Query {
229                    code: 0.into(),
230                    key: query.data,
231                    log: "".to_string(),
232                    value: res_value.into(),
233                    proof: None,
234                    height: height.try_into().context("failed to convert height")?,
235                    codespace: "".to_string(),
236                    info: "".to_string(),
237                    index: 0,
238                })
239            }
240            "/ibc.core.channel.v1.Query/Channels" => {
241                let (snapshot, height) = self
242                    .get_snapshot_for_height(u64::from(query.height))
243                    .await?;
244
245                // TODO: handle request.pagination
246                let _request = QueryChannelsRequest::decode(query.data.clone())
247                    .context("failed to decode QueryConnectionsRequest")?;
248
249                let channel_counter = snapshot.get_channel_counter().await?;
250
251                let mut channels = vec![];
252                for chan_idx in 0..channel_counter {
253                    let chan_id = ChannelId(format!("channel-{}", chan_idx));
254                    let channel = snapshot
255                        .get_channel(&chan_id, &PortId::transfer())
256                        .await?
257                        .context("couldn't find channel")?;
258                    let id_chan = IdentifiedChannelEnd {
259                        channel_id: chan_id,
260                        port_id: PortId::transfer(),
261                        channel_end: channel,
262                        upgrade_sequence: 0,
263                    };
264                    channels.push(id_chan.into());
265                }
266
267                let res_value = QueryChannelsResponse {
268                    channels,
269                    pagination: None,
270                    height: None,
271                }
272                .encode_to_vec();
273
274                Ok(response::Query {
275                    code: 0.into(),
276                    key: query.data,
277                    log: "".to_string(),
278                    value: res_value.into(),
279                    proof: None,
280                    height: height.try_into().context("failed to convert height")?,
281                    codespace: "".to_string(),
282                    info: "".to_string(),
283                    index: 0,
284                })
285            }
286            "/ibc.core.channel.v1.Query/ConnectionChannels" => {
287                let (snapshot, height) = self
288                    .get_snapshot_for_height(u64::from(query.height))
289                    .await?;
290
291                let request = QueryConnectionChannelsRequest::decode(query.data.clone())
292                    .context("failed to decode QueryConnectionChannelsRequest")?;
293
294                let connection_id: ConnectionId = ConnectionId::from_str(&request.connection)
295                    .context("couldn't decode connection id from request")?;
296
297                // look up all of the channels for this connection
298                let channel_counter = snapshot.get_channel_counter().await?;
299
300                let mut channels = vec![];
301                for chan_idx in 0..channel_counter {
302                    let chan_id = ChannelId(format!("channel-{}", chan_idx));
303                    let channel = snapshot
304                        .get_channel(&chan_id, &PortId::transfer())
305                        .await?
306                        .context("couldn't find channel")?;
307                    if channel.connection_hops.contains(&connection_id) {
308                        let id_chan = IdentifiedChannelEnd {
309                            channel_id: chan_id,
310                            port_id: PortId::transfer(),
311                            channel_end: channel,
312                            upgrade_sequence: 0,
313                        };
314                        channels.push(id_chan.into());
315                    }
316                }
317
318                let res_value = QueryConnectionChannelsResponse {
319                    channels,
320                    pagination: None,
321                    height: None,
322                }
323                .encode_to_vec();
324
325                Ok(response::Query {
326                    code: 0.into(),
327                    key: query.data,
328                    log: "".to_string(),
329                    value: res_value.into(),
330                    proof: None,
331                    height: height.try_into().context("failed to convert height")?,
332                    codespace: "".to_string(),
333                    info: "".to_string(),
334                    index: 0,
335                })
336            }
337            "/ibc.core.client.v1.Query/ClientStates" => {
338                let (snapshot, height) = self
339                    .get_snapshot_for_height(u64::from(query.height))
340                    .await?;
341
342                // TODO; handle request.pagination
343                let _request = QueryClientStatesRequest::decode(query.data.clone())
344                    .context("failed to decode QueryClientStatesRequest")?;
345
346                let client_counter = snapshot.client_counter().await?.0;
347
348                let mut client_states = vec![];
349                for client_idx in 0..client_counter {
350                    // NOTE: currently, we only look up tendermint clients, because we only support tendermint clients.
351                    let client_id =
352                        ClientId::from_str(format!("07-tendermint-{}", client_idx).as_str())?;
353                    let client_state = snapshot.get_client_state(&client_id).await;
354                    let id_client = IdentifiedClientState {
355                        client_id: client_id.to_string(),
356                        client_state: client_state.ok().map(|state| state.into()), // send None if we couldn't find the client state
357                    };
358                    client_states.push(id_client);
359                }
360
361                let res_value = QueryClientStatesResponse {
362                    client_states,
363                    pagination: None,
364                }
365                .encode_to_vec();
366
367                Ok(response::Query {
368                    code: 0.into(),
369                    key: query.data,
370                    log: "".to_string(),
371                    value: res_value.into(),
372                    proof: None,
373                    height: height.try_into().context("failed to convert height")?,
374                    codespace: "".to_string(),
375                    info: "".to_string(),
376                    index: 0,
377                })
378            }
379            "/ibc.core.channel.v1.Query/PacketCommitments" => {
380                let (snapshot, height) = self
381                    .get_snapshot_for_height(u64::from(query.height))
382                    .await?;
383
384                let request = QueryPacketCommitmentsRequest::decode(query.data.clone())
385                    .context("failed to decode QueryPacketCommitmentsRequest")?;
386
387                let chan_id: ChannelId =
388                    ChannelId::from_str(&request.channel_id).context("invalid channel id")?;
389                let port_id: PortId =
390                    PortId::from_str(&request.port_id).context("invalid port id")?;
391
392                let mut commitment_states = vec![];
393                let commitment_counter = snapshot.get_send_sequence(&chan_id, &port_id).await?;
394
395                // this starts at 1; the first commitment index is 1 (from ibc spec)
396                for commitment_idx in 1..commitment_counter {
397                    let commitment = snapshot
398                        .get_packet_commitment_by_id(&chan_id, &port_id, commitment_idx)
399                        .await?;
400                    if commitment.is_none() {
401                        continue;
402                    }
403                    let commitment = commitment.expect("commitment is Some");
404
405                    let commitment_state = PacketState {
406                        port_id: request.port_id.clone(),
407                        channel_id: request.channel_id.clone(),
408                        sequence: commitment_idx,
409                        data: commitment.clone(),
410                    };
411
412                    commitment_states.push(commitment_state);
413                }
414
415                let res_value = QueryPacketCommitmentsResponse {
416                    commitments: commitment_states,
417                    pagination: None,
418                    height: None,
419                }
420                .encode_to_vec();
421
422                Ok(response::Query {
423                    code: 0.into(),
424                    key: query.data,
425                    log: "".to_string(),
426                    value: res_value.into(),
427                    proof: None,
428                    height: height.try_into().context("failed to convert height")?,
429                    codespace: "".to_string(),
430                    info: "".to_string(),
431                    index: 0,
432                })
433            }
434            "/ibc.core.channel.v1.Query/PacketAcknowledgements" => {
435                let (snapshot, height) = self
436                    .get_snapshot_for_height(u64::from(query.height))
437                    .await?;
438
439                let request = QueryPacketAcknowledgementsRequest::decode(query.data.clone())
440                    .context("failed to decode QueryPacketAcknowledgementsRequest")?;
441
442                let chan_id: ChannelId =
443                    ChannelId::from_str(&request.channel_id).context("invalid channel id")?;
444                let port_id: PortId =
445                    PortId::from_str(&request.port_id).context("invalid port id")?;
446
447                let ack_counter = snapshot.get_ack_sequence(&chan_id, &port_id).await?;
448
449                let mut acks = vec![];
450                for ack_idx in 0..ack_counter {
451                    let ack = snapshot
452                        .get_packet_acknowledgement(&port_id, &chan_id, ack_idx)
453                        .await?
454                        .ok_or_else(|| anyhow::anyhow!("couldn't find ack"))?;
455
456                    let ack_state = PacketState {
457                        port_id: request.port_id.clone(),
458                        channel_id: request.channel_id.clone(),
459                        sequence: ack_idx,
460                        data: ack.clone(),
461                    };
462
463                    acks.push(ack_state);
464                }
465
466                let res_value = QueryPacketAcknowledgementsResponse {
467                    acknowledgements: acks,
468                    pagination: None,
469                    height: None,
470                }
471                .encode_to_vec();
472
473                Ok(response::Query {
474                    code: 0.into(),
475                    key: query.data,
476                    log: "".to_string(),
477                    value: res_value.into(),
478                    proof: None,
479                    height: height.try_into().context("failed to convert height")?,
480                    codespace: "".to_string(),
481                    info: "".to_string(),
482                    index: 0,
483                })
484            }
485
486            // docstring from ibc-go:
487            //
488            // UnreceivedPackets implements the Query/UnreceivedPackets gRPC method. Given
489            // a list of counterparty packet commitments, the querier checks if the packet
490            // has already been received by checking if a receipt exists on this
491            // chain for the packet sequence. All packets that haven't been received yet
492            // are returned in the response
493            // Usage: To use this method correctly, first query all packet commitments on
494            // the sending chain using the Query/PacketCommitments gRPC method.
495            // Then input the returned sequences into the QueryUnreceivedPacketsRequest
496            // and send the request to this Query/UnreceivedPackets on the **receiving**
497            // chain. This gRPC method will then return the list of packet sequences that
498            // are yet to be received on the receiving chain.
499            //
500            // NOTE: The querier makes the assumption that the provided list of packet
501            // commitments is correct and will not function properly if the list
502            // is not up to date. Ideally the query height should equal the latest height
503            // on the counterparty's client which represents this chain
504            "/ibc.core.channel.v1.Query/UnreceivedPackets" => {
505                let (snapshot, height) = self
506                    .get_snapshot_for_height(u64::from(query.height))
507                    .await?;
508
509                let request = QueryUnreceivedPacketsRequest::decode(query.data.clone())
510                    .context("failed to decode QueryUnreceivedPacketsRequest")?;
511
512                let chan_id: ChannelId =
513                    ChannelId::from_str(&request.channel_id).context("invalid channel id")?;
514                let port_id: PortId =
515                    PortId::from_str(&request.port_id).context("invalid port id")?;
516
517                let mut unreceived_seqs = vec![];
518
519                for seq in request.packet_commitment_sequences {
520                    if seq == 0 {
521                        anyhow::bail!("packet sequence {} cannot be 0", seq);
522                    }
523
524                    if !snapshot
525                        .seen_packet_by_channel(&chan_id, &port_id, seq)
526                        .await?
527                    {
528                        unreceived_seqs.push(seq);
529                    }
530                }
531
532                let res_value = QueryUnreceivedPacketsResponse {
533                    sequences: unreceived_seqs,
534                    height: None,
535                }
536                .encode_to_vec();
537
538                Ok(response::Query {
539                    code: 0.into(),
540                    key: query.data,
541                    log: "".to_string(),
542                    value: res_value.into(),
543                    proof: None,
544                    height: height.try_into().context("failed to convert height")?,
545                    codespace: "".to_string(),
546                    info: "".to_string(),
547                    index: 0,
548                })
549            }
550
551            "/ibc.core.channel.v1.Query/UnreceivedAcks" => {
552                let (snapshot, height) = self
553                    .get_snapshot_for_height(u64::from(query.height))
554                    .await?;
555
556                let request = QueryUnreceivedAcksRequest::decode(query.data.clone())
557                    .context("failed to decode QueryUnreceivedAcksRequest")?;
558
559                let chan_id: ChannelId =
560                    ChannelId::from_str(&request.channel_id).context("invalid channel id")?;
561                let port_id: PortId =
562                    PortId::from_str(&request.port_id).context("invalid port id")?;
563
564                let mut unreceived_seqs = vec![];
565
566                for seq in request.packet_ack_sequences {
567                    if seq == 0 {
568                        anyhow::bail!("packet sequence {} cannot be 0", seq);
569                    }
570
571                    if snapshot
572                        .get_packet_commitment_by_id(&chan_id, &port_id, seq)
573                        .await?
574                        .is_some()
575                    {
576                        unreceived_seqs.push(seq);
577                    }
578                }
579
580                let res_value = QueryUnreceivedAcksResponse {
581                    sequences: unreceived_seqs,
582                    height: None,
583                }
584                .encode_to_vec();
585
586                Ok(response::Query {
587                    code: 0.into(),
588                    key: query.data,
589                    log: "".to_string(),
590                    value: res_value.into(),
591                    proof: None,
592                    height: height.try_into().context("failed to convert height")?,
593                    codespace: "".to_string(),
594                    info: "".to_string(),
595                    index: 0,
596                })
597            }
598
599            _ => Err(anyhow::anyhow!(
600                "requested unrecognized path in ABCI query: {}",
601                query.path
602            )),
603        }
604    }
605}
606
607impl tower_service::Service<InfoRequest> for Info {
608    type Response = InfoResponse;
609    type Error = BoxError;
610    type Future = Pin<Box<dyn Future<Output = Result<InfoResponse, BoxError>> + Send + 'static>>;
611
612    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
613        Poll::Ready(Ok(()))
614    }
615
616    fn call(&mut self, req: InfoRequest) -> Self::Future {
617        let span = req.create_span();
618        let self2 = self.clone();
619
620        async move {
621            match req {
622                InfoRequest::Info(info) => self2
623                    .info(info)
624                    .await
625                    .map(InfoResponse::Info)
626                    .map_err(Into::into),
627                InfoRequest::Query(query) => match self2.query(query).await {
628                    Ok(rsp) => {
629                        tracing::debug!(value = ?rsp.value);
630                        Ok(InfoResponse::Query(rsp))
631                    }
632                    Err(e) => {
633                        tracing::debug!(error = ?e);
634                        Ok(InfoResponse::Query(response::Query {
635                            code: 1.into(),
636                            log: format!("{:#}", e),
637                            ..Default::default()
638                        }))
639                    }
640                },
641                InfoRequest::Echo(echo) => Ok(InfoResponse::Echo(Echo {
642                    message: echo.message,
643                })),
644            }
645        }
646        .instrument(span)
647        .boxed()
648    }
649}