cometindex/indexer/
indexing_state.rs

1use anyhow::anyhow;
2use futures::{Stream, StreamExt, TryStreamExt};
3use prost::Message as _;
4use sqlx::{PgPool, Postgres, Transaction};
5use std::collections::HashMap;
6use tendermint::abci::{self, Event};
7
8use crate::database::{read_only_db, read_write_db};
9use crate::index::{BlockEvents, EventBatch, Version};
10
11#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sqlx::Type)]
12#[sqlx(transparent)]
13pub struct Height(i64);
14
15impl Height {
16    pub fn post_genesis() -> Self {
17        Self(1)
18    }
19
20    /// Return the last height in the batch, and then the first height in the next batch.
21    pub fn advance(self, batch_size: u64, max_height: Self) -> (Self, Self) {
22        let last = Self::from(self.0 as u64 + batch_size - 1).min(max_height);
23        let next_first = Self(last.0 + 1);
24        (last, next_first)
25    }
26
27    pub fn next(self) -> Self {
28        Self(self.0 + 1)
29    }
30}
31
32impl From<u64> for Height {
33    fn from(value: u64) -> Self {
34        Self(value.try_into().unwrap_or(i64::MAX))
35    }
36}
37
38impl From<Height> for u64 {
39    fn from(value: Height) -> Self {
40        value.0.try_into().unwrap_or_default()
41    }
42}
43
44/// The state of a particular index.
45#[derive(Default, Debug, Clone, Copy)]
46pub struct IndexState {
47    /// What version this particular index has been using.
48    pub version: Version,
49    /// What height this particular index has reached.
50    pub height: Height,
51}
52
53#[derive(Debug, Clone)]
54pub struct IndexingManager {
55    src: PgPool,
56    dst: PgPool,
57}
58
59impl IndexingManager {
60    async fn create_watermark_table(&self) -> anyhow::Result<()> {
61        sqlx::query(
62            "
63        CREATE TABLE IF NOT EXISTS index_watermarks (
64            index_name TEXT PRIMARY KEY,
65            height BIGINT NOT NULL,
66            version BIGINT
67        )
68        ",
69        )
70        .execute(&self.dst)
71        .await?;
72        Ok(())
73    }
74
75    /// The largest height for which we know we have all events.
76    pub async fn src_height(&self) -> anyhow::Result<Height> {
77        // We may be currently indexing events for this block.
78        let res: Option<Height> = sqlx::query_scalar("SELECT MAX(height) - 1 FROM blocks")
79            .fetch_optional(&self.src)
80            .await?;
81        Ok(res.unwrap_or_default())
82    }
83
84    pub async fn index_state(&self, name: &str) -> anyhow::Result<IndexState> {
85        let row: Option<(Height, Version)> =
86            sqlx::query_as("SELECT height, version FROM index_watermarks WHERE index_name = $1")
87                .bind(name)
88                .fetch_optional(&self.dst)
89                .await?;
90        Ok(row
91            .map(|(height, version)| IndexState { height, version })
92            .unwrap_or_default())
93    }
94
95    pub async fn index_states(&self) -> anyhow::Result<HashMap<String, IndexState>> {
96        let rows: Vec<(String, Height, Version)> =
97            sqlx::query_as("SELECT index_name, height, version FROM index_watermarks")
98                .fetch_all(&self.dst)
99                .await?;
100        Ok(rows
101            .into_iter()
102            .map(|(name, height, version)| (name, IndexState { height, version }))
103            .collect())
104    }
105
106    pub async fn update_index_state(
107        dbtx: &mut sqlx::Transaction<'_, Postgres>,
108        name: &str,
109        new_state: IndexState,
110    ) -> anyhow::Result<()> {
111        sqlx::query(
112            "
113        INSERT INTO index_watermarks 
114        VALUES ($1, $2, $3) 
115        ON CONFLICT (index_name) 
116        DO UPDATE SET
117            height = excluded.height,
118            version = excluded.version
119        ",
120        )
121        .bind(name)
122        .bind(new_state.height)
123        .bind(new_state.version)
124        .execute(dbtx.as_mut())
125        .await?;
126        Ok(())
127    }
128
129    fn transactions_between(
130        &self,
131        first: Height,
132        last: Height,
133    ) -> impl Stream<Item = anyhow::Result<(Height, [u8; 32], Vec<u8>)>> + '_ {
134        async fn parse_row(
135            row: (i64, String, Vec<u8>),
136        ) -> anyhow::Result<(Height, [u8; 32], Vec<u8>)> {
137            let tx_hash: [u8; 32] = hex::decode(row.1)?
138                .try_into()
139                .map_err(|_| anyhow!("expected 32 byte hash"))?;
140            let tx_result = tendermint_proto::v0_37::abci::TxResult::decode(row.2.as_slice())?;
141            let transaction = tx_result.tx.to_vec();
142            let height = Height(row.0);
143            Ok((height, tx_hash, transaction))
144        }
145
146        sqlx::query_as::<_, (i64, String, Vec<u8>)>(
147            r#"
148SELECT height, tx_hash, tx_result
149FROM blocks
150JOIN tx_results ON blocks.rowid = tx_results.block_id
151WHERE
152    height >= $1
153AND
154    height <= $2
155"#,
156        )
157        .bind(first)
158        .bind(last)
159        .fetch(&self.src)
160        .map_err(|e| anyhow::Error::from(e).context("error reading from database"))
161        .and_then(parse_row)
162    }
163
164    fn events_between(
165        &self,
166        first: Height,
167        last: Height,
168    ) -> impl Stream<Item = anyhow::Result<(Height, Event, Option<[u8; 32]>, i64)>> + '_ {
169        sqlx::query_as::<_, (i64, String, Height, Option<String>, serde_json::Value)>(
170            // This query does some shenanigans to ensure good performance.
171            // The main trick is that we know that each event has 1 block and <= 1 transaction associated
172            // with it, so we can "encourage" (force) Postgres to avoid doing a hash join and
173            // then a sort, and instead work from the events in a linear fashion.
174            // Basically, this query ends up doing:
175            //
176            // for event in events >= id:
177            //   attach attributes
178            //   attach block
179            //   attach transaction hash?
180            r#"
181WITH blocks AS (
182  SELECT * FROM blocks WHERE height >= $1 AND height <= $2
183),
184filtered_events AS (
185  SELECT e.block_id, e.rowid, e.type, e.tx_id, b.height
186  FROM events e
187  JOIN blocks b ON e.block_id = b.rowid
188),
189events_with_attrs AS (
190  SELECT
191      f.block_id,
192      f.rowid,
193      f.type,
194      f.tx_id,
195      f.height,
196      jsonb_object_agg(a.key, a.value) AS attrs
197  FROM filtered_events f
198  LEFT JOIN attributes a ON f.rowid = a.event_id
199  GROUP BY f.block_id, f.rowid, f.type, f.tx_id, f.height
200)
201SELECT e.rowid, e.type, e.height, tx.tx_hash, e.attrs
202FROM events_with_attrs e
203LEFT JOIN tx_results tx ON tx.rowid = e.tx_id
204ORDER BY e.height ASC, e.rowid ASC;
205"#,
206        )
207        .bind(first)
208        .bind(last)
209        .fetch(&self.src)
210        .map_ok(|(local_rowid, type_str, height, tx_hash, attrs)| {
211            tracing::debug!(?local_rowid, type_str, ?height, ?tx_hash);
212            let tx_hash: Option<[u8; 32]> = tx_hash.map(|s| {
213                hex::decode(s)
214                    .expect("invalid tx_hash")
215                    .try_into()
216                    .expect("expected 32 bytes")
217            });
218            let serde_json::Value::Object(attrs) = attrs else {
219                // saves an allocation below bc we can take ownership
220                panic!("expected JSON object");
221            };
222
223            let event = abci::Event {
224                kind: type_str,
225                attributes: attrs
226                    .into_iter()
227                    .filter_map(|(k, v)| match v {
228                        serde_json::Value::String(s) => Some((k, s)),
229                        // we never hit this because of how we constructed the query
230                        _ => None,
231                    })
232                    .map(Into::into)
233                    .collect(),
234            };
235            (height, event, tx_hash, local_rowid)
236        })
237        .map_err(|e| anyhow::Error::from(e).context("error reading from database"))
238    }
239
240    pub async fn event_batch(&self, first: Height, last: Height) -> anyhow::Result<EventBatch> {
241        let mut out = (u64::from(first)..=u64::from(last))
242            .map(|height| BlockEvents::new(height))
243            .collect::<Vec<_>>();
244        let mut tx_stream = self.transactions_between(first, last).boxed();
245        while let Some((height, tx_hash, tx_data)) = tx_stream.try_next().await? {
246            out[(height.0 - first.0) as usize].push_tx(tx_hash, tx_data);
247        }
248        let mut events_stream = self.events_between(first, last).boxed();
249        while let Some((height, event, tx_hash, local_rowid)) = events_stream.try_next().await? {
250            out[(height.0 - first.0) as usize].push_event(event, tx_hash, local_rowid);
251        }
252        Ok(EventBatch::new(out))
253    }
254
255    pub async fn init(src_url: &str, dst_url: &str) -> anyhow::Result<Self> {
256        tracing::info!(url = src_url, "connecting to raw database");
257        tracing::info!(url = dst_url, "connecting to derived database");
258        let (src, dst) = tokio::try_join!(read_only_db(src_url), read_write_db(dst_url))?;
259        let out = Self { src, dst };
260        out.create_watermark_table().await?;
261        Ok(out)
262    }
263
264    pub async fn begin_transaction(&self) -> anyhow::Result<Transaction<'_, Postgres>> {
265        Ok(self.dst.begin().await?)
266    }
267}