1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
//! Logic for handling chain upgrades.
//!
//! When consensus-breaking changes are made to the Penumbra software,
//! node operators must coordinate to perform a chain upgrade.
//! This module declares how local `pd` state should be altered, if at all,
//! in order to be compatible with the network post-chain-upgrade.
mod testnet72;
mod testnet74;

use anyhow::Context;
use futures::StreamExt as _;
use std::path::{Path, PathBuf};
use tracing::instrument;

use cnidarium::{StateDelta, StateRead, StateWrite, Storage};
use jmt::RootHash;
use penumbra_app::{app::StateReadExt, SUBSTORE_PREFIXES};
use penumbra_sct::component::clock::{EpochManager, EpochRead};

use crate::testnet::generate::TestnetConfig;

use flate2::write::GzEncoder;
use flate2::Compression;
use std::fs::File;

/// The kind of migration that should be performed.
pub enum Migration {
    /// No-op migration.
    Noop,
    /// A simple migration: adds a key to the consensus state.
    /// This is useful for testing upgrade mechanisms, including in production.
    SimpleMigration,
    /// Testnet-70 migration: move swap executions from the jmt to nv-storage.
    Testnet70,
    /// Testnet-72 migration:
    /// - Migrate `BatchSwapOutputData` to new protobuf, replacing epoch height with index.
    Testnet72,
    /// Testnet-74 migration:
    /// - Update the base liquidity index to order routable pairs by descending liquidity
    /// - Update arb executions to include the amount of filled input in the output
    /// - Add `AuctionParameters` to the consensus state
    Testnet74,
}

