1use std::{collections::BTreeMap, fmt::Display, sync::Arc};
23use async_trait::async_trait;
4pub use sqlx::PgPool;
5use sqlx::{Postgres, Transaction};
6use tendermint::abci::Event;
78use crate::ContextualizedEvent;
910pub type PgTransaction<'a> = Transaction<'a, Postgres>;
1112#[derive(Clone, Copy, Debug)]
13struct EventReference {
14/// Which event in the block this is.
15pub event_index: usize,
16pub tx_hash: Option<[u8; 32]>,
17pub local_rowid: i64,
18}
1920/// Represents all of the events in a given block
21#[derive(Clone, Debug)]
22pub struct BlockEvents {
23 height: u64,
24 event_refs: Vec<EventReference>,
25 events: Vec<Event>,
26 transactions: BTreeMap<[u8; 32], Vec<u8>>,
27}
2829// The builder interface for our own crate.
30impl BlockEvents {
31pub(crate) fn new(height: u64) -> Self {
32const EXPECTED_EVENTS: usize = 32;
3334Self {
35 height,
36 event_refs: Vec::with_capacity(EXPECTED_EVENTS),
37 events: Vec::with_capacity(EXPECTED_EVENTS),
38 transactions: BTreeMap::new(),
39 }
40 }
4142/// Register a transaction in this block.
43pub(crate) fn push_tx(&mut self, hash: [u8; 32], data: Vec<u8>) {
44self.transactions.insert(hash, data);
45 }
4647/// Register an event in this block.
48pub(crate) fn push_event(&mut self, event: Event, tx_hash: Option<[u8; 32]>, local_rowid: i64) {
49let event_index = self.events.len();
50self.events.push(event);
51self.event_refs.push(EventReference {
52 event_index,
53 tx_hash,
54 local_rowid,
55 });
56 }
57}
5859impl BlockEvents {
60pub fn height(&self) -> u64 {
61self.height
62 }
6364fn contextualize(&self, event_ref: EventReference) -> ContextualizedEvent<'_> {
65let event = &self.events[event_ref.event_index];
66let tx = event_ref
67 .tx_hash
68 .and_then(|h| Some((h, self.transactions.get(&h)?.as_slice())));
69 ContextualizedEvent {
70 event,
71 block_height: self.height,
72 tx,
73 local_rowid: event_ref.local_rowid,
74 }
75 }
7677/// Iterate over the events in this block, in the order that they appear.
78pub fn events(&self) -> impl Iterator<Item = ContextualizedEvent<'_>> {
79self.event_refs.iter().map(|x| self.contextualize(*x))
80 }
8182/// Iterate over transactions (and their hashes) in the order they appear in the block.
83pub fn transactions(&self) -> impl Iterator<Item = ([u8; 32], &'_ [u8])> {
84self.transactions.iter().map(|x| (*x.0, x.1.as_slice()))
85 }
86}
8788#[derive(Clone, Debug)]
89pub struct EventBatch {
90 first_height: u64,
91 last_height: u64,
92/// The batch of events, ordered by increasing height.
93 ///
94 /// The heights are guaranteed to be increasing, and to be contiguous.
95by_height: Arc<Vec<BlockEvents>>,
96}
9798impl EventBatch {
99/// Create a new [`EventBatch`].
100pub fn new(block_events: Vec<BlockEvents>) -> Self {
101Self {
102 first_height: block_events.first().map(|x| x.height).unwrap_or_default(),
103 last_height: block_events.last().map(|x| x.height).unwrap_or_default(),
104 by_height: Arc::new(block_events),
105 }
106 }
107108pub(crate) fn first_height(&self) -> u64 {
109self.first_height
110 }
111112pub(crate) fn last_height(&self) -> u64 {
113self.last_height
114 }
115116/// Check if this batch has no blocks in it.
117 ///
118 /// Most commonly, this is the result when [`start_later`] is called with a height
119 /// past that inside the batch.
120pub fn empty(&self) -> bool {
121self.first_height > self.last_height
122 }
123124/// Modify this batch to start at a greater height.
125 ///
126 /// This will have no effect if the new start height is *before* the current start height.
127pub fn start_later(&mut self, new_start: u64) {
128self.first_height = new_start.max(self.first_height);
129 }
130131pub fn events_by_block(&self) -> impl Iterator<Item = &'_ BlockEvents> {
132// Assuming the first height is past the first height in our vec,
133 // we need to skip the difference.
134let skip = self
135.by_height
136 .first()
137 .map(|x| self.first_height.saturating_sub(x.height) as usize)
138 .unwrap_or_default();
139self.by_height.iter().skip(skip)
140 }
141142pub fn events(&self) -> impl Iterator<Item = ContextualizedEvent<'_>> {
143self.events_by_block().flat_map(|x| x.events())
144 }
145}
146147/// Provides more information about the context a batch of event comes from.
148#[derive(Debug, Clone)]
149pub struct EventBatchContext {
150pub(crate) is_last: bool,
151}
152153impl EventBatchContext {
154/// If true, then no further event batches will be sent before new blocks arrive.
155pub fn is_last(&self) -> bool {
156self.is_last
157 }
158}
159160/// Represents the version of an indexing view.
161///
162/// This allows tracking breaking change at the level of each individual view,
163/// rather than on the level of the database as a whole.
164///
165/// Versions can be compared to assess breaking changes:
166/// ```
167/// assert!(Version::with_major(3) > Version::with_major(2));
168/// ```
169#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, sqlx::Type)]
170pub struct Version {
171 major: Option<i64>,
172 option_hash: Option<[u8; 32]>,
173}
174175impl Version {
176/// Construct a new version by specifying a "major" / "breaking" number.
177pub fn with_major(v: u64) -> Self {
178Self {
179 major: Some(v.try_into().expect("version must fit into an i64")),
180 option_hash: None,
181 }
182 }
183184pub fn with_option_hash(self, option_hash: [u8; 32]) -> Self {
185Self {
186 major: self.major,
187 option_hash: Some(option_hash),
188 }
189 }
190191/// Get the major version, which controls breakage.
192pub fn major(&self) -> u64 {
193 u64::try_from(self.major.unwrap_or_default()).expect("major version cannot be negative")
194 }
195196pub fn option_hash(&self) -> Option<&[u8; 32]> {
197self.option_hash.as_ref()
198 }
199200pub(crate) fn new(major: Option<i64>, option_hash: Option<[u8; 32]>) -> Self {
201Self { major, option_hash }
202 }
203}
204205impl Display for Version {
206fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207write!(f, "v{}", self.major.unwrap_or_default())
208 }
209}
210211/// Represents a specific index of raw event data.
212#[async_trait]
213pub trait AppView: Send + Sync {
214/// Return the name of this index.
215 ///
216 /// This should be unique across all of the indices.
217fn name(&self) -> String;
218219/// Return the version of this index.
220 ///
221 /// As code evolves, versions should increase, only.
222 /// If one version is greater than another, that indicates that the view
223 /// needs to be reindexed.
224fn version(&self) -> Version {
225 Version::default()
226 }
227228/// Reset this app view to an empty state.
229 ///
230 /// This should delete all tables, across all versions, resetting the
231 /// app view to a blank state.
232async fn reset(&self, _dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
233unimplemented!(
234r#"
235Index {} has not implemented `reset` despite being on version {}.
236For versions > v0, this method needs to be implemented, so that we know
237how to delete previous versions of the schema.
238"#,
239self.name(),
240self.version()
241 )
242 }
243244async fn on_startup(&self, _dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
245Ok(())
246 }
247248/// This will be called once when processing the genesis before the first block.
249async fn init_chain(
250&self,
251 dbtx: &mut PgTransaction,
252 app_state: &serde_json::Value,
253 ) -> Result<(), anyhow::Error>;
254255/// This allows processing a batch of events, over many blocks.
256async fn index_batch(
257&self,
258 dbtx: &mut PgTransaction,
259 batch: EventBatch,
260 context: EventBatchContext,
261 ) -> Result<(), anyhow::Error>;
262}