cometindex/indexer/
indexing_state.rs

1use anyhow::anyhow;
2use futures::{Stream, StreamExt, TryStreamExt};
3use prost::Message as _;
4use sqlx::{PgPool, Postgres, Transaction};
5use std::collections::HashMap;
6use tendermint::abci::{self, Event};
7
8use crate::database::{read_only_db, read_write_db};
9use crate::index::{BlockEvents, EventBatch, Version};
10
11#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sqlx::Type)]
12#[sqlx(transparent)]
13pub struct Height(i64);
14
15impl Height {
16    pub fn post_genesis() -> Self {
17        Self(1)
18    }
19
20    /// Return the last height in the batch, and then the first height in the next batch.
21    pub fn advance(self, batch_size: u64, max_height: Self) -> (Self, Self) {
22        let last = Self::from(self.0 as u64 + batch_size - 1).min(max_height);
23        let next_first = Self(last.0 + 1);
24        (last, next_first)
25    }
26
27    pub fn next(self) -> Self {
28        Self(self.0 + 1)
29    }
30}
31
32impl From<u64> for Height {
33    fn from(value: u64) -> Self {
34        Self(value.try_into().unwrap_or(i64::MAX))
35    }
36}
37
38impl From<Height> for u64 {
39    fn from(value: Height) -> Self {
40        value.0.try_into().unwrap_or_default()
41    }
42}
43
44/// The state of a particular index.
45#[derive(Default, Debug, Clone)]
46pub struct IndexState {
47    /// What version this particular index has been using.
48    pub version: Version,
49    /// What height this particular index has reached.
50    pub height: Height,
51}
52
53#[derive(Debug, Clone)]
54pub struct IndexingManager {
55    src: PgPool,
56    dst: PgPool,
57}
58
59impl IndexingManager {
60    async fn create_watermark_table(&self) -> anyhow::Result<()> {
61        sqlx::query(
62            "
63        CREATE TABLE IF NOT EXISTS index_watermarks (
64            index_name TEXT PRIMARY KEY,
65            height BIGINT NOT NULL,
66            version BIGINT
67        )
68        ",
69        )
70        .execute(&self.dst)
71        .await?;
72        sqlx::query("ALTER TABLE index_watermarks ADD COLUMN IF NOT EXISTS option_hash BYTEA")
73            .execute(&self.dst)
74            .await?;
75        Ok(())
76    }
77
78    /// The largest height for which we know we have all events.
79    pub async fn src_height(&self) -> anyhow::Result<Height> {
80        // We may be currently indexing events for this block.
81        let res: Option<Height> = sqlx::query_scalar("SELECT MAX(height) - 1 FROM blocks")
82            .fetch_optional(&self.src)
83            .await?;
84        Ok(res.unwrap_or_default())
85    }
86
87    pub async fn index_state(&self, name: &str) -> anyhow::Result<Option<IndexState>> {
88        let row: Option<(Height, Option<i64>, Option<[u8; 32]>)> = sqlx::query_as(
89            "SELECT height, version, option_hash FROM index_watermarks WHERE index_name = $1",
90        )
91        .bind(name)
92        .fetch_optional(&self.dst)
93        .await?;
94        Ok(row.map(|(height, major, option_hash)| IndexState {
95            height,
96            version: Version::new(major, option_hash),
97        }))
98    }
99
100    pub async fn index_states(&self) -> anyhow::Result<HashMap<String, IndexState>> {
101        let rows: Vec<(String, Height, Option<i64>, Option<[u8; 32]>)> =
102            sqlx::query_as("SELECT index_name, height, version, option_hash FROM index_watermarks")
103                .fetch_all(&self.dst)
104                .await?;
105        Ok(rows
106            .into_iter()
107            .map(|(name, height, major, option_hash)| {
108                (
109                    name,
110                    IndexState {
111                        height,
112                        version: Version::new(major, option_hash),
113                    },
114                )
115            })
116            .collect())
117    }
118
119    pub async fn update_index_state(
120        dbtx: &mut sqlx::Transaction<'_, Postgres>,
121        name: &str,
122        new_state: IndexState,
123    ) -> anyhow::Result<()> {
124        sqlx::query(
125            "
126        INSERT INTO index_watermarks 
127        VALUES ($1, $2, $3, $4) 
128        ON CONFLICT (index_name) 
129        DO UPDATE SET
130            height = excluded.height,
131            version = excluded.version,
132            option_hash = excluded.option_hash
133        ",
134        )
135        .bind(name)
136        .bind(new_state.height)
137        .bind(i64::try_from(new_state.version.major())?)
138        .bind(new_state.version.option_hash())
139        .execute(dbtx.as_mut())
140        .await?;
141        Ok(())
142    }
143
144    fn transactions_between(
145        &self,
146        first: Height,
147        last: Height,
148    ) -> impl Stream<Item = anyhow::Result<(Height, [u8; 32], Vec<u8>)>> + '_ {
149        async fn parse_row(
150            row: (i64, String, Vec<u8>),
151        ) -> anyhow::Result<(Height, [u8; 32], Vec<u8>)> {
152            let tx_hash: [u8; 32] = hex::decode(row.1)?
153                .try_into()
154                .map_err(|_| anyhow!("expected 32 byte hash"))?;
155            let tx_result = tendermint_proto::v0_37::abci::TxResult::decode(row.2.as_slice())?;
156            let transaction = tx_result.tx.to_vec();
157            let height = Height(row.0);
158            Ok((height, tx_hash, transaction))
159        }
160
161        sqlx::query_as::<_, (i64, String, Vec<u8>)>(
162            r#"
163SELECT height, tx_hash, tx_result
164FROM blocks
165JOIN tx_results ON blocks.rowid = tx_results.block_id
166WHERE
167    height >= $1
168AND
169    height <= $2
170"#,
171        )
172        .bind(first)
173        .bind(last)
174        .fetch(&self.src)
175        .map_err(|e| anyhow::Error::from(e).context("error reading from database"))
176        .and_then(parse_row)
177    }
178
179    fn events_between(
180        &self,
181        first: Height,
182        last: Height,
183    ) -> impl Stream<Item = anyhow::Result<(Height, Event, Option<[u8; 32]>, i64)>> + '_ {
184        sqlx::query_as::<_, (i64, String, Height, Option<String>, serde_json::Value)>(
185            // This query does some shenanigans to ensure good performance.
186            // The main trick is that we know that each event has 1 block and <= 1 transaction associated
187            // with it, so we can "encourage" (force) Postgres to avoid doing a hash join and
188            // then a sort, and instead work from the events in a linear fashion.
189            // Basically, this query ends up doing:
190            //
191            // for event in events >= id:
192            //   attach attributes
193            //   attach block
194            //   attach transaction hash?
195            r#"
196WITH blocks AS (
197  SELECT * FROM blocks WHERE height >= $1 AND height <= $2
198),
199filtered_events AS (
200  SELECT e.block_id, e.rowid, e.type, e.tx_id, b.height
201  FROM events e
202  JOIN blocks b ON e.block_id = b.rowid
203),
204events_with_attrs AS (
205  SELECT
206      f.block_id,
207      f.rowid,
208      f.type,
209      f.tx_id,
210      f.height,
211      jsonb_object_agg(a.key, a.value) AS attrs
212  FROM filtered_events f
213  LEFT JOIN attributes a ON f.rowid = a.event_id
214  GROUP BY f.block_id, f.rowid, f.type, f.tx_id, f.height
215)
216SELECT e.rowid, e.type, e.height, tx.tx_hash, e.attrs
217FROM events_with_attrs e
218LEFT JOIN tx_results tx ON tx.rowid = e.tx_id
219ORDER BY e.height ASC, e.rowid ASC;
220"#,
221        )
222        .bind(first)
223        .bind(last)
224        .fetch(&self.src)
225        .map_ok(|(local_rowid, type_str, height, tx_hash, attrs)| {
226            tracing::debug!(?local_rowid, type_str, ?height, ?tx_hash);
227            let tx_hash: Option<[u8; 32]> = tx_hash.map(|s| {
228                hex::decode(s)
229                    .expect("invalid tx_hash")
230                    .try_into()
231                    .expect("expected 32 bytes")
232            });
233            let serde_json::Value::Object(attrs) = attrs else {
234                // saves an allocation below bc we can take ownership
235                panic!("expected JSON object");
236            };
237
238            let event = abci::Event {
239                kind: type_str,
240                attributes: attrs
241                    .into_iter()
242                    .filter_map(|(k, v)| match v {
243                        serde_json::Value::String(s) => Some((k, s)),
244                        // we never hit this because of how we constructed the query
245                        _ => None,
246                    })
247                    .map(Into::into)
248                    .collect(),
249            };
250            (height, event, tx_hash, local_rowid)
251        })
252        .map_err(|e| anyhow::Error::from(e).context("error reading from database"))
253    }
254
255    pub async fn event_batch(&self, first: Height, last: Height) -> anyhow::Result<EventBatch> {
256        let mut out = (u64::from(first)..=u64::from(last))
257            .map(|height| BlockEvents::new(height))
258            .collect::<Vec<_>>();
259        let mut tx_stream = self.transactions_between(first, last).boxed();
260        while let Some((height, tx_hash, tx_data)) = tx_stream.try_next().await? {
261            out[(height.0 - first.0) as usize].push_tx(tx_hash, tx_data);
262        }
263        let mut events_stream = self.events_between(first, last).boxed();
264        while let Some((height, event, tx_hash, local_rowid)) = events_stream.try_next().await? {
265            out[(height.0 - first.0) as usize].push_event(event, tx_hash, local_rowid);
266        }
267        Ok(EventBatch::new(out))
268    }
269
270    pub async fn init(src_url: &str, dst_url: &str) -> anyhow::Result<Self> {
271        tracing::info!(url = src_url, "connecting to raw database");
272        tracing::info!(url = dst_url, "connecting to derived database");
273        let (src, dst) = tokio::try_join!(read_only_db(src_url), read_write_db(dst_url))?;
274        let out = Self { src, dst };
275        out.create_watermark_table().await?;
276        Ok(out)
277    }
278
279    pub async fn begin_transaction(&self) -> anyhow::Result<Transaction<'_, Postgres>> {
280        Ok(self.dst.begin().await?)
281    }
282}