penumbra_sdk_app/server/
events.rs1use std::{future::Future, pin::Pin, task::Context};
2
3use anyhow::Result;
4use futures::FutureExt;
5use regex::RegexSet;
6use tendermint::abci::Event;
7use tendermint::v0_37::abci::{ConsensusRequest as Request, ConsensusResponse as Response};
8use tower::{Layer, Service};
9
10#[derive(Debug, Clone)]
11pub struct EventIndex<S> {
12 svc: S,
13 config: EventIndexLayer,
14}
15
16#[derive(Debug, Clone, Default)]
17pub struct EventIndexLayer {
18 pub index: RegexSet,
22 pub no_index: RegexSet,
24}
25
26impl EventIndexLayer {
27 pub fn index_all() -> Self {
29 Self {
30 index: RegexSet::new([""]).expect("empty regex should always parse"),
31 no_index: RegexSet::empty(),
32 }
33 }
34
35 fn adjust_events(&self, events: &mut [Event]) {
36 for e in events.iter_mut() {
37 for attr in e.attributes.iter_mut() {
38 match attr.key_str() {
43 Ok(key) => {
44 let nested_key = format!("{}.{}", e.kind, key);
45
46 if self.no_index.is_match(&nested_key) {
47 attr.set_index(false);
48 }
49
50 if self.index.is_match(&nested_key) {
52 attr.set_index(true);
53 }
54 }
55 _ => {
56 tracing::warn!("event attribute key is not valid UTF-8");
60 }
61 }
62 }
63 }
64 }
65}
66
67impl<S> Layer<S> for EventIndexLayer
68where
69 S: Service<Request, Response = Response>,
70 S::Future: Send + 'static,
71{
72 type Service = EventIndex<S>;
73
74 fn layer(&self, inner: S) -> Self::Service {
75 EventIndex {
76 svc: inner,
77 config: self.clone(),
78 }
79 }
80}
81
82impl<S> Service<Request> for EventIndex<S>
83where
84 S: Service<Request, Response = Response>,
85 S::Future: Send + 'static,
86{
87 type Error = S::Error;
88 type Response = Response;
89 type Future =
90 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
91
92 fn poll_ready(
93 &mut self,
94 cx: &mut Context<'_>,
95 ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
96 self.svc.poll_ready(cx)
97 }
98
99 fn call(&mut self, req: Request) -> Self::Future {
100 let rsp = self.svc.call(req);
101 let config = self.config.clone();
102
103 async move {
104 let mut rsp = rsp.await?;
105 match rsp {
106 Response::InitChain(_) => {}
108 Response::Commit(_) => {}
109 Response::PrepareProposal(_) => {}
110 Response::ProcessProposal(_) => {}
111 Response::BeginBlock(ref mut msg) => config.adjust_events(&mut msg.events),
113 Response::DeliverTx(ref mut msg) => config.adjust_events(&mut msg.events),
114 Response::EndBlock(ref mut msg) => config.adjust_events(&mut msg.events),
115 }
116 Ok(rsp)
117 }
118 .boxed()
119 }
120}