cometindex/
index.rs

1use std::{collections::BTreeMap, fmt::Display, sync::Arc};
2
3use async_trait::async_trait;
4pub use sqlx::PgPool;
5use sqlx::{Postgres, Transaction};
6use tendermint::abci::Event;
7
8use crate::ContextualizedEvent;
9
10pub type PgTransaction<'a> = Transaction<'a, Postgres>;
11
12#[derive(Clone, Copy, Debug)]
13struct EventReference {
14    /// Which event in the block this is.
15    pub event_index: usize,
16    pub tx_hash: Option<[u8; 32]>,
17    pub local_rowid: i64,
18}
19
20/// Represents all of the events in a given block
21#[derive(Clone, Debug)]
22pub struct BlockEvents {
23    height: u64,
24    event_refs: Vec<EventReference>,
25    events: Vec<Event>,
26    transactions: BTreeMap<[u8; 32], Vec<u8>>,
27}
28
29// The builder interface for our own crate.
30impl BlockEvents {
31    pub(crate) fn new(height: u64) -> Self {
32        const EXPECTED_EVENTS: usize = 32;
33
34        Self {
35            height,
36            event_refs: Vec::with_capacity(EXPECTED_EVENTS),
37            events: Vec::with_capacity(EXPECTED_EVENTS),
38            transactions: BTreeMap::new(),
39        }
40    }
41
42    /// Register a transaction in this block.
43    pub(crate) fn push_tx(&mut self, hash: [u8; 32], data: Vec<u8>) {
44        self.transactions.insert(hash, data);
45    }
46
47    /// Register an event in this block.
48    pub(crate) fn push_event(&mut self, event: Event, tx_hash: Option<[u8; 32]>, local_rowid: i64) {
49        let event_index = self.events.len();
50        self.events.push(event);
51        self.event_refs.push(EventReference {
52            event_index,
53            tx_hash,
54            local_rowid,
55        });
56    }
57}
58
59impl BlockEvents {
60    pub fn height(&self) -> u64 {
61        self.height
62    }
63
64    fn contextualize(&self, event_ref: EventReference) -> ContextualizedEvent<'_> {
65        let event = &self.events[event_ref.event_index];
66        let tx = event_ref
67            .tx_hash
68            .and_then(|h| Some((h, self.transactions.get(&h)?.as_slice())));
69        ContextualizedEvent {
70            event,
71            block_height: self.height,
72            tx,
73            local_rowid: event_ref.local_rowid,
74        }
75    }
76
77    /// Iterate over the events in this block, in the order that they appear.
78    pub fn events(&self) -> impl Iterator<Item = ContextualizedEvent<'_>> {
79        self.event_refs.iter().map(|x| self.contextualize(*x))
80    }
81
82    /// Iterate over transactions (and their hashes) in the order they appear in the block.
83    pub fn transactions(&self) -> impl Iterator<Item = ([u8; 32], &'_ [u8])> {
84        self.transactions.iter().map(|x| (*x.0, x.1.as_slice()))
85    }
86}
87
88#[derive(Clone, Debug)]
89pub struct EventBatch {
90    first_height: u64,
91    last_height: u64,
92    /// The batch of events, ordered by increasing height.
93    ///
94    /// The heights are guaranteed to be increasing, and to be contiguous.
95    by_height: Arc<Vec<BlockEvents>>,
96}
97
98impl EventBatch {
99    /// Create a new [`EventBatch`].
100    pub fn new(block_events: Vec<BlockEvents>) -> Self {
101        Self {
102            first_height: block_events.first().map(|x| x.height).unwrap_or_default(),
103            last_height: block_events.last().map(|x| x.height).unwrap_or_default(),
104            by_height: Arc::new(block_events),
105        }
106    }
107
108    pub(crate) fn first_height(&self) -> u64 {
109        self.first_height
110    }
111
112    pub(crate) fn last_height(&self) -> u64 {
113        self.last_height
114    }
115
116    /// Check if this batch has no blocks in it.
117    ///
118    /// Most commonly, this is the result when [`start_later`] is called with a height
119    /// past that inside the batch.
120    pub fn empty(&self) -> bool {
121        self.first_height > self.last_height
122    }
123
124    /// Modify this batch to start at a greater height.
125    ///
126    /// This will have no effect if the new start height is *before* the current start height.
127    pub fn start_later(&mut self, new_start: u64) {
128        self.first_height = new_start.max(self.first_height);
129    }
130
131    pub fn events_by_block(&self) -> impl Iterator<Item = &'_ BlockEvents> {
132        // Assuming the first height is past the first height in our vec,
133        // we need to skip the difference.
134        let skip = self
135            .by_height
136            .first()
137            .map(|x| self.first_height.saturating_sub(x.height) as usize)
138            .unwrap_or_default();
139        self.by_height.iter().skip(skip)
140    }
141
142    pub fn events(&self) -> impl Iterator<Item = ContextualizedEvent<'_>> {
143        self.events_by_block().flat_map(|x| x.events())
144    }
145}
146
147/// Provides more information about the context a batch of event comes from.
148#[derive(Debug, Clone)]
149pub struct EventBatchContext {
150    pub(crate) is_last: bool,
151}
152
153impl EventBatchContext {
154    /// If true, then no further event batches will be sent before new blocks arrive.
155    pub fn is_last(&self) -> bool {
156        self.is_last
157    }
158}
159
160/// Represents the version of an indexing view.
161///
162/// This allows tracking breaking change at the level of each individual view,
163/// rather than on the level of the database as a whole.
164///
165/// Versions can be compared to assess breaking changes:
166/// ```
167/// assert!(Version::with_major(3) > Version::with_major(2));
168/// ```
169#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, sqlx::Type)]
170pub struct Version {
171    major: Option<i64>,
172    option_hash: Option<[u8; 32]>,
173}
174
175impl Version {
176    /// Construct a new version by specifying a "major" / "breaking" number.
177    pub fn with_major(v: u64) -> Self {
178        Self {
179            major: Some(v.try_into().expect("version must fit into an i64")),
180            option_hash: None,
181        }
182    }
183
184    pub fn with_option_hash(self, option_hash: [u8; 32]) -> Self {
185        Self {
186            major: self.major,
187            option_hash: Some(option_hash),
188        }
189    }
190
191    /// Get the major version, which controls breakage.
192    pub fn major(&self) -> u64 {
193        u64::try_from(self.major.unwrap_or_default()).expect("major version cannot be negative")
194    }
195
196    pub fn option_hash(&self) -> Option<&[u8; 32]> {
197        self.option_hash.as_ref()
198    }
199
200    pub(crate) fn new(major: Option<i64>, option_hash: Option<[u8; 32]>) -> Self {
201        Self { major, option_hash }
202    }
203}
204
205impl Display for Version {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        write!(f, "v{}", self.major.unwrap_or_default())
208    }
209}
210
211/// Represents a specific index of raw event data.
212#[async_trait]
213pub trait AppView: Send + Sync {
214    /// Return the name of this index.
215    ///
216    /// This should be unique across all of the indices.
217    fn name(&self) -> String;
218
219    /// Return the version of this index.
220    ///
221    /// As code evolves, versions should increase, only.
222    /// If one version is greater than another, that indicates that the view
223    /// needs to be reindexed.
224    fn version(&self) -> Version {
225        Version::default()
226    }
227
228    /// Reset this app view to an empty state.
229    ///
230    /// This should delete all tables, across all versions, resetting the
231    /// app view to a blank state.
232    async fn reset(&self, _dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
233        unimplemented!(
234            r#"
235Index {} has not implemented `reset` despite being on version {}.
236For versions > v0, this method needs to be implemented, so that we know
237how to delete previous versions of the schema.
238"#,
239            self.name(),
240            self.version()
241        )
242    }
243
244    async fn on_startup(&self, _dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
245        Ok(())
246    }
247
248    /// This will be called once when processing the genesis before the first block.
249    async fn init_chain(
250        &self,
251        dbtx: &mut PgTransaction,
252        app_state: &serde_json::Value,
253    ) -> Result<(), anyhow::Error>;
254
255    /// This allows processing a batch of events, over many blocks.
256    async fn index_batch(
257        &self,
258        dbtx: &mut PgTransaction,
259        batch: EventBatch,
260        context: EventBatchContext,
261    ) -> Result<(), anyhow::Error>;
262}