1use anyhow::{Context, Result};
18use camino::Utf8PathBuf;
19use clap::{self, Parser};
20use directories::ProjectDirs;
21use futures::StreamExt;
22use penumbra_sdk_asset::STAKING_TOKEN_ASSET_ID;
23use rustls::crypto::aws_lc_rs;
24use std::fs;
25use std::io::IsTerminal as _;
26use std::str::FromStr;
27use tonic::transport::Channel;
28use tracing_subscriber::{prelude::*, EnvFilter};
29use url::Url;
30use uuid::Uuid;
31
32use colored::Colorize;
33
34use pcli::config::PcliConfig;
35use penumbra_sdk_compact_block::CompactBlock;
36use penumbra_sdk_keys::FullViewingKey;
37use penumbra_sdk_num::Amount;
38use penumbra_sdk_proto::box_grpc_svc;
39use penumbra_sdk_proto::view::v1::{
40 view_service_client::ViewServiceClient, view_service_server::ViewServiceServer,
41};
42use penumbra_sdk_proto::{
43 core::component::compact_block::v1::CompactBlockRequest,
44 core::component::stake::v1::query_service_client::QueryServiceClient as StakeQueryServiceClient,
45 penumbra::core::component::compact_block::v1::query_service_client::QueryServiceClient as CompactBlockQueryServiceClient,
46};
47use penumbra_sdk_stake::rate::RateData;
48use penumbra_sdk_stake::DelegationToken;
49use penumbra_sdk_view::{Storage, ViewClient, ViewServer};
50
51mod config;
52mod genesis;
53
54use config::{parse_dest_fvk_from_memo, AccountConfig, FvkEntry, PmonitorConfig};
55
56const MAX_CB_SIZE_BYTES: usize = 12 * 1024 * 1024;
58
59const VIEW_FILE_NAME: &str = "pcli-view.sqlite";
61
62const ALLOWED_DISCREPANCY: f64 = 0.1;
65
66fn init_tracing() -> anyhow::Result<()> {
68 let fmt_layer = tracing_subscriber::fmt::layer()
71 .with_ansi(std::io::stdout().is_terminal())
72 .with_writer(std::io::stderr)
73 .with_target(true);
74 let filter_layer = EnvFilter::try_from_default_env()
76 .or_else(|_| EnvFilter::try_new("info,penumbra_sdk_view=off"))?;
77
78 let registry = tracing_subscriber::registry()
80 .with(filter_layer)
81 .with(fmt_layer);
82 registry.init();
83 Ok(())
84}
85
86#[tokio::main]
87async fn main() -> Result<()> {
88 let opt = Opt::parse();
89 init_tracing()?;
90
91 aws_lc_rs::default_provider()
93 .install_default()
94 .expect("failed to initialize rustls support, via aws-lc-rs");
95
96 tracing::info!(?opt, version = env!("CARGO_PKG_VERSION"), "running command");
97 opt.exec().await
98}
99
100pub fn default_home() -> Utf8PathBuf {
104 let path = ProjectDirs::from("zone", "penumbra", "pmonitor")
105 .expect("Failed to get platform data dir")
106 .data_dir()
107 .to_path_buf();
108 Utf8PathBuf::from_path_buf(path).expect("Platform default data dir was not UTF-8")
109}
110
111#[derive(Debug, Parser)]
112#[clap(
113 name = "pmonitor",
114 about = "The Penumbra account activity monitor.",
115 version
116)]
117pub struct Opt {
118 #[clap(subcommand)]
120 pub cmd: Command,
121 #[clap(long, default_value_t = default_home(), env = "PENUMBRA_PMONITOR_HOME")]
123 pub home: Utf8PathBuf,
124}
125
126#[derive(Debug, clap::Subcommand)]
127pub enum Command {
128 Init {
130 #[clap(long, display_order = 200)]
132 fvks: String,
133 #[clap(
135 long,
136 display_order = 900,
137 parse(try_from_str = Url::parse)
138 )]
139 grpc_url: Url,
140 },
141 Audit {},
143 Reset {},
145}
146
147impl Opt {
148 pub async fn view(
150 &self,
151 path: Utf8PathBuf,
152 fvk: FullViewingKey,
153 grpc_url: Url,
154 ) -> Result<ViewServiceClient<box_grpc_svc::BoxGrpcService>> {
155 let registry_path = path.join("registry.json");
156 let registry_path = if registry_path.exists() {
158 Some(registry_path)
159 } else {
160 None
161 };
162 let db_path: Utf8PathBuf = path.join(VIEW_FILE_NAME);
163
164 let svc: ViewServer =
165 ViewServer::load_or_initialize(Some(db_path), registry_path, &fvk, grpc_url).await?;
166
167 let svc: ViewServiceServer<ViewServer> = ViewServiceServer::new(svc);
168 let view_service = ViewServiceClient::new(box_grpc_svc::local(svc));
169 Ok(view_service)
170 }
171
172 pub fn wallet_path(&self, wallet_id: &Uuid) -> Utf8PathBuf {
174 self.home.join(format!("wallet_{}", wallet_id))
175 }
176
177 pub async fn sync(
179 &self,
180 view_service: &mut ViewServiceClient<box_grpc_svc::BoxGrpcService>,
181 ) -> Result<()> {
182 let mut status_stream = ViewClient::status_stream(view_service).await?;
183
184 let initial_status = status_stream
185 .next()
186 .await
187 .transpose()?
188 .ok_or_else(|| anyhow::anyhow!("view service did not report sync status"))?;
189
190 tracing::debug!(
191 "scanning blocks from last sync height {} to latest height {}",
192 initial_status.full_sync_height,
193 initial_status.latest_known_block_height,
194 );
195
196 let log_every_n_blocks = 100;
210 while let Some(status) = status_stream.next().await.transpose()? {
211 if status.full_sync_height % log_every_n_blocks == 0 {
212 tracing::debug!("synced {} blocks", status.full_sync_height);
213 }
214 }
216 Ok(())
219 }
220
221 pub async fn fetch_genesis_compact_block(&self, grpc_url: Url) -> Result<CompactBlock> {
223 let height = 0;
224 let mut client = CompactBlockQueryServiceClient::connect(grpc_url.to_string())
225 .await?
226 .max_decoding_message_size(MAX_CB_SIZE_BYTES);
227 let compact_block = client
228 .compact_block(CompactBlockRequest { height })
229 .await?
230 .into_inner()
231 .compact_block
232 .expect("response has compact block");
233 compact_block.try_into()
234 }
235
236 pub async fn create_wallet(
238 &self,
239 wallet_dir: &Utf8PathBuf,
240 fvk: &FullViewingKey,
241 grpc_url: &Url,
242 ) -> Result<()> {
243 if !wallet_dir.exists() {
245 fs::create_dir_all(&wallet_dir)?;
246 }
247
248 let pcli_config = PcliConfig {
251 grpc_url: grpc_url.clone(),
252 view_url: None,
253 governance_custody: None,
254 full_viewing_key: fvk.clone(),
255 disable_warning: true,
256 custody: pcli::config::CustodyConfig::ViewOnly,
257 };
258
259 let pcli_config_path = wallet_dir.join("config.toml");
260 pcli_config.save(pcli_config_path).with_context(|| {
261 format!("failed to initialize wallet in {}", wallet_dir.to_string())
262 })?;
263
264 Ok(())
265 }
266
267 pub async fn compute_um_equivalent_balance(
269 &self,
270 view_client: &mut ViewServiceClient<box_grpc_svc::BoxGrpcService>,
271 stake_client: &mut StakeQueryServiceClient<Channel>,
272 ) -> Result<Amount> {
273 let notes = view_client.unspent_notes_by_asset_and_address().await?;
274 let mut total_um_equivalent_amount = Amount::from(0u64);
275 for (asset_id, map) in notes.iter() {
276 if *asset_id == *STAKING_TOKEN_ASSET_ID {
277 let total_amount = map
278 .iter()
279 .map(|(_, spendable_notes)| {
280 spendable_notes
281 .iter()
282 .map(|spendable_note| spendable_note.note.amount())
283 .sum::<Amount>()
284 })
285 .sum::<Amount>();
286 total_um_equivalent_amount += total_amount;
287 } else if let Ok(delegation_token) = DelegationToken::from_str(&asset_id.to_string()) {
288 let total_amount = map
289 .iter()
290 .map(|(_, spendable_notes)| {
291 spendable_notes
292 .iter()
293 .map(|spendable_note| spendable_note.note.amount())
294 .sum::<Amount>()
295 })
296 .sum::<Amount>();
297
298 let rate_data: RateData = stake_client
300 .current_validator_rate(tonic::Request::new(
301 (delegation_token.validator()).into(),
302 ))
303 .await?
304 .into_inner()
305 .try_into()?;
306 let um_equivalent_balance = rate_data.unbonded_amount(total_amount);
307 total_um_equivalent_amount += um_equivalent_balance;
308 };
309 }
310 Ok(total_um_equivalent_amount)
311 }
312
313 pub async fn exec(&self) -> Result<()> {
315 let opt = self;
316 match &opt.cmd {
317 Command::Reset {} => {
318 fs::remove_dir_all(&opt.home)?;
320 println!(
321 "Successfully cleaned up pmonitor directory: \"{}\"",
322 opt.home
323 );
324 Ok(())
325 }
326 Command::Init { fvks, grpc_url } => {
327 let fvks_str = fs::read_to_string(fvks)?;
329
330 let fvk_string_list: Vec<String> = serde_json::from_str(&fvks_str)?;
332 let fvk_list: Vec<FullViewingKey> = fvk_string_list
333 .iter()
334 .map(|fvk| FullViewingKey::from_str(&fvk))
335 .collect::<Result<Vec<_>>>()?;
336 println!("Successfully read FVKs from provided file");
337
338 if !opt.home.exists() {
340 fs::create_dir_all(&opt.home)?;
341 } else {
342 anyhow::bail!("pmonitor home directory already exists: {}", opt.home);
343 }
344
345 let genesis_compact_block =
348 self.fetch_genesis_compact_block(grpc_url.clone()).await?;
349 println!("About to scan the genesis block... this may take a moment");
350 let genesis_filtered_block =
351 genesis::scan_genesis_block(genesis_compact_block, fvk_list.clone()).await?;
352
353 let mut accounts = Vec::new();
354
355 for fvk in fvk_list.iter() {
358 let wallet_id = Uuid::new_v4();
359 let wallet_dir = self.wallet_path(&wallet_id);
360 tracing::debug!("creating wallet at {}", wallet_dir.to_string());
361 self.create_wallet(&wallet_dir, &fvk, &grpc_url).await?;
362
363 accounts.push(AccountConfig::new(
364 FvkEntry {
365 fvk: fvk.clone(),
366 wallet_id,
367 },
368 *(genesis_filtered_block
369 .balances
370 .get(&fvk.to_string())
371 .unwrap_or(&Amount::from(0u64))),
372 ));
373 }
374
375 tracing::info!("successfully initialized {} wallets", accounts.len());
376 let pmonitor_config = PmonitorConfig::new(grpc_url.clone(), accounts);
377
378 let config_path = opt.home.join("pmonitor_config.toml");
380 fs::write(config_path, toml::to_string(&pmonitor_config)?)?;
381
382 Ok(())
383 }
384 Command::Audit {} => {
385 let config_path = opt.home.join("pmonitor_config.toml");
391 let pmonitor_config: PmonitorConfig =
392 toml::from_str(&fs::read_to_string(config_path.clone()).context(format!(
393 "failed to load pmonitor config file: {}",
394 config_path
395 ))?)?;
396
397 let mut stake_client = StakeQueryServiceClient::new(
398 ViewServer::get_pd_channel(pmonitor_config.grpc_url()).await?,
399 );
400
401 let mut updated_config = pmonitor_config.clone();
403 let mut config_updated = false;
404
405 let num_accounts = pmonitor_config.accounts().len();
406
407 let mut failures: Vec<&AccountConfig> = vec![];
409
410 for (index, config) in pmonitor_config.accounts().iter().enumerate() {
411 let active_fvk = config.active_fvk();
412 let active_path = self.wallet_path(&config.active_uuid());
413 tracing::info!(
414 "syncing wallet {}/{}: {}",
415 index + 1,
416 num_accounts,
417 active_path.to_string()
418 );
419
420 let mut view_client = self
421 .view(
422 active_path.clone(),
423 active_fvk.clone(),
424 pmonitor_config.grpc_url(),
425 )
426 .await?;
427
428 self.sync(&mut view_client).await?;
430 tracing::debug!("finished syncing wallet {}/{}", index + 1, num_accounts);
431
432 let storage = Storage::load_or_initialize(
434 Some(active_path.join(VIEW_FILE_NAME)),
435 &active_fvk,
436 pmonitor_config.grpc_url(),
437 )
438 .await?;
439
440 let migration_tx = storage
441 .transactions_matching_memo(format!(
442 "Migrating balance from {}%",
445 active_fvk.to_string()
446 ))
447 .await?;
448 if migration_tx.is_empty() {
449 tracing::debug!(
450 "account has not been migrated, continuing using existing FVK..."
451 );
452 } else if migration_tx.len() == 1 {
453 tracing::warn!(
454 "❗ account has been migrated to new FVK, continuing using new FVK..."
455 );
456 let (_, _, _tx, memo_text) = &migration_tx[0];
457 let new_fvk = parse_dest_fvk_from_memo(&memo_text)?;
458 let wallet_id = Uuid::new_v4();
459 let wallet_dir = self.wallet_path(&wallet_id);
460 self.create_wallet(&wallet_dir, &new_fvk, &pmonitor_config.grpc_url())
461 .await?;
462
463 let new_fvk_entry = FvkEntry {
464 fvk: new_fvk.clone(),
465 wallet_id,
466 };
467 config_updated = true;
469
470 let mut new_config_entry = config.clone();
473 new_config_entry.add_migration(new_fvk_entry);
474 updated_config.set_account(index, new_config_entry.clone());
475
476 view_client = self
477 .view(wallet_dir, new_fvk.clone(), pmonitor_config.grpc_url())
478 .await?;
479
480 tracing::info!("syncing migrated wallet");
481 self.sync(&mut view_client).await?;
482 tracing::info!("finished syncing migrated wallet");
483 } else {
486 anyhow::bail!(
488 "Expected a single migration tx, found {}",
489 migration_tx.len()
490 );
491 }
492
493 let current_um_equivalent_amount = self
494 .compute_um_equivalent_balance(&mut view_client, &mut stake_client)
495 .await?;
496
497 tracing::debug!("original FVK: {:?}", config.original_fvk());
498
499 let genesis_um_equivalent_amount = config.genesis_balance();
500 if check_wallet_compliance(
502 genesis_um_equivalent_amount,
503 current_um_equivalent_amount,
504 ) {
505 tracing::info!(
506 ?genesis_um_equivalent_amount,
507 ?current_um_equivalent_amount,
508 "✅ expected balance! current balance is within compliant range of the genesis balance",
509 );
510 } else {
511 tracing::error!(
512 ?genesis_um_equivalent_amount,
513 ?current_um_equivalent_amount,
514 "❌ unexpected balance! current balance is less than the genesis balance, by more than {ALLOWED_DISCREPANCY}UM",
515 );
516 failures.push(config);
517 }
518 }
519
520 if config_updated {
522 fs::write(config_path.clone(), toml::to_string(&updated_config)?)?;
523 }
524
525 emit_summary_message(pmonitor_config.accounts(), failures)?;
527
528 Ok(())
529 }
530 }
531 }
532}
533
534fn emit_summary_message(
538 all_accounts: &Vec<AccountConfig>,
539 failures: Vec<&AccountConfig>,
540) -> Result<()> {
541 println!("#######################");
542 println!("Summary of FVK scanning");
543 println!("#######################");
544 println!("Total number of FVKs scanned: {}", all_accounts.len(),);
545 let compliant_count = format!(
546 "Number deemed compliant: {}",
547 all_accounts.len() - failures.len(),
548 );
549 let failure_count = format!("Number deemed in violation: {}", failures.len(),);
550 if failures.is_empty() {
551 println!("{}", compliant_count.green());
552 println!("{}", failure_count);
553 } else {
554 println!("{}", compliant_count.yellow());
555 println!("{}", failure_count.red());
556 println!("The non-compliant FVKs are:");
557 println!("");
558 for f in &failures {
559 println!("\t* {}", f.active_fvk().to_string());
560 }
561 println!("");
562 anyhow::bail!("non-compliant balances were detected".red());
564 }
565 Ok(())
566}
567
568fn check_wallet_compliance(genesis_balance: Amount, current_balance: Amount) -> bool {
575 let allowed_discrepancy = ALLOWED_DISCREPANCY * 1_000_000 as f64;
578 let mut result = false;
579 if current_balance >= genesis_balance {
580 result = true;
581 } else {
582 let actual_discrepancy = genesis_balance - current_balance;
583 let discrepancy_formatted = f64::from(actual_discrepancy) / 1_000_000 as f64;
584 tracing::trace!("detected low balance, missing {}UM", discrepancy_formatted);
585 if f64::from(actual_discrepancy) <= allowed_discrepancy {
586 result = true
587 }
588 }
589 result
590}