1use std::{
2 collections::{BTreeMap, BTreeSet},
3 pin::Pin,
4 sync::{Arc, Mutex},
5};
6
7use anyhow::{anyhow, Context};
8use async_stream::try_stream;
9use camino::Utf8Path;
10use decaf377::Fq;
11use futures::stream::{self, StreamExt, TryStreamExt};
12use penumbra_sdk_auction::auction::dutch::actions::view::{
13 ActionDutchAuctionScheduleView, ActionDutchAuctionWithdrawView,
14};
15use rand::Rng;
16use rand_core::OsRng;
17use tap::{Tap, TapFallible};
18use tokio::sync::{watch, RwLock};
19use tokio_stream::wrappers::WatchStream;
20use tonic::transport::channel::ClientTlsConfig;
21use tonic::transport::channel::Endpoint;
22use tonic::{async_trait, transport::Channel, Request, Response, Status};
23use tracing::{instrument, Instrument};
24use url::Url;
25
26use penumbra_sdk_asset::{asset, asset::Metadata, Value};
27use penumbra_sdk_dex::{
28 lp::{
29 position::{self, Position},
30 Reserves,
31 },
32 swap_claim::SwapClaimPlan,
33 TradingPair,
34};
35use penumbra_sdk_fee::Fee;
36use penumbra_sdk_keys::{
37 keys::WalletId,
38 keys::{AddressIndex, FullViewingKey},
39 Address, AddressView,
40};
41use penumbra_sdk_num::Amount;
42use penumbra_sdk_proto::{
43 util::tendermint_proxy::v1::{
44 tendermint_proxy_service_client::TendermintProxyServiceClient, BroadcastTxSyncRequest,
45 GetStatusRequest, GetStatusResponse, SyncInfo,
46 },
47 view::v1::{
48 self as pb,
49 broadcast_transaction_response::{BroadcastSuccess, Confirmed, Status as BroadcastStatus},
50 view_service_client::ViewServiceClient,
51 view_service_server::{ViewService, ViewServiceServer},
52 AppParametersResponse, AssetMetadataByIdRequest, AssetMetadataByIdResponse,
53 BroadcastTransactionResponse, FmdParametersResponse, GasPricesResponse,
54 NoteByCommitmentResponse, StatusResponse, SwapByCommitmentResponse,
55 TransactionPlannerResponse, WalletIdRequest, WalletIdResponse, WitnessResponse,
56 },
57 DomainType,
58};
59use penumbra_sdk_stake::rate::RateData;
60use penumbra_sdk_tct::{Proof, StateCommitment};
61use penumbra_sdk_transaction::{
62 AuthorizationData, Transaction, TransactionPerspective, TransactionPlan, WitnessData,
63};
64
65use crate::{worker::Worker, Planner, Storage};
66
67type BroadcastTransactionStream = Pin<
71 Box<dyn futures::Stream<Item = Result<pb::BroadcastTransactionResponse, tonic::Status>> + Send>,
72>;
73
74#[derive(Clone)]
83pub struct ViewServer {
84 storage: Storage,
85 error_slot: Arc<Mutex<Option<anyhow::Error>>>,
88 state_commitment_tree: Arc<RwLock<penumbra_sdk_tct::Tree>>,
90 node: Url,
92 sync_height_rx: watch::Receiver<u64>,
94}
95
96impl ViewServer {
97 pub async fn load_or_initialize(
99 storage_path: Option<impl AsRef<Utf8Path>>,
100 registry_path: Option<impl AsRef<Utf8Path>>,
101 fvk: &FullViewingKey,
102 node: Url,
103 ) -> anyhow::Result<Self> {
104 let storage = Storage::load_or_initialize(storage_path, fvk, node.clone())
105 .tap(|_| tracing::trace!("loading or initializing storage"))
106 .await?
107 .tap(|_| tracing::debug!("storage is ready"));
108
109 if let Some(registry_path) = registry_path {
110 storage.load_asset_metadata(registry_path).await?;
111 }
112
113 Self::new(storage, node)
114 .tap(|_| tracing::trace!("constructing view server"))
115 .await
116 .tap(|_| tracing::debug!("constructed view server"))
117 }
118
119 pub async fn new(storage: Storage, node: Url) -> anyhow::Result<Self> {
127 let span = tracing::error_span!(parent: None, "view");
128 let channel = Self::get_pd_channel(node.clone()).await?;
129
130 let (worker, state_commitment_tree, error_slot, sync_height_rx) =
131 Worker::new(storage.clone(), channel)
132 .instrument(span.clone())
133 .tap(|_| tracing::trace!("constructing view server worker"))
134 .await?
135 .tap(|_| tracing::debug!("constructed view server worker"));
136
137 tokio::spawn(worker.run().instrument(span))
138 .tap(|_| tracing::debug!("spawned view server worker"));
139
140 Ok(Self {
141 storage,
142 error_slot,
143 sync_height_rx,
144 state_commitment_tree,
145 node,
146 })
147 }
148
149 pub async fn get_pd_channel(node: Url) -> anyhow::Result<Channel> {
155 let endpoint = get_pd_endpoint(node).await?;
156 let span = tracing::error_span!(parent: None, "view");
157 let c: Channel = endpoint
158 .connect()
159 .instrument(span.clone())
160 .await
161 .with_context(|| "could not connect to grpc server")
162 .tap_err(|error| tracing::error!(?error, "could not connect to grpc server"))?;
163
164 Ok(c)
165 }
166
167 #[instrument(level = "debug", skip_all)]
172 async fn check_worker(&self) -> Result<(), tonic::Status> {
173 tracing::debug!("checking view server worker");
176 if let Some(error) = self
177 .error_slot
178 .lock()
179 .tap_err(|error| tracing::error!(?error, "unable to lock worker error slot"))
180 .map_err(|e| {
181 tonic::Status::unavailable(format!("unable to lock worker error slot {:#}", e))
182 })?
183 .as_ref()
184 {
185 return Err(tonic::Status::new(
186 tonic::Code::Internal,
187 format!("Worker failed: {error}"),
188 ));
189 }
190
191 Ok(()).tap(|_| tracing::trace!("view server worker is healthy"))
195 }
196
197 #[instrument(skip(self, transaction), fields(id = %transaction.id()))]
198 fn broadcast_transaction(
199 &self,
200 transaction: Transaction,
201 await_detection: bool,
202 ) -> BroadcastTransactionStream {
203 let self2 = self.clone();
204 try_stream! {
205 let mut fullnode_client = self2.tendermint_proxy_client().await
209 .map_err(|e| {
210 tonic::Status::unavailable(format!(
211 "couldn't connect to fullnode: {:#?}",
212 e
213 ))
214 })?
215 ;
216 let node_rsp = fullnode_client
217 .broadcast_tx_sync(BroadcastTxSyncRequest {
218 params: transaction.encode_to_vec(),
219 req_id: OsRng.gen(),
220 })
221 .await
222 .map_err(|e| {
223 tonic::Status::unavailable(format!(
224 "error broadcasting tx: {:#?}",
225 e
226 ))
227 })?
228 .into_inner();
229 tracing::info!(?node_rsp);
230 match node_rsp.code {
231 0 => Ok(()),
232 _ => Err(tonic::Status::new(
233 tonic::Code::Internal,
234 format!(
235 "Error submitting transaction: code {}, log: {}",
236 node_rsp.code,
237 node_rsp.log,
238 ),
239 )),
240 }?;
241
242 yield BroadcastTransactionResponse{ status: Some(BroadcastStatus::BroadcastSuccess(BroadcastSuccess{id:Some(transaction.id().into())}))};
244
245 let nullifier = if await_detection {
247 transaction
252 .actions()
253 .filter_map(|action| match action {
254 penumbra_sdk_transaction::Action::Spend(spend) => Some(spend.body.nullifier),
255 _ => None,
261 })
262 .next()
263 } else {
264 None
265 };
266
267 if let Some(nullifier) = nullifier {
268 tracing::info!(?nullifier, "waiting for detection of nullifier");
269 let detection = self2.storage.nullifier_status(nullifier, true);
270 tokio::time::timeout(std::time::Duration::from_secs(20), detection)
271 .await
272 .map_err(|_| {
273 tonic::Status::unavailable(
274 "timeout waiting to detect nullifier of submitted transaction"
275 )
276 })?
277 .map_err(|_| {
278 tonic::Status::unavailable(
279 "error while waiting for detection of submitted transaction"
280 )
281 })?;
282 }
283
284 let detection_height = self2.storage
285 .transaction_by_hash(&transaction.id().0)
286 .await
287 .map_err(|e| tonic::Status::internal(format!("error querying storage: {:#}", e)))?
288 .map(|(height, _tx)| height)
289 .unwrap_or(0);
292 yield BroadcastTransactionResponse{ status: Some(BroadcastStatus::Confirmed(Confirmed{id:Some(transaction.id().into()), detection_height}))};
293 }.boxed()
294 }
295
296 #[instrument(level = "trace", skip(self))]
297 async fn tendermint_proxy_client(
298 &self,
299 ) -> anyhow::Result<TendermintProxyServiceClient<Channel>> {
300 TendermintProxyServiceClient::connect(self.node.to_string())
301 .tap(|_| tracing::debug!("connecting to tendermint proxy"))
302 .await
303 .tap_err(|error| tracing::error!(?error, "failed to connect to tendermint proxy"))
304 .map_err(anyhow::Error::from)
305 }
306
307 #[instrument(skip(self))]
310 pub async fn latest_known_block_height(&self) -> anyhow::Result<(u64, bool)> {
311 let mut client = self.tendermint_proxy_client().await?;
312
313 let GetStatusResponse { sync_info, .. } = client
314 .get_status(GetStatusRequest {})
315 .tap(|_| tracing::debug!("querying current status"))
316 .await
317 .tap_err(|error| tracing::debug!(?error, "failed to query current status"))?
318 .into_inner();
319
320 let SyncInfo {
321 latest_block_height,
322 catching_up,
323 ..
324 } = sync_info
325 .ok_or_else(|| anyhow::anyhow!("could not parse sync_info in gRPC response"))?;
326
327 let latest_known_block_height = latest_block_height;
332
333 tracing::debug!(
334 ?latest_block_height,
335 ?catching_up,
336 ?latest_known_block_height,
337 "found latest known block height"
338 );
339
340 Ok((latest_known_block_height, catching_up))
341 }
342
343 #[instrument(skip(self))]
344 pub async fn status(&self) -> anyhow::Result<StatusResponse> {
345 let full_sync_height = self.storage.last_sync_height().await?.unwrap_or(0);
346
347 let (latest_known_block_height, node_catching_up) =
348 self.latest_known_block_height().await?;
349
350 let height_diff = latest_known_block_height
351 .checked_sub(full_sync_height)
352 .ok_or_else(|| anyhow!("sync height ahead of node height"))?;
353
354 let catching_up = match (node_catching_up, height_diff) {
355 (false, 0) => false,
357 (false, 1) => false,
359 (false, _) => true,
361 (true, _) => true,
363 };
364
365 Ok(StatusResponse {
366 full_sync_height,
367 catching_up,
368 partial_sync_height: full_sync_height, })
370 }
371}
372
373#[async_trait]
374impl ViewService for ViewServer {
375 type NotesStream =
376 Pin<Box<dyn futures::Stream<Item = Result<pb::NotesResponse, tonic::Status>> + Send>>;
377 type NotesForVotingStream = Pin<
378 Box<dyn futures::Stream<Item = Result<pb::NotesForVotingResponse, tonic::Status>> + Send>,
379 >;
380 type AssetsStream =
381 Pin<Box<dyn futures::Stream<Item = Result<pb::AssetsResponse, tonic::Status>> + Send>>;
382 type StatusStreamStream = Pin<
383 Box<dyn futures::Stream<Item = Result<pb::StatusStreamResponse, tonic::Status>> + Send>,
384 >;
385 type TransactionInfoStream = Pin<
386 Box<dyn futures::Stream<Item = Result<pb::TransactionInfoResponse, tonic::Status>> + Send>,
387 >;
388 type BalancesStream =
389 Pin<Box<dyn futures::Stream<Item = Result<pb::BalancesResponse, tonic::Status>> + Send>>;
390 type OwnedPositionIdsStream = Pin<
391 Box<dyn futures::Stream<Item = Result<pb::OwnedPositionIdsResponse, tonic::Status>> + Send>,
392 >;
393 type UnclaimedSwapsStream = Pin<
394 Box<dyn futures::Stream<Item = Result<pb::UnclaimedSwapsResponse, tonic::Status>> + Send>,
395 >;
396 type BroadcastTransactionStream = BroadcastTransactionStream;
397 type WitnessAndBuildStream = Pin<
398 Box<dyn futures::Stream<Item = Result<pb::WitnessAndBuildResponse, tonic::Status>> + Send>,
399 >;
400 type AuthorizeAndBuildStream = Pin<
401 Box<
402 dyn futures::Stream<Item = Result<pb::AuthorizeAndBuildResponse, tonic::Status>> + Send,
403 >,
404 >;
405 type DelegationsByAddressIndexStream = Pin<
406 Box<
407 dyn futures::Stream<Item = Result<pb::DelegationsByAddressIndexResponse, tonic::Status>>
408 + Send,
409 >,
410 >;
411 type UnbondingTokensByAddressIndexStream = Pin<
412 Box<
413 dyn futures::Stream<
414 Item = Result<pb::UnbondingTokensByAddressIndexResponse, tonic::Status>,
415 > + Send,
416 >,
417 >;
418 type AuctionsStream =
419 Pin<Box<dyn futures::Stream<Item = Result<pb::AuctionsResponse, tonic::Status>> + Send>>;
420 type LatestSwapsStream =
421 Pin<Box<dyn futures::Stream<Item = Result<pb::LatestSwapsResponse, tonic::Status>> + Send>>;
422
423 #[instrument(skip_all, level = "trace")]
424 async fn auctions(
425 &self,
426 request: tonic::Request<pb::AuctionsRequest>,
427 ) -> Result<tonic::Response<Self::AuctionsStream>, tonic::Status> {
428 use penumbra_sdk_proto::core::component::auction::v1 as pb_auction;
429 use penumbra_sdk_proto::core::component::auction::v1::query_service_client::QueryServiceClient as AuctionQueryServiceClient;
430
431 let parameters = request.into_inner();
432 let query_latest_state = parameters.query_latest_state;
433 let include_inactive = parameters.include_inactive;
434
435 let account_filter = parameters
436 .account_filter
437 .to_owned()
438 .map(AddressIndex::try_from)
439 .map_or(Ok(None), |v| v.map(Some))
440 .map_err(|_| tonic::Status::invalid_argument("invalid account filter"))?;
441
442 let all_auctions = self
443 .storage
444 .fetch_auctions_by_account(account_filter, include_inactive)
445 .await
446 .map_err(|e| tonic::Status::internal(e.to_string()))?;
447
448 let client = if query_latest_state {
449 Some(
450 AuctionQueryServiceClient::connect(self.node.to_string())
451 .await
452 .map_err(|e| tonic::Status::internal(e.to_string()))?,
453 )
454 } else {
455 None
456 };
457
458 let responses = futures::future::join_all(all_auctions.into_iter().map(
459 |(auction_id, note_record, local_seq)| {
460 let maybe_client = client.clone();
461 async move {
462 let (any_state, positions) = if let Some(mut client2) = maybe_client {
463 let extra_data = client2
464 .auction_state_by_id(pb_auction::AuctionStateByIdRequest {
465 id: Some(auction_id.into()),
466 })
467 .await
468 .map_err(|e| tonic::Status::internal(e.to_string()))?
469 .into_inner();
470 (extra_data.auction, extra_data.positions)
471 } else {
472 (None, vec![])
473 };
474
475 Result::<_, tonic::Status>::Ok(pb::AuctionsResponse {
476 id: Some(auction_id.into()),
477 note_record: Some(note_record.into()),
478 auction: any_state,
479 positions,
480 local_seq,
481 })
482 }
483 },
484 ))
485 .await;
486
487 let stream = stream::iter(responses)
488 .map_err(|e| tonic::Status::internal(format!("error getting auction: {e}")))
489 .boxed();
490
491 Ok(Response::new(stream))
492 }
493
494 #[instrument(skip_all, level = "trace")]
495 async fn broadcast_transaction(
496 &self,
497 request: tonic::Request<pb::BroadcastTransactionRequest>,
498 ) -> Result<tonic::Response<Self::BroadcastTransactionStream>, tonic::Status> {
499 let pb::BroadcastTransactionRequest {
500 transaction,
501 await_detection,
502 } = request.into_inner();
503
504 let transaction: Transaction = transaction
505 .ok_or_else(|| tonic::Status::invalid_argument("missing transaction"))?
506 .try_into()
507 .map_err(|e: anyhow::Error| e.context("could not decode transaction"))
508 .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
509
510 let stream = self.broadcast_transaction(transaction, await_detection);
511
512 Ok(tonic::Response::new(stream))
513 }
514
515 #[instrument(skip_all, level = "trace")]
516 async fn transaction_planner(
517 &self,
518 request: tonic::Request<pb::TransactionPlannerRequest>,
519 ) -> Result<tonic::Response<pb::TransactionPlannerResponse>, tonic::Status> {
520 let prq = request.into_inner();
521
522 let app_params =
523 self.storage.app_params().await.map_err(|e| {
524 tonic::Status::internal(format!("could not get app params: {:#}", e))
525 })?;
526
527 let gas_prices =
528 self.storage.gas_prices().await.map_err(|e| {
529 tonic::Status::internal(format!("could not get gas prices: {:#}", e))
530 })?;
531
532 let mut planner = Planner::new(OsRng);
536 planner.set_gas_prices(gas_prices);
537 planner.expiry_height(prq.expiry_height);
538
539 for output in prq.outputs {
540 let address: Address = output
541 .address
542 .ok_or_else(|| tonic::Status::invalid_argument("Missing address"))?
543 .try_into()
544 .map_err(|e| {
545 tonic::Status::invalid_argument(format!("Could not parse address: {e:#}"))
546 })?;
547
548 let value: Value = output
549 .value
550 .ok_or_else(|| tonic::Status::invalid_argument("Missing value"))?
551 .try_into()
552 .map_err(|e| {
553 tonic::Status::invalid_argument(format!("Could not parse value: {e:#}"))
554 })?;
555
556 planner.output(value, address);
557 }
558
559 for swap in prq.swaps {
560 let value: Value = swap
561 .value
562 .ok_or_else(|| tonic::Status::invalid_argument("Missing value"))?
563 .try_into()
564 .map_err(|e| {
565 tonic::Status::invalid_argument(format!("Could not parse value: {e:#}"))
566 })?;
567
568 let target_asset: asset::Id = swap
569 .target_asset
570 .ok_or_else(|| tonic::Status::invalid_argument("Missing target asset"))?
571 .try_into()
572 .map_err(|e| {
573 tonic::Status::invalid_argument(format!("Could not parse target asset: {e:#}"))
574 })?;
575
576 let fee: Fee = swap
577 .fee
578 .ok_or_else(|| tonic::Status::invalid_argument("Missing fee"))?
579 .try_into()
580 .map_err(|e| {
581 tonic::Status::invalid_argument(format!("Could not parse fee: {e:#}"))
582 })?;
583
584 let claim_address: Address = swap
585 .claim_address
586 .ok_or_else(|| tonic::Status::invalid_argument("Missing claim address"))?
587 .try_into()
588 .map_err(|e| {
589 tonic::Status::invalid_argument(format!("Could not parse claim address: {e:#}"))
590 })?;
591
592 planner
593 .swap(value, target_asset, fee, claim_address)
594 .map_err(|e| {
595 tonic::Status::invalid_argument(format!("Could not plan swap: {e:#}"))
596 })?;
597 }
598
599 for swap_claim in prq.swap_claims {
600 let swap_commitment: StateCommitment = swap_claim
601 .swap_commitment
602 .ok_or_else(|| tonic::Status::invalid_argument("Missing swap commitment"))?
603 .try_into()
604 .map_err(|e| {
605 tonic::Status::invalid_argument(format!(
606 "Could not parse swap commitment: {e:#}"
607 ))
608 })?;
609 let swap_record = self
610 .storage
611 .swap_by_commitment(swap_commitment, false)
613 .await
614 .map_err(|e| {
615 tonic::Status::invalid_argument(format!(
616 "Could not fetch swap by commitment: {e:#}"
617 ))
618 })?;
619
620 planner.swap_claim(SwapClaimPlan {
621 swap_plaintext: swap_record.swap,
622 position: swap_record.position,
623 output_data: swap_record.output_data,
624 epoch_duration: app_params.sct_params.epoch_duration,
625 proof_blinding_r: Fq::rand(&mut OsRng),
626 proof_blinding_s: Fq::rand(&mut OsRng),
627 });
628 }
629
630 let current_epoch = if prq.undelegations.is_empty() && prq.delegations.is_empty() {
631 None
632 } else {
633 Some(
634 prq.epoch
635 .ok_or_else(|| {
636 tonic::Status::invalid_argument(
637 "Missing current epoch in TransactionPlannerRequest",
638 )
639 })?
640 .try_into()
641 .map_err(|e| {
642 tonic::Status::invalid_argument(format!(
643 "Could not parse current epoch: {e:#}"
644 ))
645 })?,
646 )
647 };
648
649 for delegation in prq.delegations {
650 let amount: Amount = delegation
651 .amount
652 .ok_or_else(|| tonic::Status::invalid_argument("Missing amount"))?
653 .try_into()
654 .map_err(|e| {
655 tonic::Status::invalid_argument(format!("Could not parse amount: {e:#}"))
656 })?;
657
658 let rate_data: RateData = delegation
659 .rate_data
660 .ok_or_else(|| tonic::Status::invalid_argument("Missing rate data"))?
661 .try_into()
662 .map_err(|e| {
663 tonic::Status::invalid_argument(format!("Could not parse rate data: {e:#}"))
664 })?;
665
666 planner.delegate(
667 current_epoch.expect("checked that current epoch is present"),
668 amount,
669 rate_data,
670 );
671 }
672
673 for undelegation in prq.undelegations {
674 let value: Value = undelegation
675 .value
676 .ok_or_else(|| tonic::Status::invalid_argument("Missing value"))?
677 .try_into()
678 .map_err(|e| {
679 tonic::Status::invalid_argument(format!("Could not parse value: {e:#}"))
680 })?;
681
682 let rate_data: RateData = undelegation
683 .rate_data
684 .ok_or_else(|| tonic::Status::invalid_argument("Missing rate data"))?
685 .try_into()
686 .map_err(|e| {
687 tonic::Status::invalid_argument(format!("Could not parse rate data: {e:#}"))
688 })?;
689
690 planner.undelegate(
691 current_epoch.expect("checked that current epoch is present"),
692 value.amount,
693 rate_data,
694 );
695 }
696
697 for position_open in prq.position_opens {
698 let position: Position = position_open
699 .position
700 .ok_or_else(|| tonic::Status::invalid_argument("Missing position"))?
701 .try_into()
702 .map_err(|e| {
703 tonic::Status::invalid_argument(format!("Could not parse position: {e:#}"))
704 })?;
705
706 planner.position_open(position);
707 }
708
709 for position_close in prq.position_closes {
710 let position_id: position::Id = position_close
711 .position_id
712 .ok_or_else(|| tonic::Status::invalid_argument("Missing position_id"))?
713 .try_into()
714 .map_err(|e| {
715 tonic::Status::invalid_argument(format!("Could not parse position ID: {e:#}"))
716 })?;
717
718 planner.position_close(position_id);
719 }
720
721 for position_withdraw in prq.position_withdraws {
722 let position_id: position::Id = position_withdraw
723 .position_id
724 .ok_or_else(|| tonic::Status::invalid_argument("Missing position_id"))?
725 .try_into()
726 .map_err(|e| {
727 tonic::Status::invalid_argument(format!("Could not parse position ID: {e:#}"))
728 })?;
729
730 let reserves: Reserves = position_withdraw
731 .reserves
732 .ok_or_else(|| tonic::Status::invalid_argument("Missing reserves"))?
733 .try_into()
734 .map_err(|e| {
735 tonic::Status::invalid_argument(format!("Could not parse reserves: {e:#}"))
736 })?;
737
738 let trading_pair: TradingPair = position_withdraw
739 .trading_pair
740 .ok_or_else(|| tonic::Status::invalid_argument("Missing pair"))?
741 .try_into()
742 .map_err(|e| {
743 tonic::Status::invalid_argument(format!("Could not parse pair: {e:#}"))
744 })?;
745
746 planner.position_withdraw(position_id, reserves, trading_pair);
747 }
748
749 for ics20_withdrawal in prq.ics20_withdrawals {
751 planner.ics20_withdrawal(
752 ics20_withdrawal
753 .try_into()
754 .map_err(|e| tonic::Status::invalid_argument(format!("{e:#}")))?,
755 );
756 }
757
758 for ibc_action in prq.ibc_relay_actions {
760 planner.ibc_action(
761 ibc_action
762 .try_into()
763 .map_err(|e| tonic::Status::invalid_argument(format!("{e:#}")))?,
764 );
765 }
766
767 let mut client_of_self = ViewServiceClient::new(ViewServiceServer::new(self.clone()));
768
769 let source = prq
770 .source
771 .map(|addr_index| addr_index.account)
773 .unwrap_or(0u32);
775
776 let plan = planner
777 .plan(&mut client_of_self, source.into())
778 .await
779 .context("could not plan requested transaction")
780 .map_err(|e| tonic::Status::invalid_argument(format!("{e:#}")))?;
781
782 Ok(tonic::Response::new(TransactionPlannerResponse {
783 plan: Some(plan.into()),
784 }))
785 }
786
787 #[instrument(skip_all, level = "trace")]
788 async fn address_by_index(
789 &self,
790 request: tonic::Request<pb::AddressByIndexRequest>,
791 ) -> Result<tonic::Response<pb::AddressByIndexResponse>, tonic::Status> {
792 let fvk =
793 self.storage.full_viewing_key().await.map_err(|_| {
794 tonic::Status::failed_precondition("Error retrieving full viewing key")
795 })?;
796
797 let address_index = request
798 .into_inner()
799 .address_index
800 .ok_or_else(|| tonic::Status::invalid_argument("Missing address index"))?
801 .try_into()
802 .map_err(|e| {
803 tonic::Status::invalid_argument(format!("Could not parse address index: {e:#}"))
804 })?;
805
806 Ok(tonic::Response::new(pb::AddressByIndexResponse {
807 address: Some(fvk.payment_address(address_index).0.into()),
808 }))
809 }
810
811 #[instrument(skip_all, level = "trace")]
812 async fn index_by_address(
813 &self,
814 request: tonic::Request<pb::IndexByAddressRequest>,
815 ) -> Result<tonic::Response<pb::IndexByAddressResponse>, tonic::Status> {
816 let fvk =
817 self.storage.full_viewing_key().await.map_err(|_| {
818 tonic::Status::failed_precondition("Error retrieving full viewing key")
819 })?;
820
821 let address: Address = request
822 .into_inner()
823 .address
824 .ok_or_else(|| tonic::Status::invalid_argument("Missing address"))?
825 .try_into()
826 .map_err(|e| {
827 tonic::Status::invalid_argument(format!("Could not parse address: {e:#}"))
828 })?;
829
830 Ok(tonic::Response::new(pb::IndexByAddressResponse {
831 address_index: fvk.address_index(&address).map(Into::into),
832 }))
833 }
834 async fn transparent_address(
835 &self,
836 _request: tonic::Request<pb::TransparentAddressRequest>,
837 ) -> Result<tonic::Response<pb::TransparentAddressResponse>, tonic::Status> {
838 let fvk =
839 self.storage.full_viewing_key().await.map_err(|_| {
840 tonic::Status::failed_precondition("Error retrieving full viewing key")
841 })?;
842
843 let encoding = fvk.incoming().transparent_address();
844 let address: Address = encoding
845 .parse()
846 .map_err(|_| tonic::Status::internal("could not parse newly generated address"))?;
847
848 Ok(tonic::Response::new(pb::TransparentAddressResponse {
849 address: Some(address.into()),
850 encoding,
851 }))
852 }
853
854 #[instrument(skip_all, level = "trace")]
855 async fn ephemeral_address(
856 &self,
857 request: tonic::Request<pb::EphemeralAddressRequest>,
858 ) -> Result<tonic::Response<pb::EphemeralAddressResponse>, tonic::Status> {
859 let fvk =
860 self.storage.full_viewing_key().await.map_err(|_| {
861 tonic::Status::failed_precondition("Error retrieving full viewing key")
862 })?;
863
864 let address_index = request
865 .into_inner()
866 .address_index
867 .ok_or_else(|| tonic::Status::invalid_argument("Missing address index"))?
868 .try_into()
869 .map_err(|e| {
870 tonic::Status::invalid_argument(format!("Could not parse address index: {e:#}"))
871 })?;
872
873 Ok(tonic::Response::new(pb::EphemeralAddressResponse {
874 address: Some(fvk.ephemeral_address(OsRng, address_index).0.into()),
875 }))
876 }
877
878 #[instrument(skip_all, level = "trace")]
879 async fn transaction_info_by_hash(
880 &self,
881 request: tonic::Request<pb::TransactionInfoByHashRequest>,
882 ) -> Result<tonic::Response<pb::TransactionInfoByHashResponse>, tonic::Status> {
883 self.check_worker().await?;
884
885 let request = request.into_inner();
886
887 let fvk =
888 self.storage.full_viewing_key().await.map_err(|_| {
889 tonic::Status::failed_precondition("Error retrieving full viewing key")
890 })?;
891
892 let maybe_tx = self
893 .storage
894 .transaction_by_hash(
895 &request
896 .id
897 .clone()
898 .ok_or_else(|| {
899 tonic::Status::invalid_argument(
900 "missing transaction ID in TransactionInfoByHashRequest",
901 )
902 })?
903 .inner,
904 )
905 .await
906 .map_err(|_| {
907 tonic::Status::failed_precondition(format!(
908 "Error retrieving transaction by hash {}",
909 hex::encode(request.id.expect("transaction id is present").inner)
910 ))
911 })?;
912
913 let Some((height, tx)) = maybe_tx else {
914 return Ok(tonic::Response::new(
915 pb::TransactionInfoByHashResponse::default(),
916 ));
917 };
918
919 let mut txp = TransactionPerspective {
921 payload_keys: tx
922 .payload_keys(&fvk)
923 .map_err(|_| tonic::Status::failed_precondition("Error generating payload keys"))?,
924 ..Default::default()
925 };
926
927 for action in tx.actions() {
930 use penumbra_sdk_transaction::Action;
931 match action {
932 Action::Spend(spend) => {
933 let nullifier = spend.body.nullifier;
934 if let Ok(spendable_note_record) =
936 self.storage.note_by_nullifier(nullifier, false).await
937 {
938 txp.spend_nullifiers
939 .insert(nullifier, spendable_note_record.note);
940 }
941 }
942 Action::SwapClaim(claim) => {
943 let output_1_record = self
944 .storage
945 .note_by_commitment(claim.body.output_1_commitment, false)
946 .await
947 .map_err(|e| {
948 tonic::Status::internal(format!(
949 "Error retrieving first SwapClaim output note record: {:#}",
950 e
951 ))
952 })?;
953 let output_2_record = self
954 .storage
955 .note_by_commitment(claim.body.output_2_commitment, false)
956 .await
957 .map_err(|e| {
958 tonic::Status::internal(format!(
959 "Error retrieving second SwapClaim output note record: {:#}",
960 e
961 ))
962 })?;
963
964 txp.advice_notes
965 .insert(claim.body.output_1_commitment, output_1_record.note);
966 txp.advice_notes
967 .insert(claim.body.output_2_commitment, output_2_record.note);
968 }
969 _ => {}
970 }
971 }
972
973 let min_view = tx.view_from_perspective(&txp);
977 let mut address_views = BTreeMap::new();
978 let mut asset_ids = BTreeSet::new();
979 for action_view in min_view.action_views() {
980 use penumbra_sdk_dex::{swap::SwapView, swap_claim::SwapClaimView};
981 use penumbra_sdk_transaction::view::action_view::{
982 ActionView, DelegatorVoteView, OutputView, SpendView,
983 };
984 match action_view {
985 ActionView::Spend(SpendView::Visible { note, .. }) => {
986 let address = note.address();
987 address_views.insert(address.clone(), fvk.view_address(address));
988 asset_ids.insert(note.asset_id());
989 }
990 ActionView::Output(OutputView::Visible { note, .. }) => {
991 let address = note.address();
992 address_views.insert(address.clone(), fvk.view_address(address.clone()));
993 asset_ids.insert(note.asset_id());
994
995 let memo = tx.decrypt_memo(&fvk).map_err(|_| {
997 tonic::Status::internal("Error decrypting memo for OutputView")
998 })?;
999 address_views.insert(memo.return_address(), fvk.view_address(address));
1000 }
1001 ActionView::Swap(SwapView::Visible { swap_plaintext, .. }) => {
1002 let address = swap_plaintext.claim_address.clone();
1003 address_views.insert(address.clone(), fvk.view_address(address));
1004 asset_ids.insert(swap_plaintext.trading_pair.asset_1());
1005 asset_ids.insert(swap_plaintext.trading_pair.asset_2());
1006 }
1007 ActionView::SwapClaim(SwapClaimView::Visible {
1008 output_1, output_2, ..
1009 }) => {
1010 let address = output_1.address();
1012 address_views.insert(address.clone(), fvk.view_address(address));
1013 asset_ids.insert(output_1.asset_id());
1014 asset_ids.insert(output_2.asset_id());
1015 }
1016 ActionView::DelegatorVote(DelegatorVoteView::Visible { note, .. }) => {
1017 let address = note.address();
1018 address_views.insert(address.clone(), fvk.view_address(address));
1019 asset_ids.insert(note.asset_id());
1020 }
1021 ActionView::ActionDutchAuctionWithdraw(ActionDutchAuctionWithdrawView {
1022 action: _,
1023 reserves,
1024 }) => {
1025 for value in reserves {
1028 asset_ids.insert(value.asset_id());
1029 }
1030 }
1031 ActionView::ActionDutchAuctionSchedule(ActionDutchAuctionScheduleView {
1033 action,
1034 ..
1035 }) => {
1036 let description = &action.description;
1037 asset_ids.insert(description.input.asset_id);
1038 asset_ids.insert(description.output_id);
1039 }
1040 _ => {}
1041 }
1042 }
1043
1044 let mut denoms = Vec::new();
1047
1048 for id in asset_ids {
1049 if let Some(asset) = self.storage.asset_by_id(&id).await.map_err(|e| {
1050 tonic::Status::internal(format!("Error retrieving asset by id: {:#}", e))
1051 })? {
1052 denoms.push(asset);
1053 }
1054 }
1055
1056 txp.denoms.extend(denoms);
1057
1058 txp.address_views = address_views.into_values().collect();
1059
1060 let txv = tx.view_from_perspective(&txp);
1062 let summary = txv.summary();
1063
1064 let response = pb::TransactionInfoByHashResponse {
1065 tx_info: Some(pb::TransactionInfo {
1066 height,
1067 id: Some(tx.id().into()),
1068 perspective: Some(txp.into()),
1069 transaction: Some(tx.into()),
1070 view: Some(txv.into()),
1071 summary: Some(summary.into()),
1072 }),
1073 };
1074
1075 Ok(tonic::Response::new(response))
1076 }
1077
1078 #[instrument(skip_all, level = "trace")]
1079 async fn swap_by_commitment(
1080 &self,
1081 request: tonic::Request<pb::SwapByCommitmentRequest>,
1082 ) -> Result<tonic::Response<pb::SwapByCommitmentResponse>, tonic::Status> {
1083 self.check_worker().await?;
1084
1085 let request = request.into_inner();
1086
1087 let swap_commitment = request
1088 .swap_commitment
1089 .ok_or_else(|| {
1090 tonic::Status::failed_precondition("Missing swap commitment in request")
1091 })?
1092 .try_into()
1093 .map_err(|_| {
1094 tonic::Status::failed_precondition("Invalid swap commitment in request")
1095 })?;
1096
1097 let swap = pb::SwapRecord::from(
1098 self.storage
1099 .swap_by_commitment(swap_commitment, request.await_detection)
1100 .await
1101 .map_err(|e| tonic::Status::internal(format!("error: {e}")))?,
1102 );
1103
1104 Ok(tonic::Response::new(SwapByCommitmentResponse {
1105 swap: Some(swap),
1106 }))
1107 }
1108
1109 #[allow(deprecated)]
1110 #[instrument(skip(self, request))]
1111 async fn balances(
1112 &self,
1113 request: tonic::Request<pb::BalancesRequest>,
1114 ) -> Result<tonic::Response<Self::BalancesStream>, tonic::Status> {
1115 let request = request.into_inner();
1116
1117 let account_filter = request.account_filter.and_then(|x| {
1118 AddressIndex::try_from(x)
1119 .map_err(|_| {
1120 tonic::Status::failed_precondition("Invalid swap commitment in request")
1121 })
1122 .map_or(None, |x| x.into())
1123 });
1124
1125 let asset_id_filter = request.asset_id_filter.and_then(|x| {
1126 asset::Id::try_from(x)
1127 .map_err(|_| {
1128 tonic::Status::failed_precondition("Invalid swap commitment in request")
1129 })
1130 .map_or(None, |x| x.into())
1131 });
1132
1133 let result = self
1134 .storage
1135 .balances(account_filter, asset_id_filter)
1136 .await
1137 .map_err(|e| tonic::Status::internal(format!("error: {e}")))?;
1138
1139 tracing::debug!(?account_filter, ?asset_id_filter, ?result);
1140
1141 let self2 = self.clone();
1142 let stream = try_stream! {
1143 for element in result {
1145 let metadata: Metadata = self2
1146 .asset_metadata_by_id(Request::new(pb::AssetMetadataByIdRequest {
1147 asset_id: Some(element.id.into()),
1148 }))
1149 .await?
1150 .into_inner()
1151 .denom_metadata
1152 .context("denom metadata not found")?
1153 .try_into()?;
1154
1155 let value = Value {
1156 asset_id: element.id,
1157 amount: element.amount.into(),
1158 };
1159
1160 let value_view = value.view_with_denom(metadata)?;
1161
1162 let address: Address = self2
1163 .address_by_index(Request::new(pb::AddressByIndexRequest {
1164 address_index: account_filter.map(Into::into),
1165 }))
1166 .await?
1167 .into_inner()
1168 .address
1169 .context("address not found")?
1170 .try_into()?;
1171
1172 let wallet_id: WalletId = self2
1173 .wallet_id(Request::new(pb::WalletIdRequest {}))
1174 .await?
1175 .into_inner()
1176 .wallet_id
1177 .context("wallet id not found")?
1178 .try_into()?;
1179
1180 let address_view = AddressView::Decoded {
1181 address,
1182 index: element.address_index,
1183 wallet_id,
1184 };
1185
1186 yield pb::BalancesResponse {
1187 account_address: Some(address_view.into()),
1188 balance_view: Some(value_view.into()),
1189 balance: None,
1190 account: None,
1191 }
1192 }
1193 };
1194
1195 Ok(tonic::Response::new(
1196 stream
1197 .map_err(|e: anyhow::Error| {
1198 tonic::Status::unavailable(format!("error getting balances: {e}"))
1199 })
1200 .boxed(),
1201 ))
1202 }
1203
1204 #[instrument(skip_all, level = "trace")]
1205 async fn note_by_commitment(
1206 &self,
1207 request: tonic::Request<pb::NoteByCommitmentRequest>,
1208 ) -> Result<tonic::Response<pb::NoteByCommitmentResponse>, tonic::Status> {
1209 self.check_worker().await?;
1210
1211 let request = request.into_inner();
1212
1213 let note_commitment = request
1214 .note_commitment
1215 .ok_or_else(|| {
1216 tonic::Status::failed_precondition("Missing note commitment in request")
1217 })?
1218 .try_into()
1219 .map_err(|_| {
1220 tonic::Status::failed_precondition("Invalid note commitment in request")
1221 })?;
1222
1223 let spendable_note = pb::SpendableNoteRecord::from(
1224 self.storage
1225 .note_by_commitment(note_commitment, request.await_detection)
1226 .await
1227 .map_err(|e| tonic::Status::internal(format!("error: {e}")))?,
1228 );
1229
1230 Ok(tonic::Response::new(NoteByCommitmentResponse {
1231 spendable_note: Some(spendable_note),
1232 }))
1233 }
1234
1235 #[instrument(skip_all, level = "trace")]
1236 async fn nullifier_status(
1237 &self,
1238 request: tonic::Request<pb::NullifierStatusRequest>,
1239 ) -> Result<tonic::Response<pb::NullifierStatusResponse>, tonic::Status> {
1240 self.check_worker().await?;
1241
1242 let request = request.into_inner();
1243
1244 let nullifier = request
1245 .nullifier
1246 .ok_or_else(|| tonic::Status::failed_precondition("Missing nullifier in request"))?
1247 .try_into()
1248 .map_err(|_| tonic::Status::failed_precondition("Invalid nullifier in request"))?;
1249
1250 Ok(tonic::Response::new(pb::NullifierStatusResponse {
1251 spent: self
1252 .storage
1253 .nullifier_status(nullifier, request.await_detection)
1254 .await
1255 .map_err(|e| tonic::Status::internal(format!("error: {e}")))?,
1256 }))
1257 }
1258
1259 #[instrument(skip_all, level = "trace")]
1260 async fn status(
1261 &self,
1262 _: tonic::Request<pb::StatusRequest>,
1263 ) -> Result<tonic::Response<pb::StatusResponse>, tonic::Status> {
1264 self.check_worker().await?;
1265
1266 Ok(tonic::Response::new(self.status().await.map_err(|e| {
1267 tonic::Status::internal(format!("error: {e}"))
1268 })?))
1269 }
1270
1271 #[instrument(skip_all, level = "trace")]
1272 async fn status_stream(
1273 &self,
1274 _: tonic::Request<pb::StatusStreamRequest>,
1275 ) -> Result<tonic::Response<Self::StatusStreamStream>, tonic::Status> {
1276 self.check_worker().await?;
1277
1278 let (latest_known_block_height, _) = self
1279 .latest_known_block_height()
1280 .await
1281 .tap_err(|error| {
1282 tracing::debug!(
1283 ?error,
1284 "unable to fetch latest known block height from fullnode"
1285 )
1286 })
1287 .map_err(|e| {
1288 tonic::Status::unknown(format!(
1289 "unable to fetch latest known block height from fullnode: {e}"
1290 ))
1291 })?;
1292
1293 let mut sync_height_stream = WatchStream::new(self.sync_height_rx.clone());
1296 let stream = try_stream! {
1297 while let Some(sync_height) = sync_height_stream.next().await {
1298 yield pb::StatusStreamResponse {
1299 latest_known_block_height,
1300 full_sync_height: sync_height,
1301 partial_sync_height: sync_height, };
1303 if sync_height >= latest_known_block_height {
1304 break;
1305 }
1306 }
1307 };
1308
1309 Ok(tonic::Response::new(stream.boxed()))
1310 }
1311
1312 #[instrument(skip_all, level = "trace")]
1313 async fn notes(
1314 &self,
1315 request: tonic::Request<pb::NotesRequest>,
1316 ) -> Result<tonic::Response<Self::NotesStream>, tonic::Status> {
1317 self.check_worker().await?;
1318
1319 let request = request.into_inner();
1320
1321 let include_spent = request.include_spent;
1322 let asset_id = request
1323 .asset_id
1324 .to_owned()
1325 .map(asset::Id::try_from)
1326 .map_or(Ok(None), |v| v.map(Some))
1327 .map_err(|_| tonic::Status::invalid_argument("invalid asset id"))?;
1328 let address_index = request
1329 .address_index
1330 .to_owned()
1331 .map(AddressIndex::try_from)
1332 .map_or(Ok(None), |v| v.map(Some))
1333 .map_err(|_| tonic::Status::invalid_argument("invalid address index"))?;
1334
1335 let amount_to_spend = request
1336 .amount_to_spend
1337 .map(Amount::try_from)
1338 .map_or(Ok(None), |v| v.map(Some))
1339 .map_err(|_| tonic::Status::invalid_argument("invalid amount to spend"))?;
1340
1341 let notes = self
1342 .storage
1343 .notes(include_spent, asset_id, address_index, amount_to_spend)
1344 .await
1345 .map_err(|e| tonic::Status::unavailable(format!("error fetching notes: {e}")))?;
1346
1347 let stream = try_stream! {
1348 for note in notes {
1349 yield pb::NotesResponse {
1350 note_record: Some(note.into()),
1351 }
1352 }
1353 };
1354
1355 Ok(tonic::Response::new(
1356 stream
1357 .map_err(|e: anyhow::Error| {
1358 tonic::Status::unavailable(format!("error getting notes: {e}"))
1359 })
1360 .boxed(),
1361 ))
1362 }
1363
1364 #[instrument(skip_all, level = "trace")]
1365 async fn notes_for_voting(
1366 &self,
1367 request: tonic::Request<pb::NotesForVotingRequest>,
1368 ) -> Result<tonic::Response<Self::NotesForVotingStream>, tonic::Status> {
1369 self.check_worker().await?;
1370
1371 let address_index = request
1372 .get_ref()
1373 .address_index
1374 .to_owned()
1375 .map(AddressIndex::try_from)
1376 .map_or(Ok(None), |v| v.map(Some))
1377 .map_err(|_| tonic::Status::invalid_argument("invalid address index"))?;
1378
1379 let votable_at_height = request.get_ref().votable_at_height;
1380
1381 let notes = self
1382 .storage
1383 .notes_for_voting(address_index, votable_at_height)
1384 .await
1385 .map_err(|e| tonic::Status::unavailable(format!("error fetching notes: {e}")))?;
1386
1387 let stream = try_stream! {
1388 for (note, identity_key) in notes {
1389 yield pb::NotesForVotingResponse {
1390 note_record: Some(note.into()),
1391 identity_key: Some(identity_key.into()),
1392 }
1393 }
1394 };
1395
1396 Ok(tonic::Response::new(
1397 stream
1398 .map_err(|e: anyhow::Error| {
1399 tonic::Status::unavailable(format!("error getting notes: {e}"))
1400 })
1401 .boxed(),
1402 ))
1403 }
1404
1405 #[instrument(skip_all, level = "trace")]
1406 async fn assets(
1407 &self,
1408 request: tonic::Request<pb::AssetsRequest>,
1409 ) -> Result<tonic::Response<Self::AssetsStream>, tonic::Status> {
1410 self.check_worker().await?;
1411
1412 let pb::AssetsRequest {
1413 filtered,
1414 include_specific_denominations,
1415 include_delegation_tokens,
1416 include_unbonding_tokens,
1417 include_lp_nfts,
1418 include_proposal_nfts,
1419 include_voting_receipt_tokens,
1420 } = request.get_ref();
1421
1422 let assets = if !filtered {
1424 self.storage
1425 .all_assets()
1426 .await
1427 .map_err(|e| tonic::Status::unavailable(format!("error fetching assets: {e}")))?
1428 } else {
1429 let mut assets = vec![];
1430 for denom in include_specific_denominations {
1431 if let Some(denom) = asset::REGISTRY.parse_denom(&denom.denom) {
1432 assets.push(denom);
1433 }
1434 }
1435 for (include, pattern) in [
1436 (include_delegation_tokens, "_delegation\\_%"),
1437 (include_unbonding_tokens, "_unbonding\\_%"),
1438 (include_lp_nfts, "lpnft\\_%"),
1439 (include_proposal_nfts, "proposal\\_%"),
1440 (include_voting_receipt_tokens, "voted\\_on\\_%"),
1441 ] {
1442 if *include {
1443 assets.extend(
1444 self.storage
1445 .assets_matching(pattern.to_string())
1446 .await
1447 .map_err(|e| {
1448 tonic::Status::unavailable(format!("error fetching assets: {e}"))
1449 })?,
1450 );
1451 }
1452 }
1453 assets
1454 };
1455
1456 let stream = try_stream! {
1457 for asset in assets {
1458 yield
1459 pb::AssetsResponse {
1460 denom_metadata: Some(asset.into()),
1461 }
1462 }
1463 };
1464
1465 Ok(tonic::Response::new(
1466 stream
1467 .map_err(|e: anyhow::Error| {
1468 tonic::Status::unavailable(format!("error getting assets: {e}"))
1469 })
1470 .boxed(),
1471 ))
1472 }
1473
1474 #[instrument(skip_all, level = "trace")]
1475 async fn transaction_info(
1476 &self,
1477 request: tonic::Request<pb::TransactionInfoRequest>,
1478 ) -> Result<tonic::Response<Self::TransactionInfoStream>, tonic::Status> {
1479 self.check_worker().await?;
1480 let start_height = if request.get_ref().start_height == 0 {
1482 None
1483 } else {
1484 Some(request.get_ref().start_height)
1485 };
1486 let end_height = if request.get_ref().end_height == 0 {
1487 None
1488 } else {
1489 Some(request.get_ref().end_height)
1490 };
1491
1492 let txs = self
1494 .storage
1495 .transactions(start_height, end_height)
1496 .await
1497 .map_err(|e| tonic::Status::unavailable(format!("error fetching transactions: {e}")))?;
1498
1499 let self2 = self.clone();
1500 let stream = try_stream! {
1501 for tx in txs {
1502
1503 let rsp = self2.transaction_info_by_hash(tonic::Request::new(pb::TransactionInfoByHashRequest {
1504 id: Some(tx.2.id().into()),
1505 })).await?.into_inner();
1506
1507 yield pb::TransactionInfoResponse {
1508 tx_info: rsp.tx_info,
1509 }
1510 }
1511 };
1512
1513 Ok(tonic::Response::new(
1514 stream
1515 .map_err(|e: anyhow::Error| {
1516 tonic::Status::unavailable(format!("error getting transactions: {e}"))
1517 })
1518 .boxed(),
1519 ))
1520 }
1521
1522 #[instrument(skip_all, level = "trace")]
1523 async fn witness(
1524 &self,
1525 request: tonic::Request<pb::WitnessRequest>,
1526 ) -> Result<tonic::Response<WitnessResponse>, tonic::Status> {
1527 self.check_worker().await?;
1528
1529 let sct = self.state_commitment_tree.read().await;
1532
1533 let anchor = sct.root();
1535
1536 let tx_plan: TransactionPlan =
1538 request
1539 .get_ref()
1540 .to_owned()
1541 .transaction_plan
1542 .map_or(TransactionPlan::default(), |x| {
1543 x.try_into()
1544 .expect("TransactionPlan should exist in request")
1545 });
1546
1547 let requested_note_commitments: Vec<StateCommitment> = tx_plan
1548 .spend_plans()
1549 .filter(|plan| plan.note.amount() != 0u64.into())
1550 .map(|spend| spend.note.commit().into())
1551 .chain(
1552 tx_plan
1553 .swap_claim_plans()
1554 .map(|swap_claim| swap_claim.swap_plaintext.swap_commitment().into()),
1555 )
1556 .chain(
1557 tx_plan
1558 .delegator_vote_plans()
1559 .map(|vote_plan| vote_plan.staked_note.commit().into()),
1560 )
1561 .collect();
1562
1563 tracing::debug!(?requested_note_commitments);
1564
1565 let auth_paths: Vec<Proof> = requested_note_commitments
1566 .iter()
1567 .map(|nc| {
1568 sct.witness(*nc).ok_or_else(|| {
1569 tonic::Status::new(tonic::Code::InvalidArgument, "Note commitment missing")
1570 })
1571 })
1572 .collect::<Result<Vec<Proof>, tonic::Status>>()?;
1573
1574 drop(sct);
1576
1577 let mut witness_data = WitnessData {
1578 anchor,
1579 state_commitment_proofs: auth_paths
1580 .into_iter()
1581 .map(|proof| (proof.commitment(), proof))
1582 .collect(),
1583 };
1584
1585 tracing::debug!(?witness_data);
1586
1587 for nc in tx_plan
1590 .spend_plans()
1591 .filter(|plan| plan.note.amount() == 0u64.into())
1592 .map(|plan| plan.note.commit())
1593 {
1594 witness_data.add_proof(nc, Proof::dummy(&mut OsRng, nc));
1595 }
1596
1597 let witness_response = WitnessResponse {
1598 witness_data: Some(witness_data.into()),
1599 };
1600 Ok(tonic::Response::new(witness_response))
1601 }
1602
1603 #[instrument(skip_all, level = "trace")]
1604 async fn witness_and_build(
1605 &self,
1606 request: tonic::Request<pb::WitnessAndBuildRequest>,
1607 ) -> Result<tonic::Response<Self::WitnessAndBuildStream>, tonic::Status> {
1608 let pb::WitnessAndBuildRequest {
1609 transaction_plan,
1610 authorization_data,
1611 } = request.into_inner();
1612
1613 let transaction_plan: TransactionPlan = transaction_plan
1614 .ok_or_else(|| tonic::Status::invalid_argument("missing transaction plan"))?
1615 .try_into()
1616 .map_err(|e: anyhow::Error| e.context("could not decode transaction plan"))
1617 .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1618
1619 let authorization_data: AuthorizationData = authorization_data
1620 .ok_or_else(|| tonic::Status::invalid_argument("missing authorization data"))?
1621 .try_into()
1622 .map_err(|e: anyhow::Error| e.context("could not decode authorization data"))
1623 .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1624
1625 let witness_request = pb::WitnessRequest {
1626 transaction_plan: Some(transaction_plan.clone().into()),
1627 };
1628
1629 let witness_data: WitnessData = self
1630 .witness(tonic::Request::new(witness_request))
1631 .await?
1632 .into_inner()
1633 .witness_data
1634 .ok_or_else(|| tonic::Status::invalid_argument("missing witness data"))?
1635 .try_into()
1636 .map_err(|e: anyhow::Error| e.context("could not decode witness data"))
1637 .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1638
1639 let fvk =
1640 self.storage.full_viewing_key().await.map_err(|_| {
1641 tonic::Status::failed_precondition("Error retrieving full viewing key")
1642 })?;
1643
1644 let transaction = Some(
1645 transaction_plan
1646 .build(&fvk, &witness_data, &authorization_data)
1649 .map_err(|_| tonic::Status::failed_precondition("Error building transaction"))?
1650 .into(),
1651 );
1652
1653 let stream = try_stream! {
1654 yield pb::WitnessAndBuildResponse {
1655 status: Some(pb::witness_and_build_response::Status::Complete(
1656 pb::witness_and_build_response::Complete { transaction },
1657 )),
1658 }
1659 };
1660
1661 Ok(tonic::Response::new(
1662 stream
1663 .map_err(|e: anyhow::Error| {
1664 tonic::Status::unavailable(format!("error witnessing transaction: {e}"))
1665 })
1666 .boxed(),
1667 ))
1668 }
1669
1670 #[instrument(skip_all, level = "trace")]
1671 async fn app_parameters(
1672 &self,
1673 _request: tonic::Request<pb::AppParametersRequest>,
1674 ) -> Result<tonic::Response<pb::AppParametersResponse>, tonic::Status> {
1675 self.check_worker().await?;
1676
1677 let parameters =
1678 self.storage.app_params().await.map_err(|e| {
1679 tonic::Status::unavailable(format!("error getting app params: {e}"))
1680 })?;
1681
1682 let response = AppParametersResponse {
1683 parameters: Some(parameters.into()),
1684 };
1685
1686 Ok(tonic::Response::new(response))
1687 }
1688
1689 #[instrument(skip_all, level = "trace")]
1690 async fn gas_prices(
1691 &self,
1692 _request: tonic::Request<pb::GasPricesRequest>,
1693 ) -> Result<tonic::Response<pb::GasPricesResponse>, tonic::Status> {
1694 self.check_worker().await?;
1695
1696 let gas_prices =
1697 self.storage.gas_prices().await.map_err(|e| {
1698 tonic::Status::unavailable(format!("error getting gas prices: {e}"))
1699 })?;
1700
1701 let response = GasPricesResponse {
1702 gas_prices: Some(gas_prices.into()),
1703 alt_gas_prices: Vec::new(),
1704 };
1705
1706 Ok(tonic::Response::new(response))
1707 }
1708
1709 #[instrument(skip_all, level = "trace")]
1710 async fn fmd_parameters(
1711 &self,
1712 _request: tonic::Request<pb::FmdParametersRequest>,
1713 ) -> Result<tonic::Response<pb::FmdParametersResponse>, tonic::Status> {
1714 self.check_worker().await?;
1715
1716 let parameters =
1717 self.storage.fmd_parameters().await.map_err(|e| {
1718 tonic::Status::unavailable(format!("error getting FMD params: {e}"))
1719 })?;
1720
1721 let response = FmdParametersResponse {
1722 parameters: Some(parameters.into()),
1723 };
1724
1725 Ok(tonic::Response::new(response))
1726 }
1727
1728 #[instrument(skip_all, level = "trace")]
1729 async fn owned_position_ids(
1730 &self,
1731 request: tonic::Request<pb::OwnedPositionIdsRequest>,
1732 ) -> Result<tonic::Response<Self::OwnedPositionIdsStream>, tonic::Status> {
1733 self.check_worker().await?;
1734
1735 let pb::OwnedPositionIdsRequest {
1736 position_state,
1737 trading_pair,
1738 subaccount: _,
1739 } = request.into_inner();
1740
1741 let position_state: Option<position::State> = position_state
1742 .map(|state| state.try_into())
1743 .transpose()
1744 .map_err(|e: anyhow::Error| e.context("could not decode position state"))
1745 .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1746
1747 let trading_pair: Option<TradingPair> = trading_pair
1748 .map(|pair| pair.try_into())
1749 .transpose()
1750 .map_err(|e: anyhow::Error| e.context("could not decode trading pair"))
1751 .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1752
1753 let ids = self
1754 .storage
1755 .owned_position_ids(position_state, trading_pair)
1756 .await
1757 .map_err(|e| tonic::Status::unavailable(format!("error getting position ids: {e}")))?;
1758
1759 let stream = try_stream! {
1760 for id in ids {
1761 yield pb::OwnedPositionIdsResponse{
1762 position_id: Some(id.into()),
1763 subaccount: None,
1766 }
1767 }
1768 };
1769
1770 Ok(tonic::Response::new(
1771 stream
1772 .map_err(|e: anyhow::Error| {
1773 tonic::Status::unavailable(format!("error getting position ids: {e}"))
1774 })
1775 .boxed(),
1776 ))
1777 }
1778
1779 #[instrument(skip_all, level = "trace")]
1780 async fn authorize_and_build(
1781 &self,
1782 _request: tonic::Request<pb::AuthorizeAndBuildRequest>,
1783 ) -> Result<tonic::Response<Self::AuthorizeAndBuildStream>, tonic::Status> {
1784 unimplemented!("authorize_and_build")
1785 }
1786
1787 #[instrument(skip_all, level = "trace")]
1788 async fn unclaimed_swaps(
1789 &self,
1790 _: tonic::Request<pb::UnclaimedSwapsRequest>,
1791 ) -> Result<tonic::Response<Self::UnclaimedSwapsStream>, tonic::Status> {
1792 self.check_worker().await?;
1793
1794 let swaps = self.storage.unclaimed_swaps().await.map_err(|e| {
1795 tonic::Status::unavailable(format!("error fetching unclaimed swaps: {e}"))
1796 })?;
1797
1798 let stream = try_stream! {
1799 for swap in swaps {
1800 yield pb::UnclaimedSwapsResponse{
1801 swap: Some(swap.into()),
1802 }
1803 }
1804 };
1805
1806 Ok(tonic::Response::new(
1807 stream
1808 .map_err(|e: anyhow::Error| {
1809 tonic::Status::unavailable(format!("error getting unclaimed swaps: {e}"))
1810 })
1811 .boxed(),
1812 ))
1813 }
1814
1815 #[instrument(skip_all, level = "trace")]
1816 async fn wallet_id(
1817 &self,
1818 _: Request<WalletIdRequest>,
1819 ) -> Result<Response<WalletIdResponse>, Status> {
1820 let fvk = self.storage.full_viewing_key().await.map_err(|e| {
1821 Status::failed_precondition(format!("Error retrieving full viewing key: {e}"))
1822 })?;
1823
1824 Ok(Response::new(WalletIdResponse {
1825 wallet_id: Some(fvk.wallet_id().into()),
1826 }))
1827 }
1828
1829 #[instrument(skip_all, level = "trace")]
1830 async fn asset_metadata_by_id(
1831 &self,
1832 request: Request<AssetMetadataByIdRequest>,
1833 ) -> Result<Response<AssetMetadataByIdResponse>, Status> {
1834 let asset_id = request
1835 .into_inner()
1836 .asset_id
1837 .ok_or_else(|| Status::invalid_argument("missing asset id"))?
1838 .try_into()
1839 .map_err(|e| Status::invalid_argument(format!("{e:#}")))?;
1840
1841 let metadata = self
1842 .storage
1843 .asset_by_id(&asset_id)
1844 .await
1845 .map_err(|e| Status::internal(format!("Error retrieving asset by id: {e:#}")))?;
1846
1847 Ok(Response::new(AssetMetadataByIdResponse {
1848 denom_metadata: metadata.map(Into::into),
1849 }))
1850 }
1851
1852 #[instrument(skip_all, level = "trace")]
1853 async fn delegations_by_address_index(
1854 &self,
1855 _request: tonic::Request<pb::DelegationsByAddressIndexRequest>,
1856 ) -> Result<tonic::Response<Self::DelegationsByAddressIndexStream>, tonic::Status> {
1857 unimplemented!("delegations_by_address_index")
1858 }
1859
1860 #[instrument(skip_all, level = "trace")]
1861 async fn unbonding_tokens_by_address_index(
1862 &self,
1863 _request: tonic::Request<pb::UnbondingTokensByAddressIndexRequest>,
1864 ) -> Result<tonic::Response<Self::UnbondingTokensByAddressIndexStream>, tonic::Status> {
1865 unimplemented!("unbonding_tokens_by_address_index currently only implemented on web")
1866 }
1867
1868 #[instrument(skip_all, level = "trace")]
1869 async fn latest_swaps(
1870 &self,
1871 _request: tonic::Request<pb::LatestSwapsRequest>,
1872 ) -> Result<tonic::Response<Self::LatestSwapsStream>, tonic::Status> {
1873 unimplemented!("latest_swaps currently only implemented on web")
1874 }
1875}
1876
1877async fn get_pd_endpoint(node: Url) -> anyhow::Result<Endpoint> {
1881 let endpoint = match node.scheme() {
1882 "http" => Channel::from_shared(node.to_string())?,
1883 "https" => Channel::from_shared(node.to_string())?
1884 .tls_config(ClientTlsConfig::new().with_webpki_roots())?,
1885 other => anyhow::bail!("unknown url scheme {other}"),
1886 };
1887 Ok(endpoint)
1888}