cometindex/
index.rs

1use std::{collections::BTreeMap, fmt::Display, sync::Arc};
2
3use async_trait::async_trait;
4pub use sqlx::PgPool;
5use sqlx::{Postgres, Transaction};
6use tendermint::abci::Event;
7
8use crate::ContextualizedEvent;
9
10pub type PgTransaction<'a> = Transaction<'a, Postgres>;
11
12#[derive(Clone, Copy, Debug)]
13struct EventReference {
14    /// Which event in the block this is.
15    pub event_index: usize,
16    pub tx_hash: Option<[u8; 32]>,
17    pub local_rowid: i64,
18}
19
20/// Represents all of the events in a given block
21#[derive(Clone, Debug)]
22pub struct BlockEvents {
23    height: u64,
24    event_refs: Vec<EventReference>,
25    events: Vec<Event>,
26    transactions: BTreeMap<[u8; 32], Vec<u8>>,
27}
28
29// The builder interface for our own crate.
30impl BlockEvents {
31    pub(crate) fn new(height: u64) -> Self {
32        const EXPECTED_EVENTS: usize = 32;
33
34        Self {
35            height,
36            event_refs: Vec::with_capacity(EXPECTED_EVENTS),
37            events: Vec::with_capacity(EXPECTED_EVENTS),
38            transactions: BTreeMap::new(),
39        }
40    }
41
42    /// Register a transaction in this block.
43    pub(crate) fn push_tx(&mut self, hash: [u8; 32], data: Vec<u8>) {
44        self.transactions.insert(hash, data);
45    }
46
47    /// Register an event in this block.
48    pub(crate) fn push_event(&mut self, event: Event, tx_hash: Option<[u8; 32]>, local_rowid: i64) {
49        let event_index = self.events.len();
50        self.events.push(event);
51        self.event_refs.push(EventReference {
52            event_index,
53            tx_hash,
54            local_rowid,
55        });
56    }
57}
58
59impl BlockEvents {
60    pub fn height(&self) -> u64 {
61        self.height
62    }
63
64    fn contextualize(&self, event_ref: EventReference) -> ContextualizedEvent<'_> {
65        let event = &self.events[event_ref.event_index];
66        let tx = event_ref
67            .tx_hash
68            .and_then(|h| Some((h, self.transactions.get(&h)?.as_slice())));
69        ContextualizedEvent {
70            event,
71            block_height: self.height,
72            tx,
73            local_rowid: event_ref.local_rowid,
74        }
75    }
76
77    /// Iterate over the events in this block, in the order that they appear.
78    pub fn events(&self) -> impl Iterator<Item = ContextualizedEvent<'_>> {
79        self.event_refs.iter().map(|x| self.contextualize(*x))
80    }
81
82    /// Iterate over transactions (and their hashes) in the order they appear in the block.
83    pub fn transactions(&self) -> impl Iterator<Item = ([u8; 32], &'_ [u8])> {
84        self.transactions.iter().map(|x| (*x.0, x.1.as_slice()))
85    }
86}
87
88#[derive(Clone, Debug)]
89pub struct EventBatch {
90    first_height: u64,
91    last_height: u64,
92    /// The batch of events, ordered by increasing height.
93    ///
94    /// The heights are guaranteed to be increasing, and to be contiguous.
95    by_height: Arc<Vec<BlockEvents>>,
96}
97
98impl EventBatch {
99    /// Create a new [`EventBatch`].
100    pub fn new(block_events: Vec<BlockEvents>) -> Self {
101        Self {
102            first_height: block_events.first().map(|x| x.height).unwrap_or_default(),
103            last_height: block_events.last().map(|x| x.height).unwrap_or_default(),
104            by_height: Arc::new(block_events),
105        }
106    }
107
108    pub(crate) fn first_height(&self) -> u64 {
109        self.first_height
110    }
111
112    pub(crate) fn last_height(&self) -> u64 {
113        self.last_height
114    }
115
116    /// Check if this batch has no blocks in it.
117    ///
118    /// Most commonly, this is the result when [`start_later`] is called with a height
119    /// past that inside the batch.
120    pub fn empty(&self) -> bool {
121        self.first_height > self.last_height
122    }
123
124    /// Modify this batch to start at a greater height.
125    ///
126    /// This will have no effect if the new start height is *before* the current start height.
127    pub fn start_later(&mut self, new_start: u64) {
128        self.first_height = new_start.max(self.first_height);
129    }
130
131    pub fn events_by_block(&self) -> impl Iterator<Item = &'_ BlockEvents> {
132        // Assuming the first height is past the first height in our vec,
133        // we need to skip the difference.
134        let skip = self
135            .by_height
136            .first()
137            .map(|x| self.first_height.saturating_sub(x.height) as usize)
138            .unwrap_or_default();
139        self.by_height.iter().skip(skip)
140    }
141
142    pub fn events(&self) -> impl Iterator<Item = ContextualizedEvent<'_>> {
143        self.events_by_block().flat_map(|x| x.events())
144    }
145}
146
147/// Provides more information about the context a batch of event comes from.
148#[derive(Debug, Clone)]
149pub struct EventBatchContext {
150    pub(crate) is_last: bool,
151}
152
153impl EventBatchContext {
154    /// If true, then no further event batches will be sent before new blocks arrive.
155    pub fn is_last(&self) -> bool {
156        self.is_last
157    }
158}
159
160/// Represents the version of an indexing view.
161///
162/// This allows tracking breaking change at the level of each individual view,
163/// rather than on the level of the database as a whole.
164///
165/// Versions can be compared to assess breaking changes:
166/// ```
167/// assert!(Version::with_major(3) > Version::with_major(2));
168/// ```
169#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, sqlx::Type)]
170#[sqlx(transparent)]
171pub struct Version(Option<i64>);
172
173impl Version {
174    /// Construct a new version by specifying a "major" / "breaking" number.
175    pub fn with_major(v: u64) -> Self {
176        Self(Some(v.try_into().expect("version must fit into an i64")))
177    }
178
179    /// Get the major version, which controls breakage.
180    pub fn major(self) -> u64 {
181        u64::try_from(self.0.unwrap_or_default()).expect("major version cannot be negative")
182    }
183}
184
185impl Display for Version {
186    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187        write!(f, "v{}", self.0.unwrap_or_default())
188    }
189}
190
191/// Represents a specific index of raw event data.
192#[async_trait]
193pub trait AppView: Send + Sync {
194    /// Return the name of this index.
195    ///
196    /// This should be unique across all of the indices.
197    fn name(&self) -> String;
198
199    /// Return the version of this index.
200    ///
201    /// As code evolves, versions should increase, only.
202    /// If one version is greater than another, that indicates that the view
203    /// needs to be reindexed.
204    fn version(&self) -> Version {
205        Version::default()
206    }
207
208    /// Reset this app view to an empty state.
209    ///
210    /// This should delete all tables, across all versions, resetting the
211    /// app view to a blank state.
212    async fn reset(&self, _dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
213        unimplemented!(
214            r#"
215Index {} has not implemented `reset` despite being on version {}.
216For versions > v0, this method needs to be implemented, so that we know
217how to delete previous versions of the schema.
218"#,
219            self.name(),
220            self.version()
221        )
222    }
223
224    async fn on_startup(&self, _dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
225        Ok(())
226    }
227
228    /// This will be called once when processing the genesis before the first block.
229    async fn init_chain(
230        &self,
231        dbtx: &mut PgTransaction,
232        app_state: &serde_json::Value,
233    ) -> Result<(), anyhow::Error>;
234
235    /// This allows processing a batch of events, over many blocks.
236    async fn index_batch(
237        &self,
238        dbtx: &mut PgTransaction,
239        batch: EventBatch,
240        context: EventBatchContext,
241    ) -> Result<(), anyhow::Error>;
242}