cometindex/
indexer.rs

1mod indexing_state;
2
3use crate::{
4    index::{EventBatch, EventBatchContext},
5    opt::Options,
6    AppView,
7};
8use anyhow::{Context, Result};
9use indexing_state::{Height, IndexState, IndexingManager};
10use std::sync::Arc;
11use tokio::{sync::mpsc, task::JoinSet};
12
13async fn reset_index_if_necessary(
14    index: &dyn AppView,
15    manager: &IndexingManager,
16    dbtx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
17    new_options: bool,
18) -> anyhow::Result<()> {
19    let name = index.name();
20    let state = manager.index_state(&name).await?;
21    let version = index.version();
22    // If there's no previous version, this index is new, and we can just write its version
23    // out, without a need to reset it at all.
24    let old_version = match state {
25        None => {
26            tracing::info!(?name, new_version = ?version, "initializing index state");
27            IndexingManager::update_index_state(
28                dbtx,
29                &name,
30                IndexState {
31                    height: Height::default(),
32                    version,
33                },
34            )
35            .await?;
36
37            return Ok(());
38        }
39        Some(s) => s.version,
40    };
41    let options_changed = version.option_hash() != old_version.option_hash();
42    if options_changed && !new_options {
43        let new = version
44            .option_hash()
45            .map(hex::encode)
46            .unwrap_or_else(|| "NULL".to_string());
47        let old = old_version
48            .option_hash()
49            .map(hex::encode)
50            .unwrap_or_else(|| "NULL".to_string());
51        anyhow::bail!(
52            r#"
53Current option hash for index {name} is {old}, but new option hash is {new}.
54Use `--new-options` to allow changing the options for app views.
55If this was not your intent, check that the options have not changed.
56"#
57        );
58    }
59    if version.major() < old_version.major() {
60        // My thinking is that the only reason this can happen is that:
61        // a) Someone accidentally decreased the version in their AppView.
62        // b) For some reason, we're running the wrong version of the consuming pindexer against a DB.
63        anyhow::bail!(
64            r#"
65Current version for index {name} {version:?} is lower than that recorded in the state: {0:?}.
66Are you running the right version of the code?
67If so, maybe there's a bug in this particular index.
68        "#,
69            old_version
70        );
71    } else if version.major() > old_version.major() || options_changed {
72        tracing::info!(?name, ?old_version, new_version = ?version, "resetting index");
73        index.reset(dbtx).await?;
74        IndexingManager::update_index_state(
75            dbtx,
76            &name,
77            IndexState {
78                height: Height::default(),
79                version,
80            },
81        )
82        .await?;
83    }
84    Ok(())
85}
86
87/// Attempt to catch up to the latest indexed block.
88///
89/// Returns whether or not we've caught up.
90#[tracing::instrument(skip_all)]
91async fn catchup(
92    manager: &IndexingManager,
93    indices: &[Arc<dyn AppView>],
94    genesis: Arc<serde_json::Value>,
95) -> anyhow::Result<bool> {
96    if indices.len() <= 0 {
97        tracing::info!(why = "no indices", "catchup completed");
98        return Ok(true);
99    }
100
101    let (src_height, index_states) =
102        tokio::try_join!(manager.src_height(), manager.index_states())?;
103    tracing::info!(?src_height, ?index_states, "catchup status");
104    let lowest_index_height = index_states
105        .values()
106        .map(|x| x.height)
107        .min()
108        .unwrap_or_default();
109    if lowest_index_height >= src_height {
110        tracing::info!(why = "already caught up", "catchup completed");
111        return Ok(true);
112    }
113
114    // Constants that influence performance.
115    const DEFAULT_BATCH_SIZE: u64 = 1000;
116    const BATCH_LOOKAHEAD: usize = 2;
117
118    let mut tasks = JoinSet::<anyhow::Result<()>>::new();
119
120    let mut txs = Vec::with_capacity(indices.len());
121    for index in indices.iter().cloned() {
122        let (tx, mut rx) = mpsc::channel::<EventBatch>(BATCH_LOOKAHEAD);
123        txs.push(tx);
124        let name = index.name();
125        let index_state = index_states.get(&name).cloned().unwrap_or_default();
126        let manager_cp = manager.clone();
127        let genesis_cp = genesis.clone();
128        tasks.spawn(async move {
129            if index_state.height == Height::default() {
130                tracing::info!(?name, "initializing index");
131                let mut dbtx = manager_cp.begin_transaction().await?;
132                index.init_chain(&mut dbtx, &genesis_cp).await?;
133                tracing::info!(?name, "finished initialization");
134                let new_state = IndexState {
135                    version: index_state.version,
136                    height: Height::default(),
137                };
138                IndexingManager::update_index_state(&mut dbtx, &name, new_state).await?;
139                dbtx.commit().await?;
140            } else {
141                tracing::info!(?name, "already initialized");
142            }
143            while let Some(mut events) = rx.recv().await {
144                // We only ever want to index events past our current height.
145                // We might receive a batch with more events because other indices are behind us.
146                events.start_later(index_state.height.next().into());
147                if events.empty() {
148                    tracing::info!(
149                        first = events.first_height(),
150                        last = events.last_height(),
151                        index_name = &name,
152                        "skipping batch"
153                    );
154                    continue;
155                }
156                tracing::info!(
157                    first = events.first_height(),
158                    last = events.last_height(),
159                    index_name = &name,
160                    "indexing batch"
161                );
162                let last_height = events.last_height();
163                let mut dbtx = manager_cp.begin_transaction().await?;
164                let context = EventBatchContext {
165                    is_last: last_height >= u64::from(src_height),
166                };
167                index.index_batch(&mut dbtx, events, context).await?;
168                tracing::debug!(index_name = &name, "committing batch");
169                let new_state = IndexState {
170                    version: index.version(),
171                    height: Height::from(last_height),
172                };
173                IndexingManager::update_index_state(&mut dbtx, &name, new_state).await?;
174
175                dbtx.commit().await?;
176            }
177            Ok(())
178        });
179    }
180
181    let manager_cp = manager.clone();
182    tasks.spawn(async move {
183        let mut height = lowest_index_height.next();
184        while height <= src_height {
185            let first = height;
186            let (last, next_height) = first.advance(DEFAULT_BATCH_SIZE, src_height);
187            height = next_height;
188            tracing::debug!(?first, ?last, "fetching batch");
189            let events = manager_cp.event_batch(first, last).await?;
190            tracing::info!(?first, ?last, "sending batch");
191            for tx in &txs {
192                tx.send(events.clone()).await?;
193            }
194        }
195        Ok(())
196    });
197
198    while let Some(res) = tasks.join_next().await {
199        res??;
200    }
201    Ok(false)
202}
203
204pub struct Indexer {
205    opts: Options,
206    indices: Vec<Arc<dyn AppView>>,
207}
208
209impl Indexer {
210    pub fn new(opts: Options) -> Self {
211        Self {
212            opts,
213            indices: Vec::new(),
214        }
215    }
216
217    pub fn with_index(mut self, index: Box<dyn AppView + 'static>) -> Self {
218        self.indices.push(Arc::from(index));
219        self
220    }
221
222    pub fn with_default_tracing(self) -> Self {
223        tracing_subscriber::fmt::init();
224        self
225    }
226
227    pub async fn run(self) -> Result<(), anyhow::Error> {
228        tracing::info!(?self.opts);
229        let Self {
230            opts:
231                Options {
232                    src_database_url,
233                    dst_database_url,
234                    chain_id: _,
235                    poll_ms,
236                    genesis_json,
237                    exit_on_catchup,
238                    integrity_checks_only,
239                    new_options,
240                },
241            indices,
242        } = self;
243        crate::integrity::integrity_check(&src_database_url)
244            .await
245            .context("while running integrity checks")?;
246        if integrity_checks_only {
247            return Ok(());
248        }
249
250        let genesis: serde_json::Value = serde_json::from_str(
251            &std::fs::read_to_string(genesis_json)
252                .context("error reading provided genesis.json file")?,
253        )
254        .context("error parsing provided genesis.json file")?;
255        let app_state = Arc::new(
256            genesis
257                .get("app_state")
258                .ok_or_else(|| anyhow::anyhow!("genesis missing app_state"))?
259                .clone(),
260        );
261
262        let manager = IndexingManager::init(&src_database_url, &dst_database_url).await?;
263        {
264            let mut dbtx = manager.begin_transaction().await?;
265            for index in &indices {
266                reset_index_if_necessary(index.as_ref(), &manager, &mut dbtx, new_options).await?;
267                index.on_startup(&mut dbtx).await?;
268            }
269            dbtx.commit().await?;
270        }
271
272        loop {
273            let caught_up = catchup(&manager, indices.as_slice(), app_state.clone()).await?;
274            if exit_on_catchup && caught_up {
275                tracing::info!("catchup completed, exiting as requested");
276                return Ok(());
277            }
278            tokio::time::sleep(poll_ms).await;
279        }
280    }
281}