1use anyhow::anyhow;
2use futures::{Stream, StreamExt, TryStreamExt};
3use prost::Message as _;
4use sqlx::{postgres::PgPoolOptions, PgPool, Postgres, Transaction};
5use std::collections::HashMap;
6use tendermint::abci::{self, Event};
7
8use crate::index::{BlockEvents, EventBatch};
9
10async fn read_only_db(url: &str) -> anyhow::Result<PgPool> {
16 PgPoolOptions::new()
17 .after_connect(|conn, _| {
18 Box::pin(async move {
19 sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;")
20 .execute(conn)
21 .await?;
22 Ok(())
23 })
24 })
25 .connect(url)
26 .await
27 .map_err(Into::into)
28}
29
30async fn read_write_db(url: &str) -> anyhow::Result<PgPool> {
31 PgPoolOptions::new().connect(url).await.map_err(Into::into)
32}
33
34#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
35pub struct Height(u64);
36
37impl Height {
38 pub fn post_genesis() -> Self {
39 Height(1)
40 }
41 pub fn advance(self, batch_size: u64, max_height: Height) -> (Height, Height) {
43 let last = Height::from(self.0 + batch_size - 1).min(max_height);
44 let next_first = Height::from(last.0 + 1);
45 (last, next_first)
46 }
47
48 pub fn next(self) -> Height {
49 Self(self.0 + 1)
50 }
51}
52
53impl From<u64> for Height {
54 fn from(value: u64) -> Self {
55 Self(value)
56 }
57}
58
59impl From<Height> for u64 {
60 fn from(value: Height) -> Self {
61 value.0
62 }
63}
64
65impl TryFrom<i64> for Height {
66 type Error = anyhow::Error;
67
68 fn try_from(value: i64) -> Result<Self, Self::Error> {
69 Ok(Self(u64::try_from(value)?))
70 }
71}
72
73impl<'r> sqlx::Decode<'r, Postgres> for Height {
74 fn decode(
75 value: <Postgres as sqlx::Database>::ValueRef<'r>,
76 ) -> Result<Self, sqlx::error::BoxDynError> {
77 Ok(Height::try_from(
78 <i64 as sqlx::Decode<'r, Postgres>>::decode(value)?,
79 )?)
80 }
81}
82
83impl sqlx::Type<Postgres> for Height {
84 fn type_info() -> <Postgres as sqlx::Database>::TypeInfo {
85 <i64 as sqlx::Type<Postgres>>::type_info()
86 }
87}
88
89impl<'q> sqlx::Encode<'q, Postgres> for Height {
90 fn encode_by_ref(
91 &self,
92 buf: &mut <Postgres as sqlx::Database>::ArgumentBuffer<'q>,
93 ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
94 <i64 as sqlx::Encode<'q, Postgres>>::encode(
95 i64::try_from(self.0).expect("height should never exceed i64::MAX"),
96 buf,
97 )
98 }
99}
100
101#[derive(Debug, Clone)]
102pub struct IndexingState {
103 src: PgPool,
104 dst: PgPool,
105}
106
107impl IndexingState {
108 async fn create_watermark_table(&self) -> anyhow::Result<()> {
109 sqlx::query(
110 "
111 CREATE TABLE IF NOT EXISTS index_watermarks (
112 index_name TEXT PRIMARY KEY,
113 height BIGINT NOT NULL
114 )
115 ",
116 )
117 .execute(&self.dst)
118 .await?;
119 Ok(())
120 }
121
122 pub async fn src_height(&self) -> anyhow::Result<Height> {
124 let res: Option<Height> = sqlx::query_scalar("SELECT MAX(height) - 1 FROM blocks")
126 .fetch_optional(&self.src)
127 .await?;
128 Ok(res.unwrap_or_default())
129 }
130
131 pub async fn index_heights(&self) -> anyhow::Result<HashMap<String, Height>> {
132 let rows: Vec<(String, Height)> =
133 sqlx::query_as("SELECT index_name, height FROM index_watermarks")
134 .fetch_all(&self.dst)
135 .await?;
136 Ok(rows.into_iter().collect())
137 }
138
139 pub async fn update_index_height(
140 dbtx: &mut sqlx::Transaction<'_, Postgres>,
141 name: &str,
142 height: Height,
143 ) -> anyhow::Result<()> {
144 sqlx::query(
145 "
146 INSERT INTO index_watermarks
147 VALUES ($1, $2)
148 ON CONFLICT (index_name)
149 DO UPDATE SET height = excluded.height
150 ",
151 )
152 .bind(name)
153 .bind(height)
154 .execute(dbtx.as_mut())
155 .await?;
156 Ok(())
157 }
158
159 fn transactions_between(
160 &self,
161 first: Height,
162 last: Height,
163 ) -> impl Stream<Item = anyhow::Result<(Height, [u8; 32], Vec<u8>)>> + '_ {
164 async fn parse_row(
165 row: (i64, String, Vec<u8>),
166 ) -> anyhow::Result<(Height, [u8; 32], Vec<u8>)> {
167 let tx_hash: [u8; 32] = hex::decode(row.1)?
168 .try_into()
169 .map_err(|_| anyhow!("expected 32 byte hash"))?;
170 let tx_result = tendermint_proto::v0_37::abci::TxResult::decode(row.2.as_slice())?;
171 let transaction = tx_result.tx.to_vec();
172 let height = Height::try_from(row.0)?;
173 Ok((height, tx_hash, transaction))
174 }
175
176 sqlx::query_as::<_, (i64, String, Vec<u8>)>(
177 r#"
178SELECT height, tx_hash, tx_result
179FROM tx_results
180LEFT JOIN LATERAL (
181 SELECT height FROM blocks WHERE blocks.rowid = tx_results.block_id LIMIT 1
182) ON TRUE
183WHERE
184 block_id >= (SELECT rowid FROM blocks where height = $1)
185AND
186 block_id <= (SELECT rowid FROM blocks where height = $2)
187"#,
188 )
189 .bind(first)
190 .bind(last)
191 .fetch(&self.src)
192 .map_err(|e| anyhow::Error::from(e).context("error reading from database"))
193 .and_then(parse_row)
194 }
195
196 fn events_between(
197 &self,
198 first: Height,
199 last: Height,
200 ) -> impl Stream<Item = anyhow::Result<(Height, Event, Option<[u8; 32]>, i64)>> + '_ {
201 sqlx::query_as::<_, (i64, String, i64, Option<String>, serde_json::Value)>(
202 r#"
213SELECT
214 events.rowid,
215 events.type,
216 events.height,
217 tx_results.tx_hash,
218 events.attrs
219FROM (
220 SELECT
221 (SELECT height FROM blocks WHERE blocks.rowid = block_id) as height,
222 rowid,
223 type,
224 block_id,
225 tx_id,
226 jsonb_object_agg(attributes.key, attributes.value) AS attrs
227 FROM
228 events
229 LEFT JOIN
230 attributes ON rowid = attributes.event_id
231 WHERE
232 block_id >= (SELECT rowid FROM blocks where height = $1)
233 AND
234 block_id <= (SELECT rowid FROM blocks where height = $2)
235 GROUP BY
236 rowid,
237 type,
238 block_id,
239 tx_id
240 ORDER BY
241 rowid ASC
242) events
243LEFT JOIN LATERAL (
244 SELECT * FROM tx_results WHERE tx_results.rowid = events.tx_id LIMIT 1
245) tx_results
246ON TRUE
247ORDER BY
248 events.rowid ASC
249 "#,
250 )
251 .bind(first)
252 .bind(last)
253 .fetch(&self.src)
254 .map_ok(|(local_rowid, type_str, height, tx_hash, attrs)| {
255 tracing::debug!(?local_rowid, type_str, height, ?tx_hash);
256 let tx_hash: Option<[u8; 32]> = tx_hash.map(|s| {
257 hex::decode(s)
258 .expect("invalid tx_hash")
259 .try_into()
260 .expect("expected 32 bytes")
261 });
262 let serde_json::Value::Object(attrs) = attrs else {
263 panic!("expected JSON object");
265 };
266
267 let event = abci::Event {
268 kind: type_str,
269 attributes: attrs
270 .into_iter()
271 .filter_map(|(k, v)| match v {
272 serde_json::Value::String(s) => Some((k, s)),
273 _ => None,
275 })
276 .map(Into::into)
277 .collect(),
278 };
279 let height = Height::try_from(height).expect("failed to decode height");
280 (height, event, tx_hash, local_rowid)
281 })
282 .map_err(|e| anyhow::Error::from(e).context("error reading from database"))
283 }
284
285 pub async fn event_batch(&self, first: Height, last: Height) -> anyhow::Result<EventBatch> {
286 let mut out = (first.0..=last.0)
287 .map(|height| BlockEvents::new(height))
288 .collect::<Vec<_>>();
289 let mut tx_stream = self.transactions_between(first, last).boxed();
290 while let Some((height, tx_hash, tx_data)) = tx_stream.try_next().await? {
291 out[(height.0 - first.0) as usize].push_tx(tx_hash, tx_data);
292 }
293 let mut events_stream = self.events_between(first, last).boxed();
294 while let Some((height, event, tx_hash, local_rowid)) = events_stream.try_next().await? {
295 out[(height.0 - first.0) as usize].push_event(event, tx_hash, local_rowid);
296 }
297 Ok(EventBatch::new(out))
298 }
299
300 pub async fn init(src_url: &str, dst_url: &str) -> anyhow::Result<Self> {
301 tracing::info!(url = src_url, "connecting to raw database");
302 tracing::info!(url = dst_url, "connecting to derived database");
303 let (src, dst) = tokio::try_join!(read_only_db(src_url), read_write_db(dst_url))?;
304 let out = Self { src, dst };
305 out.create_watermark_table().await?;
306 Ok(out)
307 }
308
309 pub async fn begin_transaction(&self) -> anyhow::Result<Transaction<'_, Postgres>> {
310 Ok(self.dst.begin().await?)
311 }
312}