impl Migration {
    pub async fn migrate(
        &self,
        path_to_export: PathBuf,
        genesis_start: Option<tendermint::time::Time>,
    ) -> anyhow::Result<()> {
        match self {
            Migration::Noop => Ok(()),
            Migration::SimpleMigration => {
                let rocksdb_dir = path_to_export.join("rocksdb");
                let storage = Storage::load(rocksdb_dir, SUBSTORE_PREFIXES.to_vec()).await?;
                let export_state = storage.latest_snapshot();
                let root_hash = export_state.root_hash().await.expect("can get root hash");
                let app_hash_pre_migration: RootHash = root_hash.into();
                let height = export_state
                    .get_block_height()
                    .await
                    .expect("can get block height");
                let post_ugprade_height = height.wrapping_add(1);

                /* --------- writing to the jmt  ------------ */
                tracing::info!(?app_hash_pre_migration, "app hash pre-upgrade");
                let mut delta = StateDelta::new(export_state);
                delta.put_raw(
                    "banana".to_string(),
                    "a good fruit (and migration works!)".into(),
                );
                delta.put_block_height(0u64);
                let root_hash = storage.commit_in_place(delta).await?;
                let app_hash_post_migration: RootHash = root_hash.into();
                tracing::info!(?app_hash_post_migration, "app hash post upgrade");

                /* --------- collecting genesis data -------- */
                tracing::info!("generating genesis");
                let migrated_state = storage.latest_snapshot();
                let root_hash = migrated_state.root_hash().await.expect("can get root hash");
                let app_hash: RootHash = root_hash.into();
                tracing::info!(?root_hash, "root hash from snapshot (post-upgrade)");

                /* ---------- generate genesis ------------  */
                let chain_id = migrated_state.get_chain_id().await?;
                let app_state = penumbra_app::genesis::Content {
                    chain_id,
                    ..Default::default()
                };
                let mut genesis =
                    TestnetConfig::make_genesis(app_state.clone()).expect("can make genesis");
                genesis.app_hash = app_hash
                    .0
                    .to_vec()
                    .try_into()
                    .expect("infaillible conversion");
                genesis.initial_height = post_ugprade_height as i64;
                genesis.genesis_time = genesis_start.unwrap_or_else(|| {
                    let now = tendermint::time::Time::now();
                    tracing::info!(%now, "no genesis time provided, detecting a testing setup");
                    now
                });
                let checkpoint = app_hash.0.to_vec();
                let genesis = TestnetConfig::make_checkpoint(genesis, Some(checkpoint));

                let genesis_json = serde_json::to_string(&genesis).expect("can serialize genesis");
                tracing::info!("genesis: {}", genesis_json);
                let genesis_path = path_to_export.join("genesis.json");
                std::fs::write(genesis_path, genesis_json).expect("can write genesis");

                let validator_state_path = path_to_export.join("priv_validator_state.json");
                let fresh_validator_state =
                    crate::testnet::generate::TestnetValidator::initial_state();
                std::fs::write(validator_state_path, fresh_validator_state)
                    .expect("can write validator state");
                Ok(())
            }
            Migration::Testnet70 => {
                // Our goal is to fetch all swap executions from the jmt and store them in nv-storage.
                // In particular, we want to make sure that client lookups for (height, trading pair)
                // resolve to the same value as before.

                // Setup:
                let start_time = std::time::SystemTime::now();
                let rocksdb_dir = path_to_export.join("rocksdb");
                let storage =
                    Storage::load(rocksdb_dir.clone(), SUBSTORE_PREFIXES.to_vec()).await?;
                let export_state = storage.latest_snapshot();
                let root_hash = export_state.root_hash().await.expect("can get root hash");
                let pre_upgrade_root_hash: RootHash = root_hash.into();
                let pre_upgrade_height = export_state
                    .get_block_height()
                    .await
                    .expect("can get block height");
                let post_upgrade_height = pre_upgrade_height.wrapping_add(1);

                // We initialize a `StateDelta` and start by reaching into the JMT for all entries matching the
                // swap execution prefix. Then, we write each entry to the nv-storage.
                let mut delta = StateDelta::new(export_state);

                let prefix_key = "dex/swap_execution/";
                let mut swap_execution_stream = delta.prefix_raw(prefix_key);

                while let Some(r) = swap_execution_stream.next().await {
                    let (key, swap_execution) = r?;
                    tracing::info!("migrating swap execution: {}", key);
                    delta.nonverifiable_put_raw(key.into_bytes(), swap_execution);
                }

                delta.put_block_height(0u64);

                let post_upgrade_root_hash = storage.commit_in_place(delta).await?;
                tracing::info!(?post_upgrade_root_hash, "post-upgrade root hash");

                let migration_duration = start_time.elapsed().expect("start time not set");

                // Reload storage so we can make reads against its migrated state:
                storage.release().await;
                let storage = Storage::load(rocksdb_dir, SUBSTORE_PREFIXES.to_vec()).await?;
                let migrated_state = storage.latest_snapshot();
                storage.release().await;

                // The migration is complete, now we need to generate a genesis file. To do this, we need
                // to lookup a validator view from the chain, and specify the post-upgrade app hash and
                // initial height.
                let chain_id = migrated_state.get_chain_id().await?;
                let app_state = penumbra_app::genesis::Content {
                    chain_id,
                    ..Default::default()
                };
                let mut genesis =
                    TestnetConfig::make_genesis(app_state.clone()).expect("can make genesis");
                genesis.app_hash = post_upgrade_root_hash
                    .0
                    .to_vec()
                    .try_into()
                    .expect("infaillible conversion");
                genesis.initial_height = post_upgrade_height as i64;
                genesis.genesis_time = genesis_start.unwrap_or_else(|| {
                    let now = tendermint::time::Time::now();
                    tracing::info!(%now, "no genesis time provided, detecting a testing setup");
                    now
                });
                let checkpoint = post_upgrade_root_hash.0.to_vec();
                let genesis = TestnetConfig::make_checkpoint(genesis, Some(checkpoint));

                let genesis_json = serde_json::to_string(&genesis).expect("can serialize genesis");
                tracing::info!("genesis: {}", genesis_json);
                let genesis_path = path_to_export.join("genesis.json");
                std::fs::write(genesis_path, genesis_json).expect("can write genesis");

                let validator_state_path = path_to_export.join("priv_validator_state.json");
                let fresh_validator_state =
                    crate::testnet::generate::TestnetValidator::initial_state();
                std::fs::write(validator_state_path, fresh_validator_state)
                    .expect("can write validator state");

                tracing::info!(
                    pre_upgrade_height,
                    post_upgrade_height,
                    ?pre_upgrade_root_hash,
                    ?post_upgrade_root_hash,
                    duration = migration_duration.as_secs(),
                    "successful migration!"
                );

                Ok(())
            }
            Migration::Testnet72 => testnet72::migrate(path_to_export, genesis_start).await,
            Migration::Testnet74 => testnet74::migrate(path_to_export, genesis_start).await,
        }
    }
}

