1use anyhow::anyhow;
2use futures::{Stream, StreamExt, TryStreamExt};
3use prost::Message as _;
4use sqlx::{PgPool, Postgres, Transaction};
5use std::collections::HashMap;
6use tendermint::abci::{self, Event};
7
8use crate::database::{read_only_db, read_write_db};
9use crate::index::{BlockEvents, EventBatch, Version};
10
11#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sqlx::Type)]
12#[sqlx(transparent)]
13pub struct Height(i64);
14
15impl Height {
16 pub fn post_genesis() -> Self {
17 Self(1)
18 }
19
20 pub fn advance(self, batch_size: u64, max_height: Self) -> (Self, Self) {
22 let last = Self::from(self.0 as u64 + batch_size - 1).min(max_height);
23 let next_first = Self(last.0 + 1);
24 (last, next_first)
25 }
26
27 pub fn next(self) -> Self {
28 Self(self.0 + 1)
29 }
30}
31
32impl From<u64> for Height {
33 fn from(value: u64) -> Self {
34 Self(value.try_into().unwrap_or(i64::MAX))
35 }
36}
37
38impl From<Height> for u64 {
39 fn from(value: Height) -> Self {
40 value.0.try_into().unwrap_or_default()
41 }
42}
43
44#[derive(Default, Debug, Clone)]
46pub struct IndexState {
47 pub version: Version,
49 pub height: Height,
51}
52
53#[derive(Debug, Clone)]
54pub struct IndexingManager {
55 src: PgPool,
56 dst: PgPool,
57}
58
59impl IndexingManager {
60 async fn create_watermark_table(&self) -> anyhow::Result<()> {
61 sqlx::query(
62 "
63 CREATE TABLE IF NOT EXISTS index_watermarks (
64 index_name TEXT PRIMARY KEY,
65 height BIGINT NOT NULL,
66 version BIGINT
67 )
68 ",
69 )
70 .execute(&self.dst)
71 .await?;
72 sqlx::query("ALTER TABLE index_watermarks ADD COLUMN IF NOT EXISTS option_hash BYTEA")
73 .execute(&self.dst)
74 .await?;
75 Ok(())
76 }
77
78 pub async fn src_height(&self) -> anyhow::Result<Height> {
80 let res: Option<Height> = sqlx::query_scalar("SELECT MAX(height) - 1 FROM blocks")
82 .fetch_optional(&self.src)
83 .await?;
84 Ok(res.unwrap_or_default())
85 }
86
87 pub async fn index_state(&self, name: &str) -> anyhow::Result<Option<IndexState>> {
88 let row: Option<(Height, Option<i64>, Option<[u8; 32]>)> = sqlx::query_as(
89 "SELECT height, version, option_hash FROM index_watermarks WHERE index_name = $1",
90 )
91 .bind(name)
92 .fetch_optional(&self.dst)
93 .await?;
94 Ok(row.map(|(height, major, option_hash)| IndexState {
95 height,
96 version: Version::new(major, option_hash),
97 }))
98 }
99
100 pub async fn index_states(&self) -> anyhow::Result<HashMap<String, IndexState>> {
101 let rows: Vec<(String, Height, Option<i64>, Option<[u8; 32]>)> =
102 sqlx::query_as("SELECT index_name, height, version, option_hash FROM index_watermarks")
103 .fetch_all(&self.dst)
104 .await?;
105 Ok(rows
106 .into_iter()
107 .map(|(name, height, major, option_hash)| {
108 (
109 name,
110 IndexState {
111 height,
112 version: Version::new(major, option_hash),
113 },
114 )
115 })
116 .collect())
117 }
118
119 pub async fn update_index_state(
120 dbtx: &mut sqlx::Transaction<'_, Postgres>,
121 name: &str,
122 new_state: IndexState,
123 ) -> anyhow::Result<()> {
124 sqlx::query(
125 "
126 INSERT INTO index_watermarks
127 VALUES ($1, $2, $3, $4)
128 ON CONFLICT (index_name)
129 DO UPDATE SET
130 height = excluded.height,
131 version = excluded.version,
132 option_hash = excluded.option_hash
133 ",
134 )
135 .bind(name)
136 .bind(new_state.height)
137 .bind(i64::try_from(new_state.version.major())?)
138 .bind(new_state.version.option_hash())
139 .execute(dbtx.as_mut())
140 .await?;
141 Ok(())
142 }
143
144 fn transactions_between(
145 &self,
146 first: Height,
147 last: Height,
148 ) -> impl Stream<Item = anyhow::Result<(Height, [u8; 32], Vec<u8>)>> + '_ {
149 async fn parse_row(
150 row: (i64, String, Vec<u8>),
151 ) -> anyhow::Result<(Height, [u8; 32], Vec<u8>)> {
152 let tx_hash: [u8; 32] = hex::decode(row.1)?
153 .try_into()
154 .map_err(|_| anyhow!("expected 32 byte hash"))?;
155 let tx_result = tendermint_proto::v0_37::abci::TxResult::decode(row.2.as_slice())?;
156 let transaction = tx_result.tx.to_vec();
157 let height = Height(row.0);
158 Ok((height, tx_hash, transaction))
159 }
160
161 sqlx::query_as::<_, (i64, String, Vec<u8>)>(
162 r#"
163SELECT height, tx_hash, tx_result
164FROM blocks
165JOIN tx_results ON blocks.rowid = tx_results.block_id
166WHERE
167 height >= $1
168AND
169 height <= $2
170"#,
171 )
172 .bind(first)
173 .bind(last)
174 .fetch(&self.src)
175 .map_err(|e| anyhow::Error::from(e).context("error reading from database"))
176 .and_then(parse_row)
177 }
178
179 fn events_between(
180 &self,
181 first: Height,
182 last: Height,
183 ) -> impl Stream<Item = anyhow::Result<(Height, Event, Option<[u8; 32]>, i64)>> + '_ {
184 sqlx::query_as::<_, (i64, String, Height, Option<String>, serde_json::Value)>(
185 r#"
196WITH blocks AS (
197 SELECT * FROM blocks WHERE height >= $1 AND height <= $2
198),
199filtered_events AS (
200 SELECT e.block_id, e.rowid, e.type, e.tx_id, b.height
201 FROM events e
202 JOIN blocks b ON e.block_id = b.rowid
203),
204events_with_attrs AS (
205 SELECT
206 f.block_id,
207 f.rowid,
208 f.type,
209 f.tx_id,
210 f.height,
211 jsonb_object_agg(a.key, a.value) AS attrs
212 FROM filtered_events f
213 LEFT JOIN attributes a ON f.rowid = a.event_id
214 GROUP BY f.block_id, f.rowid, f.type, f.tx_id, f.height
215)
216SELECT e.rowid, e.type, e.height, tx.tx_hash, e.attrs
217FROM events_with_attrs e
218LEFT JOIN tx_results tx ON tx.rowid = e.tx_id
219ORDER BY e.height ASC, e.rowid ASC;
220"#,
221 )
222 .bind(first)
223 .bind(last)
224 .fetch(&self.src)
225 .map_ok(|(local_rowid, type_str, height, tx_hash, attrs)| {
226 tracing::debug!(?local_rowid, type_str, ?height, ?tx_hash);
227 let tx_hash: Option<[u8; 32]> = tx_hash.map(|s| {
228 hex::decode(s)
229 .expect("invalid tx_hash")
230 .try_into()
231 .expect("expected 32 bytes")
232 });
233 let serde_json::Value::Object(attrs) = attrs else {
234 panic!("expected JSON object");
236 };
237
238 let event = abci::Event {
239 kind: type_str,
240 attributes: attrs
241 .into_iter()
242 .filter_map(|(k, v)| match v {
243 serde_json::Value::String(s) => Some((k, s)),
244 _ => None,
246 })
247 .map(Into::into)
248 .collect(),
249 };
250 (height, event, tx_hash, local_rowid)
251 })
252 .map_err(|e| anyhow::Error::from(e).context("error reading from database"))
253 }
254
255 pub async fn event_batch(&self, first: Height, last: Height) -> anyhow::Result<EventBatch> {
256 let mut out = (u64::from(first)..=u64::from(last))
257 .map(|height| BlockEvents::new(height))
258 .collect::<Vec<_>>();
259 let mut tx_stream = self.transactions_between(first, last).boxed();
260 while let Some((height, tx_hash, tx_data)) = tx_stream.try_next().await? {
261 out[(height.0 - first.0) as usize].push_tx(tx_hash, tx_data);
262 }
263 let mut events_stream = self.events_between(first, last).boxed();
264 while let Some((height, event, tx_hash, local_rowid)) = events_stream.try_next().await? {
265 out[(height.0 - first.0) as usize].push_event(event, tx_hash, local_rowid);
266 }
267 Ok(EventBatch::new(out))
268 }
269
270 pub async fn init(src_url: &str, dst_url: &str) -> anyhow::Result<Self> {
271 tracing::info!(url = src_url, "connecting to raw database");
272 tracing::info!(url = dst_url, "connecting to derived database");
273 let (src, dst) = tokio::try_join!(read_only_db(src_url), read_write_db(dst_url))?;
274 let out = Self { src, dst };
275 out.create_watermark_table().await?;
276 Ok(out)
277 }
278
279 pub async fn begin_transaction(&self) -> anyhow::Result<Transaction<'_, Postgres>> {
280 Ok(self.dst.begin().await?)
281 }
282}