1use std::{collections::BTreeMap, sync::Arc};
2
3use async_trait::async_trait;
4pub use sqlx::PgPool;
5use sqlx::{Postgres, Transaction};
6use tendermint::abci::Event;
7
8use crate::ContextualizedEvent;
9
10pub type PgTransaction<'a> = Transaction<'a, Postgres>;
11
12#[derive(Clone, Copy, Debug)]
13struct EventReference {
14 pub event_index: usize,
16 pub tx_hash: Option<[u8; 32]>,
17 pub local_rowid: i64,
18}
19
20#[derive(Clone, Debug)]
22pub struct BlockEvents {
23 height: u64,
24 event_refs: Vec<EventReference>,
25 events: Vec<Event>,
26 transactions: BTreeMap<[u8; 32], Vec<u8>>,
27}
28
29impl BlockEvents {
31 pub(crate) fn new(height: u64) -> Self {
32 const EXPECTED_EVENTS: usize = 32;
33
34 Self {
35 height,
36 event_refs: Vec::with_capacity(EXPECTED_EVENTS),
37 events: Vec::with_capacity(EXPECTED_EVENTS),
38 transactions: BTreeMap::new(),
39 }
40 }
41
42 pub(crate) fn push_tx(&mut self, hash: [u8; 32], data: Vec<u8>) {
44 self.transactions.insert(hash, data);
45 }
46
47 pub(crate) fn push_event(&mut self, event: Event, tx_hash: Option<[u8; 32]>, local_rowid: i64) {
49 let event_index = self.events.len();
50 self.events.push(event);
51 self.event_refs.push(EventReference {
52 event_index,
53 tx_hash,
54 local_rowid,
55 });
56 }
57}
58
59impl BlockEvents {
60 pub fn height(&self) -> u64 {
61 self.height
62 }
63
64 fn contextualize(&self, event_ref: EventReference) -> ContextualizedEvent<'_> {
65 let event = &self.events[event_ref.event_index];
66 let tx = event_ref
67 .tx_hash
68 .and_then(|h| Some((h, self.transactions.get(&h)?.as_slice())));
69 ContextualizedEvent {
70 event,
71 block_height: self.height,
72 tx,
73 local_rowid: event_ref.local_rowid,
74 }
75 }
76
77 pub fn events(&self) -> impl Iterator<Item = ContextualizedEvent<'_>> {
79 self.event_refs.iter().map(|x| self.contextualize(*x))
80 }
81
82 pub fn transactions(&self) -> impl Iterator<Item = ([u8; 32], &'_ [u8])> {
84 self.transactions.iter().map(|x| (*x.0, x.1.as_slice()))
85 }
86}
87
88#[derive(Clone, Debug)]
89pub struct EventBatch {
90 first_height: u64,
91 last_height: u64,
92 by_height: Arc<Vec<BlockEvents>>,
96}
97
98impl EventBatch {
99 pub fn new(block_events: Vec<BlockEvents>) -> Self {
101 Self {
102 first_height: block_events.first().map(|x| x.height).unwrap_or_default(),
103 last_height: block_events.last().map(|x| x.height).unwrap_or_default(),
104 by_height: Arc::new(block_events),
105 }
106 }
107
108 pub(crate) fn first_height(&self) -> u64 {
109 self.first_height
110 }
111
112 pub(crate) fn last_height(&self) -> u64 {
113 self.last_height
114 }
115
116 pub fn empty(&self) -> bool {
121 self.first_height > self.last_height
122 }
123
124 pub fn start_later(&mut self, new_start: u64) {
128 self.first_height = new_start.max(self.first_height);
129 }
130
131 pub fn events_by_block(&self) -> impl Iterator<Item = &'_ BlockEvents> {
132 let skip = self
135 .by_height
136 .first()
137 .map(|x| self.first_height.saturating_sub(x.height) as usize)
138 .unwrap_or_default();
139 self.by_height.iter().skip(skip)
140 }
141
142 pub fn events(&self) -> impl Iterator<Item = ContextualizedEvent<'_>> {
143 self.events_by_block().flat_map(|x| x.events())
144 }
145}
146
147#[derive(Debug, Clone)]
149pub struct EventBatchContext {
150 pub(crate) is_last: bool,
151}
152
153impl EventBatchContext {
154 pub fn is_last(&self) -> bool {
156 self.is_last
157 }
158}
159
160#[async_trait]
162pub trait AppView: Send + Sync {
163 fn name(&self) -> String;
167
168 async fn init_chain(
170 &self,
171 dbtx: &mut PgTransaction,
172 app_state: &serde_json::Value,
173 ) -> Result<(), anyhow::Error>;
174
175 async fn index_batch(
177 &self,
178 dbtx: &mut PgTransaction,
179 batch: EventBatch,
180 context: EventBatchContext,
181 ) -> Result<(), anyhow::Error>;
182}