1mod indexing_state;
2
3use crate::{
4 index::{EventBatch, EventBatchContext},
5 opt::Options,
6 AppView,
7};
8use anyhow::{Context, Result};
9use indexing_state::{Height, IndexState, IndexingManager};
10use std::sync::Arc;
11use tokio::{sync::mpsc, task::JoinSet};
12
13async fn reset_index_if_necessary(
14 index: &dyn AppView,
15 manager: &IndexingManager,
16 dbtx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
17) -> anyhow::Result<()> {
18 let name = index.name();
19 let state = manager.index_state(&name).await?;
20 let version = index.version();
21 if version < state.version {
22 anyhow::bail!(
26 r#"
27Current version for index {name} {version:?} is lower than that recorded in the state: {0:?}.
28Are you running the right version of the code?
29If so, maybe there's a bug in this particular index.
30 "#,
31 state.version
32 );
33 } else if version > state.version {
34 tracing::info!(?name, old_version = ?state.version, new_version = ?version, "resetting index");
35 index.reset(dbtx).await?;
36 IndexingManager::update_index_state(
37 dbtx,
38 &name,
39 IndexState {
40 height: Height::default(),
41 version,
42 },
43 )
44 .await?;
45 }
46 Ok(())
47}
48
49#[tracing::instrument(skip_all)]
53async fn catchup(
54 manager: &IndexingManager,
55 indices: &[Arc<dyn AppView>],
56 genesis: Arc<serde_json::Value>,
57) -> anyhow::Result<bool> {
58 if indices.len() <= 0 {
59 tracing::info!(why = "no indices", "catchup completed");
60 return Ok(true);
61 }
62
63 let (src_height, index_states) =
64 tokio::try_join!(manager.src_height(), manager.index_states())?;
65 tracing::info!(?src_height, ?index_states, "catchup status");
66 let lowest_index_height = index_states
67 .values()
68 .map(|x| x.height)
69 .min()
70 .unwrap_or_default();
71 if lowest_index_height >= src_height {
72 tracing::info!(why = "already caught up", "catchup completed");
73 return Ok(true);
74 }
75
76 const DEFAULT_BATCH_SIZE: u64 = 1000;
78 const BATCH_LOOKAHEAD: usize = 2;
79
80 let mut tasks = JoinSet::<anyhow::Result<()>>::new();
81
82 let mut txs = Vec::with_capacity(indices.len());
83 for index in indices.iter().cloned() {
84 let (tx, mut rx) = mpsc::channel::<EventBatch>(BATCH_LOOKAHEAD);
85 txs.push(tx);
86 let name = index.name();
87 let index_state = index_states.get(&name).copied().unwrap_or_default();
88 let manager_cp = manager.clone();
89 let genesis_cp = genesis.clone();
90 tasks.spawn(async move {
91 if index_state.height == Height::default() {
92 tracing::info!(?name, "initializing index");
93 let mut dbtx = manager_cp.begin_transaction().await?;
94 index.init_chain(&mut dbtx, &genesis_cp).await?;
95 tracing::info!(?name, "finished initialization");
96 let new_state = IndexState {
97 version: index_state.version,
98 height: Height::default(),
99 };
100 IndexingManager::update_index_state(&mut dbtx, &name, new_state).await?;
101 dbtx.commit().await?;
102 } else {
103 tracing::info!(?name, "already initialized");
104 }
105 while let Some(mut events) = rx.recv().await {
106 events.start_later(index_state.height.next().into());
109 if events.empty() {
110 tracing::info!(
111 first = events.first_height(),
112 last = events.last_height(),
113 index_name = &name,
114 "skipping batch"
115 );
116 continue;
117 }
118 tracing::info!(
119 first = events.first_height(),
120 last = events.last_height(),
121 index_name = &name,
122 "indexing batch"
123 );
124 let last_height = events.last_height();
125 let mut dbtx = manager_cp.begin_transaction().await?;
126 let context = EventBatchContext {
127 is_last: last_height >= u64::from(src_height),
128 };
129 index.index_batch(&mut dbtx, events, context).await?;
130 tracing::debug!(index_name = &name, "committing batch");
131 let new_state = IndexState {
132 version: index.version(),
133 height: Height::from(last_height),
134 };
135 IndexingManager::update_index_state(&mut dbtx, &name, new_state).await?;
136
137 dbtx.commit().await?;
138 }
139 Ok(())
140 });
141 }
142
143 let manager_cp = manager.clone();
144 tasks.spawn(async move {
145 let mut height = lowest_index_height.next();
146 while height <= src_height {
147 let first = height;
148 let (last, next_height) = first.advance(DEFAULT_BATCH_SIZE, src_height);
149 height = next_height;
150 tracing::debug!(?first, ?last, "fetching batch");
151 let events = manager_cp.event_batch(first, last).await?;
152 tracing::info!(?first, ?last, "sending batch");
153 for tx in &txs {
154 tx.send(events.clone()).await?;
155 }
156 }
157 Ok(())
158 });
159
160 while let Some(res) = tasks.join_next().await {
161 res??;
162 }
163 Ok(false)
164}
165
166pub struct Indexer {
167 opts: Options,
168 indices: Vec<Arc<dyn AppView>>,
169}
170
171impl Indexer {
172 pub fn new(opts: Options) -> Self {
173 Self {
174 opts,
175 indices: Vec::new(),
176 }
177 }
178
179 pub fn with_index(mut self, index: Box<dyn AppView + 'static>) -> Self {
180 self.indices.push(Arc::from(index));
181 self
182 }
183
184 pub fn with_default_tracing(self) -> Self {
185 tracing_subscriber::fmt::init();
186 self
187 }
188
189 pub async fn run(self) -> Result<(), anyhow::Error> {
190 tracing::info!(?self.opts);
191 let Self {
192 opts:
193 Options {
194 src_database_url,
195 dst_database_url,
196 chain_id: _,
197 poll_ms,
198 genesis_json,
199 exit_on_catchup,
200 integrity_checks_only,
201 },
202 indices,
203 } = self;
204 crate::integrity::integrity_check(&src_database_url)
205 .await
206 .context("while running integrity checks")?;
207 if integrity_checks_only {
208 return Ok(());
209 }
210
211 let genesis: serde_json::Value = serde_json::from_str(
212 &std::fs::read_to_string(genesis_json)
213 .context("error reading provided genesis.json file")?,
214 )
215 .context("error parsing provided genesis.json file")?;
216 let app_state = Arc::new(
217 genesis
218 .get("app_state")
219 .ok_or_else(|| anyhow::anyhow!("genesis missing app_state"))?
220 .clone(),
221 );
222
223 let manager = IndexingManager::init(&src_database_url, &dst_database_url).await?;
224 {
225 let mut dbtx = manager.begin_transaction().await?;
226 for index in &indices {
227 reset_index_if_necessary(index.as_ref(), &manager, &mut dbtx).await?;
228 index.on_startup(&mut dbtx).await?;
229 }
230 dbtx.commit().await?;
231 }
232
233 loop {
234 let caught_up = catchup(&manager, indices.as_slice(), app_state.clone()).await?;
235 if exit_on_catchup && caught_up {
236 tracing::info!("catchup completed, exiting as requested");
237 return Ok(());
238 }
239 tokio::time::sleep(poll_ms).await;
240 }
241 }
242}