1#[macro_use]
2extern crate tracing;
3
4use std::path::PathBuf;
5
6use clap::Parser;
7use rustls::crypto::aws_lc_rs;
8use tracing::Instrument;
9use tracing_subscriber::EnvFilter;
10
11use penumbra_sdk_compact_block::CompactBlock;
12use penumbra_sdk_proto::{
13 core::component::compact_block::v1::CompactBlockRequest,
14 penumbra::{
15 core::component::compact_block::v1::{
16 query_service_client::QueryServiceClient as CompactBlockQueryServiceClient,
17 CompactBlockRangeRequest,
18 },
19 util::tendermint_proxy::v1::{
20 tendermint_proxy_service_client::TendermintProxyServiceClient, GetStatusRequest,
21 },
22 },
23 DomainType, Message,
24};
25use penumbra_sdk_view::ViewServer;
26
27use tonic::transport::Channel;
28use url::Url;
29
30const MAX_CB_SIZE_BYTES: usize = 12 * 1024 * 1024;
32
33#[derive(Debug, Parser)]
34#[clap(
35 name = "penumbra-measure",
36 about = "A developer tool for measuring things about Penumbra.",
37 version
38)]
39pub struct Opt {
40 #[clap(
42 short,
43 long,
44 env = "PENUMBRA_NODE_PD_URL",
45 parse(try_from_str = url::Url::parse)
46 )]
47 node: Url,
48 #[clap(subcommand)]
49 pub cmd: Command,
50 #[clap( long, default_value_t = EnvFilter::new("warn,measure=info"), env = "RUST_LOG")]
52 trace_filter: EnvFilter,
53}
54
55impl Opt {
56 pub fn init_tracing(&mut self) {
57 tracing_subscriber::fmt()
58 .with_env_filter(std::mem::take(&mut self.trace_filter))
59 .init();
60 }
61}
62
63#[derive(Debug, Parser)]
64pub enum Command {
65 StreamBlocks {
67 #[clap(long)]
69 skip_genesis: bool,
70 },
71 OpenConnections {
73 #[clap(short, long, default_value = "100")]
75 num_connections: usize,
76
77 #[clap(long)]
79 full_sync: bool,
80 },
81 OpenConnectionsActive {
84 #[clap(short, long, default_value = "100")]
86 num_connections: usize,
87
88 #[clap(long)]
90 full_sync: bool,
91 },
92 FetchCompactBlock {
94 height: u64,
96 #[clap(short, long)]
98 output_file: PathBuf,
99 },
100}
101
102impl Opt {
103 pub async fn run(&self) -> anyhow::Result<()> {
104 match self.cmd {
105 Command::FetchCompactBlock {
106 height,
107 ref output_file,
108 } => {
109 let mut client = CompactBlockQueryServiceClient::connect(self.node.to_string())
110 .await
111 .unwrap()
112 .max_decoding_message_size(MAX_CB_SIZE_BYTES);
113 let compact_block = client
114 .compact_block(CompactBlockRequest { height })
115 .await?
116 .into_inner()
117 .compact_block
118 .expect("response has compact block");
119 let compact_block_bin = compact_block.encode_to_vec();
120 std::fs::write(output_file.clone(), compact_block_bin)?;
121 let compact_block_bin_2 = std::fs::read(output_file.clone())?;
123 let compact_block_2 = CompactBlock::decode(compact_block_bin_2.as_ref())?;
124 println!("Fetched and saved compact block: {} bytes, height: {}, nullifiers: {}, state payloads: {}", compact_block_bin_2.len(), compact_block_2.height, compact_block_2.nullifiers.len(), compact_block_2.state_payloads.len());
125 }
126 Command::OpenConnections {
127 num_connections,
128 full_sync,
129 } => {
130 let current_height = self.latest_known_block_height().await?.0;
131 let start_height = if full_sync { 0 } else { current_height };
133 let end_height = if full_sync { current_height } else { 0 };
134 let node = self.node.to_string();
135 let mut js = tokio::task::JoinSet::new();
136 for conn_id in 0..num_connections {
137 let node2 = node.clone();
138 js.spawn(
139 async move {
140 let mut client =
141 CompactBlockQueryServiceClient::connect(node2).await.unwrap().max_decoding_message_size(MAX_CB_SIZE_BYTES);
142
143 let mut stream = client
144 .compact_block_range(tonic::Request::new(
145 CompactBlockRangeRequest {
146 start_height,
147 end_height,
148 keep_alive: true,
149 },
150 ))
151 .await
152 .unwrap()
153 .into_inner();
154 while let Some(block_rsp) = stream.message().await.unwrap() {
155 let size = block_rsp.encoded_len();
156 let block: CompactBlock = block_rsp.try_into().unwrap();
157 tracing::debug!(block_size = ?size, block_height = ?block.height, initial_chain_height = ?current_height);
158 if full_sync && block.height >= current_height {
160 break;
161 }
162 }
163 }
164 .instrument(debug_span!("open-connection", conn_id = conn_id)),
165 );
166 }
167 while let Some(res) = js.join_next().await {
168 res?;
169 }
170 }
171 Command::OpenConnectionsActive {
172 num_connections,
173 full_sync,
174 } => {
175 let current_height = self.latest_known_block_height().await?.0;
176 let start_height = if full_sync { 0 } else { current_height };
178 let end_height = if full_sync { current_height } else { 0 };
179 let node = self.node.to_string();
180 let mut js = tokio::task::JoinSet::new();
181 for conn_id in 0..num_connections {
182 let node2 = node.clone();
183 js.spawn(async move {
184 let mut client = CompactBlockQueryServiceClient::connect(node2)
185 .await
186 .unwrap()
187 .max_decoding_message_size(MAX_CB_SIZE_BYTES);
188
189 let mut stream = client
190 .compact_block_range(tonic::Request::new(CompactBlockRangeRequest {
191 start_height,
192 end_height,
193 keep_alive: true,
194 }))
195 .await
196 .unwrap()
197 .into_inner();
198 let (tx_blocks, mut rx_blocks) = tokio::sync::mpsc::channel(10_000);
199 tokio::spawn(async move {
200 while let Some(block) = stream.message().await.transpose() {
201 if tx_blocks.send(block).await.is_err() {
202 break;
203 }
204 }
205 });
206
207 while let Some(block) = rx_blocks.recv().await {
208 let block: CompactBlock =
209 block.expect("valid block").try_into().expect("valid block");
210 let height = block.height;
211 tracing::debug!(block_height = ?height, conn_id, "processing block");
212 }
213 });
214 }
215 while let Some(res) = js.join_next().await {
216 res?;
217 }
218 }
219 Command::StreamBlocks { skip_genesis } => {
220 let channel = ViewServer::get_pd_channel(self.node.clone()).await?;
221
222 let mut cb_client = CompactBlockQueryServiceClient::new(channel.clone())
223 .max_decoding_message_size(MAX_CB_SIZE_BYTES);
224
225 let end_height = self.latest_known_block_height().await?.0;
226 let start_height = if skip_genesis { 1 } else { 0 };
227
228 let mut stream = cb_client
229 .compact_block_range(tonic::Request::new(CompactBlockRangeRequest {
230 start_height,
231 end_height,
232 keep_alive: false,
233 }))
234 .await?
235 .into_inner();
236
237 use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
238 let progress_bar =
239 ProgressBar::with_draw_target(end_height, ProgressDrawTarget::stderr())
240 .with_style(ProgressStyle::default_bar().template(
241 "[{elapsed}] {bar:50.cyan/blue} {pos:>7}/{len:7} {per_sec} ETA: {eta}",
242 ));
243 progress_bar.set_position(0);
244
245 let mut bytes = 0;
246 let mut cb_count = 0;
247 let mut nf_count = 0;
248 let mut sp_rolled_up_count = 0;
249 let mut sp_note_count = 0;
250 let mut sp_swap_count = 0;
251
252 use penumbra_sdk_compact_block::StatePayload;
253
254 while let Some(block_rsp) = stream.message().await? {
255 cb_count += 1;
256 bytes += block_rsp.encoded_len();
257 let block: CompactBlock = block_rsp.try_into()?;
258 nf_count += block.nullifiers.len();
259 sp_rolled_up_count += block
260 .state_payloads
261 .iter()
262 .filter(|sp| matches!(sp, StatePayload::RolledUp { .. }))
263 .count();
264 sp_note_count += block
265 .state_payloads
266 .iter()
267 .filter(|sp| matches!(sp, StatePayload::Note { .. }))
268 .count();
269 sp_swap_count += block
270 .state_payloads
271 .iter()
272 .filter(|sp| matches!(sp, StatePayload::Swap { .. }))
273 .count();
274 progress_bar.set_position(block.height);
275 }
276 progress_bar.finish();
277
278 let sp_count = sp_note_count + sp_swap_count + sp_rolled_up_count;
279 println!(
280 "Fetched at least {}",
281 bytesize::to_string(bytes as u64, false)
282 );
283 println!("Fetched {cb_count} compact blocks, containing:");
284 println!("\t{nf_count} nullifiers");
285 println!("\t{sp_count} state payloads, containing:");
286 println!("\t\t{sp_note_count} note payloads");
287 println!("\t\t{sp_swap_count} swap payloads");
288 println!("\t\t{sp_rolled_up_count} rolled up payloads");
289 }
290 }
291
292 Ok(())
293 }
294
295 #[instrument(skip(self))]
296 pub async fn latest_known_block_height(&self) -> anyhow::Result<(u64, bool)> {
297 let mut client = get_tendermint_proxy_client(self.node.clone()).await?;
298 let rsp = client.get_status(GetStatusRequest {}).await?.into_inner();
299 let sync_info = rsp
300 .sync_info
301 .ok_or_else(|| anyhow::anyhow!("could not parse sync_info in gRPC response"))?;
302
303 let latest_block_height = sync_info.latest_block_height;
304 let node_catching_up = sync_info.catching_up;
305 Ok((latest_block_height, node_catching_up))
306 }
307}
308
309async fn get_tendermint_proxy_client(
311 pd_url: Url,
312) -> anyhow::Result<TendermintProxyServiceClient<Channel>> {
313 let pd_channel = ViewServer::get_pd_channel(pd_url).await?;
314 Ok(TendermintProxyServiceClient::new(pd_channel))
315}
316
317#[tokio::main]
318async fn main() -> anyhow::Result<()> {
319 aws_lc_rs::default_provider()
321 .install_default()
322 .expect("failed to initialize rustls support, via aws-lc-rs");
323
324 let mut opt = Opt::parse();
325 opt.init_tracing();
326 opt.run().await?;
327 Ok(())
328}