1mod indexing_state;
2
3use crate::{
4 index::{EventBatch, EventBatchContext},
5 opt::Options,
6 AppView,
7};
8use anyhow::{Context as _, Result};
9use indexing_state::{Height, IndexingState};
10use std::sync::Arc;
11use tokio::{sync::mpsc, task::JoinSet};
12
13#[tracing::instrument(skip_all)]
17async fn catchup(
18 state: &IndexingState,
19 indices: &[Arc<dyn AppView>],
20 genesis: Arc<serde_json::Value>,
21) -> anyhow::Result<bool> {
22 if indices.len() <= 0 {
23 tracing::info!(why = "no indices", "catchup completed");
24 return Ok(true);
25 }
26
27 let (src_height, index_heights) = tokio::try_join!(state.src_height(), state.index_heights())?;
28 tracing::info!(?src_height, ?index_heights, "catchup status");
29 let lowest_index_height = index_heights.values().copied().min().unwrap_or_default();
30 if lowest_index_height >= src_height {
31 tracing::info!(why = "already caught up", "catchup completed");
32 return Ok(true);
33 }
34
35 const DEFAULT_BATCH_SIZE: u64 = 1000;
37 const BATCH_LOOKAHEAD: usize = 2;
38
39 let mut tasks = JoinSet::<anyhow::Result<()>>::new();
40
41 let mut txs = Vec::with_capacity(indices.len());
42 for index in indices.iter().cloned() {
43 let (tx, mut rx) = mpsc::channel::<EventBatch>(BATCH_LOOKAHEAD);
44 txs.push(tx);
45 let name = index.name();
46 let index_height = index_heights.get(&name).copied().unwrap_or_default();
47 let state_cp = state.clone();
48 let genesis_cp = genesis.clone();
49 tasks.spawn(async move {
50 if index_height == Height::default() {
51 tracing::info!(?name, "initializing index");
52 let mut dbtx = state_cp.begin_transaction().await?;
53 index.init_chain(&mut dbtx, &genesis_cp).await?;
54 tracing::info!(?name, "finished initialization");
55 IndexingState::update_index_height(&mut dbtx, &name, Height::post_genesis())
56 .await?;
57 dbtx.commit().await?;
58 } else {
59 tracing::info!(?name, "already initialized");
60 }
61 while let Some(mut events) = rx.recv().await {
62 events.start_later(index_height.next().into());
65 if events.empty() {
66 tracing::info!(
67 first = events.first_height(),
68 last = events.last_height(),
69 index_name = &name,
70 "skipping batch"
71 );
72 continue;
73 }
74 tracing::info!(
75 first = events.first_height(),
76 last = events.last_height(),
77 index_name = &name,
78 "indexing batch"
79 );
80 let last_height = events.last_height();
81 let mut dbtx = state_cp.begin_transaction().await?;
82 let context = EventBatchContext {
83 is_last: last_height >= u64::from(src_height),
84 };
85 index.index_batch(&mut dbtx, events, context).await?;
86 tracing::debug!(index_name = &name, "committing batch");
87 IndexingState::update_index_height(&mut dbtx, &name, Height::from(last_height))
88 .await?;
89
90 dbtx.commit().await?;
91 }
92 Ok(())
93 });
94 }
95
96 let state_cp = state.clone();
97 tasks.spawn(async move {
98 let mut height = lowest_index_height.next();
99 while height <= src_height {
100 let first = height;
101 let (last, next_height) = first.advance(DEFAULT_BATCH_SIZE, src_height);
102 height = next_height;
103 tracing::debug!(?first, ?last, "fetching batch");
104 let events = state_cp.event_batch(first, last).await?;
105 tracing::info!(?first, ?last, "sending batch");
106 for tx in &txs {
107 tx.send(events.clone()).await?;
108 }
109 }
110 Ok(())
111 });
112
113 while let Some(res) = tasks.join_next().await {
114 res??;
115 }
116 Ok(false)
117}
118
119pub struct Indexer {
120 opts: Options,
121 indices: Vec<Arc<dyn AppView>>,
122}
123
124impl Indexer {
125 pub fn new(opts: Options) -> Self {
126 Self {
127 opts,
128 indices: Vec::new(),
129 }
130 }
131
132 pub fn with_index(mut self, index: Box<dyn AppView + 'static>) -> Self {
133 self.indices.push(Arc::from(index));
134 self
135 }
136
137 pub fn with_default_tracing(self) -> Self {
138 tracing_subscriber::fmt::init();
139 self
140 }
141
142 pub async fn run(self) -> Result<(), anyhow::Error> {
143 tracing::info!(?self.opts);
144 let Self {
145 opts:
146 Options {
147 src_database_url,
148 dst_database_url,
149 chain_id: _,
150 poll_ms,
151 genesis_json,
152 exit_on_catchup,
153 },
154 indices: indexes,
155 } = self;
156
157 let state = IndexingState::init(&src_database_url, &dst_database_url).await?;
158 let genesis: serde_json::Value = serde_json::from_str(
159 &std::fs::read_to_string(genesis_json)
160 .context("error reading provided genesis.json file")?,
161 )
162 .context("error parsing provided genesis.json file")?;
163 let app_state = Arc::new(
164 genesis
165 .get("app_state")
166 .ok_or_else(|| anyhow::anyhow!("genesis missing app_state"))?
167 .clone(),
168 );
169 loop {
170 let caught_up = catchup(&state, indexes.as_slice(), app_state.clone()).await?;
171 if exit_on_catchup && caught_up {
172 tracing::info!("catchup completed, exiting as requested");
173 return Ok(());
174 }
175 tokio::time::sleep(poll_ms).await;
176 }
177 }
178}