1use anyhow::Context;
4use rand::seq::SliceRandom;
5use rand_core::OsRng;
6use std::net::{IpAddr, SocketAddr};
7use std::path::PathBuf;
8use tendermint_config::net::Address as TendermintAddress;
9use url::Url;
10
11use flate2::read::GzDecoder;
12use std::io::Write;
13use tokio_stream::StreamExt;
14
15use crate::network::config::{parse_tm_address, NetworkTendermintConfig};
16use crate::network::generate::NetworkValidator;
17
18pub async fn network_join(
22 output_dir: PathBuf,
23 node: Url,
24 node_name: &str,
25 external_address: Option<TendermintAddress>,
26 tm_rpc_bind: SocketAddr,
27 tm_p2p_bind: SocketAddr,
28) -> anyhow::Result<()> {
29 let mut node_dir = output_dir;
30 node_dir.push("node0");
31 let genesis_url = node.join("genesis")?;
32 tracing::info!(%genesis_url, "fetching genesis");
33 let client = reqwest::Client::new();
36 let genesis_json = client
37 .get(genesis_url)
38 .send()
39 .await?
40 .json::<serde_json::Value>()
41 .await?
42 .get_mut("result")
43 .and_then(|v| v.get_mut("genesis"))
44 .ok_or_else(|| anyhow::anyhow!("could not parse JSON from response"))?
45 .take();
46 let genesis = serde_json::value::from_value(genesis_json)?;
47 tracing::info!("fetched genesis");
48
49 let mut peers = Vec::new();
52 if let Some(node_tm_address) = fetch_listen_address(&node).await {
53 peers.push(node_tm_address);
54 } else {
55 tracing::warn!("Failed to find listenaddr for {}", &node);
58 }
59 let new_peers = fetch_peers(&node).await?;
60 peers.extend(new_peers);
61 tracing::info!(?peers, "Network peers for inclusion in generated configs");
62
63 let tm_config = NetworkTendermintConfig::new(
64 node_name,
65 peers,
66 external_address,
67 Some(tm_rpc_bind),
68 Some(tm_p2p_bind),
69 )?;
70
71 let tv = NetworkValidator::default();
72 tm_config.write_config(node_dir, &tv, &genesis)?;
73 Ok(())
74}
75
76pub async fn fetch_listen_address(tm_url: &Url) -> Option<TendermintAddress> {
80 let client = reqwest::Client::new();
81 let listen_info = client
84 .get(tm_url.join("net_info").ok()?)
85 .send()
86 .await
87 .ok()?
88 .json::<serde_json::Value>()
89 .await
90 .ok()?
91 .get_mut("result")
92 .and_then(|v| v.get_mut("listeners"))
93 .and_then(|v| v.as_array())
94 .cloned()
95 .unwrap_or_default();
96
97 tracing::debug!(?listen_info, "found listener info from bootstrap node");
98 let first_entry = listen_info[0].as_str().unwrap_or_default();
99 let listen_addr = parse_tm_address_listener(first_entry)?;
100
101 let node_id = client
104 .get(tm_url.join("status").ok()?)
105 .send()
106 .await
107 .ok()?
108 .json::<serde_json::Value>()
109 .await
110 .ok()?
111 .get_mut("result")
112 .and_then(|v| v.get_mut("node_info"))
113 .and_then(|v| v.get_mut("id"))
114 .ok_or_else(|| anyhow::anyhow!("could not parse JSON from response"))
115 .ok()?
116 .take();
117
118 let node_id: tendermint::node::Id = serde_json::value::from_value(node_id).ok()?;
119 tracing::debug!(?node_id, "fetched node id");
120
121 let listen_addr_url = Url::parse(&format!("{}", &listen_addr)).ok()?;
122 tracing::info!(
123 %listen_addr_url,
124 "fetched listener address for bootstrap node"
125 );
126 parse_tm_address(Some(&node_id), &listen_addr_url).ok()
127}
128
129pub async fn fetch_peers(tm_url: &Url) -> anyhow::Result<Vec<TendermintAddress>> {
133 let client = reqwest::Client::new();
134 let net_info_url = tm_url.join("net_info")?;
135 tracing::debug!(%net_info_url, "Fetching peers of bootstrap node");
136 let mut net_info_peers = client
137 .get(net_info_url)
138 .send()
139 .await?
140 .json::<serde_json::Value>()
141 .await?
142 .get("result")
143 .and_then(|v| v.get("peers"))
144 .and_then(|v| v.as_array())
145 .cloned()
146 .unwrap_or_default();
147
148 if net_info_peers.is_empty() {
149 tracing::warn!(
150 ?net_info_peers,
151 "bootstrap node reported 0 peers; we'll have no way to get blocks"
152 );
153 }
154
155 net_info_peers.shuffle(&mut OsRng);
158
159 let threshold = 5;
167 let mut peers = Vec::new();
168 let mut seeds = Vec::new();
169
170 for raw_peer in net_info_peers {
171 tracing::debug!(?raw_peer, "Analyzing whether to include candidate peer");
172 let node_id: tendermint::node::Id = raw_peer
173 .get("node_info")
174 .and_then(|v| v.get("id"))
175 .and_then(|v| serde_json::value::from_value(v.clone()).ok())
176 .ok_or_else(|| anyhow::anyhow!("Could not parse node_info.id from JSON response"))?;
177
178 let listen_addr = raw_peer
179 .get("node_info")
180 .and_then(|v| v.get("listen_addr"))
181 .and_then(|v| v.as_str())
182 .ok_or_else(|| {
183 anyhow::anyhow!("Could not parse node_info.listen_addr from JSON response")
184 })?
185 .replace("tcp://", "");
188
189 let moniker = raw_peer
190 .get("node_info")
191 .and_then(|v| v.get("moniker"))
192 .and_then(|v| v.as_str())
193 .ok_or_else(|| {
194 anyhow::anyhow!("Could not parse node_info.moniker from JSON response")
195 })?;
196
197 if !address_could_be_external(&listen_addr) {
199 tracing::debug!(
200 ?listen_addr,
201 "Skipping candidate peer due to internal listener address"
202 );
203 continue;
204 }
205 let laddr = format!("tcp://{}", listen_addr);
208 let listen_url = Url::parse(&laddr).context(format!(
209 "Failed to parse candidate tendermint addr as URL: {}",
210 listen_addr
211 ))?;
212 let peer_tm_address = parse_tm_address(Some(&node_id), &listen_url)?;
213 if moniker.contains("seed") {
215 tracing::debug!(
216 ?peer_tm_address,
217 moniker,
218 "Found self-described seed node in candidate peers"
219 );
220 seeds.push(peer_tm_address)
221 } else if peers.len() <= threshold {
223 peers.push(peer_tm_address)
224 }
225 }
226 if peers.len() < threshold && seeds.is_empty() {
227 tracing::warn!(
228 "bootstrap node reported only {} peers, and 0 seeds; we may have trouble peering",
229 peers.len(),
230 );
231 }
232 seeds.extend(peers);
234 Ok(seeds)
235}
236
237pub async fn unpack_state_archive(
250 archive_url: Url,
251 output_dir: PathBuf,
252 leave_archive: bool,
253) -> anyhow::Result<()> {
254 let archive_filepath: std::path::PathBuf;
255 if archive_url.scheme() == "file" {
257 tracing::info!(%archive_url, "extracting compressed node state from local file");
258 archive_filepath = archive_url.to_file_path().map_err(|e| {
259 tracing::error!(?e);
260 anyhow::anyhow!("failed to convert archive url to filepath")
261 })?;
262 } else {
263 tracing::info!(%archive_url, "downloading compressed node state");
266 let response = reqwest::get(archive_url).await?;
267 let fname = response
268 .url()
269 .path_segments()
270 .and_then(|segments| segments.last())
271 .and_then(|name| if name.is_empty() { None } else { Some(name) })
272 .unwrap_or("pd-node-state-archive.tar.gz");
273
274 archive_filepath = output_dir.join(fname);
275 let mut download_opts = std::fs::OpenOptions::new();
276 download_opts.create_new(true).write(true);
277 let mut archive_file = download_opts.open(&archive_filepath)?;
278
279 let mut stream = response.bytes_stream();
281 while let Some(chunk_result) = stream.next().await {
282 let chunk = chunk_result?;
283 archive_file.write_all(&chunk)?;
284 }
285 archive_file.flush()?;
286 tracing::info!("download complete: {}", archive_filepath.display());
287 }
288
289 let mut unpack_opts = std::fs::OpenOptions::new();
292 unpack_opts.read(true);
293 let f = unpack_opts
294 .open(&archive_filepath)
295 .context("failed to open local archive for extraction")?;
296 let tar = GzDecoder::new(f);
297 let mut archive = tar::Archive::new(tar);
298 let pd_home = output_dir.join("node0").join("pd");
300 archive
301 .unpack(&pd_home)
302 .context("failed to extract tar.gz archive")?;
303
304 let new_genesis = pd_home.join("genesis.json");
309 let new_val_state = pd_home.join("priv_validator_state.json");
310 let cometbft_dir = output_dir.join("node0").join("cometbft");
311 let copy_opts = fs_extra::dir::CopyOptions::new().overwrite(true);
312
313 if new_genesis.exists() {
314 tracing::info!(new_genesis = %new_genesis.display(), "copying new genesis content from archive");
315 let f = vec![new_genesis];
316 fs_extra::move_items(&f, cometbft_dir.join("config"), ©_opts)?;
317 }
318 if new_val_state.exists() {
319 tracing::info!(new_val_state = %new_val_state.display(), "copying new priv_validator_state.json content from archive");
320 let f = vec![new_val_state];
321 fs_extra::move_items(&f, cometbft_dir.join("data"), ©_opts)?;
322 }
323
324 tracing::info!("archived node state unpacked to {}", pd_home.display());
325
326 if !leave_archive {
327 std::fs::remove_file(archive_filepath)?;
329 } else {
330 tracing::info!(path = ?archive_filepath, "leaving downloaded archive on disk");
331 }
332
333 Ok(())
334}
335
336fn address_could_be_external(address: &str) -> bool {
341 let addr = address.parse::<SocketAddr>().ok();
342 match addr {
343 Some(a) => match a.ip() {
344 IpAddr::V4(ip) => !(ip.is_private() || ip.is_loopback() || ip.is_unspecified()),
345 IpAddr::V6(ip) => !(ip.is_loopback() || ip.is_unspecified()),
346 },
347 _ => false,
348 }
349}
350
351pub fn parse_tm_address_listener(s: &str) -> Option<TendermintAddress> {
360 let re = regex::Regex::new(r"Listener\(.*@(tcp://)?(.*)\)").ok()?;
361 let groups = re
362 .captures(s)
363 .expect("tendermint listener address from net_info endpoint is valid");
364 let r: Option<String> = groups.get(2).map(|m| m.as_str().to_string());
365 match r {
366 Some(t) => {
367 if address_could_be_external(&t) {
370 t.parse::<TendermintAddress>().ok()
371 } else {
372 None
373 }
374 }
375 None => None,
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 #[test]
385 fn external_address_detection() {
386 assert!(!address_could_be_external("127.0.0.1"));
387 assert!(!address_could_be_external("0.0.0.0"));
388 assert!(!address_could_be_external("0.0.0.0:80"));
389 assert!(!address_could_be_external("192.168.4.1:26657"));
390 assert!(!address_could_be_external("35.226.255.25"));
391 assert!(address_could_be_external("35.226.255.25:26657"));
392 }
393
394 #[test]
399 fn parse_tendermint_address_tcp() -> anyhow::Result<()> {
400 let tm1 = parse_tm_address(None, &Url::parse("tcp://35.226.255.25:26656")?)?;
401 if let TendermintAddress::Tcp {
402 peer_id,
403 host,
404 port,
405 } = tm1
406 {
407 assert!(peer_id.is_none());
408 assert!(port == 26656);
409 assert!(host == "35.226.255.25");
410 }
411 Ok(())
412 }
413 #[test]
414 fn parse_tendermint_address_listener() -> anyhow::Result<()> {
423 let l1 = "Listener(@35.226.255.25:26656)";
424 let r1 = parse_tm_address_listener(l1);
425 assert!(r1 == Some("35.226.255.25:26656".parse::<TendermintAddress>()?));
426
427 let l2 = "Listener(tcp://@35.226.255.25:26656)";
428 let r2 = parse_tm_address_listener(l2);
429 assert!(r2 == Some("tcp://35.226.255.25:26656".parse::<TendermintAddress>()?));
430
431 let l3 = "Listener(@)";
432 let r3 = parse_tm_address_listener(l3);
433 assert!(r3.is_none());
434
435 Ok(())
436 }
437 #[test]
438 fn parse_tendermint_address_from_listener() -> anyhow::Result<()> {
439 let l = tendermint::node::info::ListenAddress::new(
442 "Listener(@35.226.255.25:26656)".to_string(),
443 );
444 let tm1 = TendermintAddress::from_listen_address(&l);
445 assert!(tm1.is_none());
446 Ok(())
447 }
448}