cometindex/
index.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use async_trait::async_trait;
4pub use sqlx::PgPool;
5use sqlx::{Postgres, Transaction};
6use tendermint::abci::Event;
7
8use crate::ContextualizedEvent;
9
10pub type PgTransaction<'a> = Transaction<'a, Postgres>;
11
12#[derive(Clone, Copy, Debug)]
13struct EventReference {
14    /// Which event in the block this is.
15    pub event_index: usize,
16    pub tx_hash: Option<[u8; 32]>,
17    pub local_rowid: i64,
18}
19
20/// Represents all of the events in a given block
21#[derive(Clone, Debug)]
22pub struct BlockEvents {
23    height: u64,
24    event_refs: Vec<EventReference>,
25    events: Vec<Event>,
26    transactions: BTreeMap<[u8; 32], Vec<u8>>,
27}
28
29// The builder interface for our own crate.
30impl BlockEvents {
31    pub(crate) fn new(height: u64) -> Self {
32        const EXPECTED_EVENTS: usize = 32;
33
34        Self {
35            height,
36            event_refs: Vec::with_capacity(EXPECTED_EVENTS),
37            events: Vec::with_capacity(EXPECTED_EVENTS),
38            transactions: BTreeMap::new(),
39        }
40    }
41
42    /// Register a transaction in this block.
43    pub(crate) fn push_tx(&mut self, hash: [u8; 32], data: Vec<u8>) {
44        self.transactions.insert(hash, data);
45    }
46
47    /// Register an event in this block.
48    pub(crate) fn push_event(&mut self, event: Event, tx_hash: Option<[u8; 32]>, local_rowid: i64) {
49        let event_index = self.events.len();
50        self.events.push(event);
51        self.event_refs.push(EventReference {
52            event_index,
53            tx_hash,
54            local_rowid,
55        });
56    }
57}
58
59impl BlockEvents {
60    pub fn height(&self) -> u64 {
61        self.height
62    }
63
64    fn contextualize(&self, event_ref: EventReference) -> ContextualizedEvent<'_> {
65        let event = &self.events[event_ref.event_index];
66        let tx = event_ref
67            .tx_hash
68            .and_then(|h| Some((h, self.transactions.get(&h)?.as_slice())));
69        ContextualizedEvent {
70            event,
71            block_height: self.height,
72            tx,
73            local_rowid: event_ref.local_rowid,
74        }
75    }
76
77    /// Iterate over the events in this block, in the order that they appear.
78    pub fn events(&self) -> impl Iterator<Item = ContextualizedEvent<'_>> {
79        self.event_refs.iter().map(|x| self.contextualize(*x))
80    }
81
82    /// Iterate over transactions (and their hashes) in the order they appear in the block.
83    pub fn transactions(&self) -> impl Iterator<Item = ([u8; 32], &'_ [u8])> {
84        self.transactions.iter().map(|x| (*x.0, x.1.as_slice()))
85    }
86}
87
88#[derive(Clone, Debug)]
89pub struct EventBatch {
90    first_height: u64,
91    last_height: u64,
92    /// The batch of events, ordered by increasing height.
93    ///
94    /// The heights are guaranteed to be increasing, and to be contiguous.
95    by_height: Arc<Vec<BlockEvents>>,
96}
97
98impl EventBatch {
99    /// Create a new [`EventBatch`].
100    pub fn new(block_events: Vec<BlockEvents>) -> Self {
101        Self {
102            first_height: block_events.first().map(|x| x.height).unwrap_or_default(),
103            last_height: block_events.last().map(|x| x.height).unwrap_or_default(),
104            by_height: Arc::new(block_events),
105        }
106    }
107
108    pub(crate) fn first_height(&self) -> u64 {
109        self.first_height
110    }
111
112    pub(crate) fn last_height(&self) -> u64 {
113        self.last_height
114    }
115
116    /// Check if this batch has no blocks in it.
117    ///
118    /// Most commonly, this is the result when [`start_later`] is called with a height
119    /// past that inside the batch.
120    pub fn empty(&self) -> bool {
121        self.first_height > self.last_height
122    }
123
124    /// Modify this batch to start at a greater height.
125    ///
126    /// This will have no effect if the new start height is *before* the current start height.
127    pub fn start_later(&mut self, new_start: u64) {
128        self.first_height = new_start.max(self.first_height);
129    }
130
131    pub fn events_by_block(&self) -> impl Iterator<Item = &'_ BlockEvents> {
132        // Assuming the first height is past the first height in our vec,
133        // we need to skip the difference.
134        let skip = self
135            .by_height
136            .first()
137            .map(|x| self.first_height.saturating_sub(x.height) as usize)
138            .unwrap_or_default();
139        self.by_height.iter().skip(skip)
140    }
141
142    pub fn events(&self) -> impl Iterator<Item = ContextualizedEvent<'_>> {
143        self.events_by_block().flat_map(|x| x.events())
144    }
145}
146
147/// Provides more information about the context a batch of event comes from.
148#[derive(Debug, Clone)]
149pub struct EventBatchContext {
150    pub(crate) is_last: bool,
151}
152
153impl EventBatchContext {
154    /// If true, then no further event batches will be sent before new blocks arrive.
155    pub fn is_last(&self) -> bool {
156        self.is_last
157    }
158}
159
160/// Represents a specific index of raw event data.
161#[async_trait]
162pub trait AppView: Send + Sync {
163    /// Return the name of this index.
164    ///
165    /// This should be unique across all of the indices.
166    fn name(&self) -> String;
167
168    /// This will be called once when processing the genesis before the first block.
169    async fn init_chain(
170        &self,
171        dbtx: &mut PgTransaction,
172        app_state: &serde_json::Value,
173    ) -> Result<(), anyhow::Error>;
174
175    /// This allows processing a batch of events, over many blocks.
176    async fn index_batch(
177        &self,
178        dbtx: &mut PgTransaction,
179        batch: EventBatch,
180        context: EventBatchContext,
181    ) -> Result<(), anyhow::Error>;
182}