1use std::{
2 collections::{BTreeMap, BTreeSet},
3 pin::Pin,
4 sync::{Arc, Mutex},
5};
6
7use anyhow::{anyhow, Context};
8use async_stream::try_stream;
9use camino::Utf8Path;
10use decaf377::Fq;
11use futures::stream::{self, StreamExt, TryStreamExt};
12use penumbra_sdk_auction::auction::dutch::actions::view::{
13 ActionDutchAuctionScheduleView, ActionDutchAuctionWithdrawView,
14};
15use rand::Rng;
16use rand_core::OsRng;
17use tap::{Tap, TapFallible};
18use tokio::sync::{watch, RwLock};
19use tokio_stream::wrappers::WatchStream;
20use tonic::transport::channel::ClientTlsConfig;
21use tonic::transport::channel::Endpoint;
22use tonic::{async_trait, transport::Channel, Request, Response, Status};
23use tracing::{instrument, Instrument};
24use url::Url;
25
26use penumbra_sdk_asset::{asset, asset::Metadata, Value};
27use penumbra_sdk_dex::{
28 lp::{
29 position::{self, Position},
30 Reserves,
31 },
32 swap_claim::SwapClaimPlan,
33 TradingPair,
34};
35use penumbra_sdk_fee::Fee;
36use penumbra_sdk_keys::{
37 keys::WalletId,
38 keys::{AddressIndex, FullViewingKey},
39 Address, AddressView,
40};
41use penumbra_sdk_num::Amount;
42use penumbra_sdk_proto::{
43 util::tendermint_proxy::v1::{
44 tendermint_proxy_service_client::TendermintProxyServiceClient, BroadcastTxSyncRequest,
45 GetStatusRequest, GetStatusResponse, SyncInfo,
46 },
47 view::v1::{
48 self as pb,
49 broadcast_transaction_response::{BroadcastSuccess, Confirmed, Status as BroadcastStatus},
50 view_service_client::ViewServiceClient,
51 view_service_server::{ViewService, ViewServiceServer},
52 AppParametersResponse, AssetMetadataByIdRequest, AssetMetadataByIdResponse,
53 BroadcastTransactionResponse, FmdParametersResponse, GasPricesResponse,
54 NoteByCommitmentResponse, StatusResponse, SwapByCommitmentResponse,
55 TransactionPlannerResponse, WalletIdRequest, WalletIdResponse, WitnessResponse,
56 },
57 DomainType,
58};
59use penumbra_sdk_stake::{rate::RateData, IdentityKey};
60use penumbra_sdk_tct::{Proof, StateCommitment};
61use penumbra_sdk_transaction::{
62 AuthorizationData, Transaction, TransactionPerspective, TransactionPlan, WitnessData,
63};
64
65use crate::{worker::Worker, Planner, SpendableNoteRecord, Storage};
66
67type BroadcastTransactionStream = Pin<
71 Box<dyn futures::Stream<Item = Result<pb::BroadcastTransactionResponse, tonic::Status>> + Send>,
72>;
73
74#[derive(Clone)]
83pub struct ViewServer {
84 storage: Storage,
85 error_slot: Arc<Mutex<Option<anyhow::Error>>>,
88 state_commitment_tree: Arc<RwLock<penumbra_sdk_tct::Tree>>,
90 node: Url,
92 sync_height_rx: watch::Receiver<u64>,
94}
95
96impl ViewServer {
97 pub async fn load_or_initialize(
99 storage_path: Option<impl AsRef<Utf8Path>>,
100 registry_path: Option<impl AsRef<Utf8Path>>,
101 fvk: &FullViewingKey,
102 node: Url,
103 ) -> anyhow::Result<Self> {
104 let storage = Storage::load_or_initialize(storage_path, fvk, node.clone())
105 .tap(|_| tracing::trace!("loading or initializing storage"))
106 .await?
107 .tap(|_| tracing::debug!("storage is ready"));
108
109 if let Some(registry_path) = registry_path {
110 storage.load_asset_metadata(registry_path).await?;
111 }
112
113 Self::new(storage, node)
114 .tap(|_| tracing::trace!("constructing view server"))
115 .await
116 .tap(|_| tracing::debug!("constructed view server"))
117 }
118
119 pub async fn new(storage: Storage, node: Url) -> anyhow::Result<Self> {
127 let span = tracing::error_span!(parent: None, "view");
128 let channel = Self::get_pd_channel(node.clone()).await?;
129
130 let (worker, state_commitment_tree, error_slot, sync_height_rx) =
131 Worker::new(storage.clone(), channel)
132 .instrument(span.clone())
133 .tap(|_| tracing::trace!("constructing view server worker"))
134 .await?
135 .tap(|_| tracing::debug!("constructed view server worker"));
136
137 tokio::spawn(worker.run().instrument(span))
138 .tap(|_| tracing::debug!("spawned view server worker"));
139
140 Ok(Self {
141 storage,
142 error_slot,
143 sync_height_rx,
144 state_commitment_tree,
145 node,
146 })
147 }
148
149 pub async fn get_pd_channel(node: Url) -> anyhow::Result<Channel> {
155 let endpoint = get_pd_endpoint(node).await?;
156 let span = tracing::error_span!(parent: None, "view");
157 let c: Channel = endpoint
158 .connect()
159 .instrument(span.clone())
160 .await
161 .with_context(|| "could not connect to grpc server")
162 .tap_err(|error| tracing::error!(?error, "could not connect to grpc server"))?;
163
164 Ok(c)
165 }
166
167 #[instrument(level = "debug", skip_all)]
172 async fn check_worker(&self) -> Result<(), tonic::Status> {
173 tracing::debug!("checking view server worker");
176 if let Some(error) = self
177 .error_slot
178 .lock()
179 .tap_err(|error| tracing::error!(?error, "unable to lock worker error slot"))
180 .map_err(|e| {
181 tonic::Status::unavailable(format!("unable to lock worker error slot {:#}", e))
182 })?
183 .as_ref()
184 {
185 return Err(tonic::Status::new(
186 tonic::Code::Internal,
187 format!("Worker failed: {error}"),
188 ));
189 }
190
191 Ok(()).tap(|_| tracing::trace!("view server worker is healthy"))
195 }
196
197 #[instrument(skip(self, transaction), fields(id = %transaction.id()))]
198 fn broadcast_transaction(
199 &self,
200 transaction: Transaction,
201 await_detection: bool,
202 ) -> BroadcastTransactionStream {
203 let self2 = self.clone();
204 try_stream! {
205 let mut fullnode_client = self2.tendermint_proxy_client().await
209 .map_err(|e| {
210 tonic::Status::unavailable(format!(
211 "couldn't connect to fullnode: {:#?}",
212 e
213 ))
214 })?
215 ;
216 let node_rsp = fullnode_client
217 .broadcast_tx_sync(BroadcastTxSyncRequest {
218 params: transaction.encode_to_vec(),
219 req_id: OsRng.gen(),
220 })
221 .await
222 .map_err(|e| {
223 tonic::Status::unavailable(format!(
224 "error broadcasting tx: {:#?}",
225 e
226 ))
227 })?
228 .into_inner();
229 tracing::info!(?node_rsp);
230 match node_rsp.code {
231 0 => Ok(()),
232 _ => Err(tonic::Status::new(
233 tonic::Code::Internal,
234 format!(
235 "Error submitting transaction: code {}, log: {}",
236 node_rsp.code,
237 node_rsp.log,
238 ),
239 )),
240 }?;
241
242 yield BroadcastTransactionResponse{ status: Some(BroadcastStatus::BroadcastSuccess(BroadcastSuccess{id:Some(transaction.id().into())}))};
244
245 let nullifier = if await_detection {
247 transaction
252 .actions()
253 .filter_map(|action| match action {
254 penumbra_sdk_transaction::Action::Spend(spend) => Some(spend.body.nullifier),
255 _ => None,
261 })
262 .next()
263 } else {
264 None
265 };
266
267 if let Some(nullifier) = nullifier {
268 tracing::info!(?nullifier, "waiting for detection of nullifier");
269 let detection = self2.storage.nullifier_status(nullifier, true);
270 tokio::time::timeout(std::time::Duration::from_secs(20), detection)
271 .await
272 .map_err(|_| {
273 tonic::Status::unavailable(
274 "timeout waiting to detect nullifier of submitted transaction"
275 )
276 })?
277 .map_err(|_| {
278 tonic::Status::unavailable(
279 "error while waiting for detection of submitted transaction"
280 )
281 })?;
282 }
283
284 let detection_height = self2.storage
285 .transaction_by_hash(&transaction.id().0)
286 .await
287 .map_err(|e| tonic::Status::internal(format!("error querying storage: {:#}", e)))?
288 .map(|(height, _tx)| height)
289 .unwrap_or(0);
292 yield BroadcastTransactionResponse{ status: Some(BroadcastStatus::Confirmed(Confirmed{id:Some(transaction.id().into()), detection_height}))};
293 }.boxed()
294 }
295
296 #[instrument(level = "trace", skip(self))]
297 async fn tendermint_proxy_client(
298 &self,
299 ) -> anyhow::Result<TendermintProxyServiceClient<Channel>> {
300 TendermintProxyServiceClient::connect(self.node.to_string())
301 .tap(|_| tracing::debug!("connecting to tendermint proxy"))
302 .await
303 .tap_err(|error| tracing::error!(?error, "failed to connect to tendermint proxy"))
304 .map_err(anyhow::Error::from)
305 }
306
307 #[instrument(skip(self))]
310 pub async fn latest_known_block_height(&self) -> anyhow::Result<(u64, bool)> {
311 let mut client = self.tendermint_proxy_client().await?;
312
313 let GetStatusResponse { sync_info, .. } = client
314 .get_status(GetStatusRequest {})
315 .tap(|_| tracing::debug!("querying current status"))
316 .await
317 .tap_err(|error| tracing::debug!(?error, "failed to query current status"))?
318 .into_inner();
319
320 let SyncInfo {
321 latest_block_height,
322 catching_up,
323 ..
324 } = sync_info
325 .ok_or_else(|| anyhow::anyhow!("could not parse sync_info in gRPC response"))?;
326
327 let latest_known_block_height = latest_block_height;
332
333 tracing::debug!(
334 ?latest_block_height,
335 ?catching_up,
336 ?latest_known_block_height,
337 "found latest known block height"
338 );
339
340 Ok((latest_known_block_height, catching_up))
341 }
342
343 #[instrument(skip(self))]
344 pub async fn status(&self) -> anyhow::Result<StatusResponse> {
345 let full_sync_height = self.storage.last_sync_height().await?.unwrap_or(0);
346
347 let (latest_known_block_height, node_catching_up) =
348 self.latest_known_block_height().await?;
349
350 let height_diff = latest_known_block_height
351 .checked_sub(full_sync_height)
352 .ok_or_else(|| anyhow!("sync height ahead of node height"))?;
353
354 let catching_up = match (node_catching_up, height_diff) {
355 (false, 0) => false,
357 (false, 1) => false,
359 (false, _) => true,
361 (true, _) => true,
363 };
364
365 Ok(StatusResponse {
366 full_sync_height,
367 catching_up,
368 partial_sync_height: full_sync_height, })
370 }
371}
372
373#[async_trait]
374impl ViewService for ViewServer {
375 type NotesStream =
376 Pin<Box<dyn futures::Stream<Item = Result<pb::NotesResponse, tonic::Status>> + Send>>;
377 type NotesForVotingStream = Pin<
378 Box<dyn futures::Stream<Item = Result<pb::NotesForVotingResponse, tonic::Status>> + Send>,
379 >;
380 type AssetsStream =
381 Pin<Box<dyn futures::Stream<Item = Result<pb::AssetsResponse, tonic::Status>> + Send>>;
382 type StatusStreamStream = Pin<
383 Box<dyn futures::Stream<Item = Result<pb::StatusStreamResponse, tonic::Status>> + Send>,
384 >;
385 type TransactionInfoStream = Pin<
386 Box<dyn futures::Stream<Item = Result<pb::TransactionInfoResponse, tonic::Status>> + Send>,
387 >;
388 type BalancesStream =
389 Pin<Box<dyn futures::Stream<Item = Result<pb::BalancesResponse, tonic::Status>> + Send>>;
390 type OwnedPositionIdsStream = Pin<
391 Box<dyn futures::Stream<Item = Result<pb::OwnedPositionIdsResponse, tonic::Status>> + Send>,
392 >;
393 type UnclaimedSwapsStream = Pin<
394 Box<dyn futures::Stream<Item = Result<pb::UnclaimedSwapsResponse, tonic::Status>> + Send>,
395 >;
396 type BroadcastTransactionStream = BroadcastTransactionStream;
397 type WitnessAndBuildStream = Pin<
398 Box<dyn futures::Stream<Item = Result<pb::WitnessAndBuildResponse, tonic::Status>> + Send>,
399 >;
400 type AuthorizeAndBuildStream = Pin<
401 Box<
402 dyn futures::Stream<Item = Result<pb::AuthorizeAndBuildResponse, tonic::Status>> + Send,
403 >,
404 >;
405 type DelegationsByAddressIndexStream = Pin<
406 Box<
407 dyn futures::Stream<Item = Result<pb::DelegationsByAddressIndexResponse, tonic::Status>>
408 + Send,
409 >,
410 >;
411 type UnbondingTokensByAddressIndexStream = Pin<
412 Box<
413 dyn futures::Stream<
414 Item = Result<pb::UnbondingTokensByAddressIndexResponse, tonic::Status>,
415 > + Send,
416 >,
417 >;
418 type AuctionsStream =
419 Pin<Box<dyn futures::Stream<Item = Result<pb::AuctionsResponse, tonic::Status>> + Send>>;
420 type LatestSwapsStream =
421 Pin<Box<dyn futures::Stream<Item = Result<pb::LatestSwapsResponse, tonic::Status>> + Send>>;
422 type LqtVotingNotesStream = Pin<
423 Box<dyn futures::Stream<Item = Result<pb::LqtVotingNotesResponse, tonic::Status>> + Send>,
424 >;
425 type TournamentVotesStream = Pin<
426 Box<dyn futures::Stream<Item = Result<pb::TournamentVotesResponse, tonic::Status>> + Send>,
427 >;
428 type LpPositionBundleStream = Pin<
429 Box<dyn futures::Stream<Item = Result<pb::LpPositionBundleResponse, tonic::Status>> + Send>,
430 >;
431 type LpStrategyCatalogStream = Pin<
432 Box<
433 dyn futures::Stream<Item = Result<pb::LpStrategyCatalogResponse, tonic::Status>> + Send,
434 >,
435 >;
436
437 #[instrument(skip_all, level = "trace")]
438 async fn auctions(
439 &self,
440 request: tonic::Request<pb::AuctionsRequest>,
441 ) -> Result<tonic::Response<Self::AuctionsStream>, tonic::Status> {
442 use penumbra_sdk_proto::core::component::auction::v1 as pb_auction;
443 use penumbra_sdk_proto::core::component::auction::v1::query_service_client::QueryServiceClient as AuctionQueryServiceClient;
444
445 let parameters = request.into_inner();
446 let query_latest_state = parameters.query_latest_state;
447 let include_inactive = parameters.include_inactive;
448
449 let account_filter = parameters
450 .account_filter
451 .to_owned()
452 .map(AddressIndex::try_from)
453 .map_or(Ok(None), |v| v.map(Some))
454 .map_err(|_| tonic::Status::invalid_argument("invalid account filter"))?;
455
456 let all_auctions = self
457 .storage
458 .fetch_auctions_by_account(account_filter, include_inactive)
459 .await
460 .map_err(|e| tonic::Status::internal(e.to_string()))?;
461
462 let client = if query_latest_state {
463 Some(
464 AuctionQueryServiceClient::connect(self.node.to_string())
465 .await
466 .map_err(|e| tonic::Status::internal(e.to_string()))?,
467 )
468 } else {
469 None
470 };
471
472 let responses = futures::future::join_all(all_auctions.into_iter().map(
473 |(auction_id, note_record, local_seq)| {
474 let maybe_client = client.clone();
475 async move {
476 let (any_state, positions) = if let Some(mut client2) = maybe_client {
477 let extra_data = client2
478 .auction_state_by_id(pb_auction::AuctionStateByIdRequest {
479 id: Some(auction_id.into()),
480 })
481 .await
482 .map_err(|e| tonic::Status::internal(e.to_string()))?
483 .into_inner();
484 (extra_data.auction, extra_data.positions)
485 } else {
486 (None, vec![])
487 };
488
489 Result::<_, tonic::Status>::Ok(pb::AuctionsResponse {
490 id: Some(auction_id.into()),
491 note_record: Some(note_record.into()),
492 auction: any_state,
493 positions,
494 local_seq,
495 })
496 }
497 },
498 ))
499 .await;
500
501 let stream = stream::iter(responses)
502 .map_err(|e| tonic::Status::internal(format!("error getting auction: {e}")))
503 .boxed();
504
505 Ok(Response::new(stream))
506 }
507
508 #[instrument(skip_all, level = "trace")]
509 async fn broadcast_transaction(
510 &self,
511 request: tonic::Request<pb::BroadcastTransactionRequest>,
512 ) -> Result<tonic::Response<Self::BroadcastTransactionStream>, tonic::Status> {
513 let pb::BroadcastTransactionRequest {
514 transaction,
515 await_detection,
516 } = request.into_inner();
517
518 let transaction: Transaction = transaction
519 .ok_or_else(|| tonic::Status::invalid_argument("missing transaction"))?
520 .try_into()
521 .map_err(|e: anyhow::Error| e.context("could not decode transaction"))
522 .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
523
524 let stream = self.broadcast_transaction(transaction, await_detection);
525
526 Ok(tonic::Response::new(stream))
527 }
528
529 #[instrument(skip_all, level = "trace")]
530 async fn transaction_planner(
531 &self,
532 request: tonic::Request<pb::TransactionPlannerRequest>,
533 ) -> Result<tonic::Response<pb::TransactionPlannerResponse>, tonic::Status> {
534 let prq = request.into_inner();
535
536 let app_params =
537 self.storage.app_params().await.map_err(|e| {
538 tonic::Status::internal(format!("could not get app params: {:#}", e))
539 })?;
540
541 let gas_prices =
542 self.storage.gas_prices().await.map_err(|e| {
543 tonic::Status::internal(format!("could not get gas prices: {:#}", e))
544 })?;
545
546 let mut planner = Planner::new(OsRng);
550 planner.set_gas_prices(gas_prices);
551 planner.expiry_height(prq.expiry_height);
552
553 for output in prq.outputs {
554 let address: Address = output
555 .address
556 .ok_or_else(|| tonic::Status::invalid_argument("Missing address"))?
557 .try_into()
558 .map_err(|e| {
559 tonic::Status::invalid_argument(format!("Could not parse address: {e:#}"))
560 })?;
561
562 let value: Value = output
563 .value
564 .ok_or_else(|| tonic::Status::invalid_argument("Missing value"))?
565 .try_into()
566 .map_err(|e| {
567 tonic::Status::invalid_argument(format!("Could not parse value: {e:#}"))
568 })?;
569
570 planner.output(value, address);
571 }
572
573 for swap in prq.swaps {
574 let value: Value = swap
575 .value
576 .ok_or_else(|| tonic::Status::invalid_argument("Missing value"))?
577 .try_into()
578 .map_err(|e| {
579 tonic::Status::invalid_argument(format!("Could not parse value: {e:#}"))
580 })?;
581
582 let target_asset: asset::Id = swap
583 .target_asset
584 .ok_or_else(|| tonic::Status::invalid_argument("Missing target asset"))?
585 .try_into()
586 .map_err(|e| {
587 tonic::Status::invalid_argument(format!("Could not parse target asset: {e:#}"))
588 })?;
589
590 let fee: Fee = swap
591 .fee
592 .ok_or_else(|| tonic::Status::invalid_argument("Missing fee"))?
593 .try_into()
594 .map_err(|e| {
595 tonic::Status::invalid_argument(format!("Could not parse fee: {e:#}"))
596 })?;
597
598 let claim_address: Address = swap
599 .claim_address
600 .ok_or_else(|| tonic::Status::invalid_argument("Missing claim address"))?
601 .try_into()
602 .map_err(|e| {
603 tonic::Status::invalid_argument(format!("Could not parse claim address: {e:#}"))
604 })?;
605
606 planner
607 .swap(value, target_asset, fee, claim_address)
608 .map_err(|e| {
609 tonic::Status::invalid_argument(format!("Could not plan swap: {e:#}"))
610 })?;
611 }
612
613 for swap_claim in prq.swap_claims {
614 let swap_commitment: StateCommitment = swap_claim
615 .swap_commitment
616 .ok_or_else(|| tonic::Status::invalid_argument("Missing swap commitment"))?
617 .try_into()
618 .map_err(|e| {
619 tonic::Status::invalid_argument(format!(
620 "Could not parse swap commitment: {e:#}"
621 ))
622 })?;
623 let swap_record = self
624 .storage
625 .swap_by_commitment(swap_commitment, false)
627 .await
628 .map_err(|e| {
629 tonic::Status::invalid_argument(format!(
630 "Could not fetch swap by commitment: {e:#}"
631 ))
632 })?;
633
634 planner.swap_claim(SwapClaimPlan {
635 swap_plaintext: swap_record.swap,
636 position: swap_record.position,
637 output_data: swap_record.output_data,
638 epoch_duration: app_params.sct_params.epoch_duration,
639 proof_blinding_r: Fq::rand(&mut OsRng),
640 proof_blinding_s: Fq::rand(&mut OsRng),
641 });
642 }
643
644 let current_epoch = if prq.undelegations.is_empty() && prq.delegations.is_empty() {
645 None
646 } else {
647 Some(
648 prq.epoch
649 .ok_or_else(|| {
650 tonic::Status::invalid_argument(
651 "Missing current epoch in TransactionPlannerRequest",
652 )
653 })?
654 .try_into()
655 .map_err(|e| {
656 tonic::Status::invalid_argument(format!(
657 "Could not parse current epoch: {e:#}"
658 ))
659 })?,
660 )
661 };
662
663 for delegation in prq.delegations {
664 let amount: Amount = delegation
665 .amount
666 .ok_or_else(|| tonic::Status::invalid_argument("Missing amount"))?
667 .try_into()
668 .map_err(|e| {
669 tonic::Status::invalid_argument(format!("Could not parse amount: {e:#}"))
670 })?;
671
672 let rate_data: RateData = delegation
673 .rate_data
674 .ok_or_else(|| tonic::Status::invalid_argument("Missing rate data"))?
675 .try_into()
676 .map_err(|e| {
677 tonic::Status::invalid_argument(format!("Could not parse rate data: {e:#}"))
678 })?;
679
680 planner.delegate(
681 current_epoch.expect("checked that current epoch is present"),
682 amount,
683 rate_data,
684 );
685 }
686
687 for undelegation in prq.undelegations {
688 let value: Value = undelegation
689 .value
690 .ok_or_else(|| tonic::Status::invalid_argument("Missing value"))?
691 .try_into()
692 .map_err(|e| {
693 tonic::Status::invalid_argument(format!("Could not parse value: {e:#}"))
694 })?;
695
696 let rate_data: RateData = undelegation
697 .rate_data
698 .ok_or_else(|| tonic::Status::invalid_argument("Missing rate data"))?
699 .try_into()
700 .map_err(|e| {
701 tonic::Status::invalid_argument(format!("Could not parse rate data: {e:#}"))
702 })?;
703
704 planner.undelegate(
705 current_epoch.expect("checked that current epoch is present"),
706 value.amount,
707 rate_data,
708 );
709 }
710
711 for position_open in prq.position_opens {
712 let position: Position = position_open
713 .position
714 .ok_or_else(|| tonic::Status::invalid_argument("Missing position"))?
715 .try_into()
716 .map_err(|e| {
717 tonic::Status::invalid_argument(format!("Could not parse position: {e:#}"))
718 })?;
719
720 planner.position_open(position);
721 }
722
723 for position_close in prq.position_closes {
724 let position_id: position::Id = position_close
725 .position_id
726 .ok_or_else(|| tonic::Status::invalid_argument("Missing position_id"))?
727 .try_into()
728 .map_err(|e| {
729 tonic::Status::invalid_argument(format!("Could not parse position ID: {e:#}"))
730 })?;
731
732 planner.position_close(position_id);
733 }
734
735 for position_withdraw in prq.position_withdraws {
736 let position_id: position::Id = position_withdraw
737 .position_id
738 .ok_or_else(|| tonic::Status::invalid_argument("Missing position_id"))?
739 .try_into()
740 .map_err(|e| {
741 tonic::Status::invalid_argument(format!("Could not parse position ID: {e:#}"))
742 })?;
743
744 let reserves: Reserves = position_withdraw
745 .reserves
746 .ok_or_else(|| tonic::Status::invalid_argument("Missing reserves"))?
747 .try_into()
748 .map_err(|e| {
749 tonic::Status::invalid_argument(format!("Could not parse reserves: {e:#}"))
750 })?;
751
752 let trading_pair: TradingPair = position_withdraw
753 .trading_pair
754 .ok_or_else(|| tonic::Status::invalid_argument("Missing pair"))?
755 .try_into()
756 .map_err(|e| {
757 tonic::Status::invalid_argument(format!("Could not parse pair: {e:#}"))
758 })?;
759
760 planner.position_withdraw(position_id, reserves, trading_pair, 0);
761 }
762
763 for ics20_withdrawal in prq.ics20_withdrawals {
765 planner.ics20_withdrawal(
766 ics20_withdrawal
767 .try_into()
768 .map_err(|e| tonic::Status::invalid_argument(format!("{e:#}")))?,
769 );
770 }
771
772 for ibc_action in prq.ibc_relay_actions {
774 planner.ibc_action(
775 ibc_action
776 .try_into()
777 .map_err(|e| tonic::Status::invalid_argument(format!("{e:#}")))?,
778 );
779 }
780
781 let mut client_of_self = ViewServiceClient::new(ViewServiceServer::new(self.clone()));
782
783 let source = prq
784 .source
785 .map(|addr_index| addr_index.account)
787 .unwrap_or(0u32);
789
790 let plan = planner
791 .plan(&mut client_of_self, source.into())
792 .await
793 .context("could not plan requested transaction")
794 .map_err(|e| tonic::Status::invalid_argument(format!("{e:#}")))?;
795
796 Ok(tonic::Response::new(TransactionPlannerResponse {
797 plan: Some(plan.into()),
798 }))
799 }
800
801 #[instrument(skip_all, level = "trace")]
802 async fn address_by_index(
803 &self,
804 request: tonic::Request<pb::AddressByIndexRequest>,
805 ) -> Result<tonic::Response<pb::AddressByIndexResponse>, tonic::Status> {
806 let fvk =
807 self.storage.full_viewing_key().await.map_err(|_| {
808 tonic::Status::failed_precondition("Error retrieving full viewing key")
809 })?;
810
811 let address_index = request
812 .into_inner()
813 .address_index
814 .ok_or_else(|| tonic::Status::invalid_argument("Missing address index"))?
815 .try_into()
816 .map_err(|e| {
817 tonic::Status::invalid_argument(format!("Could not parse address index: {e:#}"))
818 })?;
819
820 Ok(tonic::Response::new(pb::AddressByIndexResponse {
821 address: Some(fvk.payment_address(address_index).0.into()),
822 }))
823 }
824
825 #[instrument(skip_all, level = "trace")]
826 async fn index_by_address(
827 &self,
828 request: tonic::Request<pb::IndexByAddressRequest>,
829 ) -> Result<tonic::Response<pb::IndexByAddressResponse>, tonic::Status> {
830 let fvk =
831 self.storage.full_viewing_key().await.map_err(|_| {
832 tonic::Status::failed_precondition("Error retrieving full viewing key")
833 })?;
834
835 let address: Address = request
836 .into_inner()
837 .address
838 .ok_or_else(|| tonic::Status::invalid_argument("Missing address"))?
839 .try_into()
840 .map_err(|e| {
841 tonic::Status::invalid_argument(format!("Could not parse address: {e:#}"))
842 })?;
843
844 Ok(tonic::Response::new(pb::IndexByAddressResponse {
845 address_index: fvk.address_index(&address).map(Into::into),
846 }))
847 }
848 async fn transparent_address(
849 &self,
850 _request: tonic::Request<pb::TransparentAddressRequest>,
851 ) -> Result<tonic::Response<pb::TransparentAddressResponse>, tonic::Status> {
852 let fvk =
853 self.storage.full_viewing_key().await.map_err(|_| {
854 tonic::Status::failed_precondition("Error retrieving full viewing key")
855 })?;
856
857 let encoding = fvk.incoming().transparent_address();
858 let address: Address = encoding
859 .parse()
860 .map_err(|_| tonic::Status::internal("could not parse newly generated address"))?;
861
862 Ok(tonic::Response::new(pb::TransparentAddressResponse {
863 address: Some(address.into()),
864 encoding,
865 }))
866 }
867
868 #[instrument(skip_all, level = "trace")]
869 async fn ephemeral_address(
870 &self,
871 request: tonic::Request<pb::EphemeralAddressRequest>,
872 ) -> Result<tonic::Response<pb::EphemeralAddressResponse>, tonic::Status> {
873 let fvk =
874 self.storage.full_viewing_key().await.map_err(|_| {
875 tonic::Status::failed_precondition("Error retrieving full viewing key")
876 })?;
877
878 let address_index = request
879 .into_inner()
880 .address_index
881 .ok_or_else(|| tonic::Status::invalid_argument("Missing address index"))?
882 .try_into()
883 .map_err(|e| {
884 tonic::Status::invalid_argument(format!("Could not parse address index: {e:#}"))
885 })?;
886
887 Ok(tonic::Response::new(pb::EphemeralAddressResponse {
888 address: Some(fvk.ephemeral_address(OsRng, address_index).0.into()),
889 }))
890 }
891
892 #[instrument(skip_all, level = "trace")]
893 async fn transaction_info_by_hash(
894 &self,
895 request: tonic::Request<pb::TransactionInfoByHashRequest>,
896 ) -> Result<tonic::Response<pb::TransactionInfoByHashResponse>, tonic::Status> {
897 self.check_worker().await?;
898
899 let request = request.into_inner();
900
901 let fvk =
902 self.storage.full_viewing_key().await.map_err(|_| {
903 tonic::Status::failed_precondition("Error retrieving full viewing key")
904 })?;
905
906 let maybe_tx = self
907 .storage
908 .transaction_by_hash(
909 &request
910 .id
911 .clone()
912 .ok_or_else(|| {
913 tonic::Status::invalid_argument(
914 "missing transaction ID in TransactionInfoByHashRequest",
915 )
916 })?
917 .inner,
918 )
919 .await
920 .map_err(|_| {
921 tonic::Status::failed_precondition(format!(
922 "Error retrieving transaction by hash {}",
923 hex::encode(request.id.expect("transaction id is present").inner)
924 ))
925 })?;
926
927 let Some((height, tx)) = maybe_tx else {
928 return Ok(tonic::Response::new(
929 pb::TransactionInfoByHashResponse::default(),
930 ));
931 };
932
933 let mut txp = TransactionPerspective {
935 payload_keys: tx
936 .payload_keys(&fvk)
937 .map_err(|_| tonic::Status::failed_precondition("Error generating payload keys"))?,
938 ..Default::default()
939 };
940
941 for action in tx.actions() {
944 use penumbra_sdk_transaction::Action;
945 match action {
946 Action::Spend(spend) => {
947 let nullifier = spend.body.nullifier;
948 if let Ok(spendable_note_record) =
950 self.storage.note_by_nullifier(nullifier, false).await
951 {
952 txp.spend_nullifiers
953 .insert(nullifier, spendable_note_record.note);
954 }
955 }
956 Action::SwapClaim(claim) => {
957 let output_1_record = self
958 .storage
959 .note_by_commitment(claim.body.output_1_commitment, false)
960 .await
961 .map_err(|e| {
962 tonic::Status::internal(format!(
963 "Error retrieving first SwapClaim output note record: {:#}",
964 e
965 ))
966 })?;
967 let output_2_record = self
968 .storage
969 .note_by_commitment(claim.body.output_2_commitment, false)
970 .await
971 .map_err(|e| {
972 tonic::Status::internal(format!(
973 "Error retrieving second SwapClaim output note record: {:#}",
974 e
975 ))
976 })?;
977
978 txp.advice_notes
979 .insert(claim.body.output_1_commitment, output_1_record.note);
980 txp.advice_notes
981 .insert(claim.body.output_2_commitment, output_2_record.note);
982 }
983 _ => {}
984 }
985 }
986
987 let min_view = tx.view_from_perspective(&txp);
991 let mut address_views = BTreeMap::new();
992 let mut asset_ids = BTreeSet::new();
993 for action_view in min_view.action_views() {
994 use penumbra_sdk_dex::{swap::SwapView, swap_claim::SwapClaimView};
995 use penumbra_sdk_transaction::view::action_view::{
996 ActionView, DelegatorVoteView, OutputView, SpendView,
997 };
998 match action_view {
999 ActionView::Spend(SpendView::Visible { note, .. }) => {
1000 let address = note.address();
1001 address_views.insert(address.clone(), fvk.view_address(address));
1002 asset_ids.insert(note.asset_id());
1003 }
1004 ActionView::Output(OutputView::Visible { note, .. }) => {
1005 let address = note.address();
1006 address_views.insert(address.clone(), fvk.view_address(address.clone()));
1007 asset_ids.insert(note.asset_id());
1008
1009 let memo = tx.decrypt_memo(&fvk).map_err(|_| {
1011 tonic::Status::internal("Error decrypting memo for OutputView")
1012 })?;
1013 address_views.insert(memo.return_address(), fvk.view_address(address));
1014 }
1015 ActionView::Swap(SwapView::Visible { swap_plaintext, .. }) => {
1016 let address = swap_plaintext.claim_address.clone();
1017 address_views.insert(address.clone(), fvk.view_address(address));
1018 asset_ids.insert(swap_plaintext.trading_pair.asset_1());
1019 asset_ids.insert(swap_plaintext.trading_pair.asset_2());
1020 }
1021 ActionView::SwapClaim(SwapClaimView::Visible {
1022 output_1, output_2, ..
1023 }) => {
1024 let address = output_1.address();
1026 address_views.insert(address.clone(), fvk.view_address(address));
1027 asset_ids.insert(output_1.asset_id());
1028 asset_ids.insert(output_2.asset_id());
1029 }
1030 ActionView::DelegatorVote(DelegatorVoteView::Visible { note, .. }) => {
1031 let address = note.address();
1032 address_views.insert(address.clone(), fvk.view_address(address));
1033 asset_ids.insert(note.asset_id());
1034 }
1035 ActionView::ActionDutchAuctionWithdraw(ActionDutchAuctionWithdrawView {
1036 action: _,
1037 reserves,
1038 }) => {
1039 for value in reserves {
1042 asset_ids.insert(value.asset_id());
1043 }
1044 }
1045 ActionView::ActionDutchAuctionSchedule(ActionDutchAuctionScheduleView {
1047 action,
1048 ..
1049 }) => {
1050 let description = &action.description;
1051 asset_ids.insert(description.input.asset_id);
1052 asset_ids.insert(description.output_id);
1053 }
1054 _ => {}
1055 }
1056 }
1057
1058 let mut denoms = Vec::new();
1061
1062 for id in asset_ids {
1063 if let Some(asset) = self.storage.asset_by_id(&id).await.map_err(|e| {
1064 tonic::Status::internal(format!("Error retrieving asset by id: {:#}", e))
1065 })? {
1066 denoms.push(asset);
1067 }
1068 }
1069
1070 txp.denoms.extend(denoms);
1071
1072 txp.address_views = address_views.into_values().collect();
1073
1074 let txv = tx.view_from_perspective(&txp);
1076 let summary = txv.summary();
1077
1078 let response = pb::TransactionInfoByHashResponse {
1079 tx_info: Some(pb::TransactionInfo {
1080 height,
1081 id: Some(tx.id().into()),
1082 perspective: Some(txp.into()),
1083 transaction: Some(tx.into()),
1084 view: Some(txv.into()),
1085 summary: Some(summary.into()),
1086 }),
1087 };
1088
1089 Ok(tonic::Response::new(response))
1090 }
1091
1092 #[instrument(skip_all, level = "trace")]
1093 async fn swap_by_commitment(
1094 &self,
1095 request: tonic::Request<pb::SwapByCommitmentRequest>,
1096 ) -> Result<tonic::Response<pb::SwapByCommitmentResponse>, tonic::Status> {
1097 self.check_worker().await?;
1098
1099 let request = request.into_inner();
1100
1101 let swap_commitment = request
1102 .swap_commitment
1103 .ok_or_else(|| {
1104 tonic::Status::failed_precondition("Missing swap commitment in request")
1105 })?
1106 .try_into()
1107 .map_err(|_| {
1108 tonic::Status::failed_precondition("Invalid swap commitment in request")
1109 })?;
1110
1111 let swap = pb::SwapRecord::from(
1112 self.storage
1113 .swap_by_commitment(swap_commitment, request.await_detection)
1114 .await
1115 .map_err(|e| tonic::Status::internal(format!("error: {e}")))?,
1116 );
1117
1118 Ok(tonic::Response::new(SwapByCommitmentResponse {
1119 swap: Some(swap),
1120 }))
1121 }
1122
1123 #[allow(deprecated)]
1124 #[instrument(skip(self, request))]
1125 async fn balances(
1126 &self,
1127 request: tonic::Request<pb::BalancesRequest>,
1128 ) -> Result<tonic::Response<Self::BalancesStream>, tonic::Status> {
1129 let request = request.into_inner();
1130
1131 let account_filter = request.account_filter.and_then(|x| {
1132 AddressIndex::try_from(x)
1133 .map_err(|_| {
1134 tonic::Status::failed_precondition("Invalid swap commitment in request")
1135 })
1136 .map_or(None, |x| x.into())
1137 });
1138
1139 let asset_id_filter = request.asset_id_filter.and_then(|x| {
1140 asset::Id::try_from(x)
1141 .map_err(|_| {
1142 tonic::Status::failed_precondition("Invalid swap commitment in request")
1143 })
1144 .map_or(None, |x| x.into())
1145 });
1146
1147 let result = self
1148 .storage
1149 .balances(account_filter, asset_id_filter)
1150 .await
1151 .map_err(|e| tonic::Status::internal(format!("error: {e}")))?;
1152
1153 tracing::debug!(?account_filter, ?asset_id_filter, ?result);
1154
1155 let self2 = self.clone();
1156 let stream = try_stream! {
1157 for element in result {
1159 let metadata: Metadata = self2
1160 .asset_metadata_by_id(Request::new(pb::AssetMetadataByIdRequest {
1161 asset_id: Some(element.id.into()),
1162 }))
1163 .await?
1164 .into_inner()
1165 .denom_metadata
1166 .context("denom metadata not found")?
1167 .try_into()?;
1168
1169 let value = Value {
1170 asset_id: element.id,
1171 amount: element.amount.into(),
1172 };
1173
1174 let value_view = value.view_with_denom(metadata)?;
1175
1176 let address: Address = self2
1177 .address_by_index(Request::new(pb::AddressByIndexRequest {
1178 address_index: account_filter.map(Into::into),
1179 }))
1180 .await?
1181 .into_inner()
1182 .address
1183 .context("address not found")?
1184 .try_into()?;
1185
1186 let wallet_id: WalletId = self2
1187 .wallet_id(Request::new(pb::WalletIdRequest {}))
1188 .await?
1189 .into_inner()
1190 .wallet_id
1191 .context("wallet id not found")?
1192 .try_into()?;
1193
1194 let address_view = AddressView::Decoded {
1195 address,
1196 index: element.address_index,
1197 wallet_id,
1198 };
1199
1200 yield pb::BalancesResponse {
1201 account_address: Some(address_view.into()),
1202 balance_view: Some(value_view.into()),
1203 balance: None,
1204 account: None,
1205 }
1206 }
1207 };
1208
1209 Ok(tonic::Response::new(
1210 stream
1211 .map_err(|e: anyhow::Error| {
1212 tonic::Status::unavailable(format!("error getting balances: {e}"))
1213 })
1214 .boxed(),
1215 ))
1216 }
1217
1218 #[instrument(skip_all, level = "trace")]
1219 async fn note_by_commitment(
1220 &self,
1221 request: tonic::Request<pb::NoteByCommitmentRequest>,
1222 ) -> Result<tonic::Response<pb::NoteByCommitmentResponse>, tonic::Status> {
1223 self.check_worker().await?;
1224
1225 let request = request.into_inner();
1226
1227 let note_commitment = request
1228 .note_commitment
1229 .ok_or_else(|| {
1230 tonic::Status::failed_precondition("Missing note commitment in request")
1231 })?
1232 .try_into()
1233 .map_err(|_| {
1234 tonic::Status::failed_precondition("Invalid note commitment in request")
1235 })?;
1236
1237 let spendable_note = pb::SpendableNoteRecord::from(
1238 self.storage
1239 .note_by_commitment(note_commitment, request.await_detection)
1240 .await
1241 .map_err(|e| tonic::Status::internal(format!("error: {e}")))?,
1242 );
1243
1244 Ok(tonic::Response::new(NoteByCommitmentResponse {
1245 spendable_note: Some(spendable_note),
1246 }))
1247 }
1248
1249 #[instrument(skip_all, level = "trace")]
1250 async fn nullifier_status(
1251 &self,
1252 request: tonic::Request<pb::NullifierStatusRequest>,
1253 ) -> Result<tonic::Response<pb::NullifierStatusResponse>, tonic::Status> {
1254 self.check_worker().await?;
1255
1256 let request = request.into_inner();
1257
1258 let nullifier = request
1259 .nullifier
1260 .ok_or_else(|| tonic::Status::failed_precondition("Missing nullifier in request"))?
1261 .try_into()
1262 .map_err(|_| tonic::Status::failed_precondition("Invalid nullifier in request"))?;
1263
1264 Ok(tonic::Response::new(pb::NullifierStatusResponse {
1265 spent: self
1266 .storage
1267 .nullifier_status(nullifier, request.await_detection)
1268 .await
1269 .map_err(|e| tonic::Status::internal(format!("error: {e}")))?,
1270 }))
1271 }
1272
1273 #[instrument(skip_all, level = "trace")]
1274 async fn status(
1275 &self,
1276 _: tonic::Request<pb::StatusRequest>,
1277 ) -> Result<tonic::Response<pb::StatusResponse>, tonic::Status> {
1278 self.check_worker().await?;
1279
1280 Ok(tonic::Response::new(self.status().await.map_err(|e| {
1281 tonic::Status::internal(format!("error: {e}"))
1282 })?))
1283 }
1284
1285 #[instrument(skip_all, level = "trace")]
1286 async fn status_stream(
1287 &self,
1288 _: tonic::Request<pb::StatusStreamRequest>,
1289 ) -> Result<tonic::Response<Self::StatusStreamStream>, tonic::Status> {
1290 self.check_worker().await?;
1291
1292 let (latest_known_block_height, _) = self
1293 .latest_known_block_height()
1294 .await
1295 .tap_err(|error| {
1296 tracing::debug!(
1297 ?error,
1298 "unable to fetch latest known block height from fullnode"
1299 )
1300 })
1301 .map_err(|e| {
1302 tonic::Status::unknown(format!(
1303 "unable to fetch latest known block height from fullnode: {e}"
1304 ))
1305 })?;
1306
1307 let mut sync_height_stream = WatchStream::new(self.sync_height_rx.clone());
1310 let stream = try_stream! {
1311 while let Some(sync_height) = sync_height_stream.next().await {
1312 yield pb::StatusStreamResponse {
1313 latest_known_block_height,
1314 full_sync_height: sync_height,
1315 partial_sync_height: sync_height, };
1317 if sync_height >= latest_known_block_height {
1318 break;
1319 }
1320 }
1321 };
1322
1323 Ok(tonic::Response::new(stream.boxed()))
1324 }
1325
1326 #[instrument(skip_all, level = "trace")]
1327 async fn notes(
1328 &self,
1329 request: tonic::Request<pb::NotesRequest>,
1330 ) -> Result<tonic::Response<Self::NotesStream>, tonic::Status> {
1331 self.check_worker().await?;
1332
1333 let request = request.into_inner();
1334
1335 let include_spent = request.include_spent;
1336 let asset_id = request
1337 .asset_id
1338 .to_owned()
1339 .map(asset::Id::try_from)
1340 .map_or(Ok(None), |v| v.map(Some))
1341 .map_err(|_| tonic::Status::invalid_argument("invalid asset id"))?;
1342 let address_index = request
1343 .address_index
1344 .to_owned()
1345 .map(AddressIndex::try_from)
1346 .map_or(Ok(None), |v| v.map(Some))
1347 .map_err(|_| tonic::Status::invalid_argument("invalid address index"))?;
1348
1349 let amount_to_spend = request
1350 .amount_to_spend
1351 .map(Amount::try_from)
1352 .map_or(Ok(None), |v| v.map(Some))
1353 .map_err(|_| tonic::Status::invalid_argument("invalid amount to spend"))?;
1354
1355 let notes = self
1356 .storage
1357 .notes(include_spent, asset_id, address_index, amount_to_spend)
1358 .await
1359 .map_err(|e| tonic::Status::unavailable(format!("error fetching notes: {e}")))?;
1360
1361 let stream = try_stream! {
1362 for note in notes {
1363 yield pb::NotesResponse {
1364 note_record: Some(note.into()),
1365 }
1366 }
1367 };
1368
1369 Ok(tonic::Response::new(
1370 stream
1371 .map_err(|e: anyhow::Error| {
1372 tonic::Status::unavailable(format!("error getting notes: {e}"))
1373 })
1374 .boxed(),
1375 ))
1376 }
1377
1378 #[instrument(skip_all, level = "trace")]
1379 async fn notes_for_voting(
1380 &self,
1381 request: tonic::Request<pb::NotesForVotingRequest>,
1382 ) -> Result<tonic::Response<Self::NotesForVotingStream>, tonic::Status> {
1383 self.check_worker().await?;
1384
1385 let address_index = request
1386 .get_ref()
1387 .address_index
1388 .to_owned()
1389 .map(AddressIndex::try_from)
1390 .map_or(Ok(None), |v| v.map(Some))
1391 .map_err(|_| tonic::Status::invalid_argument("invalid address index"))?;
1392
1393 let votable_at_height = request.get_ref().votable_at_height;
1394
1395 let notes = self
1396 .storage
1397 .notes_for_voting(address_index, votable_at_height)
1398 .await
1399 .map_err(|e| tonic::Status::unavailable(format!("error fetching notes: {e}")))?;
1400
1401 let stream = try_stream! {
1402 for (note, identity_key) in notes {
1403 yield pb::NotesForVotingResponse {
1404 note_record: Some(note.into()),
1405 identity_key: Some(identity_key.into()),
1406 }
1407 }
1408 };
1409
1410 Ok(tonic::Response::new(
1411 stream
1412 .map_err(|e: anyhow::Error| {
1413 tonic::Status::unavailable(format!("error getting notes: {e}"))
1414 })
1415 .boxed(),
1416 ))
1417 }
1418
1419 #[instrument(skip_all, level = "trace")]
1420 async fn assets(
1421 &self,
1422 request: tonic::Request<pb::AssetsRequest>,
1423 ) -> Result<tonic::Response<Self::AssetsStream>, tonic::Status> {
1424 self.check_worker().await?;
1425
1426 let pb::AssetsRequest {
1427 filtered,
1428 include_specific_denominations,
1429 include_delegation_tokens,
1430 include_unbonding_tokens,
1431 include_lp_nfts,
1432 include_proposal_nfts,
1433 include_voting_receipt_tokens,
1434 } = request.get_ref();
1435
1436 let assets = if !filtered {
1438 self.storage
1439 .all_assets()
1440 .await
1441 .map_err(|e| tonic::Status::unavailable(format!("error fetching assets: {e}")))?
1442 } else {
1443 let mut assets = vec![];
1444 for denom in include_specific_denominations {
1445 if let Some(denom) = asset::REGISTRY.parse_denom(&denom.denom) {
1446 assets.push(denom);
1447 }
1448 }
1449 for (include, pattern) in [
1450 (include_delegation_tokens, "_delegation\\_%"),
1451 (include_unbonding_tokens, "_unbonding\\_%"),
1452 (include_lp_nfts, "lpnft\\_%"),
1453 (include_proposal_nfts, "proposal\\_%"),
1454 (include_voting_receipt_tokens, "voted\\_on\\_%"),
1455 ] {
1456 if *include {
1457 assets.extend(
1458 self.storage
1459 .assets_matching(pattern.to_string())
1460 .await
1461 .map_err(|e| {
1462 tonic::Status::unavailable(format!("error fetching assets: {e}"))
1463 })?,
1464 );
1465 }
1466 }
1467 assets
1468 };
1469
1470 let stream = try_stream! {
1471 for asset in assets {
1472 yield
1473 pb::AssetsResponse {
1474 denom_metadata: Some(asset.into()),
1475 }
1476 }
1477 };
1478
1479 Ok(tonic::Response::new(
1480 stream
1481 .map_err(|e: anyhow::Error| {
1482 tonic::Status::unavailable(format!("error getting assets: {e}"))
1483 })
1484 .boxed(),
1485 ))
1486 }
1487
1488 #[instrument(skip_all, level = "trace")]
1489 async fn transaction_info(
1490 &self,
1491 request: tonic::Request<pb::TransactionInfoRequest>,
1492 ) -> Result<tonic::Response<Self::TransactionInfoStream>, tonic::Status> {
1493 self.check_worker().await?;
1494 let start_height = if request.get_ref().start_height == 0 {
1496 None
1497 } else {
1498 Some(request.get_ref().start_height)
1499 };
1500 let end_height = if request.get_ref().end_height == 0 {
1501 None
1502 } else {
1503 Some(request.get_ref().end_height)
1504 };
1505
1506 let txs = self
1508 .storage
1509 .transactions(start_height, end_height)
1510 .await
1511 .map_err(|e| tonic::Status::unavailable(format!("error fetching transactions: {e}")))?;
1512
1513 let self2 = self.clone();
1514 let stream = try_stream! {
1515 for tx in txs {
1516
1517 let rsp = self2.transaction_info_by_hash(tonic::Request::new(pb::TransactionInfoByHashRequest {
1518 id: Some(tx.2.id().into()),
1519 })).await?.into_inner();
1520
1521 yield pb::TransactionInfoResponse {
1522 tx_info: rsp.tx_info,
1523 }
1524 }
1525 };
1526
1527 Ok(tonic::Response::new(
1528 stream
1529 .map_err(|e: anyhow::Error| {
1530 tonic::Status::unavailable(format!("error getting transactions: {e}"))
1531 })
1532 .boxed(),
1533 ))
1534 }
1535
1536 #[instrument(skip_all, level = "trace")]
1537 async fn witness(
1538 &self,
1539 request: tonic::Request<pb::WitnessRequest>,
1540 ) -> Result<tonic::Response<WitnessResponse>, tonic::Status> {
1541 self.check_worker().await?;
1542
1543 let sct = self.state_commitment_tree.read().await;
1546
1547 let anchor = sct.root();
1549
1550 let tx_plan: TransactionPlan =
1552 request
1553 .get_ref()
1554 .to_owned()
1555 .transaction_plan
1556 .map_or(TransactionPlan::default(), |x| {
1557 x.try_into()
1558 .expect("TransactionPlan should exist in request")
1559 });
1560
1561 let requested_note_commitments: Vec<StateCommitment> = tx_plan
1562 .spend_plans()
1563 .filter(|plan| plan.note.amount() != 0u64.into())
1564 .map(|spend| spend.note.commit().into())
1565 .chain(
1566 tx_plan
1567 .swap_claim_plans()
1568 .map(|swap_claim| swap_claim.swap_plaintext.swap_commitment().into()),
1569 )
1570 .chain(
1571 tx_plan
1572 .delegator_vote_plans()
1573 .map(|vote_plan| vote_plan.staked_note.commit().into()),
1574 )
1575 .collect();
1576
1577 tracing::debug!(?requested_note_commitments);
1578
1579 let auth_paths: Vec<Proof> = requested_note_commitments
1580 .iter()
1581 .map(|nc| {
1582 sct.witness(*nc).ok_or_else(|| {
1583 tonic::Status::new(tonic::Code::InvalidArgument, "Note commitment missing")
1584 })
1585 })
1586 .collect::<Result<Vec<Proof>, tonic::Status>>()?;
1587
1588 drop(sct);
1590
1591 let mut witness_data = WitnessData {
1592 anchor,
1593 state_commitment_proofs: auth_paths
1594 .into_iter()
1595 .map(|proof| (proof.commitment(), proof))
1596 .collect(),
1597 };
1598
1599 tracing::debug!(?witness_data);
1600
1601 for nc in tx_plan
1604 .spend_plans()
1605 .filter(|plan| plan.note.amount() == 0u64.into())
1606 .map(|plan| plan.note.commit())
1607 {
1608 witness_data.add_proof(nc, Proof::dummy(&mut OsRng, nc));
1609 }
1610
1611 let witness_response = WitnessResponse {
1612 witness_data: Some(witness_data.into()),
1613 };
1614 Ok(tonic::Response::new(witness_response))
1615 }
1616
1617 #[instrument(skip_all, level = "trace")]
1618 async fn witness_and_build(
1619 &self,
1620 request: tonic::Request<pb::WitnessAndBuildRequest>,
1621 ) -> Result<tonic::Response<Self::WitnessAndBuildStream>, tonic::Status> {
1622 let pb::WitnessAndBuildRequest {
1623 transaction_plan,
1624 authorization_data,
1625 } = request.into_inner();
1626
1627 let transaction_plan: TransactionPlan = transaction_plan
1628 .ok_or_else(|| tonic::Status::invalid_argument("missing transaction plan"))?
1629 .try_into()
1630 .map_err(|e: anyhow::Error| e.context("could not decode transaction plan"))
1631 .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1632
1633 let authorization_data: AuthorizationData = authorization_data
1634 .ok_or_else(|| tonic::Status::invalid_argument("missing authorization data"))?
1635 .try_into()
1636 .map_err(|e: anyhow::Error| e.context("could not decode authorization data"))
1637 .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1638
1639 let witness_request = pb::WitnessRequest {
1640 transaction_plan: Some(transaction_plan.clone().into()),
1641 };
1642
1643 let witness_data: WitnessData = self
1644 .witness(tonic::Request::new(witness_request))
1645 .await?
1646 .into_inner()
1647 .witness_data
1648 .ok_or_else(|| tonic::Status::invalid_argument("missing witness data"))?
1649 .try_into()
1650 .map_err(|e: anyhow::Error| e.context("could not decode witness data"))
1651 .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1652
1653 let fvk =
1654 self.storage.full_viewing_key().await.map_err(|_| {
1655 tonic::Status::failed_precondition("Error retrieving full viewing key")
1656 })?;
1657
1658 let transaction = Some(
1659 transaction_plan
1660 .build(&fvk, &witness_data, &authorization_data)
1663 .map_err(|_| tonic::Status::failed_precondition("Error building transaction"))?
1664 .into(),
1665 );
1666
1667 let stream = try_stream! {
1668 yield pb::WitnessAndBuildResponse {
1669 status: Some(pb::witness_and_build_response::Status::Complete(
1670 pb::witness_and_build_response::Complete { transaction },
1671 )),
1672 }
1673 };
1674
1675 Ok(tonic::Response::new(
1676 stream
1677 .map_err(|e: anyhow::Error| {
1678 tonic::Status::unavailable(format!("error witnessing transaction: {e}"))
1679 })
1680 .boxed(),
1681 ))
1682 }
1683
1684 #[instrument(skip_all, level = "trace")]
1685 async fn app_parameters(
1686 &self,
1687 _request: tonic::Request<pb::AppParametersRequest>,
1688 ) -> Result<tonic::Response<pb::AppParametersResponse>, tonic::Status> {
1689 self.check_worker().await?;
1690
1691 let parameters =
1692 self.storage.app_params().await.map_err(|e| {
1693 tonic::Status::unavailable(format!("error getting app params: {e}"))
1694 })?;
1695
1696 let response = AppParametersResponse {
1697 parameters: Some(parameters.into()),
1698 };
1699
1700 Ok(tonic::Response::new(response))
1701 }
1702
1703 #[instrument(skip_all, level = "trace")]
1704 async fn gas_prices(
1705 &self,
1706 _request: tonic::Request<pb::GasPricesRequest>,
1707 ) -> Result<tonic::Response<pb::GasPricesResponse>, tonic::Status> {
1708 self.check_worker().await?;
1709
1710 let gas_prices =
1711 self.storage.gas_prices().await.map_err(|e| {
1712 tonic::Status::unavailable(format!("error getting gas prices: {e}"))
1713 })?;
1714
1715 let response = GasPricesResponse {
1716 gas_prices: Some(gas_prices.into()),
1717 alt_gas_prices: Vec::new(),
1718 };
1719
1720 Ok(tonic::Response::new(response))
1721 }
1722
1723 #[instrument(skip_all, level = "trace")]
1724 async fn fmd_parameters(
1725 &self,
1726 _request: tonic::Request<pb::FmdParametersRequest>,
1727 ) -> Result<tonic::Response<pb::FmdParametersResponse>, tonic::Status> {
1728 self.check_worker().await?;
1729
1730 let parameters =
1731 self.storage.fmd_parameters().await.map_err(|e| {
1732 tonic::Status::unavailable(format!("error getting FMD params: {e}"))
1733 })?;
1734
1735 let response = FmdParametersResponse {
1736 parameters: Some(parameters.into()),
1737 };
1738
1739 Ok(tonic::Response::new(response))
1740 }
1741
1742 #[instrument(skip_all, level = "trace")]
1743 async fn owned_position_ids(
1744 &self,
1745 request: tonic::Request<pb::OwnedPositionIdsRequest>,
1746 ) -> Result<tonic::Response<Self::OwnedPositionIdsStream>, tonic::Status> {
1747 self.check_worker().await?;
1748
1749 let pb::OwnedPositionIdsRequest {
1750 position_state,
1751 trading_pair,
1752 subaccount: _,
1753 } = request.into_inner();
1754
1755 let position_state: Option<position::State> = position_state
1756 .map(|state| state.try_into())
1757 .transpose()
1758 .map_err(|e: anyhow::Error| e.context("could not decode position state"))
1759 .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1760
1761 let trading_pair: Option<TradingPair> = trading_pair
1762 .map(|pair| pair.try_into())
1763 .transpose()
1764 .map_err(|e: anyhow::Error| e.context("could not decode trading pair"))
1765 .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1766
1767 let ids = self
1768 .storage
1769 .owned_position_ids(position_state, trading_pair)
1770 .await
1771 .map_err(|e| tonic::Status::unavailable(format!("error getting position ids: {e}")))?;
1772
1773 let stream = try_stream! {
1774 for id in ids {
1775 yield pb::OwnedPositionIdsResponse{
1776 position_id: Some(id.into()),
1777 subaccount: None,
1780 }
1781 }
1782 };
1783
1784 Ok(tonic::Response::new(
1785 stream
1786 .map_err(|e: anyhow::Error| {
1787 tonic::Status::unavailable(format!("error getting position ids: {e}"))
1788 })
1789 .boxed(),
1790 ))
1791 }
1792
1793 #[instrument(skip_all, level = "trace")]
1794 async fn authorize_and_build(
1795 &self,
1796 _request: tonic::Request<pb::AuthorizeAndBuildRequest>,
1797 ) -> Result<tonic::Response<Self::AuthorizeAndBuildStream>, tonic::Status> {
1798 unimplemented!("authorize_and_build")
1799 }
1800
1801 #[instrument(skip_all, level = "trace")]
1802 async fn unclaimed_swaps(
1803 &self,
1804 _: tonic::Request<pb::UnclaimedSwapsRequest>,
1805 ) -> Result<tonic::Response<Self::UnclaimedSwapsStream>, tonic::Status> {
1806 self.check_worker().await?;
1807
1808 let swaps = self.storage.unclaimed_swaps().await.map_err(|e| {
1809 tonic::Status::unavailable(format!("error fetching unclaimed swaps: {e}"))
1810 })?;
1811
1812 let stream = try_stream! {
1813 for swap in swaps {
1814 yield pb::UnclaimedSwapsResponse{
1815 swap: Some(swap.into()),
1816 }
1817 }
1818 };
1819
1820 Ok(tonic::Response::new(
1821 stream
1822 .map_err(|e: anyhow::Error| {
1823 tonic::Status::unavailable(format!("error getting unclaimed swaps: {e}"))
1824 })
1825 .boxed(),
1826 ))
1827 }
1828
1829 #[instrument(skip_all, level = "trace")]
1830 async fn wallet_id(
1831 &self,
1832 _: Request<WalletIdRequest>,
1833 ) -> Result<Response<WalletIdResponse>, Status> {
1834 let fvk = self.storage.full_viewing_key().await.map_err(|e| {
1835 Status::failed_precondition(format!("Error retrieving full viewing key: {e}"))
1836 })?;
1837
1838 Ok(Response::new(WalletIdResponse {
1839 wallet_id: Some(fvk.wallet_id().into()),
1840 }))
1841 }
1842
1843 #[instrument(skip_all, level = "trace")]
1844 async fn asset_metadata_by_id(
1845 &self,
1846 request: Request<AssetMetadataByIdRequest>,
1847 ) -> Result<Response<AssetMetadataByIdResponse>, Status> {
1848 let asset_id = request
1849 .into_inner()
1850 .asset_id
1851 .ok_or_else(|| Status::invalid_argument("missing asset id"))?
1852 .try_into()
1853 .map_err(|e| Status::invalid_argument(format!("{e:#}")))?;
1854
1855 let metadata = self
1856 .storage
1857 .asset_by_id(&asset_id)
1858 .await
1859 .map_err(|e| Status::internal(format!("Error retrieving asset by id: {e:#}")))?;
1860
1861 Ok(Response::new(AssetMetadataByIdResponse {
1862 denom_metadata: metadata.map(Into::into),
1863 }))
1864 }
1865
1866 #[instrument(skip_all, level = "trace")]
1867 async fn delegations_by_address_index(
1868 &self,
1869 _request: tonic::Request<pb::DelegationsByAddressIndexRequest>,
1870 ) -> Result<tonic::Response<Self::DelegationsByAddressIndexStream>, tonic::Status> {
1871 unimplemented!("delegations_by_address_index")
1872 }
1873
1874 #[instrument(skip_all, level = "trace")]
1875 async fn unbonding_tokens_by_address_index(
1876 &self,
1877 _request: tonic::Request<pb::UnbondingTokensByAddressIndexRequest>,
1878 ) -> Result<tonic::Response<Self::UnbondingTokensByAddressIndexStream>, tonic::Status> {
1879 unimplemented!("unbonding_tokens_by_address_index currently only implemented on web")
1880 }
1881
1882 #[instrument(skip_all, level = "trace")]
1883 async fn latest_swaps(
1884 &self,
1885 _request: tonic::Request<pb::LatestSwapsRequest>,
1886 ) -> Result<tonic::Response<Self::LatestSwapsStream>, tonic::Status> {
1887 unimplemented!("latest_swaps currently only implemented on web")
1888 }
1889
1890 #[instrument(skip_all, level = "trace")]
1891 async fn tournament_votes(
1892 &self,
1893 _request: tonic::Request<pb::TournamentVotesRequest>,
1894 ) -> Result<tonic::Response<Self::TournamentVotesStream>, tonic::Status> {
1895 unimplemented!("tournament_votes currently only implemented on web")
1896 }
1897
1898 #[instrument(skip_all, level = "trace")]
1899 async fn lqt_voting_notes(
1900 &self,
1901 request: tonic::Request<pb::LqtVotingNotesRequest>,
1902 ) -> Result<tonic::Response<Self::LqtVotingNotesStream>, tonic::Status> {
1903 async fn inner(
1904 this: &ViewServer,
1905 epoch: u64,
1906 filter: Option<AddressIndex>,
1907 ) -> anyhow::Result<Vec<(SpendableNoteRecord, IdentityKey)>> {
1908 let (_, start_height) = this.storage.get_epoch(epoch).await?;
1909 let start_height =
1910 start_height.ok_or_else(|| anyhow!("missing height for epoch {epoch}"))?;
1911 let notes = this.storage.notes_for_voting(filter, start_height).await?;
1912 Ok(notes)
1913 }
1914
1915 let request = request.into_inner();
1916 let epoch = request.epoch_index;
1917 let filter = request
1918 .account_filter
1919 .map(|x| AddressIndex::try_from(x))
1920 .transpose()
1921 .map_err(|_| tonic::Status::invalid_argument("invalid account filter"))?;
1922 let notes = inner(self, epoch, filter).await.map_err(|e| {
1923 tonic::Status::internal(format!("error fetching voting notes: {:#}", e))
1924 })?;
1925 let stream = tokio_stream::iter(notes.into_iter().map(|(note, _)| {
1926 Result::<_, tonic::Status>::Ok(pb::LqtVotingNotesResponse {
1927 note_record: Some(note.into()),
1928 already_voted: false,
1929 })
1930 }));
1931 Ok(tonic::Response::new(stream.boxed()))
1932 }
1933
1934 #[instrument(skip_all, level = "trace")]
1935 async fn lp_position_bundle(
1936 &self,
1937 _request: tonic::Request<pb::LpPositionBundleRequest>,
1938 ) -> Result<tonic::Response<Self::LpPositionBundleStream>, tonic::Status> {
1939 unimplemented!("lp_position_bundle currently only implemented on web")
1940 }
1941
1942 #[instrument(skip_all, level = "trace")]
1943 async fn lp_strategy_catalog(
1944 &self,
1945 _request: tonic::Request<pb::LpStrategyCatalogRequest>,
1946 ) -> Result<tonic::Response<Self::LpStrategyCatalogStream>, tonic::Status> {
1947 unimplemented!("lp_strategy_catalog currently only implemented on web")
1948 }
1949}
1950
1951async fn get_pd_endpoint(node: Url) -> anyhow::Result<Endpoint> {
1955 let endpoint = match node.scheme() {
1956 "http" => Channel::from_shared(node.to_string())?,
1957 "https" => Channel::from_shared(node.to_string())?
1958 .tls_config(ClientTlsConfig::new().with_webpki_roots())?,
1959 other => anyhow::bail!("unknown url scheme {other}"),
1960 };
1961 Ok(endpoint)
1962}