cometindex/indexer/
indexing_state.rs

1use anyhow::anyhow;
2use futures::{Stream, StreamExt, TryStreamExt};
3use prost::Message as _;
4use sqlx::{postgres::PgPoolOptions, PgPool, Postgres, Transaction};
5use std::collections::HashMap;
6use tendermint::abci::{self, Event};
7
8use crate::index::{BlockEvents, EventBatch};
9
10/// Create a Database, with, for sanity, some read only settings.
11///
12/// These will be overrideable by a consumer who knows what they're doing,
13/// but prevents basic mistakes.
14/// c.f. https://github.com/launchbadge/sqlx/issues/481#issuecomment-727011811
15async fn read_only_db(url: &str) -> anyhow::Result<PgPool> {
16    PgPoolOptions::new()
17        .after_connect(|conn, _| {
18            Box::pin(async move {
19                sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;")
20                    .execute(conn)
21                    .await?;
22                Ok(())
23            })
24        })
25        .connect(url)
26        .await
27        .map_err(Into::into)
28}
29
30async fn read_write_db(url: &str) -> anyhow::Result<PgPool> {
31    PgPoolOptions::new().connect(url).await.map_err(Into::into)
32}
33
34#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
35pub struct Height(u64);
36
37impl Height {
38    pub fn post_genesis() -> Self {
39        Height(1)
40    }
41    /// Return the last height in the batch, and then the first height in the next batch.
42    pub fn advance(self, batch_size: u64, max_height: Height) -> (Height, Height) {
43        let last = Height::from(self.0 + batch_size - 1).min(max_height);
44        let next_first = Height::from(last.0 + 1);
45        (last, next_first)
46    }
47
48    pub fn next(self) -> Height {
49        Self(self.0 + 1)
50    }
51}
52
53impl From<u64> for Height {
54    fn from(value: u64) -> Self {
55        Self(value)
56    }
57}
58
59impl From<Height> for u64 {
60    fn from(value: Height) -> Self {
61        value.0
62    }
63}
64
65impl TryFrom<i64> for Height {
66    type Error = anyhow::Error;
67
68    fn try_from(value: i64) -> Result<Self, Self::Error> {
69        Ok(Self(u64::try_from(value)?))
70    }
71}
72
73impl<'r> sqlx::Decode<'r, Postgres> for Height {
74    fn decode(
75        value: <Postgres as sqlx::Database>::ValueRef<'r>,
76    ) -> Result<Self, sqlx::error::BoxDynError> {
77        Ok(Height::try_from(
78            <i64 as sqlx::Decode<'r, Postgres>>::decode(value)?,
79        )?)
80    }
81}
82
83impl sqlx::Type<Postgres> for Height {
84    fn type_info() -> <Postgres as sqlx::Database>::TypeInfo {
85        <i64 as sqlx::Type<Postgres>>::type_info()
86    }
87}
88
89impl<'q> sqlx::Encode<'q, Postgres> for Height {
90    fn encode_by_ref(
91        &self,
92        buf: &mut <Postgres as sqlx::Database>::ArgumentBuffer<'q>,
93    ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
94        <i64 as sqlx::Encode<'q, Postgres>>::encode(
95            i64::try_from(self.0).expect("height should never exceed i64::MAX"),
96            buf,
97        )
98    }
99}
100
101#[derive(Debug, Clone)]
102pub struct IndexingState {
103    src: PgPool,
104    dst: PgPool,
105}
106
107impl IndexingState {
108    async fn create_watermark_table(&self) -> anyhow::Result<()> {
109        sqlx::query(
110            "
111        CREATE TABLE IF NOT EXISTS index_watermarks (
112            index_name TEXT PRIMARY KEY,
113            height BIGINT NOT NULL
114        )
115        ",
116        )
117        .execute(&self.dst)
118        .await?;
119        Ok(())
120    }
121
122    /// The largest height for which we know we have all events.
123    pub async fn src_height(&self) -> anyhow::Result<Height> {
124        // We may be currently indexing events for this block.
125        let res: Option<Height> = sqlx::query_scalar("SELECT MAX(height) - 1 FROM blocks")
126            .fetch_optional(&self.src)
127            .await?;
128        Ok(res.unwrap_or_default())
129    }
130
131    pub async fn index_heights(&self) -> anyhow::Result<HashMap<String, Height>> {
132        let rows: Vec<(String, Height)> =
133            sqlx::query_as("SELECT index_name, height FROM index_watermarks")
134                .fetch_all(&self.dst)
135                .await?;
136        Ok(rows.into_iter().collect())
137    }
138
139    pub async fn update_index_height(
140        dbtx: &mut sqlx::Transaction<'_, Postgres>,
141        name: &str,
142        height: Height,
143    ) -> anyhow::Result<()> {
144        sqlx::query(
145            "
146        INSERT INTO index_watermarks 
147        VALUES ($1, $2) 
148        ON CONFLICT (index_name) 
149        DO UPDATE SET height = excluded.height
150        ",
151        )
152        .bind(name)
153        .bind(height)
154        .execute(dbtx.as_mut())
155        .await?;
156        Ok(())
157    }
158
159    fn transactions_between(
160        &self,
161        first: Height,
162        last: Height,
163    ) -> impl Stream<Item = anyhow::Result<(Height, [u8; 32], Vec<u8>)>> + '_ {
164        async fn parse_row(
165            row: (i64, String, Vec<u8>),
166        ) -> anyhow::Result<(Height, [u8; 32], Vec<u8>)> {
167            let tx_hash: [u8; 32] = hex::decode(row.1)?
168                .try_into()
169                .map_err(|_| anyhow!("expected 32 byte hash"))?;
170            let tx_result = tendermint_proto::v0_37::abci::TxResult::decode(row.2.as_slice())?;
171            let transaction = tx_result.tx.to_vec();
172            let height = Height::try_from(row.0)?;
173            Ok((height, tx_hash, transaction))
174        }
175
176        sqlx::query_as::<_, (i64, String, Vec<u8>)>(
177            r#"
178SELECT height, tx_hash, tx_result
179FROM tx_results
180LEFT JOIN LATERAL (
181    SELECT height FROM blocks WHERE blocks.rowid = tx_results.block_id LIMIT 1
182) ON TRUE
183WHERE
184    block_id >= (SELECT rowid FROM blocks where height = $1)
185AND
186    block_id <= (SELECT rowid FROM blocks where height = $2)
187"#,
188        )
189        .bind(first)
190        .bind(last)
191        .fetch(&self.src)
192        .map_err(|e| anyhow::Error::from(e).context("error reading from database"))
193        .and_then(parse_row)
194    }
195
196    fn events_between(
197        &self,
198        first: Height,
199        last: Height,
200    ) -> impl Stream<Item = anyhow::Result<(Height, Event, Option<[u8; 32]>, i64)>> + '_ {
201        sqlx::query_as::<_, (i64, String, i64, Option<String>, serde_json::Value)>(
202            // This query does some shenanigans to ensure good performance.
203            // The main trick is that we know that each event has 1 block and <= 1 transaction associated
204            // with it, so we can "encourage" (force) Postgres to avoid doing a hash join and
205            // then a sort, and instead work from the events in a linear fashion.
206            // Basically, this query ends up doing:
207            //
208            // for event in events >= id:
209            //   attach attributes
210            //   attach block
211            //   attach transaction hash?
212            r#"
213SELECT
214    events.rowid,
215    events.type,
216    events.height,
217    tx_results.tx_hash,
218    events.attrs
219FROM (
220    SELECT 
221        (SELECT height FROM blocks WHERE blocks.rowid = block_id) as height,
222        rowid, 
223        type, 
224        block_id,
225        tx_id,
226        jsonb_object_agg(attributes.key, attributes.value) AS attrs
227    FROM 
228        events 
229    LEFT JOIN
230        attributes ON rowid = attributes.event_id
231    WHERE
232        block_id >= (SELECT rowid FROM blocks where height = $1)
233    AND
234        block_id <= (SELECT rowid FROM blocks where height = $2)
235    GROUP BY 
236        rowid, 
237        type,
238        block_id, 
239        tx_id
240    ORDER BY
241        rowid ASC
242) events
243LEFT JOIN LATERAL (
244    SELECT * FROM tx_results WHERE tx_results.rowid = events.tx_id LIMIT 1
245) tx_results
246ON TRUE
247ORDER BY
248    events.rowid ASC
249        "#,
250        )
251        .bind(first)
252        .bind(last)
253        .fetch(&self.src)
254        .map_ok(|(local_rowid, type_str, height, tx_hash, attrs)| {
255            tracing::debug!(?local_rowid, type_str, height, ?tx_hash);
256            let tx_hash: Option<[u8; 32]> = tx_hash.map(|s| {
257                hex::decode(s)
258                    .expect("invalid tx_hash")
259                    .try_into()
260                    .expect("expected 32 bytes")
261            });
262            let serde_json::Value::Object(attrs) = attrs else {
263                // saves an allocation below bc we can take ownership
264                panic!("expected JSON object");
265            };
266
267            let event = abci::Event {
268                kind: type_str,
269                attributes: attrs
270                    .into_iter()
271                    .filter_map(|(k, v)| match v {
272                        serde_json::Value::String(s) => Some((k, s)),
273                        // we never hit this because of how we constructed the query
274                        _ => None,
275                    })
276                    .map(Into::into)
277                    .collect(),
278            };
279            let height = Height::try_from(height).expect("failed to decode height");
280            (height, event, tx_hash, local_rowid)
281        })
282        .map_err(|e| anyhow::Error::from(e).context("error reading from database"))
283    }
284
285    pub async fn event_batch(&self, first: Height, last: Height) -> anyhow::Result<EventBatch> {
286        let mut out = (first.0..=last.0)
287            .map(|height| BlockEvents::new(height))
288            .collect::<Vec<_>>();
289        let mut tx_stream = self.transactions_between(first, last).boxed();
290        while let Some((height, tx_hash, tx_data)) = tx_stream.try_next().await? {
291            out[(height.0 - first.0) as usize].push_tx(tx_hash, tx_data);
292        }
293        let mut events_stream = self.events_between(first, last).boxed();
294        while let Some((height, event, tx_hash, local_rowid)) = events_stream.try_next().await? {
295            out[(height.0 - first.0) as usize].push_event(event, tx_hash, local_rowid);
296        }
297        Ok(EventBatch::new(out))
298    }
299
300    pub async fn init(src_url: &str, dst_url: &str) -> anyhow::Result<Self> {
301        tracing::info!(url = src_url, "connecting to raw database");
302        tracing::info!(url = dst_url, "connecting to derived database");
303        let (src, dst) = tokio::try_join!(read_only_db(src_url), read_write_db(dst_url))?;
304        let out = Self { src, dst };
305        out.create_watermark_table().await?;
306        Ok(out)
307    }
308
309    pub async fn begin_transaction(&self) -> anyhow::Result<Transaction<'_, Postgres>> {
310        Ok(self.dst.begin().await?)
311    }
312}