cometindex/
integrity.rs

1use sqlx::PgPool;
2
3use crate::database::read_only_db;
4
5pub async fn missing_blocks(name: &'static str, db: PgPool) -> anyhow::Result<bool> {
6    let mut dbtx = db.begin().await?;
7    let res: Option<(i64, i64)> = sqlx::query_as(
8        "
9      WITH height_gaps AS (
10        SELECT
11          height as current_height,
12          LEAD(height) OVER (ORDER BY height) as next_height,
13          LEAD(height) OVER (ORDER BY height) - height as gap
14        FROM blocks
15      )
16      SELECT
17        current_height,
18        next_height
19      FROM height_gaps
20      WHERE gap > 1 AND next_height IS NOT NULL
21      ORDER BY current_height ASC
22      LIMIT 1;
23    ",
24    )
25    .fetch_optional(dbtx.as_mut())
26    .await?;
27    dbtx.rollback().await?;
28    if let Some((current, next)) = res {
29        println!("{}:", name);
30        println!("missing blocks between heights {} and {}", current, next);
31        return Ok(false);
32    }
33    Ok(true)
34}
35
36async fn missing_events(name: &'static str, db: PgPool) -> anyhow::Result<bool> {
37    let mut dbtx = db.begin().await?;
38    let res: Option<(i64, i64, i64)> = sqlx::query_as(
39        "
40      WITH event_heights AS (
41        SELECT e.rowid, b.height,
42               LEAD(e.rowid) OVER (PARTITION BY b.height ORDER BY e.rowid) AS next_rowid
43        FROM events e
44        JOIN blocks b ON e.block_id = b.rowid
45      )
46      SELECT height, rowid, next_rowid
47      FROM event_heights
48      WHERE next_rowid IS NOT NULL
49      AND next_rowid - rowid <> 1
50      ORDER BY height ASC, rowid ASC
51      LIMIT 1;
52    ",
53    )
54    .fetch_optional(dbtx.as_mut())
55    .await?;
56    if let Some((height, next, current)) = res {
57        println!("{}:", name);
58        println!(
59            "missing events at height {}: missing between event #{} and #{}",
60            height, current, next
61        );
62        return Ok(false);
63    };
64    dbtx.rollback().await?;
65    Ok(true)
66}
67
68pub async fn integrity_check(src_database_url: &str) -> anyhow::Result<()> {
69    let db = read_only_db(src_database_url).await?;
70    let mut tasks = Vec::new();
71    tasks.push(tokio::spawn({
72        let db = db.clone();
73        async move { missing_blocks("# 000 Missing Blocks", db).await }
74    }));
75    tasks.push(tokio::spawn({
76        let db = db.clone();
77        async move { missing_events("# 001 Missing Events", db).await }
78    }));
79    let mut failed = false;
80    for task in tasks {
81        failed |= !task.await??;
82    }
83    if failed {
84        anyhow::bail!("integrity checks failed, check logs");
85    }
86    Ok(())
87}