/// Compress single directory to gzipped tar archive. Accepts an Option for naming
/// the subdir within the tar archive, which defaults to ".", meaning no nesting.
pub fn archive_directory(
    src_directory: PathBuf,
    archive_filepath: PathBuf,
    subdir_within_archive: Option<String>,
) -> anyhow::Result<()> {
    // Don't clobber an existing target archive.
    if archive_filepath.exists() {
        tracing::error!(
            "export archive filepath already exists: {}",
            archive_filepath.display()
        );
        anyhow::bail!("refusing to overwrite existing archive");
    }

    tracing::info!(
        "creating archive {} -> {}",
        src_directory.display(),
        archive_filepath.display()
    );
    let tarball_file = File::create(&archive_filepath)
        .context("failed to create file for archive: check parent directory and permissions")?;
    let enc = GzEncoder::new(tarball_file, Compression::default());
    let mut tarball = tar::Builder::new(enc);
    let subdir_within_archive = subdir_within_archive.unwrap_or(String::from("."));
    tarball
        .append_dir_all(subdir_within_archive, src_directory.as_path())
        .context("failed to package archive contents")?;
    Ok(())
}

/// Read the last block timestamp from the pd state.
pub async fn last_block_timestamp(home: PathBuf) -> anyhow::Result<tendermint::Time> {
    let rocksdb = home.join("rocksdb");
    let storage = Storage::load(rocksdb, SUBSTORE_PREFIXES.to_vec())
        .await
        .context("error loading store for timestamp")?;
    let state = storage.latest_snapshot();
    let last_block_time = state
        .get_block_timestamp()
        .await
        .context("error reading latest block timestamp")?;
    storage.release().await;
    Ok(last_block_time)
}

#[instrument(skip_all)]
pub async fn migrate_comet_data(
    comet_home: PathBuf,
    new_genesis_file: PathBuf,
) -> anyhow::Result<()> {
    tracing::info!(?comet_home, ?new_genesis_file, "migrating comet data");

    // Read the contents of new_genesis_file into a serde_json::Value and pull out .initial_height
    let genesis_contents =
        std::fs::read_to_string(new_genesis_file).context("error reading new genesis file")?;
    let genesis_json: serde_json::Value =
        serde_json::from_str(&genesis_contents).context("error parsing new genesis file")?;
    tracing::info!(?genesis_json, "parsed genesis file");
    let initial_height = genesis_json["initial_height"]
        .as_str()
        .context("error reading initial_height from genesis file")?
        .parse::<u64>()?;

    // Write the genesis data to HOME/config/genesis.json
    let genesis_file = comet_home.join("config").join("genesis.json");
    tracing::info!(?genesis_file, "writing genesis file to comet config");
    std::fs::write(genesis_file, genesis_contents)
        .context("error writing genesis file to comet config")?;

    // Adjust the high-water mark in priv_validator_state.json but don't decrease it
    adjust_priv_validator_state(&comet_home, initial_height)?;

    // Delete other cometbft data.
    clear_comet_data(&comet_home)?;

    Ok(())
}

#[instrument(skip_all)]
fn adjust_priv_validator_state(comet_home: &Path, initial_height: u64) -> anyhow::Result<()> {
    let priv_validator_state = comet_home.join("data").join("priv_validator_state.json");
    let current_state: serde_json::Value =
        serde_json::from_str(&std::fs::read_to_string(&priv_validator_state)?)?;

    let current_height = current_state["height"]
        .as_str()
        .context("error reading height from priv_validator_state.json")?
        .parse::<u64>()?;
    if current_height < initial_height {
        tracing::info!(
            "increasing height in priv_validator_state from {} to {}",
            current_height,
            initial_height
        );
        let new_state = serde_json::json!({
            "height": initial_height.to_string(), // Important to use to_string here as if protojson
            "round": 0,
            "step": 0,
        });
        tracing::info!(?new_state, "updated priv_validator_state.json");
        std::fs::write(
            &priv_validator_state,
            &serde_json::to_string_pretty(&new_state)?,
        )?;
    } else {
        anyhow::bail!(
            "priv_validator_state height {} is already greater than or equal to initial_height {}",
            current_height,
            initial_height
        );
    }

    Ok(())
}

#[instrument(skip_all)]
fn clear_comet_data(comet_home: &Path) -> anyhow::Result<()> {
    let data_dir = comet_home.join("data");

    /*
    N.B. We want to preserve the `tx_index.db` directory.
    Doing so will allow CometBFT to reference historical transactions behind the upgrade boundary.
     */
    for subdir in &["evidence.db", "state.db", "blockstore.db", "cs.wal"] {
        let path = data_dir.join(subdir);
        if path.exists() {
            tracing::info!(?path, "removing file");
            std::fs::remove_dir_all(path)?;
        }
    }

    Ok(())
}