cometindex/
indexer.rs

1mod indexing_state;
2
3use crate::{
4    index::{EventBatch, EventBatchContext},
5    opt::Options,
6    AppView,
7};
8use anyhow::{Context as _, Result};
9use indexing_state::{Height, IndexingState};
10use std::sync::Arc;
11use tokio::{sync::mpsc, task::JoinSet};
12
13/// Attempt to catch up to the latest indexed block.
14///
15/// Returns whether or not we've caught up.
16#[tracing::instrument(skip_all)]
17async fn catchup(
18    state: &IndexingState,
19    indices: &[Arc<dyn AppView>],
20    genesis: Arc<serde_json::Value>,
21) -> anyhow::Result<bool> {
22    if indices.len() <= 0 {
23        tracing::info!(why = "no indices", "catchup completed");
24        return Ok(true);
25    }
26
27    let (src_height, index_heights) = tokio::try_join!(state.src_height(), state.index_heights())?;
28    tracing::info!(?src_height, ?index_heights, "catchup status");
29    let lowest_index_height = index_heights.values().copied().min().unwrap_or_default();
30    if lowest_index_height >= src_height {
31        tracing::info!(why = "already caught up", "catchup completed");
32        return Ok(true);
33    }
34
35    // Constants that influence performance.
36    const DEFAULT_BATCH_SIZE: u64 = 1000;
37    const BATCH_LOOKAHEAD: usize = 2;
38
39    let mut tasks = JoinSet::<anyhow::Result<()>>::new();
40
41    let mut txs = Vec::with_capacity(indices.len());
42    for index in indices.iter().cloned() {
43        let (tx, mut rx) = mpsc::channel::<EventBatch>(BATCH_LOOKAHEAD);
44        txs.push(tx);
45        let name = index.name();
46        let index_height = index_heights.get(&name).copied().unwrap_or_default();
47        let state_cp = state.clone();
48        let genesis_cp = genesis.clone();
49        tasks.spawn(async move {
50            if index_height == Height::default() {
51                tracing::info!(?name, "initializing index");
52                let mut dbtx = state_cp.begin_transaction().await?;
53                index.init_chain(&mut dbtx, &genesis_cp).await?;
54                tracing::info!(?name, "finished initialization");
55                IndexingState::update_index_height(&mut dbtx, &name, Height::post_genesis())
56                    .await?;
57                dbtx.commit().await?;
58            } else {
59                tracing::info!(?name, "already initialized");
60            }
61            while let Some(mut events) = rx.recv().await {
62                // We only ever want to index events past our current height.
63                // We might receive a batch with more events because other indices are behind us.
64                events.start_later(index_height.next().into());
65                if events.empty() {
66                    tracing::info!(
67                        first = events.first_height(),
68                        last = events.last_height(),
69                        index_name = &name,
70                        "skipping batch"
71                    );
72                    continue;
73                }
74                tracing::info!(
75                    first = events.first_height(),
76                    last = events.last_height(),
77                    index_name = &name,
78                    "indexing batch"
79                );
80                let last_height = events.last_height();
81                let mut dbtx = state_cp.begin_transaction().await?;
82                let context = EventBatchContext {
83                    is_last: last_height >= u64::from(src_height),
84                };
85                index.index_batch(&mut dbtx, events, context).await?;
86                tracing::debug!(index_name = &name, "committing batch");
87                IndexingState::update_index_height(&mut dbtx, &name, Height::from(last_height))
88                    .await?;
89
90                dbtx.commit().await?;
91            }
92            Ok(())
93        });
94    }
95
96    let state_cp = state.clone();
97    tasks.spawn(async move {
98        let mut height = lowest_index_height.next();
99        while height <= src_height {
100            let first = height;
101            let (last, next_height) = first.advance(DEFAULT_BATCH_SIZE, src_height);
102            height = next_height;
103            tracing::debug!(?first, ?last, "fetching batch");
104            let events = state_cp.event_batch(first, last).await?;
105            tracing::info!(?first, ?last, "sending batch");
106            for tx in &txs {
107                tx.send(events.clone()).await?;
108            }
109        }
110        Ok(())
111    });
112
113    while let Some(res) = tasks.join_next().await {
114        res??;
115    }
116    Ok(false)
117}
118
119pub struct Indexer {
120    opts: Options,
121    indices: Vec<Arc<dyn AppView>>,
122}
123
124impl Indexer {
125    pub fn new(opts: Options) -> Self {
126        Self {
127            opts,
128            indices: Vec::new(),
129        }
130    }
131
132    pub fn with_index(mut self, index: Box<dyn AppView + 'static>) -> Self {
133        self.indices.push(Arc::from(index));
134        self
135    }
136
137    pub fn with_default_tracing(self) -> Self {
138        tracing_subscriber::fmt::init();
139        self
140    }
141
142    pub async fn run(self) -> Result<(), anyhow::Error> {
143        tracing::info!(?self.opts);
144        let Self {
145            opts:
146                Options {
147                    src_database_url,
148                    dst_database_url,
149                    chain_id: _,
150                    poll_ms,
151                    genesis_json,
152                    exit_on_catchup,
153                },
154            indices: indexes,
155        } = self;
156
157        let state = IndexingState::init(&src_database_url, &dst_database_url).await?;
158        let genesis: serde_json::Value = serde_json::from_str(
159            &std::fs::read_to_string(genesis_json)
160                .context("error reading provided genesis.json file")?,
161        )
162        .context("error parsing provided genesis.json file")?;
163        let app_state = Arc::new(
164            genesis
165                .get("app_state")
166                .ok_or_else(|| anyhow::anyhow!("genesis missing app_state"))?
167                .clone(),
168        );
169        loop {
170            let caught_up = catchup(&state, indexes.as_slice(), app_state.clone()).await?;
171            if exit_on_catchup && caught_up {
172                tracing::info!("catchup completed, exiting as requested");
173                return Ok(());
174            }
175            tokio::time::sleep(poll_ms).await;
176        }
177    }
178}