1use anyhow::anyhow;
2use futures::{Stream, StreamExt, TryStreamExt};
3use prost::Message as _;
4use sqlx::{PgPool, Postgres, Transaction};
5use std::collections::HashMap;
6use tendermint::abci::{self, Event};
7
8use crate::database::{read_only_db, read_write_db};
9use crate::index::{BlockEvents, EventBatch, Version};
10
11#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sqlx::Type)]
12#[sqlx(transparent)]
13pub struct Height(i64);
14
15impl Height {
16 pub fn post_genesis() -> Self {
17 Self(1)
18 }
19
20 pub fn advance(self, batch_size: u64, max_height: Self) -> (Self, Self) {
22 let last = Self::from(self.0 as u64 + batch_size - 1).min(max_height);
23 let next_first = Self(last.0 + 1);
24 (last, next_first)
25 }
26
27 pub fn next(self) -> Self {
28 Self(self.0 + 1)
29 }
30}
31
32impl From<u64> for Height {
33 fn from(value: u64) -> Self {
34 Self(value.try_into().unwrap_or(i64::MAX))
35 }
36}
37
38impl From<Height> for u64 {
39 fn from(value: Height) -> Self {
40 value.0.try_into().unwrap_or_default()
41 }
42}
43
44#[derive(Default, Debug, Clone, Copy)]
46pub struct IndexState {
47 pub version: Version,
49 pub height: Height,
51}
52
53#[derive(Debug, Clone)]
54pub struct IndexingManager {
55 src: PgPool,
56 dst: PgPool,
57}
58
59impl IndexingManager {
60 async fn create_watermark_table(&self) -> anyhow::Result<()> {
61 sqlx::query(
62 "
63 CREATE TABLE IF NOT EXISTS index_watermarks (
64 index_name TEXT PRIMARY KEY,
65 height BIGINT NOT NULL,
66 version BIGINT
67 )
68 ",
69 )
70 .execute(&self.dst)
71 .await?;
72 Ok(())
73 }
74
75 pub async fn src_height(&self) -> anyhow::Result<Height> {
77 let res: Option<Height> = sqlx::query_scalar("SELECT MAX(height) - 1 FROM blocks")
79 .fetch_optional(&self.src)
80 .await?;
81 Ok(res.unwrap_or_default())
82 }
83
84 pub async fn index_state(&self, name: &str) -> anyhow::Result<IndexState> {
85 let row: Option<(Height, Version)> =
86 sqlx::query_as("SELECT height, version FROM index_watermarks WHERE index_name = $1")
87 .bind(name)
88 .fetch_optional(&self.dst)
89 .await?;
90 Ok(row
91 .map(|(height, version)| IndexState { height, version })
92 .unwrap_or_default())
93 }
94
95 pub async fn index_states(&self) -> anyhow::Result<HashMap<String, IndexState>> {
96 let rows: Vec<(String, Height, Version)> =
97 sqlx::query_as("SELECT index_name, height, version FROM index_watermarks")
98 .fetch_all(&self.dst)
99 .await?;
100 Ok(rows
101 .into_iter()
102 .map(|(name, height, version)| (name, IndexState { height, version }))
103 .collect())
104 }
105
106 pub async fn update_index_state(
107 dbtx: &mut sqlx::Transaction<'_, Postgres>,
108 name: &str,
109 new_state: IndexState,
110 ) -> anyhow::Result<()> {
111 sqlx::query(
112 "
113 INSERT INTO index_watermarks
114 VALUES ($1, $2, $3)
115 ON CONFLICT (index_name)
116 DO UPDATE SET
117 height = excluded.height,
118 version = excluded.version
119 ",
120 )
121 .bind(name)
122 .bind(new_state.height)
123 .bind(new_state.version)
124 .execute(dbtx.as_mut())
125 .await?;
126 Ok(())
127 }
128
129 fn transactions_between(
130 &self,
131 first: Height,
132 last: Height,
133 ) -> impl Stream<Item = anyhow::Result<(Height, [u8; 32], Vec<u8>)>> + '_ {
134 async fn parse_row(
135 row: (i64, String, Vec<u8>),
136 ) -> anyhow::Result<(Height, [u8; 32], Vec<u8>)> {
137 let tx_hash: [u8; 32] = hex::decode(row.1)?
138 .try_into()
139 .map_err(|_| anyhow!("expected 32 byte hash"))?;
140 let tx_result = tendermint_proto::v0_37::abci::TxResult::decode(row.2.as_slice())?;
141 let transaction = tx_result.tx.to_vec();
142 let height = Height(row.0);
143 Ok((height, tx_hash, transaction))
144 }
145
146 sqlx::query_as::<_, (i64, String, Vec<u8>)>(
147 r#"
148SELECT height, tx_hash, tx_result
149FROM blocks
150JOIN tx_results ON blocks.rowid = tx_results.block_id
151WHERE
152 height >= $1
153AND
154 height <= $2
155"#,
156 )
157 .bind(first)
158 .bind(last)
159 .fetch(&self.src)
160 .map_err(|e| anyhow::Error::from(e).context("error reading from database"))
161 .and_then(parse_row)
162 }
163
164 fn events_between(
165 &self,
166 first: Height,
167 last: Height,
168 ) -> impl Stream<Item = anyhow::Result<(Height, Event, Option<[u8; 32]>, i64)>> + '_ {
169 sqlx::query_as::<_, (i64, String, Height, Option<String>, serde_json::Value)>(
170 r#"
181WITH blocks AS (
182 SELECT * FROM blocks WHERE height >= $1 AND height <= $2
183),
184filtered_events AS (
185 SELECT e.block_id, e.rowid, e.type, e.tx_id, b.height
186 FROM events e
187 JOIN blocks b ON e.block_id = b.rowid
188),
189events_with_attrs AS (
190 SELECT
191 f.block_id,
192 f.rowid,
193 f.type,
194 f.tx_id,
195 f.height,
196 jsonb_object_agg(a.key, a.value) AS attrs
197 FROM filtered_events f
198 LEFT JOIN attributes a ON f.rowid = a.event_id
199 GROUP BY f.block_id, f.rowid, f.type, f.tx_id, f.height
200)
201SELECT e.rowid, e.type, e.height, tx.tx_hash, e.attrs
202FROM events_with_attrs e
203LEFT JOIN tx_results tx ON tx.rowid = e.tx_id
204ORDER BY e.height ASC, e.rowid ASC;
205"#,
206 )
207 .bind(first)
208 .bind(last)
209 .fetch(&self.src)
210 .map_ok(|(local_rowid, type_str, height, tx_hash, attrs)| {
211 tracing::debug!(?local_rowid, type_str, ?height, ?tx_hash);
212 let tx_hash: Option<[u8; 32]> = tx_hash.map(|s| {
213 hex::decode(s)
214 .expect("invalid tx_hash")
215 .try_into()
216 .expect("expected 32 bytes")
217 });
218 let serde_json::Value::Object(attrs) = attrs else {
219 panic!("expected JSON object");
221 };
222
223 let event = abci::Event {
224 kind: type_str,
225 attributes: attrs
226 .into_iter()
227 .filter_map(|(k, v)| match v {
228 serde_json::Value::String(s) => Some((k, s)),
229 _ => None,
231 })
232 .map(Into::into)
233 .collect(),
234 };
235 (height, event, tx_hash, local_rowid)
236 })
237 .map_err(|e| anyhow::Error::from(e).context("error reading from database"))
238 }
239
240 pub async fn event_batch(&self, first: Height, last: Height) -> anyhow::Result<EventBatch> {
241 let mut out = (u64::from(first)..=u64::from(last))
242 .map(|height| BlockEvents::new(height))
243 .collect::<Vec<_>>();
244 let mut tx_stream = self.transactions_between(first, last).boxed();
245 while let Some((height, tx_hash, tx_data)) = tx_stream.try_next().await? {
246 out[(height.0 - first.0) as usize].push_tx(tx_hash, tx_data);
247 }
248 let mut events_stream = self.events_between(first, last).boxed();
249 while let Some((height, event, tx_hash, local_rowid)) = events_stream.try_next().await? {
250 out[(height.0 - first.0) as usize].push_event(event, tx_hash, local_rowid);
251 }
252 Ok(EventBatch::new(out))
253 }
254
255 pub async fn init(src_url: &str, dst_url: &str) -> anyhow::Result<Self> {
256 tracing::info!(url = src_url, "connecting to raw database");
257 tracing::info!(url = dst_url, "connecting to derived database");
258 let (src, dst) = tokio::try_join!(read_only_db(src_url), read_write_db(dst_url))?;
259 let out = Self { src, dst };
260 out.create_watermark_table().await?;
261 Ok(out)
262 }
263
264 pub async fn begin_transaction(&self) -> anyhow::Result<Transaction<'_, Postgres>> {
265 Ok(self.dst.begin().await?)
266 }
267}