pmonitor/
main.rs

1//! The `pmonitor` tool tracks the balances of Penumbra wallets, as identified
2//! by a [FullViewingKey] (FVK), in order to perform auditing. It accepts a JSON file
3//! of FVKs and a `pd` gRPC URL to initialize:
4//!
5//!     pmonitor init --grpc-url http://127.0.0.1:8080 --fvks fvks.json
6//!
7//! The audit functionality runs as a single operation, evaluating compliance up to the
8//! current block height:
9//!
10//!     pmonitor audit
11//!
12//! If regular auditing is desired, consider automating the `pmonitor audit` action via
13//! cron or similar. `pmonitor` will cache view databases for each tracked FVK, so that future
14//! `audit` actions need only inspect the blocks generated between the previous audit and the
15//! current height.
16
17use anyhow::{Context, Result};
18use camino::Utf8PathBuf;
19use clap::{self, Parser};
20use directories::ProjectDirs;
21use futures::StreamExt;
22use penumbra_sdk_asset::STAKING_TOKEN_ASSET_ID;
23use rustls::crypto::aws_lc_rs;
24use std::fs;
25use std::io::IsTerminal as _;
26use std::str::FromStr;
27use tonic::transport::Channel;
28use tracing_subscriber::{prelude::*, EnvFilter};
29use url::Url;
30use uuid::Uuid;
31
32use colored::Colorize;
33
34use pcli::config::PcliConfig;
35use penumbra_sdk_compact_block::CompactBlock;
36use penumbra_sdk_keys::FullViewingKey;
37use penumbra_sdk_num::Amount;
38use penumbra_sdk_proto::box_grpc_svc;
39use penumbra_sdk_proto::view::v1::{
40    view_service_client::ViewServiceClient, view_service_server::ViewServiceServer,
41};
42use penumbra_sdk_proto::{
43    core::component::compact_block::v1::CompactBlockRequest,
44    core::component::stake::v1::query_service_client::QueryServiceClient as StakeQueryServiceClient,
45    penumbra::core::component::compact_block::v1::query_service_client::QueryServiceClient as CompactBlockQueryServiceClient,
46};
47use penumbra_sdk_stake::rate::RateData;
48use penumbra_sdk_stake::DelegationToken;
49use penumbra_sdk_view::{Storage, ViewClient, ViewServer};
50
51mod config;
52mod genesis;
53
54use config::{parse_dest_fvk_from_memo, AccountConfig, FvkEntry, PmonitorConfig};
55
56/// The maximum size of a compact block, in bytes (12MB).
57const MAX_CB_SIZE_BYTES: usize = 12 * 1024 * 1024;
58
59/// The name of the view database file
60const VIEW_FILE_NAME: &str = "pcli-view.sqlite";
61
62/// The permitted difference between genesis balance and current balance,
63/// specified in number of staking tokens.
64const ALLOWED_DISCREPANCY: f64 = 0.1;
65
66/// Configure tracing_subscriber for logging messages
67fn init_tracing() -> anyhow::Result<()> {
68    // Instantiate tracing layers.
69    // The `FmtLayer` is used to print to the console.
70    let fmt_layer = tracing_subscriber::fmt::layer()
71        .with_ansi(std::io::stdout().is_terminal())
72        .with_writer(std::io::stderr)
73        .with_target(true);
74    // The `EnvFilter` layer is used to filter events based on `RUST_LOG`.
75    let filter_layer = EnvFilter::try_from_default_env()
76        .or_else(|_| EnvFilter::try_new("info,penumbra_sdk_view=off"))?;
77
78    // Register the tracing subscribers.
79    let registry = tracing_subscriber::registry()
80        .with(filter_layer)
81        .with(fmt_layer);
82    registry.init();
83    Ok(())
84}
85
86#[tokio::main]
87async fn main() -> Result<()> {
88    let opt = Opt::parse();
89    init_tracing()?;
90
91    // Initialize HTTPS support
92    aws_lc_rs::default_provider()
93        .install_default()
94        .expect("failed to initialize rustls support, via aws-lc-rs");
95
96    tracing::info!(?opt, version = env!("CARGO_PKG_VERSION"), "running command");
97    opt.exec().await
98}
99
100/// The path to the default `pmonitor` home directory.
101///
102/// Can be overridden on the command-line via `--home`.
103pub fn default_home() -> Utf8PathBuf {
104    let path = ProjectDirs::from("zone", "penumbra", "pmonitor")
105        .expect("Failed to get platform data dir")
106        .data_dir()
107        .to_path_buf();
108    Utf8PathBuf::from_path_buf(path).expect("Platform default data dir was not UTF-8")
109}
110
111#[derive(Debug, Parser)]
112#[clap(
113    name = "pmonitor",
114    about = "The Penumbra account activity monitor.",
115    version
116)]
117pub struct Opt {
118    /// Command to run.
119    #[clap(subcommand)]
120    pub cmd: Command,
121    /// The path used to store pmonitor state.
122    #[clap(long, default_value_t = default_home(), env = "PENUMBRA_PMONITOR_HOME")]
123    pub home: Utf8PathBuf,
124}
125
126#[derive(Debug, clap::Subcommand)]
127pub enum Command {
128    /// Generate configs for `pmonitor`.
129    Init {
130        /// Provide JSON file with the list of full viewing keys to monitor.
131        #[clap(long, display_order = 200)]
132        fvks: String,
133        /// Sets the URL of the gRPC endpoint used to sync the wallets.
134        #[clap(
135            long,
136            display_order = 900,
137            parse(try_from_str = Url::parse)
138        )]
139        grpc_url: Url,
140    },
141    /// Sync to latest block height and verify all configured wallets have the correct balance.
142    Audit {},
143    /// Delete `pmonitor` storage to reset local state.
144    Reset {},
145}
146
147impl Opt {
148    /// Set up the view service for a given wallet.
149    pub async fn view(
150        &self,
151        path: Utf8PathBuf,
152        fvk: FullViewingKey,
153        grpc_url: Url,
154    ) -> Result<ViewServiceClient<box_grpc_svc::BoxGrpcService>> {
155        let registry_path = path.join("registry.json");
156        // Check if the path exists or set it to none
157        let registry_path = if registry_path.exists() {
158            Some(registry_path)
159        } else {
160            None
161        };
162        let db_path: Utf8PathBuf = path.join(VIEW_FILE_NAME);
163
164        let svc: ViewServer =
165            ViewServer::load_or_initialize(Some(db_path), registry_path, &fvk, grpc_url).await?;
166
167        let svc: ViewServiceServer<ViewServer> = ViewServiceServer::new(svc);
168        let view_service = ViewServiceClient::new(box_grpc_svc::local(svc));
169        Ok(view_service)
170    }
171
172    /// Get the path to the wallet directory for a given wallet ID.
173    pub fn wallet_path(&self, wallet_id: &Uuid) -> Utf8PathBuf {
174        self.home.join(format!("wallet_{}", wallet_id))
175    }
176
177    /// Sync a given wallet to the latest block height.
178    pub async fn sync(
179        &self,
180        view_service: &mut ViewServiceClient<box_grpc_svc::BoxGrpcService>,
181    ) -> Result<()> {
182        let mut status_stream = ViewClient::status_stream(view_service).await?;
183
184        let initial_status = status_stream
185            .next()
186            .await
187            .transpose()?
188            .ok_or_else(|| anyhow::anyhow!("view service did not report sync status"))?;
189
190        tracing::debug!(
191            "scanning blocks from last sync height {} to latest height {}",
192            initial_status.full_sync_height,
193            initial_status.latest_known_block_height,
194        );
195
196        // use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
197        // let progress_bar = ProgressBar::with_draw_target(
198        //     initial_status.latest_known_block_height - initial_status.full_sync_height,
199        //     ProgressDrawTarget::stdout(),
200        // )
201        // .with_style(
202        //     ProgressStyle::default_bar()
203        //         .template("[{elapsed}] {bar:50.cyan/blue} {pos:>7}/{len:7} {per_sec} ETA: {eta}"),
204        // );
205        // progress_bar.set_position(0);
206
207        // On large networks, logging an update every 100k blocks or so seems reasonable.
208        // let log_every_n_blocks = 100000;
209        let log_every_n_blocks = 100;
210        while let Some(status) = status_stream.next().await.transpose()? {
211            if status.full_sync_height % log_every_n_blocks == 0 {
212                tracing::debug!("synced {} blocks", status.full_sync_height);
213            }
214            // progress_bar.set_position(status.full_sync_height - initial_status.full_sync_height);
215        }
216        // progress_bar.finish();
217
218        Ok(())
219    }
220
221    /// Fetch the genesis compact block
222    pub async fn fetch_genesis_compact_block(&self, grpc_url: Url) -> Result<CompactBlock> {
223        let height = 0;
224        let mut client = CompactBlockQueryServiceClient::connect(grpc_url.to_string())
225            .await?
226            .max_decoding_message_size(MAX_CB_SIZE_BYTES);
227        let compact_block = client
228            .compact_block(CompactBlockRequest { height })
229            .await?
230            .into_inner()
231            .compact_block
232            .expect("response has compact block");
233        compact_block.try_into()
234    }
235
236    /// Create wallet given a path and fvk
237    pub async fn create_wallet(
238        &self,
239        wallet_dir: &Utf8PathBuf,
240        fvk: &FullViewingKey,
241        grpc_url: &Url,
242    ) -> Result<()> {
243        // Create the wallet directory if it doesn't exist
244        if !wallet_dir.exists() {
245            fs::create_dir_all(&wallet_dir)?;
246        }
247
248        // Use FVK to build a pcli config file,
249        // which we'll reference when syncing wallets.
250        let pcli_config = PcliConfig {
251            grpc_url: grpc_url.clone(),
252            view_url: None,
253            governance_custody: None,
254            full_viewing_key: fvk.clone(),
255            disable_warning: true,
256            custody: pcli::config::CustodyConfig::ViewOnly,
257        };
258
259        let pcli_config_path = wallet_dir.join("config.toml");
260        pcli_config.save(pcli_config_path).with_context(|| {
261            format!("failed to initialize wallet in {}", wallet_dir.to_string())
262        })?;
263
264        Ok(())
265    }
266
267    /// Compute the UM-equivalent balance for a given (synced) wallet.
268    pub async fn compute_um_equivalent_balance(
269        &self,
270        view_client: &mut ViewServiceClient<box_grpc_svc::BoxGrpcService>,
271        stake_client: &mut StakeQueryServiceClient<Channel>,
272    ) -> Result<Amount> {
273        let notes = view_client.unspent_notes_by_asset_and_address().await?;
274        let mut total_um_equivalent_amount = Amount::from(0u64);
275        for (asset_id, map) in notes.iter() {
276            if *asset_id == *STAKING_TOKEN_ASSET_ID {
277                let total_amount = map
278                    .iter()
279                    .map(|(_, spendable_notes)| {
280                        spendable_notes
281                            .iter()
282                            .map(|spendable_note| spendable_note.note.amount())
283                            .sum::<Amount>()
284                    })
285                    .sum::<Amount>();
286                total_um_equivalent_amount += total_amount;
287            } else if let Ok(delegation_token) = DelegationToken::from_str(&asset_id.to_string()) {
288                let total_amount = map
289                    .iter()
290                    .map(|(_, spendable_notes)| {
291                        spendable_notes
292                            .iter()
293                            .map(|spendable_note| spendable_note.note.amount())
294                            .sum::<Amount>()
295                    })
296                    .sum::<Amount>();
297
298                // We need to convert the amount to the UM-equivalent amount using the appropriate rate data
299                let rate_data: RateData = stake_client
300                    .current_validator_rate(tonic::Request::new(
301                        (delegation_token.validator()).into(),
302                    ))
303                    .await?
304                    .into_inner()
305                    .try_into()?;
306                let um_equivalent_balance = rate_data.unbonded_amount(total_amount);
307                total_um_equivalent_amount += um_equivalent_balance;
308            };
309        }
310        Ok(total_um_equivalent_amount)
311    }
312
313    /// Execute the specified command.
314    pub async fn exec(&self) -> Result<()> {
315        let opt = self;
316        match &opt.cmd {
317            Command::Reset {} => {
318                // Delete the home directory
319                fs::remove_dir_all(&opt.home)?;
320                println!(
321                    "Successfully cleaned up pmonitor directory: \"{}\"",
322                    opt.home
323                );
324                Ok(())
325            }
326            Command::Init { fvks, grpc_url } => {
327                // Parse the JSON file into a list of full viewing keys
328                let fvks_str = fs::read_to_string(fvks)?;
329
330                // Take elements from the array and parse them into FullViewingKeys
331                let fvk_string_list: Vec<String> = serde_json::from_str(&fvks_str)?;
332                let fvk_list: Vec<FullViewingKey> = fvk_string_list
333                    .iter()
334                    .map(|fvk| FullViewingKey::from_str(&fvk))
335                    .collect::<Result<Vec<_>>>()?;
336                println!("Successfully read FVKs from provided file");
337
338                // Create the home directory if it doesn't exist
339                if !opt.home.exists() {
340                    fs::create_dir_all(&opt.home)?;
341                } else {
342                    anyhow::bail!("pmonitor home directory already exists: {}", opt.home);
343                }
344
345                // During init, we also compute and save the genesis balance for each
346                // FVK, since that won't change in the future.
347                let genesis_compact_block =
348                    self.fetch_genesis_compact_block(grpc_url.clone()).await?;
349                println!("About to scan the genesis block... this may take a moment");
350                let genesis_filtered_block =
351                    genesis::scan_genesis_block(genesis_compact_block, fvk_list.clone()).await?;
352
353                let mut accounts = Vec::new();
354
355                // Now we need to make subdirectories for each of the FVKs and setup their
356                // config files, with the selected FVK and GRPC URL.
357                for fvk in fvk_list.iter() {
358                    let wallet_id = Uuid::new_v4();
359                    let wallet_dir = self.wallet_path(&wallet_id);
360                    tracing::debug!("creating wallet at {}", wallet_dir.to_string());
361                    self.create_wallet(&wallet_dir, &fvk, &grpc_url).await?;
362
363                    accounts.push(AccountConfig::new(
364                        FvkEntry {
365                            fvk: fvk.clone(),
366                            wallet_id,
367                        },
368                        *(genesis_filtered_block
369                            .balances
370                            .get(&fvk.to_string())
371                            .unwrap_or(&Amount::from(0u64))),
372                    ));
373                }
374
375                tracing::info!("successfully initialized {} wallets", accounts.len());
376                let pmonitor_config = PmonitorConfig::new(grpc_url.clone(), accounts);
377
378                // Save the config
379                let config_path = opt.home.join("pmonitor_config.toml");
380                fs::write(config_path, toml::to_string(&pmonitor_config)?)?;
381
382                Ok(())
383            }
384            Command::Audit {} => {
385                // Parse the config file to get the accounts to monitor.
386                //
387                // Note that each logical genesis entry might now have one or more FVKs, depending on if the
388                // user migrated their account to a new FVK, i.e. if they migrated once, they'll have two
389                // FVKs. This can happen an unlimited number of times.
390                let config_path = opt.home.join("pmonitor_config.toml");
391                let pmonitor_config: PmonitorConfig =
392                    toml::from_str(&fs::read_to_string(config_path.clone()).context(format!(
393                        "failed to load pmonitor config file: {}",
394                        config_path
395                    ))?)?;
396
397                let mut stake_client = StakeQueryServiceClient::new(
398                    ViewServer::get_pd_channel(pmonitor_config.grpc_url()).await?,
399                );
400
401                // Sync each wallet to the latest block height, check for new migrations, and check the balance.
402                let mut updated_config = pmonitor_config.clone();
403                let mut config_updated = false;
404
405                let num_accounts = pmonitor_config.accounts().len();
406
407                // Create bucket for documenting non-compliant FVKs, for reporting in summary.
408                let mut failures: Vec<&AccountConfig> = vec![];
409
410                for (index, config) in pmonitor_config.accounts().iter().enumerate() {
411                    let active_fvk = config.active_fvk();
412                    let active_path = self.wallet_path(&config.active_uuid());
413                    tracing::info!(
414                        "syncing wallet {}/{}: {}",
415                        index + 1,
416                        num_accounts,
417                        active_path.to_string()
418                    );
419
420                    let mut view_client = self
421                        .view(
422                            active_path.clone(),
423                            active_fvk.clone(),
424                            pmonitor_config.grpc_url(),
425                        )
426                        .await?;
427
428                    // todo: do this in parallel?
429                    self.sync(&mut view_client).await?;
430                    tracing::debug!("finished syncing wallet {}/{}", index + 1, num_accounts);
431
432                    // Check if the account has been migrated
433                    let storage = Storage::load_or_initialize(
434                        Some(active_path.join(VIEW_FILE_NAME)),
435                        &active_fvk,
436                        pmonitor_config.grpc_url(),
437                    )
438                    .await?;
439
440                    let migration_tx = storage
441                        .transactions_matching_memo(format!(
442                            // N.B. the `%` symbol is an SQLite wildcard, required to match the
443                            // remainder of the memo field.
444                            "Migrating balance from {}%",
445                            active_fvk.to_string()
446                        ))
447                        .await?;
448                    if migration_tx.is_empty() {
449                        tracing::debug!(
450                            "account has not been migrated, continuing using existing FVK..."
451                        );
452                    } else if migration_tx.len() == 1 {
453                        tracing::warn!(
454                            "❗ account has been migrated to new FVK, continuing using new FVK..."
455                        );
456                        let (_, _, _tx, memo_text) = &migration_tx[0];
457                        let new_fvk = parse_dest_fvk_from_memo(&memo_text)?;
458                        let wallet_id = Uuid::new_v4();
459                        let wallet_dir = self.wallet_path(&wallet_id);
460                        self.create_wallet(&wallet_dir, &new_fvk, &pmonitor_config.grpc_url())
461                            .await?;
462
463                        let new_fvk_entry = FvkEntry {
464                            fvk: new_fvk.clone(),
465                            wallet_id,
466                        };
467                        // Mark that the config needs to get saved again for the next time we run the audit command.
468                        config_updated = true;
469
470                        // We need to update the config with the new FVK and path on disk
471                        // to the wallet for the next time we run the audit command.
472                        let mut new_config_entry = config.clone();
473                        new_config_entry.add_migration(new_fvk_entry);
474                        updated_config.set_account(index, new_config_entry.clone());
475
476                        view_client = self
477                            .view(wallet_dir, new_fvk.clone(), pmonitor_config.grpc_url())
478                            .await?;
479
480                        tracing::info!("syncing migrated wallet");
481                        self.sync(&mut view_client).await?;
482                        tracing::info!("finished syncing migrated wallet");
483                        // Now we can exit the else if statement and continue by computing the balance,
484                        // which will use the new migrated wallet.
485                    } else {
486                        // we expect a single migration tx per FVK, if this assumption is violated we should bail.
487                        anyhow::bail!(
488                            "Expected a single migration tx, found {}",
489                            migration_tx.len()
490                        );
491                    }
492
493                    let current_um_equivalent_amount = self
494                        .compute_um_equivalent_balance(&mut view_client, &mut stake_client)
495                        .await?;
496
497                    tracing::debug!("original FVK: {:?}", config.original_fvk());
498
499                    let genesis_um_equivalent_amount = config.genesis_balance();
500                    // Let the user know if the balance is unexpected or not
501                    if check_wallet_compliance(
502                        genesis_um_equivalent_amount,
503                        current_um_equivalent_amount,
504                    ) {
505                        tracing::info!(
506                            ?genesis_um_equivalent_amount,
507                            ?current_um_equivalent_amount,
508                            "✅ expected balance! current balance is within compliant range of the genesis balance",
509                        );
510                    } else {
511                        tracing::error!(
512                            ?genesis_um_equivalent_amount,
513                            ?current_um_equivalent_amount,
514                            "❌ unexpected balance! current balance is less than the genesis balance, by more than {ALLOWED_DISCREPANCY}UM",
515                        );
516                        failures.push(config);
517                    }
518                }
519
520                // If at any point we marked the config for updating, we need to save it.
521                if config_updated {
522                    fs::write(config_path.clone(), toml::to_string(&updated_config)?)?;
523                }
524
525                // Print summary message
526                emit_summary_message(pmonitor_config.accounts(), failures)?;
527
528                Ok(())
529            }
530        }
531    }
532}
533
534/// Prepare a human-readable text summary at the end of the audit run.
535/// This is important, as errors logged during scanning are likely to be off-screen
536/// due to backscroll.
537fn emit_summary_message(
538    all_accounts: &Vec<AccountConfig>,
539    failures: Vec<&AccountConfig>,
540) -> Result<()> {
541    println!("#######################");
542    println!("Summary of FVK scanning");
543    println!("#######################");
544    println!("Total number of FVKs scanned: {}", all_accounts.len(),);
545    let compliant_count = format!(
546        "Number deemed compliant: {}",
547        all_accounts.len() - failures.len(),
548    );
549    let failure_count = format!("Number deemed in violation: {}", failures.len(),);
550    if failures.is_empty() {
551        println!("{}", compliant_count.green());
552        println!("{}", failure_count);
553    } else {
554        println!("{}", compliant_count.yellow());
555        println!("{}", failure_count.red());
556        println!("The non-compliant FVKs are:");
557        println!("");
558        for f in &failures {
559            println!("\t* {}", f.active_fvk().to_string());
560        }
561        println!("");
562        // println!("{}", "Error: non-compliant balances were detected".red());
563        anyhow::bail!("non-compliant balances were detected".red());
564    }
565    Ok(())
566}
567
568/// Check whether the wallet is compliant.
569///
570/// Rather than a naive comparison that the current balance is greater than or
571/// equal to the genesis balance, we permit less than within a tolerance of
572/// 0.1UM. Doing so allows for discrepancies due to gas fees, for instance
573/// if `pcli migrate balance` was used.
574fn check_wallet_compliance(genesis_balance: Amount, current_balance: Amount) -> bool {
575    // Since the `Amount` of the staking token will be in millionths,
576    // we multiply 0.1 * 1_000_000.
577    let allowed_discrepancy = ALLOWED_DISCREPANCY * 1_000_000 as f64;
578    let mut result = false;
579    if current_balance >= genesis_balance {
580        result = true;
581    } else {
582        let actual_discrepancy = genesis_balance - current_balance;
583        let discrepancy_formatted = f64::from(actual_discrepancy) / 1_000_000 as f64;
584        tracing::trace!("detected low balance, missing {}UM", discrepancy_formatted);
585        if f64::from(actual_discrepancy) <= allowed_discrepancy {
586            result = true
587        }
588    }
589    result
590}