1use anyhow::{anyhow, Result};
2use penumbra_sdk_transaction::AuthorizationData;
3use rand_core::OsRng;
4use serde::Serialize;
5use tonic::{async_trait, Request, Response, Status};
6
7use penumbra_sdk_keys::{keys::AddressIndex, Address, FullViewingKey};
8use penumbra_sdk_proto::{custody::v1 as pb, DomainType};
9
10use crate::{AuthorizeRequest, AuthorizeValidatorDefinitionRequest, AuthorizeValidatorVoteRequest};
11
12pub use self::config::Config;
13use self::sign::no_signature_response;
14pub use crate::terminal::{SigningRequest, Terminal};
15
16mod config;
17mod dkg;
18mod sign;
19
20#[derive(Clone, Debug)]
23pub enum SigningResponse {
24 Transaction(AuthorizationData),
26 ValidatorDefinition(decaf377_rdsa::Signature<decaf377_rdsa::SpendAuth>),
28 ValidatorVote(decaf377_rdsa::Signature<decaf377_rdsa::SpendAuth>),
30}
31
32impl From<AuthorizationData> for SigningResponse {
33 fn from(msg: AuthorizationData) -> Self {
34 Self::Transaction(msg)
35 }
36}
37
38fn to_json<T>(data: &T) -> Result<String>
39where
40 T: DomainType,
41 anyhow::Error: From<<T as TryFrom<<T as DomainType>::Proto>>::Error>,
42 <T as DomainType>::Proto: Serialize,
43{
44 Ok(serde_json::to_string(&data.to_proto())?)
45}
46
47pub async fn follow(
52 config: Option<&Config>,
53 governance_config: Option<&Config>,
54 terminal: &impl Terminal,
55) -> Result<()> {
56 terminal.explain("Paste the coordinator's first message:")?;
58 let round1_message = terminal.next_response::<sign::CoordinatorRound1>().await?;
59 let config = match round1_message.signing_request() {
61 SigningRequest::TransactionPlan(_) => config.ok_or(anyhow!(
62 "cannot threshold sign transaction using a non-threshold custody backend"
63 ))?,
64 SigningRequest::ValidatorDefinition(_) => config.ok_or(anyhow!(
65 "cannot threshold sign validator definition using a non-threshold custody backend"
66 ))?,
67 SigningRequest::ValidatorVote(_) => governance_config.ok_or(anyhow!(
68 "cannot threshold sign validator vote using a non-threshold validator governance custody backend"
69 ))?,
70 };
71 if !terminal
72 .confirm_request(round1_message.signing_request())
73 .await?
74 {
75 return Ok(());
76 }
77 let (round1_reply, round1_state) = sign::follower_round1(&mut OsRng, config, round1_message)?;
78 terminal.explain("Send this message to the coordinator:")?;
79 terminal.broadcast(&to_json(&round1_reply)?).await?;
80 terminal.explain("Paste the coordinator's second message:")?;
82 let round2_message = terminal.next_response::<sign::CoordinatorRound2>().await?;
83 let round2_reply = sign::follower_round2(config, round1_state, round2_message)?;
84 terminal.explain("Send this message to the coordinator:")?;
85 terminal.broadcast(&to_json(&round2_reply)?).await?;
86
87 Ok(())
88}
89
90pub async fn dkg(t: u16, n: u16, terminal: &impl Terminal) -> Result<Config> {
98 let expected_responses = n.saturating_sub(1) as usize;
99 let (round1_message, state) = dkg::round1(&mut OsRng, t, n)?;
101 terminal.explain("Round 1/2: Send this message to all other participants:")?;
102 terminal.broadcast(&to_json(&round1_message)?).await?;
103 terminal.explain(&format!(
105 "Round 1/2: Gather {expected_responses} messages from the other participants:"
106 ))?;
107 let round1_replies = {
108 let mut acc: Vec<dkg::Round1> = Vec::new();
109 while acc.len() < expected_responses {
110 let rsp = terminal.next_response::<dkg::Round1>().await?;
111 if acc
113 .iter()
114 .any(|existing| existing.encode_to_vec() == rsp.encode_to_vec())
116 {
117 terminal.explain("Received a duplicate message, ignoring")?;
118 continue;
119 }
120 if round1_message.encode_to_vec() == rsp.encode_to_vec() {
122 terminal.explain("Received our own outbound message by mistake, ignoring")?;
123 continue;
124 }
125 acc.push(rsp);
126 terminal.explain(&format!(
127 "Received {}/{} responses...",
128 acc.len(),
129 expected_responses
130 ))?;
131 }
132 acc
133 };
134
135 let (round2_message, state) = dkg::round2(&mut OsRng, state, round1_replies)?;
137 terminal.explain("Round 2/2: Send this message to all other participants:")?;
138 terminal.broadcast(&to_json(&round2_message)?).await?;
139 terminal.explain(&format!(
141 "Round 2/2: Gather {expected_responses} messages from the other participants:"
142 ))?;
143 let round2_replies = {
144 let mut acc: Vec<dkg::Round2> = Vec::new();
145 while acc.len() < expected_responses {
146 let rsp = terminal.next_response::<dkg::Round2>().await?;
147 if acc
149 .iter()
150 .any(|existing| existing.encode_to_vec() == rsp.encode_to_vec())
152 {
153 terminal.explain("Received a duplicate message, ignoring")?;
154 continue;
155 }
156 if round2_message.encode_to_vec() == rsp.encode_to_vec() {
158 terminal.explain("Received our own outbound message by mistake, ignoring")?;
159 continue;
160 }
161 acc.push(rsp);
162 terminal.explain(&format!(
163 "Received {}/{} responses...",
164 acc.len(),
165 expected_responses
166 ))?;
167 }
168 acc
169 };
170 dkg::round3(&mut OsRng, state, round2_replies)
171}
172
173pub struct Threshold<T> {
180 config: Config,
181 terminal: T,
182}
183
184impl<T> Threshold<T> {
185 pub fn new(config: Config, terminal: T) -> Self {
186 Threshold { config, terminal }
187 }
188}
189
190impl<T: Terminal> Threshold<T> {
191 async fn authorize(&self, request: SigningRequest) -> Result<SigningResponse> {
193 if let Some(out) = no_signature_response(self.config.fvk(), &request)? {
196 return Ok(out);
197 }
198 let (round1_message, state1) = sign::coordinator_round1(&mut OsRng, &self.config, request)?;
200 self.terminal
201 .explain("Send this message to the other signers:")?;
202 self.terminal.broadcast(&to_json(&round1_message)?).await?;
203 self.terminal.explain(&format!(
204 "Now, gather at least {} replies from the other signers, and paste them below:",
205 self.config.threshold() - 1
206 ))?;
207 let round1_replies = {
208 let mut acc = Vec::<sign::FollowerRound1>::new();
209 for _ in 1..self.config.threshold() {
211 acc.push(self.terminal.next_response().await?);
212 }
213 acc
214 };
215 let (round2_message, state2) =
217 sign::coordinator_round2(&self.config, state1, &round1_replies)?;
218 self.terminal
219 .explain("Send this message to the other signers:")?;
220 self.terminal.broadcast(&to_json(&round2_message)?).await?;
221 self.terminal.explain(
222 "Now, gather the replies from the *same* signers as Round 1, and paste them below:",
223 )?;
224 let round2_replies = {
225 let mut acc = Vec::<sign::FollowerRound2>::new();
226 for _ in 1..self.config.threshold() {
228 acc.push(self.terminal.next_response().await?);
229 }
230 acc
231 };
232 sign::coordinator_round3(&self.config, state2, &round2_replies)
234 }
235
236 fn export_full_viewing_key(&self) -> FullViewingKey {
238 self.config.fvk().clone()
239 }
240
241 fn confirm_address(&self, index: AddressIndex) -> Address {
245 self.config.fvk().payment_address(index).0
246 }
247}
248
249#[async_trait]
250impl<T: Terminal + Sync + Send + 'static> pb::custody_service_server::CustodyService
251 for Threshold<T>
252{
253 async fn authorize(
254 &self,
255 request: Request<pb::AuthorizeRequest>,
256 ) -> Result<Response<pb::AuthorizeResponse>, Status> {
257 let request: AuthorizeRequest = request
258 .into_inner()
259 .try_into()
260 .map_err(|e| Status::invalid_argument(format!("{e}")))?;
261 let data = self
262 .authorize(SigningRequest::TransactionPlan(request.plan))
263 .await
264 .map_err(|e| {
265 Status::internal(format!(
266 "Failed to process transaction authorization request: {e}"
267 ))
268 })?;
269 let SigningResponse::Transaction(data) = data else {
270 return Err(Status::internal(
271 "expected transaction authorization but custody service returned another kind of authorization data"
272 .to_string()
273 ));
274 };
275 Ok(Response::new(pb::AuthorizeResponse {
276 data: Some(data.into()),
277 }))
278 }
279
280 async fn authorize_validator_definition(
281 &self,
282 request: Request<pb::AuthorizeValidatorDefinitionRequest>,
283 ) -> Result<Response<pb::AuthorizeValidatorDefinitionResponse>, Status> {
284 let request: AuthorizeValidatorDefinitionRequest = request
285 .into_inner()
286 .try_into()
287 .map_err(|e| Status::invalid_argument(format!("{e}")))?;
288 let data = self
289 .authorize(SigningRequest::ValidatorDefinition(
290 request.validator_definition,
291 ))
292 .await
293 .map_err(|e| {
294 Status::internal(format!(
295 "Failed to process validator definition authorization request: {e}"
296 ))
297 })?;
298 let SigningResponse::ValidatorDefinition(validator_definition_auth) = data else {
299 return Err(Status::internal(
300 "expected validator definition authorization but custody service returned another kind of authorization data".to_string()
301 ));
302 };
303 Ok(Response::new(pb::AuthorizeValidatorDefinitionResponse {
304 validator_definition_auth: Some(validator_definition_auth.into()),
305 }))
306 }
307
308 async fn authorize_validator_vote(
309 &self,
310 request: Request<pb::AuthorizeValidatorVoteRequest>,
311 ) -> Result<Response<pb::AuthorizeValidatorVoteResponse>, Status> {
312 let request: AuthorizeValidatorVoteRequest = request
313 .into_inner()
314 .try_into()
315 .map_err(|e| Status::invalid_argument(format!("{e}")))?;
316 let data = self
317 .authorize(SigningRequest::ValidatorVote(request.validator_vote))
318 .await
319 .map_err(|e| {
320 Status::internal(format!(
321 "Failed to process validator vote authorization request: {e}"
322 ))
323 })?;
324 let SigningResponse::ValidatorVote(validator_vote_auth) = data else {
325 return Err(Status::internal(
326 "expected validator vote authorization but custody service returned another kind of authorization data".to_string()
327 ));
328 };
329 Ok(Response::new(pb::AuthorizeValidatorVoteResponse {
330 validator_vote_auth: Some(validator_vote_auth.into()),
331 }))
332 }
333
334 async fn export_full_viewing_key(
335 &self,
336 _request: Request<pb::ExportFullViewingKeyRequest>,
337 ) -> Result<Response<pb::ExportFullViewingKeyResponse>, Status> {
338 let fvk = self.export_full_viewing_key();
339 Ok(Response::new(pb::ExportFullViewingKeyResponse {
340 full_viewing_key: Some(fvk.into()),
341 }))
342 }
343
344 async fn confirm_address(
345 &self,
346 request: Request<pb::ConfirmAddressRequest>,
347 ) -> Result<Response<pb::ConfirmAddressResponse>, Status> {
348 let index = request
349 .into_inner()
350 .address_index
351 .ok_or(anyhow!("ConfirmAddressRequest missing address_index"))
352 .and_then(|x| x.try_into())
353 .map_err(|e| Status::invalid_argument(format!("{e}")))?;
354 let address = self.confirm_address(index);
355 Ok(Response::new(pb::ConfirmAddressResponse {
356 address: Some(address.into()),
357 }))
358 }
359}
360
361#[cfg(test)]
362mod test {
363 use std::collections::HashMap;
364
365 use penumbra_sdk_transaction::TransactionPlan;
366
367 use tokio::sync;
368
369 use super::*;
370
371 struct FollowerTerminal {
372 incoming: sync::Mutex<sync::mpsc::Receiver<String>>,
373 outgoing: sync::mpsc::Sender<String>,
374 }
375
376 #[async_trait]
377 impl Terminal for FollowerTerminal {
378 async fn confirm_request(&self, _request: &SigningRequest) -> Result<bool> {
379 Ok(true)
380 }
381
382 fn explain(&self, _msg: &str) -> Result<()> {
383 Ok(())
384 }
385
386 async fn broadcast(&self, data: &str) -> Result<()> {
387 self.outgoing.send(data.to_owned()).await?;
388 Ok(())
389 }
390
391 async fn read_line_raw(&self) -> Result<String> {
392 Ok(self.incoming.lock().await.recv().await.unwrap_or_default())
393 }
394
395 async fn get_password(&self) -> Result<String> {
396 Ok(Default::default())
397 }
398 }
399
400 struct CoordinatorTerminalInner {
401 incoming: Vec<sync::mpsc::Receiver<String>>,
402 i: usize,
403 }
404
405 impl CoordinatorTerminalInner {
406 async fn recv(&mut self) -> Option<String> {
407 let out = self.incoming[self.i].recv().await;
408 self.i = (self.i + 1) % self.incoming.len();
409 out
410 }
411 }
412
413 struct CoordinatorTerminal {
414 incoming: sync::Mutex<CoordinatorTerminalInner>,
415 outgoing: Vec<sync::mpsc::Sender<String>>,
416 }
417
418 #[async_trait]
419 impl Terminal for CoordinatorTerminal {
420 async fn confirm_request(&self, _request: &SigningRequest) -> Result<bool> {
421 Ok(true)
422 }
423
424 fn explain(&self, _msg: &str) -> Result<()> {
425 Ok(())
426 }
427
428 async fn broadcast(&self, data: &str) -> Result<()> {
429 for out in &self.outgoing {
430 out.send(data.to_owned()).await?;
431 }
432 Ok(())
433 }
434
435 async fn read_line_raw(&self) -> Result<String> {
436 Ok(self.incoming.lock().await.recv().await.unwrap_or_default())
437 }
438
439 async fn get_password(&self) -> Result<String> {
440 Ok(Default::default())
441 }
442 }
443
444 fn make_terminals(follower_count: usize) -> (CoordinatorTerminal, Vec<FollowerTerminal>) {
445 let mut followers = Vec::new();
446 let mut incoming = Vec::new();
447 let mut outgoing = Vec::new();
448 for _ in 0..follower_count {
449 let (c2f_send, c2f_recv) = sync::mpsc::channel(1);
450 let (f2c_send, f2c_recv) = sync::mpsc::channel(1);
451 followers.push(FollowerTerminal {
452 incoming: sync::Mutex::new(f2c_recv),
453 outgoing: c2f_send,
454 });
455 incoming.push(c2f_recv);
456 outgoing.push(f2c_send);
457 }
458 let coordinator = CoordinatorTerminal {
459 incoming: sync::Mutex::new(CoordinatorTerminalInner { incoming, i: 0 }),
460 outgoing,
461 };
462 (coordinator, followers)
463 }
464
465 fn make_symmetric_terminals(count: usize) -> Vec<CoordinatorTerminal> {
466 let mut sending = HashMap::new();
468 let mut recving = HashMap::new();
469 for i in 0..count {
470 for j in 0..count {
471 let (send, recv) = sync::mpsc::channel(1);
472 sending.insert((i, j), send);
473 recving.insert((i, j), recv);
474 }
475 }
476 let mut out = Vec::new();
477 for i in 0..count {
478 let incoming = (0..count)
479 .filter(|&j| j != i)
480 .map(|j| recving.remove(&(j, i)).unwrap())
481 .collect();
482 let outgoing = (0..count)
483 .filter(|&j| j != i)
484 .map(|j| sending.remove(&(i, j)).unwrap())
485 .collect();
486 let coordinator = CoordinatorTerminal {
487 incoming: sync::Mutex::new(CoordinatorTerminalInner { incoming, i: 0 }),
488 outgoing,
489 };
490 out.push(coordinator);
491 }
492 out
493 }
494
495 async fn run_dkg(t: u16, n: u16) -> Result<Vec<Config>> {
496 let terminals = make_symmetric_terminals(n as usize);
497 let mut handles = Vec::new();
498 for terminal in terminals {
499 handles.push(tokio::spawn(async move { dkg(t, n, &terminal).await }));
500 }
501 let mut out = Vec::new();
502 for handle in handles {
503 out.push(handle.await??);
504 }
505 Ok(out)
506 }
507
508 #[tokio::test]
509 async fn test_dkg_produces_identical_fvks() -> Result<()> {
510 const T: u16 = 3;
511 const N: u16 = 3;
512 let (first_config, configs) = {
513 let mut configs = run_dkg(T, N).await?;
514 let first = configs.pop().unwrap();
515 (first, configs)
516 };
517 for config in configs {
518 assert_eq!(first_config.fvk(), config.fvk());
519 }
520 Ok(())
521 }
522
523 #[tokio::test]
524 async fn test_transaction_signing() -> Result<()> {
525 const TEST_PLAN: &'static str = r#"
526{
527 "actions": [
528 {
529 "output": {
530 "value": {
531 "amount": {
532 "lo": "1000000000"
533 },
534 "assetId": {
535 "inner": "KeqcLzNx9qSH5+lcJHBB9KNW+YPrBk5dKzvPMiypahA="
536 }
537 },
538 "destAddress": {
539 "inner": "UuFEV0VoZNxNTttsJVJzRqEzW4bm0z2RCxhUneve0KTvDjQipeg/1zx0ftbDjgr6uPiSA70yJIdlpFyxeLyXfAAtmSy6BCpR3YjEkf1bI5Q="
540 },
541 "rseed": "4m4bxumA0sHuonPjr12UnI4CWKj1wuq4y6rrMRb0nw0=",
542 "valueBlinding": "HHS7tY19JuWMwdKJvtKs8AmhMVa7osSpZ+CCBszu/AE=",
543 "proofBlindingR": "FmbXZoh5Pd2mEtiAEkkAZpllWo9pdwTPlXeODBXHUxA=",
544 "proofBlindingS": "0x96kUchW8jFfnxglAoMtvzPT5/RLg2RvfkRKjlU8BA="
545 }
546 },
547 {
548 "spend": {
549 "note": {
550 "value": {
551 "amount": {
552 "lo": "1000000000000"
553 },
554 "assetId": {
555 "inner": "KeqcLzNx9qSH5+lcJHBB9KNW+YPrBk5dKzvPMiypahA="
556 }
557 },
558 "rseed": "3svSxWREwvvVzb2upQuu3Cyr56O2kRbo0nuX4+OWcdc=",
559 "address": {
560 "inner": "6146pY5upA9bQa4tag+6hXpMXa2kO5fcicSJGVEUP4HhZt7m4FpwAJ3+qwr5gpbHUON7DigyEJRpeV31FATGdfJhHBzGDWC+CIvi8dyIzGo="
561 }
562 },
563 "position": "90",
564 "randomizer": "dJvg8FGvw5rJAvtSQvlQ4imLXahVXn419+xroVMLSwA=",
565 "valueBlinding": "Ce1/hBKLEMB/bjEA06b4zUJVEstNUjkDBWM3WrVu+QM=",
566 "proofBlindingR": "gXA7M4VR48IoxKrf4w4jGae2O7OGlTecU/RBXd4g6QI=",
567 "proofBlindingS": "7+Rhrve7mdgsKbkfFq41yfq9+Mx2qRAZDtwP3VUDAAs="
568 }
569 },
570 {
571 "output": {
572 "value": {
573 "amount": {
574 "lo": "999000000000"
575 },
576 "assetId": {
577 "inner": "KeqcLzNx9qSH5+lcJHBB9KNW+YPrBk5dKzvPMiypahA="
578 }
579 },
580 "destAddress": {
581 "inner": "6146pY5upA9bQa4tag+6hXpMXa2kO5fcicSJGVEUP4HhZt7m4FpwAJ3+qwr5gpbHUON7DigyEJRpeV31FATGdfJhHBzGDWC+CIvi8dyIzGo="
582 },
583 "rseed": "rCTbPc6xWyEcDV73Pl+W6XXbACShVOM+8/vdc7RSLlo=",
584 "valueBlinding": "DP0FN5CV4g9xZN6u2W6/4o6I/Zwr38n81q4YnJ6COAA=",
585 "proofBlindingR": "KV3u8Dc+cZo0HFUIn7n95UkQVXWeYp+3vAVuIpCIZRI=",
586 "proofBlindingS": "i00KyJVklWXUhVRy37N3p9szFIvo7383to/qxBexnBE="
587 }
588 }
589 ],
590 "transactionParameters": {
591 "chainId": "penumbra-testnet-rhea-8b2dfc5c",
592 "fee": {
593 "amount": {}
594 }
595 },
596 "detectionData": {
597 "cluePlans": [
598 {
599 "address": {
600 "inner": "UuFEV0VoZNxNTttsJVJzRqEzW4bm0z2RCxhUneve0KTvDjQipeg/1zx0ftbDjgr6uPiSA70yJIdlpFyxeLyXfAAtmSy6BCpR3YjEkf1bI5Q="
601 },
602 "rseed": "1Li0Qx05txsyOrx2pfO9kD5rDSUMy9e+j/hHmucqARI="
603 },
604 {
605 "address": {
606 "inner": "6146pY5upA9bQa4tag+6hXpMXa2kO5fcicSJGVEUP4HhZt7m4FpwAJ3+qwr5gpbHUON7DigyEJRpeV31FATGdfJhHBzGDWC+CIvi8dyIzGo="
607 },
608 "rseed": "ePtCm9/tFcpLBdlgyu8bYRKV5CHbqd823UGDhG1LsGY="
609 }
610 ]
611 },
612 "memo": {
613 "plaintext": {
614 "returnAddress": {
615 "inner": "OB8AEHEehWo0o0/Dn7JtNmgdDX1VRPaDgn6MLl6n41hVjI3llljrTDCFRRjN5mkNwVwsAyJ/UdfjNIFzbGV62YVXfBJ/IMVTq2CNAHwR8Qo="
616 }
617 },
618 "key": "3plOcPZzKKj8KT3sVdKnblUUFDRzCmMWYtgwB3BqfXQ="
619 }
620}
621 "#;
622 const T: u16 = 3;
623 const N: u16 = 3;
624
625 let (coordinator_config, follower_configs) = {
626 let mut configs = run_dkg(T, N).await?;
627 (configs.pop().unwrap(), configs)
628 };
629 let (coordinator_terminal, follower_terminals) = make_terminals((N - 1) as usize);
630 for (config, terminal) in follower_configs
631 .into_iter()
632 .zip(follower_terminals.into_iter())
633 {
634 tokio::spawn(async move { follow(Some(&config), Some(&config), &terminal).await });
635 }
636 let plan = serde_json::from_str::<TransactionPlan>(TEST_PLAN)?;
637 let fvk = coordinator_config.fvk().clone();
638 let authorization_data = Threshold::new(coordinator_config, coordinator_terminal)
639 .authorize(SigningRequest::TransactionPlan(plan.clone()))
640 .await?;
641 let tx_authorization_data = match authorization_data {
642 SigningResponse::Transaction(tx) => tx,
643 _ => panic!("expected transaction authorization data"),
644 };
645 assert_eq!(
646 plan.effect_hash(&fvk)?,
647 tx_authorization_data
648 .effect_hash
649 .expect("effect hash not present")
650 );
651 for (randomizer, sig) in plan
653 .spend_plans()
654 .map(|x| x.randomizer)
655 .zip(tx_authorization_data.spend_auths)
656 {
657 fvk.spend_verification_key().randomize(&randomizer).verify(
658 tx_authorization_data
659 .effect_hash
660 .expect("effect hash not present")
661 .as_bytes(),
662 &sig,
663 )?;
664 }
665 Ok(())
666 }
667}