cometindex/
indexer.rs

1mod indexing_state;
2
3use crate::{
4    index::{EventBatch, EventBatchContext},
5    opt::Options,
6    AppView,
7};
8use anyhow::{Context, Result};
9use indexing_state::{Height, IndexState, IndexingManager};
10use std::sync::Arc;
11use tokio::{sync::mpsc, task::JoinSet};
12
13async fn reset_index_if_necessary(
14    index: &dyn AppView,
15    manager: &IndexingManager,
16    dbtx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
17) -> anyhow::Result<()> {
18    let name = index.name();
19    let state = manager.index_state(&name).await?;
20    let version = index.version();
21    if version < state.version {
22        // My thinking is that the only reason this can happen is that:
23        // a) Someone accidentally decreased the version in their AppView.
24        // b) For some reason, we're running the wrong version of the consuming pindexer against a DB.
25        anyhow::bail!(
26            r#"
27Current version for index {name} {version:?} is lower than that recorded in the state: {0:?}.
28Are you running the right version of the code?
29If so, maybe there's a bug in this particular index.
30        "#,
31            state.version
32        );
33    } else if version > state.version {
34        tracing::info!(?name, old_version = ?state.version, new_version = ?version, "resetting index");
35        index.reset(dbtx).await?;
36        IndexingManager::update_index_state(
37            dbtx,
38            &name,
39            IndexState {
40                height: Height::default(),
41                version,
42            },
43        )
44        .await?;
45    }
46    Ok(())
47}
48
49/// Attempt to catch up to the latest indexed block.
50///
51/// Returns whether or not we've caught up.
52#[tracing::instrument(skip_all)]
53async fn catchup(
54    manager: &IndexingManager,
55    indices: &[Arc<dyn AppView>],
56    genesis: Arc<serde_json::Value>,
57) -> anyhow::Result<bool> {
58    if indices.len() <= 0 {
59        tracing::info!(why = "no indices", "catchup completed");
60        return Ok(true);
61    }
62
63    let (src_height, index_states) =
64        tokio::try_join!(manager.src_height(), manager.index_states())?;
65    tracing::info!(?src_height, ?index_states, "catchup status");
66    let lowest_index_height = index_states
67        .values()
68        .map(|x| x.height)
69        .min()
70        .unwrap_or_default();
71    if lowest_index_height >= src_height {
72        tracing::info!(why = "already caught up", "catchup completed");
73        return Ok(true);
74    }
75
76    // Constants that influence performance.
77    const DEFAULT_BATCH_SIZE: u64 = 1000;
78    const BATCH_LOOKAHEAD: usize = 2;
79
80    let mut tasks = JoinSet::<anyhow::Result<()>>::new();
81
82    let mut txs = Vec::with_capacity(indices.len());
83    for index in indices.iter().cloned() {
84        let (tx, mut rx) = mpsc::channel::<EventBatch>(BATCH_LOOKAHEAD);
85        txs.push(tx);
86        let name = index.name();
87        let index_state = index_states.get(&name).copied().unwrap_or_default();
88        let manager_cp = manager.clone();
89        let genesis_cp = genesis.clone();
90        tasks.spawn(async move {
91            if index_state.height == Height::default() {
92                tracing::info!(?name, "initializing index");
93                let mut dbtx = manager_cp.begin_transaction().await?;
94                index.init_chain(&mut dbtx, &genesis_cp).await?;
95                tracing::info!(?name, "finished initialization");
96                let new_state = IndexState {
97                    version: index_state.version,
98                    height: Height::default(),
99                };
100                IndexingManager::update_index_state(&mut dbtx, &name, new_state).await?;
101                dbtx.commit().await?;
102            } else {
103                tracing::info!(?name, "already initialized");
104            }
105            while let Some(mut events) = rx.recv().await {
106                // We only ever want to index events past our current height.
107                // We might receive a batch with more events because other indices are behind us.
108                events.start_later(index_state.height.next().into());
109                if events.empty() {
110                    tracing::info!(
111                        first = events.first_height(),
112                        last = events.last_height(),
113                        index_name = &name,
114                        "skipping batch"
115                    );
116                    continue;
117                }
118                tracing::info!(
119                    first = events.first_height(),
120                    last = events.last_height(),
121                    index_name = &name,
122                    "indexing batch"
123                );
124                let last_height = events.last_height();
125                let mut dbtx = manager_cp.begin_transaction().await?;
126                let context = EventBatchContext {
127                    is_last: last_height >= u64::from(src_height),
128                };
129                index.index_batch(&mut dbtx, events, context).await?;
130                tracing::debug!(index_name = &name, "committing batch");
131                let new_state = IndexState {
132                    version: index.version(),
133                    height: Height::from(last_height),
134                };
135                IndexingManager::update_index_state(&mut dbtx, &name, new_state).await?;
136
137                dbtx.commit().await?;
138            }
139            Ok(())
140        });
141    }
142
143    let manager_cp = manager.clone();
144    tasks.spawn(async move {
145        let mut height = lowest_index_height.next();
146        while height <= src_height {
147            let first = height;
148            let (last, next_height) = first.advance(DEFAULT_BATCH_SIZE, src_height);
149            height = next_height;
150            tracing::debug!(?first, ?last, "fetching batch");
151            let events = manager_cp.event_batch(first, last).await?;
152            tracing::info!(?first, ?last, "sending batch");
153            for tx in &txs {
154                tx.send(events.clone()).await?;
155            }
156        }
157        Ok(())
158    });
159
160    while let Some(res) = tasks.join_next().await {
161        res??;
162    }
163    Ok(false)
164}
165
166pub struct Indexer {
167    opts: Options,
168    indices: Vec<Arc<dyn AppView>>,
169}
170
171impl Indexer {
172    pub fn new(opts: Options) -> Self {
173        Self {
174            opts,
175            indices: Vec::new(),
176        }
177    }
178
179    pub fn with_index(mut self, index: Box<dyn AppView + 'static>) -> Self {
180        self.indices.push(Arc::from(index));
181        self
182    }
183
184    pub fn with_default_tracing(self) -> Self {
185        tracing_subscriber::fmt::init();
186        self
187    }
188
189    pub async fn run(self) -> Result<(), anyhow::Error> {
190        tracing::info!(?self.opts);
191        let Self {
192            opts:
193                Options {
194                    src_database_url,
195                    dst_database_url,
196                    chain_id: _,
197                    poll_ms,
198                    genesis_json,
199                    exit_on_catchup,
200                    integrity_checks_only,
201                },
202            indices,
203        } = self;
204        crate::integrity::integrity_check(&src_database_url)
205            .await
206            .context("while running integrity checks")?;
207        if integrity_checks_only {
208            return Ok(());
209        }
210
211        let genesis: serde_json::Value = serde_json::from_str(
212            &std::fs::read_to_string(genesis_json)
213                .context("error reading provided genesis.json file")?,
214        )
215        .context("error parsing provided genesis.json file")?;
216        let app_state = Arc::new(
217            genesis
218                .get("app_state")
219                .ok_or_else(|| anyhow::anyhow!("genesis missing app_state"))?
220                .clone(),
221        );
222
223        let manager = IndexingManager::init(&src_database_url, &dst_database_url).await?;
224        {
225            let mut dbtx = manager.begin_transaction().await?;
226            for index in &indices {
227                reset_index_if_necessary(index.as_ref(), &manager, &mut dbtx).await?;
228                index.on_startup(&mut dbtx).await?;
229            }
230            dbtx.commit().await?;
231        }
232
233        loop {
234            let caught_up = catchup(&manager, indices.as_slice(), app_state.clone()).await?;
235            if exit_on_catchup && caught_up {
236                tracing::info!("catchup completed, exiting as requested");
237                return Ok(());
238            }
239            tokio::time::sleep(poll_ms).await;
240        }
241    }
242}