1mod indexing_state;
2
3use crate::{
4 index::{EventBatch, EventBatchContext},
5 opt::Options,
6 AppView,
7};
8use anyhow::{Context, Result};
9use indexing_state::{Height, IndexState, IndexingManager};
10use std::sync::Arc;
11use tokio::{sync::mpsc, task::JoinSet};
12
13async fn reset_index_if_necessary(
14 index: &dyn AppView,
15 manager: &IndexingManager,
16 dbtx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
17 new_options: bool,
18) -> anyhow::Result<()> {
19 let name = index.name();
20 let state = manager.index_state(&name).await?;
21 let version = index.version();
22 let old_version = match state {
25 None => {
26 tracing::info!(?name, new_version = ?version, "initializing index state");
27 IndexingManager::update_index_state(
28 dbtx,
29 &name,
30 IndexState {
31 height: Height::default(),
32 version,
33 },
34 )
35 .await?;
36
37 return Ok(());
38 }
39 Some(s) => s.version,
40 };
41 let options_changed = version.option_hash() != old_version.option_hash();
42 if options_changed && !new_options {
43 let new = version
44 .option_hash()
45 .map(hex::encode)
46 .unwrap_or_else(|| "NULL".to_string());
47 let old = old_version
48 .option_hash()
49 .map(hex::encode)
50 .unwrap_or_else(|| "NULL".to_string());
51 anyhow::bail!(
52 r#"
53Current option hash for index {name} is {old}, but new option hash is {new}.
54Use `--new-options` to allow changing the options for app views.
55If this was not your intent, check that the options have not changed.
56"#
57 );
58 }
59 if version.major() < old_version.major() {
60 anyhow::bail!(
64 r#"
65Current version for index {name} {version:?} is lower than that recorded in the state: {0:?}.
66Are you running the right version of the code?
67If so, maybe there's a bug in this particular index.
68 "#,
69 old_version
70 );
71 } else if version.major() > old_version.major() || options_changed {
72 tracing::info!(?name, ?old_version, new_version = ?version, "resetting index");
73 index.reset(dbtx).await?;
74 IndexingManager::update_index_state(
75 dbtx,
76 &name,
77 IndexState {
78 height: Height::default(),
79 version,
80 },
81 )
82 .await?;
83 }
84 Ok(())
85}
86
87#[tracing::instrument(skip_all)]
91async fn catchup(
92 manager: &IndexingManager,
93 indices: &[Arc<dyn AppView>],
94 genesis: Arc<serde_json::Value>,
95) -> anyhow::Result<bool> {
96 if indices.len() <= 0 {
97 tracing::info!(why = "no indices", "catchup completed");
98 return Ok(true);
99 }
100
101 let (src_height, index_states) =
102 tokio::try_join!(manager.src_height(), manager.index_states())?;
103 tracing::info!(?src_height, ?index_states, "catchup status");
104 let lowest_index_height = index_states
105 .values()
106 .map(|x| x.height)
107 .min()
108 .unwrap_or_default();
109 if lowest_index_height >= src_height {
110 tracing::info!(why = "already caught up", "catchup completed");
111 return Ok(true);
112 }
113
114 const DEFAULT_BATCH_SIZE: u64 = 1000;
116 const BATCH_LOOKAHEAD: usize = 2;
117
118 let mut tasks = JoinSet::<anyhow::Result<()>>::new();
119
120 let mut txs = Vec::with_capacity(indices.len());
121 for index in indices.iter().cloned() {
122 let (tx, mut rx) = mpsc::channel::<EventBatch>(BATCH_LOOKAHEAD);
123 txs.push(tx);
124 let name = index.name();
125 let index_state = index_states.get(&name).cloned().unwrap_or_default();
126 let manager_cp = manager.clone();
127 let genesis_cp = genesis.clone();
128 tasks.spawn(async move {
129 if index_state.height == Height::default() {
130 tracing::info!(?name, "initializing index");
131 let mut dbtx = manager_cp.begin_transaction().await?;
132 index.init_chain(&mut dbtx, &genesis_cp).await?;
133 tracing::info!(?name, "finished initialization");
134 let new_state = IndexState {
135 version: index_state.version,
136 height: Height::default(),
137 };
138 IndexingManager::update_index_state(&mut dbtx, &name, new_state).await?;
139 dbtx.commit().await?;
140 } else {
141 tracing::info!(?name, "already initialized");
142 }
143 while let Some(mut events) = rx.recv().await {
144 events.start_later(index_state.height.next().into());
147 if events.empty() {
148 tracing::info!(
149 first = events.first_height(),
150 last = events.last_height(),
151 index_name = &name,
152 "skipping batch"
153 );
154 continue;
155 }
156 tracing::info!(
157 first = events.first_height(),
158 last = events.last_height(),
159 index_name = &name,
160 "indexing batch"
161 );
162 let last_height = events.last_height();
163 let mut dbtx = manager_cp.begin_transaction().await?;
164 let context = EventBatchContext {
165 is_last: last_height >= u64::from(src_height),
166 };
167 index.index_batch(&mut dbtx, events, context).await?;
168 tracing::debug!(index_name = &name, "committing batch");
169 let new_state = IndexState {
170 version: index.version(),
171 height: Height::from(last_height),
172 };
173 IndexingManager::update_index_state(&mut dbtx, &name, new_state).await?;
174
175 dbtx.commit().await?;
176 }
177 Ok(())
178 });
179 }
180
181 let manager_cp = manager.clone();
182 tasks.spawn(async move {
183 let mut height = lowest_index_height.next();
184 while height <= src_height {
185 let first = height;
186 let (last, next_height) = first.advance(DEFAULT_BATCH_SIZE, src_height);
187 height = next_height;
188 tracing::debug!(?first, ?last, "fetching batch");
189 let events = manager_cp.event_batch(first, last).await?;
190 tracing::info!(?first, ?last, "sending batch");
191 for tx in &txs {
192 tx.send(events.clone()).await?;
193 }
194 }
195 Ok(())
196 });
197
198 while let Some(res) = tasks.join_next().await {
199 res??;
200 }
201 Ok(false)
202}
203
204pub struct Indexer {
205 opts: Options,
206 indices: Vec<Arc<dyn AppView>>,
207}
208
209impl Indexer {
210 pub fn new(opts: Options) -> Self {
211 Self {
212 opts,
213 indices: Vec::new(),
214 }
215 }
216
217 pub fn with_index(mut self, index: Box<dyn AppView + 'static>) -> Self {
218 self.indices.push(Arc::from(index));
219 self
220 }
221
222 pub fn with_default_tracing(self) -> Self {
223 tracing_subscriber::fmt::init();
224 self
225 }
226
227 pub async fn run(self) -> Result<(), anyhow::Error> {
228 tracing::info!(?self.opts);
229 let Self {
230 opts:
231 Options {
232 src_database_url,
233 dst_database_url,
234 chain_id: _,
235 poll_ms,
236 genesis_json,
237 exit_on_catchup,
238 integrity_checks_only,
239 new_options,
240 },
241 indices,
242 } = self;
243 crate::integrity::integrity_check(&src_database_url)
244 .await
245 .context("while running integrity checks")?;
246 if integrity_checks_only {
247 return Ok(());
248 }
249
250 let genesis: serde_json::Value = serde_json::from_str(
251 &std::fs::read_to_string(genesis_json)
252 .context("error reading provided genesis.json file")?,
253 )
254 .context("error parsing provided genesis.json file")?;
255 let app_state = Arc::new(
256 genesis
257 .get("app_state")
258 .ok_or_else(|| anyhow::anyhow!("genesis missing app_state"))?
259 .clone(),
260 );
261
262 let manager = IndexingManager::init(&src_database_url, &dst_database_url).await?;
263 {
264 let mut dbtx = manager.begin_transaction().await?;
265 for index in &indices {
266 reset_index_if_necessary(index.as_ref(), &manager, &mut dbtx, new_options).await?;
267 index.on_startup(&mut dbtx).await?;
268 }
269 dbtx.commit().await?;
270 }
271
272 loop {
273 let caught_up = catchup(&manager, indices.as_slice(), app_state.clone()).await?;
274 if exit_on_catchup && caught_up {
275 tracing::info!("catchup completed, exiting as requested");
276 return Ok(());
277 }
278 tokio::time::sleep(poll_ms).await;
279 }
280 }
281}