pd/network/
join.rs

1//! Logic for onboarding a new `pd` node onto an existing network.
2//! Handles generation of config files for `pd` and `cometbft`.
3use anyhow::Context;
4use rand::seq::SliceRandom;
5use rand_core::OsRng;
6use std::net::{IpAddr, SocketAddr};
7use std::path::PathBuf;
8use tendermint_config::net::Address as TendermintAddress;
9use url::Url;
10
11use flate2::read::GzDecoder;
12use std::io::Write;
13use tokio_stream::StreamExt;
14
15use crate::network::config::{parse_tm_address, NetworkTendermintConfig};
16use crate::network::generate::NetworkValidator;
17
18/// Bootstrap a connection to a network, via a node on that network.
19/// Look up network peer info from the target node, and seed the tendermint
20/// p2p settings with that peer info.
21pub async fn network_join(
22    output_dir: PathBuf,
23    node: Url,
24    node_name: &str,
25    external_address: Option<TendermintAddress>,
26    tm_rpc_bind: SocketAddr,
27    tm_p2p_bind: SocketAddr,
28) -> anyhow::Result<()> {
29    let mut node_dir = output_dir;
30    node_dir.push("node0");
31    let genesis_url = node.join("genesis")?;
32    tracing::info!(%genesis_url, "fetching genesis");
33    // We need to download the genesis data and the node ID from the remote node.
34    // TODO: replace with TendermintProxyServiceClient
35    let client = reqwest::Client::new();
36    let genesis_json = client
37        .get(genesis_url)
38        .send()
39        .await?
40        .json::<serde_json::Value>()
41        .await?
42        .get_mut("result")
43        .and_then(|v| v.get_mut("genesis"))
44        .ok_or_else(|| anyhow::anyhow!("could not parse JSON from response"))?
45        .take();
46    let genesis = serde_json::value::from_value(genesis_json)?;
47    tracing::info!("fetched genesis");
48
49    // Look up more peers from the target node, so that generated tendermint config
50    // contains multiple addresses, making peering easier.
51    let mut peers = Vec::new();
52    if let Some(node_tm_address) = fetch_listen_address(&node).await {
53        peers.push(node_tm_address);
54    } else {
55        // We consider it odd that a bootstrap node has a remotely accessible
56        // RPC endpoint, but no P2P listener enabled.
57        tracing::warn!("Failed to find listenaddr for {}", &node);
58    }
59    let new_peers = fetch_peers(&node).await?;
60    peers.extend(new_peers);
61    tracing::info!(?peers, "Network peers for inclusion in generated configs");
62
63    let tm_config = NetworkTendermintConfig::new(
64        node_name,
65        peers,
66        external_address,
67        Some(tm_rpc_bind),
68        Some(tm_p2p_bind),
69    )?;
70
71    let tv = NetworkValidator::default();
72    tm_config.write_config(node_dir, &tv, &genesis)?;
73    Ok(())
74}
75
76/// Query the Tendermint node's RPC endpoint at `tm_url` and return the listener
77/// address for the P2P endpoint. Returns an [Option<TendermintAddress>] because
78/// it's possible that no p2p listener is configured.
79pub async fn fetch_listen_address(tm_url: &Url) -> Option<TendermintAddress> {
80    let client = reqwest::Client::new();
81    // We cannot assume the RPC URL is the same as the P2P address,
82    // so we use the RPC URL to look up the P2P listening address.
83    let listen_info = client
84        .get(tm_url.join("net_info").ok()?)
85        .send()
86        .await
87        .ok()?
88        .json::<serde_json::Value>()
89        .await
90        .ok()?
91        .get_mut("result")
92        .and_then(|v| v.get_mut("listeners"))
93        .and_then(|v| v.as_array())
94        .cloned()
95        .unwrap_or_default();
96
97    tracing::debug!(?listen_info, "found listener info from bootstrap node");
98    let first_entry = listen_info[0].as_str().unwrap_or_default();
99    let listen_addr = parse_tm_address_listener(first_entry)?;
100
101    // Next we'll look up the node_id, so we can assemble a self-authenticating
102    // Tendermint Address, in the form of <id>@<url>.
103    let node_id = client
104        .get(tm_url.join("status").ok()?)
105        .send()
106        .await
107        .ok()?
108        .json::<serde_json::Value>()
109        .await
110        .ok()?
111        .get_mut("result")
112        .and_then(|v| v.get_mut("node_info"))
113        .and_then(|v| v.get_mut("id"))
114        .ok_or_else(|| anyhow::anyhow!("could not parse JSON from response"))
115        .ok()?
116        .take();
117
118    let node_id: tendermint::node::Id = serde_json::value::from_value(node_id).ok()?;
119    tracing::debug!(?node_id, "fetched node id");
120
121    let listen_addr_url = Url::parse(&format!("{}", &listen_addr)).ok()?;
122    tracing::info!(
123        %listen_addr_url,
124        "fetched listener address for bootstrap node"
125    );
126    parse_tm_address(Some(&node_id), &listen_addr_url).ok()
127}
128
129/// Query the Tendermint node's RPC endpoint at `tm_url` and return a list
130/// of all known peers by their `external_address`es. Omits private/special
131/// addresses like `localhost` or `0.0.0.0`.
132pub async fn fetch_peers(tm_url: &Url) -> anyhow::Result<Vec<TendermintAddress>> {
133    let client = reqwest::Client::new();
134    let net_info_url = tm_url.join("net_info")?;
135    tracing::debug!(%net_info_url, "Fetching peers of bootstrap node");
136    let mut net_info_peers = client
137        .get(net_info_url)
138        .send()
139        .await?
140        .json::<serde_json::Value>()
141        .await?
142        .get("result")
143        .and_then(|v| v.get("peers"))
144        .and_then(|v| v.as_array())
145        .cloned()
146        .unwrap_or_default();
147
148    if net_info_peers.is_empty() {
149        tracing::warn!(
150            ?net_info_peers,
151            "bootstrap node reported 0 peers; we'll have no way to get blocks"
152        );
153    }
154
155    // Randomize the ordering of the peer candidates, so that different nodes
156    // joining are more likely to get different peers, as we select a subset.
157    net_info_peers.shuffle(&mut OsRng);
158
159    // We'll look for a handful of peers and use those in our config.
160    // We don't want to naively add all of the bootstrap node's peers,
161    // instead trusting gossip to find peers dynamically over time.
162    // We'll also special-case nodes that contain the string "seed" in their moniker:
163    // those nodes should be assumed to seed nodes. This is optimistic, but should result
164    // in more solid peering behavior than the previous strategy of declaring all fullnodes
165    // as seeds within the joining node's CometBFT config.
166    let threshold = 5;
167    let mut peers = Vec::new();
168    let mut seeds = Vec::new();
169
170    for raw_peer in net_info_peers {
171        tracing::debug!(?raw_peer, "Analyzing whether to include candidate peer");
172        let node_id: tendermint::node::Id = raw_peer
173            .get("node_info")
174            .and_then(|v| v.get("id"))
175            .and_then(|v| serde_json::value::from_value(v.clone()).ok())
176            .ok_or_else(|| anyhow::anyhow!("Could not parse node_info.id from JSON response"))?;
177
178        let listen_addr = raw_peer
179            .get("node_info")
180            .and_then(|v| v.get("listen_addr"))
181            .and_then(|v| v.as_str())
182            .ok_or_else(|| {
183                anyhow::anyhow!("Could not parse node_info.listen_addr from JSON response")
184            })?
185            // Depending on node config, there may or may not be a protocol prefix.
186            // Remove it so we can treat it as a SocketAddr when checking for internal/external.
187            .replace("tcp://", "");
188
189        let moniker = raw_peer
190            .get("node_info")
191            .and_then(|v| v.get("moniker"))
192            .and_then(|v| v.as_str())
193            .ok_or_else(|| {
194                anyhow::anyhow!("Could not parse node_info.moniker from JSON response")
195            })?;
196
197        // Filter out addresses that are obviously not external addresses.
198        if !address_could_be_external(&listen_addr) {
199            tracing::debug!(
200                ?listen_addr,
201                "Skipping candidate peer due to internal listener address"
202            );
203            continue;
204        }
205        // The API returns a str formatted as a SocketAddr; prepend protocol so we can handle
206        // as a URL. The Tendermint config template already includes the tcp:// prefix.
207        let laddr = format!("tcp://{}", listen_addr);
208        let listen_url = Url::parse(&laddr).context(format!(
209            "Failed to parse candidate tendermint addr as URL: {}",
210            listen_addr
211        ))?;
212        let peer_tm_address = parse_tm_address(Some(&node_id), &listen_url)?;
213        // A bit of optimism: any node with "seed" in its moniker gets to be a seed.
214        if moniker.contains("seed") {
215            tracing::debug!(
216                ?peer_tm_address,
217                moniker,
218                "Found self-described seed node in candidate peers"
219            );
220            seeds.push(peer_tm_address)
221        // Otherwise, we check if we've found enough.
222        } else if peers.len() <= threshold {
223            peers.push(peer_tm_address)
224        }
225    }
226    if peers.len() < threshold && seeds.is_empty() {
227        tracing::warn!(
228            "bootstrap node reported only {} peers, and 0 seeds; we may have trouble peering",
229            peers.len(),
230        );
231    }
232    // TODO handle seeds and peers differently. For now, all peers are used as seeds.
233    seeds.extend(peers);
234    Ok(seeds)
235}
236
237/// Download a gzipped tarball from a URL, and extract its contents as the starting state
238/// config for the fullnode. Allows bootstrapping from archived state, which is useful
239/// for nodes joining after a chain upgrade has been performed.
240///
241/// Supports archive files generated via `pd export`, which contain only the rocksdb dir,
242/// and via `pd migrate`, which contain the rocksdb dir, new genesis content, and a private
243/// validator state file.
244///
245/// The `output_dir` should be the same argument as passed to `pd network --network-dir <dir> join`;
246/// relative paths for pd and cometbft will be created from this base path.
247///
248/// The `leave_archive` argument allows you to keep the downloaded archive file after unpacking.
249pub async fn unpack_state_archive(
250    archive_url: Url,
251    output_dir: PathBuf,
252    leave_archive: bool,
253) -> anyhow::Result<()> {
254    let archive_filepath: std::path::PathBuf;
255    // Check whether URL points to a local file
256    if archive_url.scheme() == "file" {
257        tracing::info!(%archive_url, "extracting compressed node state from local file");
258        archive_filepath = archive_url.to_file_path().map_err(|e| {
259            tracing::error!(?e);
260            anyhow::anyhow!("failed to convert archive url to filepath")
261        })?;
262    } else {
263        // Download.
264        // Here we inspect HEAD so we can infer filename.
265        tracing::info!(%archive_url, "downloading compressed node state");
266        let response = reqwest::get(archive_url).await?;
267        let fname = response
268            .url()
269            .path_segments()
270            .and_then(|segments| segments.last())
271            .and_then(|name| if name.is_empty() { None } else { Some(name) })
272            .unwrap_or("pd-node-state-archive.tar.gz");
273
274        archive_filepath = output_dir.join(fname);
275        let mut download_opts = std::fs::OpenOptions::new();
276        download_opts.create_new(true).write(true);
277        let mut archive_file = download_opts.open(&archive_filepath)?;
278
279        // Download via stream, in case file is too large to shove into RAM.
280        let mut stream = response.bytes_stream();
281        while let Some(chunk_result) = stream.next().await {
282            let chunk = chunk_result?;
283            archive_file.write_all(&chunk)?;
284        }
285        archive_file.flush()?;
286        tracing::info!("download complete: {}", archive_filepath.display());
287    }
288
289    // Extract.
290    // Re-open downloaded file for unpacking, for a fresh filehandle.
291    let mut unpack_opts = std::fs::OpenOptions::new();
292    unpack_opts.read(true);
293    let f = unpack_opts
294        .open(&archive_filepath)
295        .context("failed to open local archive for extraction")?;
296    let tar = GzDecoder::new(f);
297    let mut archive = tar::Archive::new(tar);
298    // This dir-path building is duplicated in the config gen code.
299    let pd_home = output_dir.join("node0").join("pd");
300    archive
301        .unpack(&pd_home)
302        .context("failed to extract tar.gz archive")?;
303
304    // If the archive we consumed was generated via `pd migrate`, then it will contain
305    // a new genesis file and priv_validator_state.json, both of which should be applied
306    // over the generated cometbft config files. If the archive was generated via `pd export`,
307    // then those extra files will be missing, and only rocksdb data will be present.
308    let new_genesis = pd_home.join("genesis.json");
309    let new_val_state = pd_home.join("priv_validator_state.json");
310    let cometbft_dir = output_dir.join("node0").join("cometbft");
311    let copy_opts = fs_extra::dir::CopyOptions::new().overwrite(true);
312
313    if new_genesis.exists() {
314        tracing::info!(new_genesis = %new_genesis.display(), "copying new genesis content from archive");
315        let f = vec![new_genesis];
316        fs_extra::move_items(&f, cometbft_dir.join("config"), &copy_opts)?;
317    }
318    if new_val_state.exists() {
319        tracing::info!(new_val_state = %new_val_state.display(), "copying new priv_validator_state.json content from archive");
320        let f = vec![new_val_state];
321        fs_extra::move_items(&f, cometbft_dir.join("data"), &copy_opts)?;
322    }
323
324    tracing::info!("archived node state unpacked to {}", pd_home.display());
325
326    if !leave_archive {
327        // Post-extraction, clean up the downloaded tarball.
328        std::fs::remove_file(archive_filepath)?;
329    } else {
330        tracing::info!(path = ?archive_filepath, "leaving downloaded archive on disk");
331    }
332
333    Ok(())
334}
335
336/// Check whether SocketAddress spec is likely to be externally-accessible.
337/// Filters out RFC1918 and loopback addresses. Requires an address and port.
338// TODO: This should return a Result, to be clearer about the expectation
339// of a SocketAddr, rather than an IpAddr, as arg.
340fn address_could_be_external(address: &str) -> bool {
341    let addr = address.parse::<SocketAddr>().ok();
342    match addr {
343        Some(a) => match a.ip() {
344            IpAddr::V4(ip) => !(ip.is_private() || ip.is_loopback() || ip.is_unspecified()),
345            IpAddr::V6(ip) => !(ip.is_loopback() || ip.is_unspecified()),
346        },
347        _ => false,
348    }
349}
350
351/// Extract a [TendermintAddress] obtained from the RPC `/net_info` endpoint.
352/// The raw value is a String formatted as:
353///
354///   * `Listener(@35.226.255.25:26656)` or
355///   * `Listener(@tcp://35.226.255.25:26656)` or
356///   * `Listener(@)`
357///
358/// It may be possible for a node [Id] to proceed the `@`.
359pub fn parse_tm_address_listener(s: &str) -> Option<TendermintAddress> {
360    let re = regex::Regex::new(r"Listener\(.*@(tcp://)?(.*)\)").ok()?;
361    let groups = re
362        .captures(s)
363        .expect("tendermint listener address from net_info endpoint is valid");
364    let r: Option<String> = groups.get(2).map(|m| m.as_str().to_string());
365    match r {
366        Some(t) => {
367            // Haven't observed a local addr in Listener field, but let's make sure
368            // it's a public addr
369            if address_could_be_external(&t) {
370                t.parse::<TendermintAddress>().ok()
371            } else {
372                None
373            }
374        }
375        None => None,
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    // The '35.226.255.25' IPv4 address used throughout these tests is a reserved
383    // GCP IP, used by Penumbra Labs on the testnet cluster.
384    #[test]
385    fn external_address_detection() {
386        assert!(!address_could_be_external("127.0.0.1"));
387        assert!(!address_could_be_external("0.0.0.0"));
388        assert!(!address_could_be_external("0.0.0.0:80"));
389        assert!(!address_could_be_external("192.168.4.1:26657"));
390        assert!(!address_could_be_external("35.226.255.25"));
391        assert!(address_could_be_external("35.226.255.25:26657"));
392    }
393
394    // Some of these tests duplicate tests in the upstream Tendermint crates.
395    // The underlying structs upstream are mostly just Strings, though, and since
396    // our code wrangles these types for interpolation in config files from multiple
397    // API endpoint sources, it's important to validate expected behavior.
398    #[test]
399    fn parse_tendermint_address_tcp() -> anyhow::Result<()> {
400        let tm1 = parse_tm_address(None, &Url::parse("tcp://35.226.255.25:26656")?)?;
401        if let TendermintAddress::Tcp {
402            peer_id,
403            host,
404            port,
405        } = tm1
406        {
407            assert!(peer_id.is_none());
408            assert!(port == 26656);
409            assert!(host == "35.226.255.25");
410        }
411        Ok(())
412    }
413    #[test]
414    // The Tendermint RPC net_info endpoint will return Listener information
415    // formatted as:
416    //
417    //   * `Listener(@35.226.255.25:26656)` or
418    //   * `Listener(@tcp://35.226.255.25:26656)` or
419    //   * `Listener(@)`
420    //
421    // I've yet to observe a node_id preceding the `@`.
422    fn parse_tendermint_address_listener() -> anyhow::Result<()> {
423        let l1 = "Listener(@35.226.255.25:26656)";
424        let r1 = parse_tm_address_listener(l1);
425        assert!(r1 == Some("35.226.255.25:26656".parse::<TendermintAddress>()?));
426
427        let l2 = "Listener(tcp://@35.226.255.25:26656)";
428        let r2 = parse_tm_address_listener(l2);
429        assert!(r2 == Some("tcp://35.226.255.25:26656".parse::<TendermintAddress>()?));
430
431        let l3 = "Listener(@)";
432        let r3 = parse_tm_address_listener(l3);
433        assert!(r3.is_none());
434
435        Ok(())
436    }
437    #[test]
438    fn parse_tendermint_address_from_listener() -> anyhow::Result<()> {
439        // Most upstream Tendermint types are just String structs, so there's
440        // no handling of the `Listener()` wrapper. We must regex it out.
441        let l = tendermint::node::info::ListenAddress::new(
442            "Listener(@35.226.255.25:26656)".to_string(),
443        );
444        let tm1 = TendermintAddress::from_listen_address(&l);
445        assert!(tm1.is_none());
446        Ok(())
447    }
448}