penumbra_sdk_app/server/
events.rs

1use std::{future::Future, pin::Pin, task::Context};
2
3use anyhow::Result;
4use futures::FutureExt;
5use regex::RegexSet;
6use tendermint::abci::Event;
7use tendermint::v0_37::abci::{ConsensusRequest as Request, ConsensusResponse as Response};
8use tower::{Layer, Service};
9
10#[derive(Debug, Clone)]
11pub struct EventIndex<S> {
12    svc: S,
13    config: EventIndexLayer,
14}
15
16#[derive(Debug, Clone, Default)]
17pub struct EventIndexLayer {
18    /// A set of regexes matching event keys that should be set to `index=true`.
19    ///
20    /// Takes priority over `no_index` matches.
21    pub index: RegexSet,
22    /// A set of regexes matching event keys that should be set to `index=false`.
23    pub no_index: RegexSet,
24}
25
26impl EventIndexLayer {
27    /// Convenience constructor to force every event attribute to be indexed.
28    pub fn index_all() -> Self {
29        Self {
30            index: RegexSet::new([""]).expect("empty regex should always parse"),
31            no_index: RegexSet::empty(),
32        }
33    }
34
35    fn adjust_events(&self, events: &mut [Event]) {
36        for e in events.iter_mut() {
37            for attr in e.attributes.iter_mut() {
38                // Perform matching on a nested key in the same format used by
39                // the cosmos SDK: https://docs.cosmos.network/main/core/config
40                // e.g., "message.sender", "message.recipient"
41
42                match attr.key_str() {
43                    Ok(key) => {
44                        let nested_key = format!("{}.{}", e.kind, key);
45
46                        if self.no_index.is_match(&nested_key) {
47                            attr.set_index(false);
48                        }
49
50                        // This comes second so that explicit index requests take priority over no-index requests.
51                        if self.index.is_match(&nested_key) {
52                            attr.set_index(true);
53                        }
54                    }
55                    _ => {
56                        // The key is not valid UTF-8, so we can't match it, we skip it.
57                        // This should be unreachable, as the key is always a valid UTF-8 string
58                        // for tendermint/cometbft > 0.34
59                        tracing::warn!("event attribute key is not valid UTF-8");
60                    }
61                }
62            }
63        }
64    }
65}
66
67impl<S> Layer<S> for EventIndexLayer
68where
69    S: Service<Request, Response = Response>,
70    S::Future: Send + 'static,
71{
72    type Service = EventIndex<S>;
73
74    fn layer(&self, inner: S) -> Self::Service {
75        EventIndex {
76            svc: inner,
77            config: self.clone(),
78        }
79    }
80}
81
82impl<S> Service<Request> for EventIndex<S>
83where
84    S: Service<Request, Response = Response>,
85    S::Future: Send + 'static,
86{
87    type Error = S::Error;
88    type Response = Response;
89    type Future =
90        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
91
92    fn poll_ready(
93        &mut self,
94        cx: &mut Context<'_>,
95    ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
96        self.svc.poll_ready(cx)
97    }
98
99    fn call(&mut self, req: Request) -> Self::Future {
100        let rsp = self.svc.call(req);
101        let config = self.config.clone();
102
103        async move {
104            let mut rsp = rsp.await?;
105            match rsp {
106                // No events.
107                Response::InitChain(_) => {}
108                Response::Commit(_) => {}
109                Response::PrepareProposal(_) => {}
110                Response::ProcessProposal(_) => {}
111                // These responses have events.
112                Response::BeginBlock(ref mut msg) => config.adjust_events(&mut msg.events),
113                Response::DeliverTx(ref mut msg) => config.adjust_events(&mut msg.events),
114                Response::EndBlock(ref mut msg) => config.adjust_events(&mut msg.events),
115            }
116            Ok(rsp)
117        }
118        .boxed()
119    }
120}