penumbra_app/server/
events.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use std::{future::Future, pin::Pin, task::Context};

use anyhow::Result;
use futures::FutureExt;
use regex::RegexSet;
use tendermint::abci::Event;
use tendermint::v0_37::abci::{ConsensusRequest as Request, ConsensusResponse as Response};
use tower::{Layer, Service};

#[derive(Debug, Clone)]
pub struct EventIndex<S> {
    svc: S,
    config: EventIndexLayer,
}

#[derive(Debug, Clone, Default)]
pub struct EventIndexLayer {
    /// A set of regexes matching event keys that should be set to `index=true`.
    ///
    /// Takes priority over `no_index` matches.
    pub index: RegexSet,
    /// A set of regexes matching event keys that should be set to `index=false`.
    pub no_index: RegexSet,
}

impl EventIndexLayer {
    /// Convenience constructor to force every event attribute to be indexed.
    pub fn index_all() -> Self {
        Self {
            index: RegexSet::new([""]).expect("empty regex should always parse"),
            no_index: RegexSet::empty(),
        }
    }

    fn adjust_events(&self, events: &mut [Event]) {
        for e in events.iter_mut() {
            for attr in e.attributes.iter_mut() {
                // Perform matching on a nested key in the same format used by
                // the cosmos SDK: https://docs.cosmos.network/main/core/config
                // e.g., "message.sender", "message.recipient"
                let nested_key = format!("{}.{}", e.kind, attr.key);

                if self.no_index.is_match(&nested_key) {
                    attr.index = false;
                }
                // This comes second so that explicit index requests take priority over no-index requests.
                if self.index.is_match(&nested_key) {
                    attr.index = true;
                }
            }
        }
    }
}

impl<S> Layer<S> for EventIndexLayer
where
    S: Service<Request, Response = Response>,
    S::Future: Send + 'static,
{
    type Service = EventIndex<S>;

    fn layer(&self, inner: S) -> Self::Service {
        EventIndex {
            svc: inner,
            config: self.clone(),
        }
    }
}

impl<S> Service<Request> for EventIndex<S>
where
    S: Service<Request, Response = Response>,
    S::Future: Send + 'static,
{
    type Error = S::Error;
    type Response = Response;
    type Future =
        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

    fn poll_ready(
        &mut self,
        cx: &mut Context<'_>,
    ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
        self.svc.poll_ready(cx)
    }

    fn call(&mut self, req: Request) -> Self::Future {
        let rsp = self.svc.call(req);
        let config = self.config.clone();

        async move {
            let mut rsp = rsp.await?;
            match rsp {
                // No events.
                Response::InitChain(_) => {}
                Response::Commit(_) => {}
                Response::PrepareProposal(_) => {}
                Response::ProcessProposal(_) => {}
                // These responses have events.
                Response::BeginBlock(ref mut msg) => config.adjust_events(&mut msg.events),
                Response::DeliverTx(ref mut msg) => config.adjust_events(&mut msg.events),
                Response::EndBlock(ref mut msg) => config.adjust_events(&mut msg.events),
            }
            Ok(rsp)
        }
        .boxed()
    }
}