measure/
main.rs

1#[macro_use]
2extern crate tracing;
3
4use std::path::PathBuf;
5
6use clap::Parser;
7use rustls::crypto::aws_lc_rs;
8use tracing::Instrument;
9use tracing_subscriber::EnvFilter;
10
11use penumbra_sdk_compact_block::CompactBlock;
12use penumbra_sdk_proto::{
13    core::component::compact_block::v1::CompactBlockRequest,
14    penumbra::{
15        core::component::compact_block::v1::{
16            query_service_client::QueryServiceClient as CompactBlockQueryServiceClient,
17            CompactBlockRangeRequest,
18        },
19        util::tendermint_proxy::v1::{
20            tendermint_proxy_service_client::TendermintProxyServiceClient, GetStatusRequest,
21        },
22    },
23    DomainType, Message,
24};
25use penumbra_sdk_view::ViewServer;
26
27use tonic::transport::Channel;
28use url::Url;
29
30// The expected maximum size of a compact block message.
31const MAX_CB_SIZE_BYTES: usize = 12 * 1024 * 1024;
32
33#[derive(Debug, Parser)]
34#[clap(
35    name = "penumbra-measure",
36    about = "A developer tool for measuring things about Penumbra.",
37    version
38)]
39pub struct Opt {
40    /// The URL for the gRPC endpoint of the remote pd node.
41    #[clap(
42        short,
43        long,
44        env = "PENUMBRA_NODE_PD_URL",
45        parse(try_from_str = url::Url::parse)
46    )]
47    node: Url,
48    #[clap(subcommand)]
49    pub cmd: Command,
50    /// The filter for log messages.
51    #[clap( long, default_value_t = EnvFilter::new("warn,measure=info"), env = "RUST_LOG")]
52    trace_filter: EnvFilter,
53}
54
55impl Opt {
56    pub fn init_tracing(&mut self) {
57        tracing_subscriber::fmt()
58            .with_env_filter(std::mem::take(&mut self.trace_filter))
59            .init();
60    }
61}
62
63#[derive(Debug, Parser)]
64pub enum Command {
65    /// Measure the performance of downloading compact blocks without parsing them.
66    StreamBlocks {
67        /// If set, skip downloading the genesis compact block.
68        #[clap(long)]
69        skip_genesis: bool,
70    },
71    /// Load-test `pd` by holding open many connections subscribing to compact block updates.
72    OpenConnections {
73        /// The number of connections to open.
74        #[clap(short, long, default_value = "100")]
75        num_connections: usize,
76
77        /// Whether to sync the entire chain state, then exit.
78        #[clap(long)]
79        full_sync: bool,
80    },
81    /// Load-test `pd` by holding open many connections subscribing to compact block updates,
82    /// processing the messages asynchronously to create
83    OpenConnectionsActive {
84        /// The number of connections to open.
85        #[clap(short, long, default_value = "100")]
86        num_connections: usize,
87
88        /// Whether to sync the entire chain state, then exit.
89        #[clap(long)]
90        full_sync: bool,
91    },
92    /// Fetch a specified compact block.
93    FetchCompactBlock {
94        /// The height of the block to fetch.
95        height: u64,
96        /// The output file path.
97        #[clap(short, long)]
98        output_file: PathBuf,
99    },
100}
101
102impl Opt {
103    pub async fn run(&self) -> anyhow::Result<()> {
104        match self.cmd {
105            Command::FetchCompactBlock {
106                height,
107                ref output_file,
108            } => {
109                let mut client = CompactBlockQueryServiceClient::connect(self.node.to_string())
110                    .await
111                    .unwrap()
112                    .max_decoding_message_size(MAX_CB_SIZE_BYTES);
113                let compact_block = client
114                    .compact_block(CompactBlockRequest { height })
115                    .await?
116                    .into_inner()
117                    .compact_block
118                    .expect("response has compact block");
119                let compact_block_bin = compact_block.encode_to_vec();
120                std::fs::write(output_file.clone(), compact_block_bin)?;
121                // Now read back the data and do sanity checking
122                let compact_block_bin_2 = std::fs::read(output_file.clone())?;
123                let compact_block_2 = CompactBlock::decode(compact_block_bin_2.as_ref())?;
124                println!("Fetched and saved compact block: {} bytes, height: {}, nullifiers: {}, state payloads: {}", compact_block_bin_2.len(), compact_block_2.height, compact_block_2.nullifiers.len(), compact_block_2.state_payloads.len());
125            }
126            Command::OpenConnections {
127                num_connections,
128                full_sync,
129            } => {
130                let current_height = self.latest_known_block_height().await?.0;
131                // Configure start/stop ranges on query, depending on whether we want a full sync.
132                let start_height = if full_sync { 0 } else { current_height };
133                let end_height = if full_sync { current_height } else { 0 };
134                let node = self.node.to_string();
135                let mut js = tokio::task::JoinSet::new();
136                for conn_id in 0..num_connections {
137                    let node2 = node.clone();
138                    js.spawn(
139                        async move {
140                            let mut client =
141                                CompactBlockQueryServiceClient::connect(node2).await.unwrap().max_decoding_message_size(MAX_CB_SIZE_BYTES);
142
143                            let mut stream = client
144                                .compact_block_range(tonic::Request::new(
145                                    CompactBlockRangeRequest {
146                                        start_height,
147                                        end_height,
148                                        keep_alive: true,
149                                    },
150                                ))
151                                .await
152                                .unwrap()
153                                .into_inner();
154                            while let Some(block_rsp) = stream.message().await.unwrap() {
155                                let size = block_rsp.encoded_len();
156                                let block: CompactBlock = block_rsp.try_into().unwrap();
157                                tracing::debug!(block_size = ?size, block_height = ?block.height, initial_chain_height = ?current_height);
158                                // Exit if we only wanted a single full sync per client.
159                                if full_sync && block.height >=  current_height {
160                                    break;
161                                }
162                            }
163                        }
164                        .instrument(debug_span!("open-connection", conn_id = conn_id)),
165                    );
166                }
167                while let Some(res) = js.join_next().await {
168                    res?;
169                }
170            }
171            Command::OpenConnectionsActive {
172                num_connections,
173                full_sync,
174            } => {
175                let current_height = self.latest_known_block_height().await?.0;
176                // Configure start/stop ranges on query, depending on whether we want a full sync.
177                let start_height = if full_sync { 0 } else { current_height };
178                let end_height = if full_sync { current_height } else { 0 };
179                let node = self.node.to_string();
180                let mut js = tokio::task::JoinSet::new();
181                for conn_id in 0..num_connections {
182                    let node2 = node.clone();
183                    js.spawn(async move {
184                        let mut client = CompactBlockQueryServiceClient::connect(node2)
185                            .await
186                            .unwrap()
187                            .max_decoding_message_size(MAX_CB_SIZE_BYTES);
188
189                        let mut stream = client
190                            .compact_block_range(tonic::Request::new(CompactBlockRangeRequest {
191                                start_height,
192                                end_height,
193                                keep_alive: true,
194                            }))
195                            .await
196                            .unwrap()
197                            .into_inner();
198                        let (tx_blocks, mut rx_blocks) = tokio::sync::mpsc::channel(10_000);
199                        tokio::spawn(async move {
200                            while let Some(block) = stream.message().await.transpose() {
201                                if tx_blocks.send(block).await.is_err() {
202                                    break;
203                                }
204                            }
205                        });
206
207                        while let Some(block) = rx_blocks.recv().await {
208                            let block: CompactBlock =
209                                block.expect("valid block").try_into().expect("valid block");
210                            let height = block.height;
211                            tracing::debug!(block_height = ?height, conn_id, "processing block");
212                        }
213                    });
214                }
215                while let Some(res) = js.join_next().await {
216                    res?;
217                }
218            }
219            Command::StreamBlocks { skip_genesis } => {
220                let channel = ViewServer::get_pd_channel(self.node.clone()).await?;
221
222                let mut cb_client = CompactBlockQueryServiceClient::new(channel.clone())
223                    .max_decoding_message_size(MAX_CB_SIZE_BYTES);
224
225                let end_height = self.latest_known_block_height().await?.0;
226                let start_height = if skip_genesis { 1 } else { 0 };
227
228                let mut stream = cb_client
229                    .compact_block_range(tonic::Request::new(CompactBlockRangeRequest {
230                        start_height,
231                        end_height,
232                        keep_alive: false,
233                    }))
234                    .await?
235                    .into_inner();
236
237                use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
238                let progress_bar =
239                    ProgressBar::with_draw_target(end_height, ProgressDrawTarget::stderr())
240                        .with_style(ProgressStyle::default_bar().template(
241                            "[{elapsed}] {bar:50.cyan/blue} {pos:>7}/{len:7} {per_sec} ETA: {eta}",
242                        ));
243                progress_bar.set_position(0);
244
245                let mut bytes = 0;
246                let mut cb_count = 0;
247                let mut nf_count = 0;
248                let mut sp_rolled_up_count = 0;
249                let mut sp_note_count = 0;
250                let mut sp_swap_count = 0;
251
252                use penumbra_sdk_compact_block::StatePayload;
253
254                while let Some(block_rsp) = stream.message().await? {
255                    cb_count += 1;
256                    bytes += block_rsp.encoded_len();
257                    let block: CompactBlock = block_rsp.try_into()?;
258                    nf_count += block.nullifiers.len();
259                    sp_rolled_up_count += block
260                        .state_payloads
261                        .iter()
262                        .filter(|sp| matches!(sp, StatePayload::RolledUp { .. }))
263                        .count();
264                    sp_note_count += block
265                        .state_payloads
266                        .iter()
267                        .filter(|sp| matches!(sp, StatePayload::Note { .. }))
268                        .count();
269                    sp_swap_count += block
270                        .state_payloads
271                        .iter()
272                        .filter(|sp| matches!(sp, StatePayload::Swap { .. }))
273                        .count();
274                    progress_bar.set_position(block.height);
275                }
276                progress_bar.finish();
277
278                let sp_count = sp_note_count + sp_swap_count + sp_rolled_up_count;
279                println!(
280                    "Fetched at least {}",
281                    bytesize::to_string(bytes as u64, false)
282                );
283                println!("Fetched {cb_count} compact blocks, containing:");
284                println!("\t{nf_count} nullifiers");
285                println!("\t{sp_count} state payloads, containing:");
286                println!("\t\t{sp_note_count} note payloads");
287                println!("\t\t{sp_swap_count} swap payloads");
288                println!("\t\t{sp_rolled_up_count} rolled up payloads");
289            }
290        }
291
292        Ok(())
293    }
294
295    #[instrument(skip(self))]
296    pub async fn latest_known_block_height(&self) -> anyhow::Result<(u64, bool)> {
297        let mut client = get_tendermint_proxy_client(self.node.clone()).await?;
298        let rsp = client.get_status(GetStatusRequest {}).await?.into_inner();
299        let sync_info = rsp
300            .sync_info
301            .ok_or_else(|| anyhow::anyhow!("could not parse sync_info in gRPC response"))?;
302
303        let latest_block_height = sync_info.latest_block_height;
304        let node_catching_up = sync_info.catching_up;
305        Ok((latest_block_height, node_catching_up))
306    }
307}
308
309// Wrapper for the `get_pd_channel` method from the view crate.
310async fn get_tendermint_proxy_client(
311    pd_url: Url,
312) -> anyhow::Result<TendermintProxyServiceClient<Channel>> {
313    let pd_channel = ViewServer::get_pd_channel(pd_url).await?;
314    Ok(TendermintProxyServiceClient::new(pd_channel))
315}
316
317#[tokio::main]
318async fn main() -> anyhow::Result<()> {
319    // Initialize HTTPS support
320    aws_lc_rs::default_provider()
321        .install_default()
322        .expect("failed to initialize rustls support, via aws-lc-rs");
323
324    let mut opt = Opt::parse();
325    opt.init_tracing();
326    opt.run().await?;
327    Ok(())
328}