penumbra_sdk_view/
service.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    pin::Pin,
4    sync::{Arc, Mutex},
5};
6
7use anyhow::{anyhow, Context};
8use async_stream::try_stream;
9use camino::Utf8Path;
10use decaf377::Fq;
11use futures::stream::{self, StreamExt, TryStreamExt};
12use penumbra_sdk_auction::auction::dutch::actions::view::{
13    ActionDutchAuctionScheduleView, ActionDutchAuctionWithdrawView,
14};
15use rand::Rng;
16use rand_core::OsRng;
17use tap::{Tap, TapFallible};
18use tokio::sync::{watch, RwLock};
19use tokio_stream::wrappers::WatchStream;
20use tonic::transport::channel::ClientTlsConfig;
21use tonic::transport::channel::Endpoint;
22use tonic::{async_trait, transport::Channel, Request, Response, Status};
23use tracing::{instrument, Instrument};
24use url::Url;
25
26use penumbra_sdk_asset::{asset, asset::Metadata, Value};
27use penumbra_sdk_dex::{
28    lp::{
29        position::{self, Position},
30        Reserves,
31    },
32    swap_claim::SwapClaimPlan,
33    TradingPair,
34};
35use penumbra_sdk_fee::Fee;
36use penumbra_sdk_keys::{
37    keys::WalletId,
38    keys::{AddressIndex, FullViewingKey},
39    Address, AddressView,
40};
41use penumbra_sdk_num::Amount;
42use penumbra_sdk_proto::{
43    util::tendermint_proxy::v1::{
44        tendermint_proxy_service_client::TendermintProxyServiceClient, BroadcastTxSyncRequest,
45        GetStatusRequest, GetStatusResponse, SyncInfo,
46    },
47    view::v1::{
48        self as pb,
49        broadcast_transaction_response::{BroadcastSuccess, Confirmed, Status as BroadcastStatus},
50        view_service_client::ViewServiceClient,
51        view_service_server::{ViewService, ViewServiceServer},
52        AppParametersResponse, AssetMetadataByIdRequest, AssetMetadataByIdResponse,
53        BroadcastTransactionResponse, FmdParametersResponse, GasPricesResponse,
54        NoteByCommitmentResponse, StatusResponse, SwapByCommitmentResponse,
55        TransactionPlannerResponse, WalletIdRequest, WalletIdResponse, WitnessResponse,
56    },
57    DomainType,
58};
59use penumbra_sdk_stake::rate::RateData;
60use penumbra_sdk_tct::{Proof, StateCommitment};
61use penumbra_sdk_transaction::{
62    AuthorizationData, Transaction, TransactionPerspective, TransactionPlan, WitnessData,
63};
64
65use crate::{worker::Worker, Planner, Storage};
66
67/// A [`futures::Stream`] of broadcast transaction responses.
68///
69/// See [`ViewService::broadcast_transaction()`].
70type BroadcastTransactionStream = Pin<
71    Box<dyn futures::Stream<Item = Result<pb::BroadcastTransactionResponse, tonic::Status>> + Send>,
72>;
73
74/// A service that synchronizes private chain state and responds to queries
75/// about it.
76///
77/// The [`ViewServer`] implements the Tonic-derived [`ViewService`] trait,
78/// so it can be used as a gRPC server, or called directly.  It spawns a task
79/// internally that performs synchronization and scanning.  The
80/// [`ViewServer`] can be cloned; each clone will read from the same shared
81/// state, but there will only be a single scanning task.
82#[derive(Clone)]
83pub struct ViewServer {
84    storage: Storage,
85    // A shared error slot for errors bubbled up by the worker. This is a regular Mutex
86    // rather than a Tokio Mutex because it should be uncontended.
87    error_slot: Arc<Mutex<Option<anyhow::Error>>>,
88    // A copy of the SCT used by the worker task.
89    state_commitment_tree: Arc<RwLock<penumbra_sdk_tct::Tree>>,
90    // The Url for the pd gRPC endpoint on remote node.
91    node: Url,
92    /// Used to watch for changes to the sync height.
93    sync_height_rx: watch::Receiver<u64>,
94}
95
96impl ViewServer {
97    /// Convenience method that calls [`Storage::load_or_initialize`] and then [`Self::new`].
98    pub async fn load_or_initialize(
99        storage_path: Option<impl AsRef<Utf8Path>>,
100        registry_path: Option<impl AsRef<Utf8Path>>,
101        fvk: &FullViewingKey,
102        node: Url,
103    ) -> anyhow::Result<Self> {
104        let storage = Storage::load_or_initialize(storage_path, fvk, node.clone())
105            .tap(|_| tracing::trace!("loading or initializing storage"))
106            .await?
107            .tap(|_| tracing::debug!("storage is ready"));
108
109        if let Some(registry_path) = registry_path {
110            storage.load_asset_metadata(registry_path).await?;
111        }
112
113        Self::new(storage, node)
114            .tap(|_| tracing::trace!("constructing view server"))
115            .await
116            .tap(|_| tracing::debug!("constructed view server"))
117    }
118
119    /// Constructs a new [`ViewService`], spawning a sync task internally.
120    ///
121    /// The sync task uses the provided `client` to sync with the chain.
122    ///
123    /// To create multiple [`ViewService`]s, clone the [`ViewService`] returned
124    /// by this method, rather than calling it multiple times.  That way, each clone
125    /// will be backed by the same scanning task, rather than each spawning its own.
126    pub async fn new(storage: Storage, node: Url) -> anyhow::Result<Self> {
127        let span = tracing::error_span!(parent: None, "view");
128        let channel = Self::get_pd_channel(node.clone()).await?;
129
130        let (worker, state_commitment_tree, error_slot, sync_height_rx) =
131            Worker::new(storage.clone(), channel)
132                .instrument(span.clone())
133                .tap(|_| tracing::trace!("constructing view server worker"))
134                .await?
135                .tap(|_| tracing::debug!("constructed view server worker"));
136
137        tokio::spawn(worker.run().instrument(span))
138            .tap(|_| tracing::debug!("spawned view server worker"));
139
140        Ok(Self {
141            storage,
142            error_slot,
143            sync_height_rx,
144            state_commitment_tree,
145            node,
146        })
147    }
148
149    /// Obtain a Tonic [Channel] to a remote `pd` endpoint.
150    ///
151    /// Provided as a convenience method for bootstrapping a connection.
152    /// Handles configuring TLS if the URL is HTTPS. Also adds a tracing span
153    /// to the working [Channel].
154    pub async fn get_pd_channel(node: Url) -> anyhow::Result<Channel> {
155        let endpoint = get_pd_endpoint(node).await?;
156        let span = tracing::error_span!(parent: None, "view");
157        let c: Channel = endpoint
158            .connect()
159            .instrument(span.clone())
160            .await
161            .with_context(|| "could not connect to grpc server")
162            .tap_err(|error| tracing::error!(?error, "could not connect to grpc server"))?;
163
164        Ok(c)
165    }
166
167    /// Checks if the view server worker has encountered an error.
168    ///
169    /// This function returns a gRPC [`tonic::Status`] containing the view server worker error if
170    /// any exists, otherwise it returns `Ok(())`.
171    #[instrument(level = "debug", skip_all)]
172    async fn check_worker(&self) -> Result<(), tonic::Status> {
173        // If the shared error slot is set, then an error has occurred in the worker
174        // that we should bubble up.
175        tracing::debug!("checking view server worker");
176        if let Some(error) = self
177            .error_slot
178            .lock()
179            .tap_err(|error| tracing::error!(?error, "unable to lock worker error slot"))
180            .map_err(|e| {
181                tonic::Status::unavailable(format!("unable to lock worker error slot {:#}", e))
182            })?
183            .as_ref()
184        {
185            return Err(tonic::Status::new(
186                tonic::Code::Internal,
187                format!("Worker failed: {error}"),
188            ));
189        }
190
191        // TODO: check whether the worker is still alive, else fail, when we have a way to do that
192        // (if the worker is to crash without setting the error_slot, the service should die as well)
193
194        Ok(()).tap(|_| tracing::trace!("view server worker is healthy"))
195    }
196
197    #[instrument(skip(self, transaction), fields(id = %transaction.id()))]
198    fn broadcast_transaction(
199        &self,
200        transaction: Transaction,
201        await_detection: bool,
202    ) -> BroadcastTransactionStream {
203        let self2 = self.clone();
204        try_stream! {
205                // 1. Broadcast the transaction to the network.
206                // Note that "synchronous" here means "wait for the tx to be accepted by
207                // the fullnode", not "wait for the tx to be included on chain.
208                let mut fullnode_client = self2.tendermint_proxy_client().await
209                            .map_err(|e| {
210                                tonic::Status::unavailable(format!(
211                                    "couldn't connect to fullnode: {:#?}",
212                                    e
213                                ))
214                            })?
215                        ;
216                let node_rsp = fullnode_client
217                    .broadcast_tx_sync(BroadcastTxSyncRequest {
218                        params: transaction.encode_to_vec(),
219                        req_id: OsRng.gen(),
220                    })
221                    .await
222                    .map_err(|e| {
223                        tonic::Status::unavailable(format!(
224                            "error broadcasting tx: {:#?}",
225                            e
226                        ))
227                    })?
228                    .into_inner();
229                tracing::info!(?node_rsp);
230                match node_rsp.code {
231                    0 => Ok(()),
232                    _ => Err(tonic::Status::new(
233                        tonic::Code::Internal,
234                        format!(
235                            "Error submitting transaction: code {}, log: {}",
236                            node_rsp.code,
237                            node_rsp.log,
238                        ),
239                    )),
240                }?;
241
242                // The transaction was submitted so we provide a status update
243                yield BroadcastTransactionResponse{ status: Some(BroadcastStatus::BroadcastSuccess(BroadcastSuccess{id:Some(transaction.id().into())}))};
244
245                // 2. Optionally wait for the transaction to be detected by the view service.
246                let nullifier = if await_detection {
247                    // This needs to be only *spend* nullifiers because the nullifier detection
248                    // is broken for swaps, https://github.com/penumbra-zone/penumbra/issues/1749
249                    //
250                    // in the meantime, inline the definition from `Transaction`
251                    transaction
252                        .actions()
253                        .filter_map(|action| match action {
254                            penumbra_sdk_transaction::Action::Spend(spend) => Some(spend.body.nullifier),
255                            /*
256                            penumbra_sdk_transaction::Action::SwapClaim(swap_claim) => {
257                                Some(swap_claim.body.nullifier)
258                            }
259                             */
260                            _ => None,
261                        })
262                        .next()
263                } else {
264                    None
265                };
266
267                if let Some(nullifier) = nullifier {
268                    tracing::info!(?nullifier, "waiting for detection of nullifier");
269                    let detection = self2.storage.nullifier_status(nullifier, true);
270                    tokio::time::timeout(std::time::Duration::from_secs(20), detection)
271                        .await
272                        .map_err(|_| {
273                            tonic::Status::unavailable(
274                                "timeout waiting to detect nullifier of submitted transaction"
275                            )
276                        })?
277                        .map_err(|_| {
278                            tonic::Status::unavailable(
279                                "error while waiting for detection of submitted transaction"
280                            )
281                        })?;
282                }
283
284                let detection_height = self2.storage
285                    .transaction_by_hash(&transaction.id().0)
286                    .await
287                    .map_err(|e| tonic::Status::internal(format!("error querying storage: {:#}", e)))?
288                    .map(|(height, _tx)| height)
289                    // If we didn't find it for some reason, return 0 for unknown.
290                    // TODO: how does this change if we detach extended transaction fetch from scanning?
291                    .unwrap_or(0);
292                yield BroadcastTransactionResponse{ status: Some(BroadcastStatus::Confirmed(Confirmed{id:Some(transaction.id().into()), detection_height}))};
293            }.boxed()
294    }
295
296    #[instrument(level = "trace", skip(self))]
297    async fn tendermint_proxy_client(
298        &self,
299    ) -> anyhow::Result<TendermintProxyServiceClient<Channel>> {
300        TendermintProxyServiceClient::connect(self.node.to_string())
301            .tap(|_| tracing::debug!("connecting to tendermint proxy"))
302            .await
303            .tap_err(|error| tracing::error!(?error, "failed to connect to tendermint proxy"))
304            .map_err(anyhow::Error::from)
305    }
306
307    /// Return the latest block height known by the fullnode or its peers, as
308    /// well as whether the fullnode is caught up with that height.
309    #[instrument(skip(self))]
310    pub async fn latest_known_block_height(&self) -> anyhow::Result<(u64, bool)> {
311        let mut client = self.tendermint_proxy_client().await?;
312
313        let GetStatusResponse { sync_info, .. } = client
314            .get_status(GetStatusRequest {})
315            .tap(|_| tracing::debug!("querying current status"))
316            .await
317            .tap_err(|error| tracing::debug!(?error, "failed to query current status"))?
318            .into_inner();
319
320        let SyncInfo {
321            latest_block_height,
322            catching_up,
323            ..
324        } = sync_info
325            .ok_or_else(|| anyhow::anyhow!("could not parse sync_info in gRPC response"))?;
326
327        // There is a `max_peer_block_height` available in TM 0.35, however it should not be used
328        // as it does not seem to reflect the consensus height. Since clients use `latest_known_block_height`
329        // to determine the height to attempt syncing to, a validator reporting a non-consensus height
330        // can cause a DoS to clients attempting to sync if `max_peer_block_height` is used.
331        let latest_known_block_height = latest_block_height;
332
333        tracing::debug!(
334            ?latest_block_height,
335            ?catching_up,
336            ?latest_known_block_height,
337            "found latest known block height"
338        );
339
340        Ok((latest_known_block_height, catching_up))
341    }
342
343    #[instrument(skip(self))]
344    pub async fn status(&self) -> anyhow::Result<StatusResponse> {
345        let full_sync_height = self.storage.last_sync_height().await?.unwrap_or(0);
346
347        let (latest_known_block_height, node_catching_up) =
348            self.latest_known_block_height().await?;
349
350        let height_diff = latest_known_block_height
351            .checked_sub(full_sync_height)
352            .ok_or_else(|| anyhow!("sync height ahead of node height"))?;
353
354        let catching_up = match (node_catching_up, height_diff) {
355            // We're synced to the same height as the node
356            (false, 0) => false,
357            // We're one block behind, and will learn about it soon, close enough
358            (false, 1) => false,
359            // We're behind the node
360            (false, _) => true,
361            // The node is behind the network
362            (true, _) => true,
363        };
364
365        Ok(StatusResponse {
366            full_sync_height,
367            catching_up,
368            partial_sync_height: full_sync_height, // Set these as the same for backwards compatibility following adding the partial_sync_height
369        })
370    }
371}
372
373#[async_trait]
374impl ViewService for ViewServer {
375    type NotesStream =
376        Pin<Box<dyn futures::Stream<Item = Result<pb::NotesResponse, tonic::Status>> + Send>>;
377    type NotesForVotingStream = Pin<
378        Box<dyn futures::Stream<Item = Result<pb::NotesForVotingResponse, tonic::Status>> + Send>,
379    >;
380    type AssetsStream =
381        Pin<Box<dyn futures::Stream<Item = Result<pb::AssetsResponse, tonic::Status>> + Send>>;
382    type StatusStreamStream = Pin<
383        Box<dyn futures::Stream<Item = Result<pb::StatusStreamResponse, tonic::Status>> + Send>,
384    >;
385    type TransactionInfoStream = Pin<
386        Box<dyn futures::Stream<Item = Result<pb::TransactionInfoResponse, tonic::Status>> + Send>,
387    >;
388    type BalancesStream =
389        Pin<Box<dyn futures::Stream<Item = Result<pb::BalancesResponse, tonic::Status>> + Send>>;
390    type OwnedPositionIdsStream = Pin<
391        Box<dyn futures::Stream<Item = Result<pb::OwnedPositionIdsResponse, tonic::Status>> + Send>,
392    >;
393    type UnclaimedSwapsStream = Pin<
394        Box<dyn futures::Stream<Item = Result<pb::UnclaimedSwapsResponse, tonic::Status>> + Send>,
395    >;
396    type BroadcastTransactionStream = BroadcastTransactionStream;
397    type WitnessAndBuildStream = Pin<
398        Box<dyn futures::Stream<Item = Result<pb::WitnessAndBuildResponse, tonic::Status>> + Send>,
399    >;
400    type AuthorizeAndBuildStream = Pin<
401        Box<
402            dyn futures::Stream<Item = Result<pb::AuthorizeAndBuildResponse, tonic::Status>> + Send,
403        >,
404    >;
405    type DelegationsByAddressIndexStream = Pin<
406        Box<
407            dyn futures::Stream<Item = Result<pb::DelegationsByAddressIndexResponse, tonic::Status>>
408                + Send,
409        >,
410    >;
411    type UnbondingTokensByAddressIndexStream = Pin<
412        Box<
413            dyn futures::Stream<
414                    Item = Result<pb::UnbondingTokensByAddressIndexResponse, tonic::Status>,
415                > + Send,
416        >,
417    >;
418    type AuctionsStream =
419        Pin<Box<dyn futures::Stream<Item = Result<pb::AuctionsResponse, tonic::Status>> + Send>>;
420    type LatestSwapsStream =
421        Pin<Box<dyn futures::Stream<Item = Result<pb::LatestSwapsResponse, tonic::Status>> + Send>>;
422
423    #[instrument(skip_all, level = "trace")]
424    async fn auctions(
425        &self,
426        request: tonic::Request<pb::AuctionsRequest>,
427    ) -> Result<tonic::Response<Self::AuctionsStream>, tonic::Status> {
428        use penumbra_sdk_proto::core::component::auction::v1 as pb_auction;
429        use penumbra_sdk_proto::core::component::auction::v1::query_service_client::QueryServiceClient as AuctionQueryServiceClient;
430
431        let parameters = request.into_inner();
432        let query_latest_state = parameters.query_latest_state;
433        let include_inactive = parameters.include_inactive;
434
435        let account_filter = parameters
436            .account_filter
437            .to_owned()
438            .map(AddressIndex::try_from)
439            .map_or(Ok(None), |v| v.map(Some))
440            .map_err(|_| tonic::Status::invalid_argument("invalid account filter"))?;
441
442        let all_auctions = self
443            .storage
444            .fetch_auctions_by_account(account_filter, include_inactive)
445            .await
446            .map_err(|e| tonic::Status::internal(e.to_string()))?;
447
448        let client = if query_latest_state {
449            Some(
450                AuctionQueryServiceClient::connect(self.node.to_string())
451                    .await
452                    .map_err(|e| tonic::Status::internal(e.to_string()))?,
453            )
454        } else {
455            None
456        };
457
458        let responses = futures::future::join_all(all_auctions.into_iter().map(
459            |(auction_id, note_record, local_seq)| {
460                let maybe_client = client.clone();
461                async move {
462                    let (any_state, positions) = if let Some(mut client2) = maybe_client {
463                        let extra_data = client2
464                            .auction_state_by_id(pb_auction::AuctionStateByIdRequest {
465                                id: Some(auction_id.into()),
466                            })
467                            .await
468                            .map_err(|e| tonic::Status::internal(e.to_string()))?
469                            .into_inner();
470                        (extra_data.auction, extra_data.positions)
471                    } else {
472                        (None, vec![])
473                    };
474
475                    Result::<_, tonic::Status>::Ok(pb::AuctionsResponse {
476                        id: Some(auction_id.into()),
477                        note_record: Some(note_record.into()),
478                        auction: any_state,
479                        positions,
480                        local_seq,
481                    })
482                }
483            },
484        ))
485        .await;
486
487        let stream = stream::iter(responses)
488            .map_err(|e| tonic::Status::internal(format!("error getting auction: {e}")))
489            .boxed();
490
491        Ok(Response::new(stream))
492    }
493
494    #[instrument(skip_all, level = "trace")]
495    async fn broadcast_transaction(
496        &self,
497        request: tonic::Request<pb::BroadcastTransactionRequest>,
498    ) -> Result<tonic::Response<Self::BroadcastTransactionStream>, tonic::Status> {
499        let pb::BroadcastTransactionRequest {
500            transaction,
501            await_detection,
502        } = request.into_inner();
503
504        let transaction: Transaction = transaction
505            .ok_or_else(|| tonic::Status::invalid_argument("missing transaction"))?
506            .try_into()
507            .map_err(|e: anyhow::Error| e.context("could not decode transaction"))
508            .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
509
510        let stream = self.broadcast_transaction(transaction, await_detection);
511
512        Ok(tonic::Response::new(stream))
513    }
514
515    #[instrument(skip_all, level = "trace")]
516    async fn transaction_planner(
517        &self,
518        request: tonic::Request<pb::TransactionPlannerRequest>,
519    ) -> Result<tonic::Response<pb::TransactionPlannerResponse>, tonic::Status> {
520        let prq = request.into_inner();
521
522        let app_params =
523            self.storage.app_params().await.map_err(|e| {
524                tonic::Status::internal(format!("could not get app params: {:#}", e))
525            })?;
526
527        let gas_prices =
528            self.storage.gas_prices().await.map_err(|e| {
529                tonic::Status::internal(format!("could not get gas prices: {:#}", e))
530            })?;
531
532        // TODO: need to support passing the fee _in_ to this API via the TransactionPlannerRequest
533        // meaning the requester should fetch the gas prices and estimate cost/allow the user to modify
534        // fee paid
535        let mut planner = Planner::new(OsRng);
536        planner.set_gas_prices(gas_prices);
537        planner.expiry_height(prq.expiry_height);
538
539        for output in prq.outputs {
540            let address: Address = output
541                .address
542                .ok_or_else(|| tonic::Status::invalid_argument("Missing address"))?
543                .try_into()
544                .map_err(|e| {
545                    tonic::Status::invalid_argument(format!("Could not parse address: {e:#}"))
546                })?;
547
548            let value: Value = output
549                .value
550                .ok_or_else(|| tonic::Status::invalid_argument("Missing value"))?
551                .try_into()
552                .map_err(|e| {
553                    tonic::Status::invalid_argument(format!("Could not parse value: {e:#}"))
554                })?;
555
556            planner.output(value, address);
557        }
558
559        for swap in prq.swaps {
560            let value: Value = swap
561                .value
562                .ok_or_else(|| tonic::Status::invalid_argument("Missing value"))?
563                .try_into()
564                .map_err(|e| {
565                    tonic::Status::invalid_argument(format!("Could not parse value: {e:#}"))
566                })?;
567
568            let target_asset: asset::Id = swap
569                .target_asset
570                .ok_or_else(|| tonic::Status::invalid_argument("Missing target asset"))?
571                .try_into()
572                .map_err(|e| {
573                    tonic::Status::invalid_argument(format!("Could not parse target asset: {e:#}"))
574                })?;
575
576            let fee: Fee = swap
577                .fee
578                .ok_or_else(|| tonic::Status::invalid_argument("Missing fee"))?
579                .try_into()
580                .map_err(|e| {
581                    tonic::Status::invalid_argument(format!("Could not parse fee: {e:#}"))
582                })?;
583
584            let claim_address: Address = swap
585                .claim_address
586                .ok_or_else(|| tonic::Status::invalid_argument("Missing claim address"))?
587                .try_into()
588                .map_err(|e| {
589                    tonic::Status::invalid_argument(format!("Could not parse claim address: {e:#}"))
590                })?;
591
592            planner
593                .swap(value, target_asset, fee, claim_address)
594                .map_err(|e| {
595                    tonic::Status::invalid_argument(format!("Could not plan swap: {e:#}"))
596                })?;
597        }
598
599        for swap_claim in prq.swap_claims {
600            let swap_commitment: StateCommitment = swap_claim
601                .swap_commitment
602                .ok_or_else(|| tonic::Status::invalid_argument("Missing swap commitment"))?
603                .try_into()
604                .map_err(|e| {
605                    tonic::Status::invalid_argument(format!(
606                        "Could not parse swap commitment: {e:#}"
607                    ))
608                })?;
609            let swap_record = self
610                .storage
611                // TODO: should there be a timeout on detection here instead?
612                .swap_by_commitment(swap_commitment, false)
613                .await
614                .map_err(|e| {
615                    tonic::Status::invalid_argument(format!(
616                        "Could not fetch swap by commitment: {e:#}"
617                    ))
618                })?;
619
620            planner.swap_claim(SwapClaimPlan {
621                swap_plaintext: swap_record.swap,
622                position: swap_record.position,
623                output_data: swap_record.output_data,
624                epoch_duration: app_params.sct_params.epoch_duration,
625                proof_blinding_r: Fq::rand(&mut OsRng),
626                proof_blinding_s: Fq::rand(&mut OsRng),
627            });
628        }
629
630        let current_epoch = if prq.undelegations.is_empty() && prq.delegations.is_empty() {
631            None
632        } else {
633            Some(
634                prq.epoch
635                    .ok_or_else(|| {
636                        tonic::Status::invalid_argument(
637                            "Missing current epoch in TransactionPlannerRequest",
638                        )
639                    })?
640                    .try_into()
641                    .map_err(|e| {
642                        tonic::Status::invalid_argument(format!(
643                            "Could not parse current epoch: {e:#}"
644                        ))
645                    })?,
646            )
647        };
648
649        for delegation in prq.delegations {
650            let amount: Amount = delegation
651                .amount
652                .ok_or_else(|| tonic::Status::invalid_argument("Missing amount"))?
653                .try_into()
654                .map_err(|e| {
655                    tonic::Status::invalid_argument(format!("Could not parse amount: {e:#}"))
656                })?;
657
658            let rate_data: RateData = delegation
659                .rate_data
660                .ok_or_else(|| tonic::Status::invalid_argument("Missing rate data"))?
661                .try_into()
662                .map_err(|e| {
663                    tonic::Status::invalid_argument(format!("Could not parse rate data: {e:#}"))
664                })?;
665
666            planner.delegate(
667                current_epoch.expect("checked that current epoch is present"),
668                amount,
669                rate_data,
670            );
671        }
672
673        for undelegation in prq.undelegations {
674            let value: Value = undelegation
675                .value
676                .ok_or_else(|| tonic::Status::invalid_argument("Missing value"))?
677                .try_into()
678                .map_err(|e| {
679                    tonic::Status::invalid_argument(format!("Could not parse value: {e:#}"))
680                })?;
681
682            let rate_data: RateData = undelegation
683                .rate_data
684                .ok_or_else(|| tonic::Status::invalid_argument("Missing rate data"))?
685                .try_into()
686                .map_err(|e| {
687                    tonic::Status::invalid_argument(format!("Could not parse rate data: {e:#}"))
688                })?;
689
690            planner.undelegate(
691                current_epoch.expect("checked that current epoch is present"),
692                value.amount,
693                rate_data,
694            );
695        }
696
697        for position_open in prq.position_opens {
698            let position: Position = position_open
699                .position
700                .ok_or_else(|| tonic::Status::invalid_argument("Missing position"))?
701                .try_into()
702                .map_err(|e| {
703                    tonic::Status::invalid_argument(format!("Could not parse position: {e:#}"))
704                })?;
705
706            planner.position_open(position);
707        }
708
709        for position_close in prq.position_closes {
710            let position_id: position::Id = position_close
711                .position_id
712                .ok_or_else(|| tonic::Status::invalid_argument("Missing position_id"))?
713                .try_into()
714                .map_err(|e| {
715                    tonic::Status::invalid_argument(format!("Could not parse position ID: {e:#}"))
716                })?;
717
718            planner.position_close(position_id);
719        }
720
721        for position_withdraw in prq.position_withdraws {
722            let position_id: position::Id = position_withdraw
723                .position_id
724                .ok_or_else(|| tonic::Status::invalid_argument("Missing position_id"))?
725                .try_into()
726                .map_err(|e| {
727                    tonic::Status::invalid_argument(format!("Could not parse position ID: {e:#}"))
728                })?;
729
730            let reserves: Reserves = position_withdraw
731                .reserves
732                .ok_or_else(|| tonic::Status::invalid_argument("Missing reserves"))?
733                .try_into()
734                .map_err(|e| {
735                    tonic::Status::invalid_argument(format!("Could not parse reserves: {e:#}"))
736                })?;
737
738            let trading_pair: TradingPair = position_withdraw
739                .trading_pair
740                .ok_or_else(|| tonic::Status::invalid_argument("Missing pair"))?
741                .try_into()
742                .map_err(|e| {
743                    tonic::Status::invalid_argument(format!("Could not parse pair: {e:#}"))
744                })?;
745
746            planner.position_withdraw(position_id, reserves, trading_pair);
747        }
748
749        // Insert any ICS20 withdrawals.
750        for ics20_withdrawal in prq.ics20_withdrawals {
751            planner.ics20_withdrawal(
752                ics20_withdrawal
753                    .try_into()
754                    .map_err(|e| tonic::Status::invalid_argument(format!("{e:#}")))?,
755            );
756        }
757
758        // Finally, insert all the requested IBC actions.
759        for ibc_action in prq.ibc_relay_actions {
760            planner.ibc_action(
761                ibc_action
762                    .try_into()
763                    .map_err(|e| tonic::Status::invalid_argument(format!("{e:#}")))?,
764            );
765        }
766
767        let mut client_of_self = ViewServiceClient::new(ViewServiceServer::new(self.clone()));
768
769        let source = prq
770            .source
771            // If the request specified a source of funds, pass it to the planner...
772            .map(|addr_index| addr_index.account)
773            // ... or just use the default account if not.
774            .unwrap_or(0u32);
775
776        let plan = planner
777            .plan(&mut client_of_self, source.into())
778            .await
779            .context("could not plan requested transaction")
780            .map_err(|e| tonic::Status::invalid_argument(format!("{e:#}")))?;
781
782        Ok(tonic::Response::new(TransactionPlannerResponse {
783            plan: Some(plan.into()),
784        }))
785    }
786
787    #[instrument(skip_all, level = "trace")]
788    async fn address_by_index(
789        &self,
790        request: tonic::Request<pb::AddressByIndexRequest>,
791    ) -> Result<tonic::Response<pb::AddressByIndexResponse>, tonic::Status> {
792        let fvk =
793            self.storage.full_viewing_key().await.map_err(|_| {
794                tonic::Status::failed_precondition("Error retrieving full viewing key")
795            })?;
796
797        let address_index = request
798            .into_inner()
799            .address_index
800            .ok_or_else(|| tonic::Status::invalid_argument("Missing address index"))?
801            .try_into()
802            .map_err(|e| {
803                tonic::Status::invalid_argument(format!("Could not parse address index: {e:#}"))
804            })?;
805
806        Ok(tonic::Response::new(pb::AddressByIndexResponse {
807            address: Some(fvk.payment_address(address_index).0.into()),
808        }))
809    }
810
811    #[instrument(skip_all, level = "trace")]
812    async fn index_by_address(
813        &self,
814        request: tonic::Request<pb::IndexByAddressRequest>,
815    ) -> Result<tonic::Response<pb::IndexByAddressResponse>, tonic::Status> {
816        let fvk =
817            self.storage.full_viewing_key().await.map_err(|_| {
818                tonic::Status::failed_precondition("Error retrieving full viewing key")
819            })?;
820
821        let address: Address = request
822            .into_inner()
823            .address
824            .ok_or_else(|| tonic::Status::invalid_argument("Missing address"))?
825            .try_into()
826            .map_err(|e| {
827                tonic::Status::invalid_argument(format!("Could not parse address: {e:#}"))
828            })?;
829
830        Ok(tonic::Response::new(pb::IndexByAddressResponse {
831            address_index: fvk.address_index(&address).map(Into::into),
832        }))
833    }
834    async fn transparent_address(
835        &self,
836        _request: tonic::Request<pb::TransparentAddressRequest>,
837    ) -> Result<tonic::Response<pb::TransparentAddressResponse>, tonic::Status> {
838        let fvk =
839            self.storage.full_viewing_key().await.map_err(|_| {
840                tonic::Status::failed_precondition("Error retrieving full viewing key")
841            })?;
842
843        let encoding = fvk.incoming().transparent_address();
844        let address: Address = encoding
845            .parse()
846            .map_err(|_| tonic::Status::internal("could not parse newly generated address"))?;
847
848        Ok(tonic::Response::new(pb::TransparentAddressResponse {
849            address: Some(address.into()),
850            encoding,
851        }))
852    }
853
854    #[instrument(skip_all, level = "trace")]
855    async fn ephemeral_address(
856        &self,
857        request: tonic::Request<pb::EphemeralAddressRequest>,
858    ) -> Result<tonic::Response<pb::EphemeralAddressResponse>, tonic::Status> {
859        let fvk =
860            self.storage.full_viewing_key().await.map_err(|_| {
861                tonic::Status::failed_precondition("Error retrieving full viewing key")
862            })?;
863
864        let address_index = request
865            .into_inner()
866            .address_index
867            .ok_or_else(|| tonic::Status::invalid_argument("Missing address index"))?
868            .try_into()
869            .map_err(|e| {
870                tonic::Status::invalid_argument(format!("Could not parse address index: {e:#}"))
871            })?;
872
873        Ok(tonic::Response::new(pb::EphemeralAddressResponse {
874            address: Some(fvk.ephemeral_address(OsRng, address_index).0.into()),
875        }))
876    }
877
878    #[instrument(skip_all, level = "trace")]
879    async fn transaction_info_by_hash(
880        &self,
881        request: tonic::Request<pb::TransactionInfoByHashRequest>,
882    ) -> Result<tonic::Response<pb::TransactionInfoByHashResponse>, tonic::Status> {
883        self.check_worker().await?;
884
885        let request = request.into_inner();
886
887        let fvk =
888            self.storage.full_viewing_key().await.map_err(|_| {
889                tonic::Status::failed_precondition("Error retrieving full viewing key")
890            })?;
891
892        let maybe_tx = self
893            .storage
894            .transaction_by_hash(
895                &request
896                    .id
897                    .clone()
898                    .ok_or_else(|| {
899                        tonic::Status::invalid_argument(
900                            "missing transaction ID in TransactionInfoByHashRequest",
901                        )
902                    })?
903                    .inner,
904            )
905            .await
906            .map_err(|_| {
907                tonic::Status::failed_precondition(format!(
908                    "Error retrieving transaction by hash {}",
909                    hex::encode(request.id.expect("transaction id is present").inner)
910                ))
911            })?;
912
913        let Some((height, tx)) = maybe_tx else {
914            return Ok(tonic::Response::new(
915                pb::TransactionInfoByHashResponse::default(),
916            ));
917        };
918
919        // First, create a TxP with the payload keys visible to our FVK and no other data.
920        let mut txp = TransactionPerspective {
921            payload_keys: tx
922                .payload_keys(&fvk)
923                .map_err(|_| tonic::Status::failed_precondition("Error generating payload keys"))?,
924            ..Default::default()
925        };
926
927        // Next, extend the TxP with the openings of commitments known to our view server
928        // but not included in the transaction body, for instance spent notes or swap claim outputs.
929        for action in tx.actions() {
930            use penumbra_sdk_transaction::Action;
931            match action {
932                Action::Spend(spend) => {
933                    let nullifier = spend.body.nullifier;
934                    // An error here indicates we don't know the nullifier, so we omit it from the Perspective.
935                    if let Ok(spendable_note_record) =
936                        self.storage.note_by_nullifier(nullifier, false).await
937                    {
938                        txp.spend_nullifiers
939                            .insert(nullifier, spendable_note_record.note);
940                    }
941                }
942                Action::SwapClaim(claim) => {
943                    let output_1_record = self
944                        .storage
945                        .note_by_commitment(claim.body.output_1_commitment, false)
946                        .await
947                        .map_err(|e| {
948                            tonic::Status::internal(format!(
949                                "Error retrieving first SwapClaim output note record: {:#}",
950                                e
951                            ))
952                        })?;
953                    let output_2_record = self
954                        .storage
955                        .note_by_commitment(claim.body.output_2_commitment, false)
956                        .await
957                        .map_err(|e| {
958                            tonic::Status::internal(format!(
959                                "Error retrieving second SwapClaim output note record: {:#}",
960                                e
961                            ))
962                        })?;
963
964                    txp.advice_notes
965                        .insert(claim.body.output_1_commitment, output_1_record.note);
966                    txp.advice_notes
967                        .insert(claim.body.output_2_commitment, output_2_record.note);
968                }
969                _ => {}
970            }
971        }
972
973        // Now, generate a stub TxV from our minimal TxP, and inspect it to see what data we should
974        // augment the minimal TxP with to provide additional context (e.g., filling in denoms for
975        // visible asset IDs).
976        let min_view = tx.view_from_perspective(&txp);
977        let mut address_views = BTreeMap::new();
978        let mut asset_ids = BTreeSet::new();
979        for action_view in min_view.action_views() {
980            use penumbra_sdk_dex::{swap::SwapView, swap_claim::SwapClaimView};
981            use penumbra_sdk_transaction::view::action_view::{
982                ActionView, DelegatorVoteView, OutputView, SpendView,
983            };
984            match action_view {
985                ActionView::Spend(SpendView::Visible { note, .. }) => {
986                    let address = note.address();
987                    address_views.insert(address.clone(), fvk.view_address(address));
988                    asset_ids.insert(note.asset_id());
989                }
990                ActionView::Output(OutputView::Visible { note, .. }) => {
991                    let address = note.address();
992                    address_views.insert(address.clone(), fvk.view_address(address.clone()));
993                    asset_ids.insert(note.asset_id());
994
995                    // Also add an AddressView for the return address in the memo.
996                    let memo = tx.decrypt_memo(&fvk).map_err(|_| {
997                        tonic::Status::internal("Error decrypting memo for OutputView")
998                    })?;
999                    address_views.insert(memo.return_address(), fvk.view_address(address));
1000                }
1001                ActionView::Swap(SwapView::Visible { swap_plaintext, .. }) => {
1002                    let address = swap_plaintext.claim_address.clone();
1003                    address_views.insert(address.clone(), fvk.view_address(address));
1004                    asset_ids.insert(swap_plaintext.trading_pair.asset_1());
1005                    asset_ids.insert(swap_plaintext.trading_pair.asset_2());
1006                }
1007                ActionView::SwapClaim(SwapClaimView::Visible {
1008                    output_1, output_2, ..
1009                }) => {
1010                    // Both will be sent to the same address so this only needs to be added once
1011                    let address = output_1.address();
1012                    address_views.insert(address.clone(), fvk.view_address(address));
1013                    asset_ids.insert(output_1.asset_id());
1014                    asset_ids.insert(output_2.asset_id());
1015                }
1016                ActionView::DelegatorVote(DelegatorVoteView::Visible { note, .. }) => {
1017                    let address = note.address();
1018                    address_views.insert(address.clone(), fvk.view_address(address));
1019                    asset_ids.insert(note.asset_id());
1020                }
1021                ActionView::ActionDutchAuctionWithdraw(ActionDutchAuctionWithdrawView {
1022                    action: _,
1023                    reserves,
1024                }) => {
1025                    // previous comment: /* no-op for now - i'm not totally sure we have all the necessary data to attribute specific note openings to this view */
1026                    // to this cronokirby replied: well, we can however at least fill in some asset ids!
1027                    for value in reserves {
1028                        asset_ids.insert(value.asset_id());
1029                    }
1030                }
1031                // We can populate asset ids for the assets involved in the auction
1032                ActionView::ActionDutchAuctionSchedule(ActionDutchAuctionScheduleView {
1033                    action,
1034                    ..
1035                }) => {
1036                    let description = &action.description;
1037                    asset_ids.insert(description.input.asset_id);
1038                    asset_ids.insert(description.output_id);
1039                }
1040                _ => {}
1041            }
1042        }
1043
1044        // Now, extend the TxV with information helpful to understand the data it can view:
1045
1046        let mut denoms = Vec::new();
1047
1048        for id in asset_ids {
1049            if let Some(asset) = self.storage.asset_by_id(&id).await.map_err(|e| {
1050                tonic::Status::internal(format!("Error retrieving asset by id: {:#}", e))
1051            })? {
1052                denoms.push(asset);
1053            }
1054        }
1055
1056        txp.denoms.extend(denoms);
1057
1058        txp.address_views = address_views.into_values().collect();
1059
1060        // Finally, compute the full TxV from the full TxP:
1061        let txv = tx.view_from_perspective(&txp);
1062        let summary = txv.summary();
1063
1064        let response = pb::TransactionInfoByHashResponse {
1065            tx_info: Some(pb::TransactionInfo {
1066                height,
1067                id: Some(tx.id().into()),
1068                perspective: Some(txp.into()),
1069                transaction: Some(tx.into()),
1070                view: Some(txv.into()),
1071                summary: Some(summary.into()),
1072            }),
1073        };
1074
1075        Ok(tonic::Response::new(response))
1076    }
1077
1078    #[instrument(skip_all, level = "trace")]
1079    async fn swap_by_commitment(
1080        &self,
1081        request: tonic::Request<pb::SwapByCommitmentRequest>,
1082    ) -> Result<tonic::Response<pb::SwapByCommitmentResponse>, tonic::Status> {
1083        self.check_worker().await?;
1084
1085        let request = request.into_inner();
1086
1087        let swap_commitment = request
1088            .swap_commitment
1089            .ok_or_else(|| {
1090                tonic::Status::failed_precondition("Missing swap commitment in request")
1091            })?
1092            .try_into()
1093            .map_err(|_| {
1094                tonic::Status::failed_precondition("Invalid swap commitment in request")
1095            })?;
1096
1097        let swap = pb::SwapRecord::from(
1098            self.storage
1099                .swap_by_commitment(swap_commitment, request.await_detection)
1100                .await
1101                .map_err(|e| tonic::Status::internal(format!("error: {e}")))?,
1102        );
1103
1104        Ok(tonic::Response::new(SwapByCommitmentResponse {
1105            swap: Some(swap),
1106        }))
1107    }
1108
1109    #[allow(deprecated)]
1110    #[instrument(skip(self, request))]
1111    async fn balances(
1112        &self,
1113        request: tonic::Request<pb::BalancesRequest>,
1114    ) -> Result<tonic::Response<Self::BalancesStream>, tonic::Status> {
1115        let request = request.into_inner();
1116
1117        let account_filter = request.account_filter.and_then(|x| {
1118            AddressIndex::try_from(x)
1119                .map_err(|_| {
1120                    tonic::Status::failed_precondition("Invalid swap commitment in request")
1121                })
1122                .map_or(None, |x| x.into())
1123        });
1124
1125        let asset_id_filter = request.asset_id_filter.and_then(|x| {
1126            asset::Id::try_from(x)
1127                .map_err(|_| {
1128                    tonic::Status::failed_precondition("Invalid swap commitment in request")
1129                })
1130                .map_or(None, |x| x.into())
1131        });
1132
1133        let result = self
1134            .storage
1135            .balances(account_filter, asset_id_filter)
1136            .await
1137            .map_err(|e| tonic::Status::internal(format!("error: {e}")))?;
1138
1139        tracing::debug!(?account_filter, ?asset_id_filter, ?result);
1140
1141        let self2 = self.clone();
1142        let stream = try_stream! {
1143            // retrieve balance and address views
1144            for element in result {
1145                let metadata: Metadata = self2
1146                    .asset_metadata_by_id(Request::new(pb::AssetMetadataByIdRequest {
1147                        asset_id: Some(element.id.into()),
1148                    }))
1149                    .await?
1150                    .into_inner()
1151                    .denom_metadata
1152                    .context("denom metadata not found")?
1153                    .try_into()?;
1154
1155                 let value = Value {
1156                    asset_id: element.id,
1157                    amount: element.amount.into(),
1158                };
1159
1160                let value_view = value.view_with_denom(metadata)?;
1161
1162                let address: Address = self2
1163                  .address_by_index(Request::new(pb::AddressByIndexRequest {
1164                       address_index: account_filter.map(Into::into),
1165                   }))
1166                   .await?
1167                    .into_inner()
1168                    .address
1169                    .context("address not found")?
1170                    .try_into()?;
1171
1172                 let wallet_id: WalletId = self2
1173                            .wallet_id(Request::new(pb::WalletIdRequest {}))
1174                            .await?
1175                            .into_inner()
1176                            .wallet_id
1177                            .context("wallet id not found")?
1178                            .try_into()?;
1179
1180                let address_view = AddressView::Decoded {
1181                    address,
1182                    index: element.address_index,
1183                    wallet_id,
1184                };
1185
1186                yield pb::BalancesResponse {
1187                    account_address: Some(address_view.into()),
1188                    balance_view: Some(value_view.into()),
1189                    balance: None,
1190                    account: None,
1191                }
1192            }
1193        };
1194
1195        Ok(tonic::Response::new(
1196            stream
1197                .map_err(|e: anyhow::Error| {
1198                    tonic::Status::unavailable(format!("error getting balances: {e}"))
1199                })
1200                .boxed(),
1201        ))
1202    }
1203
1204    #[instrument(skip_all, level = "trace")]
1205    async fn note_by_commitment(
1206        &self,
1207        request: tonic::Request<pb::NoteByCommitmentRequest>,
1208    ) -> Result<tonic::Response<pb::NoteByCommitmentResponse>, tonic::Status> {
1209        self.check_worker().await?;
1210
1211        let request = request.into_inner();
1212
1213        let note_commitment = request
1214            .note_commitment
1215            .ok_or_else(|| {
1216                tonic::Status::failed_precondition("Missing note commitment in request")
1217            })?
1218            .try_into()
1219            .map_err(|_| {
1220                tonic::Status::failed_precondition("Invalid note commitment in request")
1221            })?;
1222
1223        let spendable_note = pb::SpendableNoteRecord::from(
1224            self.storage
1225                .note_by_commitment(note_commitment, request.await_detection)
1226                .await
1227                .map_err(|e| tonic::Status::internal(format!("error: {e}")))?,
1228        );
1229
1230        Ok(tonic::Response::new(NoteByCommitmentResponse {
1231            spendable_note: Some(spendable_note),
1232        }))
1233    }
1234
1235    #[instrument(skip_all, level = "trace")]
1236    async fn nullifier_status(
1237        &self,
1238        request: tonic::Request<pb::NullifierStatusRequest>,
1239    ) -> Result<tonic::Response<pb::NullifierStatusResponse>, tonic::Status> {
1240        self.check_worker().await?;
1241
1242        let request = request.into_inner();
1243
1244        let nullifier = request
1245            .nullifier
1246            .ok_or_else(|| tonic::Status::failed_precondition("Missing nullifier in request"))?
1247            .try_into()
1248            .map_err(|_| tonic::Status::failed_precondition("Invalid nullifier in request"))?;
1249
1250        Ok(tonic::Response::new(pb::NullifierStatusResponse {
1251            spent: self
1252                .storage
1253                .nullifier_status(nullifier, request.await_detection)
1254                .await
1255                .map_err(|e| tonic::Status::internal(format!("error: {e}")))?,
1256        }))
1257    }
1258
1259    #[instrument(skip_all, level = "trace")]
1260    async fn status(
1261        &self,
1262        _: tonic::Request<pb::StatusRequest>,
1263    ) -> Result<tonic::Response<pb::StatusResponse>, tonic::Status> {
1264        self.check_worker().await?;
1265
1266        Ok(tonic::Response::new(self.status().await.map_err(|e| {
1267            tonic::Status::internal(format!("error: {e}"))
1268        })?))
1269    }
1270
1271    #[instrument(skip_all, level = "trace")]
1272    async fn status_stream(
1273        &self,
1274        _: tonic::Request<pb::StatusStreamRequest>,
1275    ) -> Result<tonic::Response<Self::StatusStreamStream>, tonic::Status> {
1276        self.check_worker().await?;
1277
1278        let (latest_known_block_height, _) = self
1279            .latest_known_block_height()
1280            .await
1281            .tap_err(|error| {
1282                tracing::debug!(
1283                    ?error,
1284                    "unable to fetch latest known block height from fullnode"
1285                )
1286            })
1287            .map_err(|e| {
1288                tonic::Status::unknown(format!(
1289                    "unable to fetch latest known block height from fullnode: {e}"
1290                ))
1291            })?;
1292
1293        // Create a stream of sync height updates from our worker, and send them to the client
1294        // until we've reached the latest known block height at the time the request was made.
1295        let mut sync_height_stream = WatchStream::new(self.sync_height_rx.clone());
1296        let stream = try_stream! {
1297            while let Some(sync_height) = sync_height_stream.next().await {
1298                yield pb::StatusStreamResponse {
1299                    latest_known_block_height,
1300                    full_sync_height: sync_height,
1301                    partial_sync_height: sync_height, // Set these as the same for backwards compatibility following adding the partial_sync_height
1302                };
1303                if sync_height >= latest_known_block_height {
1304                    break;
1305                }
1306            }
1307        };
1308
1309        Ok(tonic::Response::new(stream.boxed()))
1310    }
1311
1312    #[instrument(skip_all, level = "trace")]
1313    async fn notes(
1314        &self,
1315        request: tonic::Request<pb::NotesRequest>,
1316    ) -> Result<tonic::Response<Self::NotesStream>, tonic::Status> {
1317        self.check_worker().await?;
1318
1319        let request = request.into_inner();
1320
1321        let include_spent = request.include_spent;
1322        let asset_id = request
1323            .asset_id
1324            .to_owned()
1325            .map(asset::Id::try_from)
1326            .map_or(Ok(None), |v| v.map(Some))
1327            .map_err(|_| tonic::Status::invalid_argument("invalid asset id"))?;
1328        let address_index = request
1329            .address_index
1330            .to_owned()
1331            .map(AddressIndex::try_from)
1332            .map_or(Ok(None), |v| v.map(Some))
1333            .map_err(|_| tonic::Status::invalid_argument("invalid address index"))?;
1334
1335        let amount_to_spend = request
1336            .amount_to_spend
1337            .map(Amount::try_from)
1338            .map_or(Ok(None), |v| v.map(Some))
1339            .map_err(|_| tonic::Status::invalid_argument("invalid amount to spend"))?;
1340
1341        let notes = self
1342            .storage
1343            .notes(include_spent, asset_id, address_index, amount_to_spend)
1344            .await
1345            .map_err(|e| tonic::Status::unavailable(format!("error fetching notes: {e}")))?;
1346
1347        let stream = try_stream! {
1348            for note in notes {
1349                yield pb::NotesResponse {
1350                    note_record: Some(note.into()),
1351                }
1352            }
1353        };
1354
1355        Ok(tonic::Response::new(
1356            stream
1357                .map_err(|e: anyhow::Error| {
1358                    tonic::Status::unavailable(format!("error getting notes: {e}"))
1359                })
1360                .boxed(),
1361        ))
1362    }
1363
1364    #[instrument(skip_all, level = "trace")]
1365    async fn notes_for_voting(
1366        &self,
1367        request: tonic::Request<pb::NotesForVotingRequest>,
1368    ) -> Result<tonic::Response<Self::NotesForVotingStream>, tonic::Status> {
1369        self.check_worker().await?;
1370
1371        let address_index = request
1372            .get_ref()
1373            .address_index
1374            .to_owned()
1375            .map(AddressIndex::try_from)
1376            .map_or(Ok(None), |v| v.map(Some))
1377            .map_err(|_| tonic::Status::invalid_argument("invalid address index"))?;
1378
1379        let votable_at_height = request.get_ref().votable_at_height;
1380
1381        let notes = self
1382            .storage
1383            .notes_for_voting(address_index, votable_at_height)
1384            .await
1385            .map_err(|e| tonic::Status::unavailable(format!("error fetching notes: {e}")))?;
1386
1387        let stream = try_stream! {
1388            for (note, identity_key) in notes {
1389                yield pb::NotesForVotingResponse {
1390                    note_record: Some(note.into()),
1391                    identity_key: Some(identity_key.into()),
1392                }
1393            }
1394        };
1395
1396        Ok(tonic::Response::new(
1397            stream
1398                .map_err(|e: anyhow::Error| {
1399                    tonic::Status::unavailable(format!("error getting notes: {e}"))
1400                })
1401                .boxed(),
1402        ))
1403    }
1404
1405    #[instrument(skip_all, level = "trace")]
1406    async fn assets(
1407        &self,
1408        request: tonic::Request<pb::AssetsRequest>,
1409    ) -> Result<tonic::Response<Self::AssetsStream>, tonic::Status> {
1410        self.check_worker().await?;
1411
1412        let pb::AssetsRequest {
1413            filtered,
1414            include_specific_denominations,
1415            include_delegation_tokens,
1416            include_unbonding_tokens,
1417            include_lp_nfts,
1418            include_proposal_nfts,
1419            include_voting_receipt_tokens,
1420        } = request.get_ref();
1421
1422        // Fetch assets from storage.
1423        let assets = if !filtered {
1424            self.storage
1425                .all_assets()
1426                .await
1427                .map_err(|e| tonic::Status::unavailable(format!("error fetching assets: {e}")))?
1428        } else {
1429            let mut assets = vec![];
1430            for denom in include_specific_denominations {
1431                if let Some(denom) = asset::REGISTRY.parse_denom(&denom.denom) {
1432                    assets.push(denom);
1433                }
1434            }
1435            for (include, pattern) in [
1436                (include_delegation_tokens, "_delegation\\_%"),
1437                (include_unbonding_tokens, "_unbonding\\_%"),
1438                (include_lp_nfts, "lpnft\\_%"),
1439                (include_proposal_nfts, "proposal\\_%"),
1440                (include_voting_receipt_tokens, "voted\\_on\\_%"),
1441            ] {
1442                if *include {
1443                    assets.extend(
1444                        self.storage
1445                            .assets_matching(pattern.to_string())
1446                            .await
1447                            .map_err(|e| {
1448                                tonic::Status::unavailable(format!("error fetching assets: {e}"))
1449                            })?,
1450                    );
1451                }
1452            }
1453            assets
1454        };
1455
1456        let stream = try_stream! {
1457            for asset in assets {
1458                yield
1459                    pb::AssetsResponse {
1460                        denom_metadata: Some(asset.into()),
1461                    }
1462            }
1463        };
1464
1465        Ok(tonic::Response::new(
1466            stream
1467                .map_err(|e: anyhow::Error| {
1468                    tonic::Status::unavailable(format!("error getting assets: {e}"))
1469                })
1470                .boxed(),
1471        ))
1472    }
1473
1474    #[instrument(skip_all, level = "trace")]
1475    async fn transaction_info(
1476        &self,
1477        request: tonic::Request<pb::TransactionInfoRequest>,
1478    ) -> Result<tonic::Response<Self::TransactionInfoStream>, tonic::Status> {
1479        self.check_worker().await?;
1480        // Unpack optional start/end heights.
1481        let start_height = if request.get_ref().start_height == 0 {
1482            None
1483        } else {
1484            Some(request.get_ref().start_height)
1485        };
1486        let end_height = if request.get_ref().end_height == 0 {
1487            None
1488        } else {
1489            Some(request.get_ref().end_height)
1490        };
1491
1492        // Fetch transactions from storage.
1493        let txs = self
1494            .storage
1495            .transactions(start_height, end_height)
1496            .await
1497            .map_err(|e| tonic::Status::unavailable(format!("error fetching transactions: {e}")))?;
1498
1499        let self2 = self.clone();
1500        let stream = try_stream! {
1501            for tx in txs {
1502
1503                let rsp = self2.transaction_info_by_hash(tonic::Request::new(pb::TransactionInfoByHashRequest {
1504                    id: Some(tx.2.id().into()),
1505                })).await?.into_inner();
1506
1507                yield pb::TransactionInfoResponse {
1508                    tx_info: rsp.tx_info,
1509                }
1510            }
1511        };
1512
1513        Ok(tonic::Response::new(
1514            stream
1515                .map_err(|e: anyhow::Error| {
1516                    tonic::Status::unavailable(format!("error getting transactions: {e}"))
1517                })
1518                .boxed(),
1519        ))
1520    }
1521
1522    #[instrument(skip_all, level = "trace")]
1523    async fn witness(
1524        &self,
1525        request: tonic::Request<pb::WitnessRequest>,
1526    ) -> Result<tonic::Response<WitnessResponse>, tonic::Status> {
1527        self.check_worker().await?;
1528
1529        // Acquire a read lock for the SCT that will live for the entire request,
1530        // so that all auth paths are relative to the same SCT root.
1531        let sct = self.state_commitment_tree.read().await;
1532
1533        // Read the SCT root
1534        let anchor = sct.root();
1535
1536        // Obtain an auth path for each requested note commitment
1537        let tx_plan: TransactionPlan =
1538            request
1539                .get_ref()
1540                .to_owned()
1541                .transaction_plan
1542                .map_or(TransactionPlan::default(), |x| {
1543                    x.try_into()
1544                        .expect("TransactionPlan should exist in request")
1545                });
1546
1547        let requested_note_commitments: Vec<StateCommitment> = tx_plan
1548            .spend_plans()
1549            .filter(|plan| plan.note.amount() != 0u64.into())
1550            .map(|spend| spend.note.commit().into())
1551            .chain(
1552                tx_plan
1553                    .swap_claim_plans()
1554                    .map(|swap_claim| swap_claim.swap_plaintext.swap_commitment().into()),
1555            )
1556            .chain(
1557                tx_plan
1558                    .delegator_vote_plans()
1559                    .map(|vote_plan| vote_plan.staked_note.commit().into()),
1560            )
1561            .collect();
1562
1563        tracing::debug!(?requested_note_commitments);
1564
1565        let auth_paths: Vec<Proof> = requested_note_commitments
1566            .iter()
1567            .map(|nc| {
1568                sct.witness(*nc).ok_or_else(|| {
1569                    tonic::Status::new(tonic::Code::InvalidArgument, "Note commitment missing")
1570                })
1571            })
1572            .collect::<Result<Vec<Proof>, tonic::Status>>()?;
1573
1574        // Release the read lock on the SCT
1575        drop(sct);
1576
1577        let mut witness_data = WitnessData {
1578            anchor,
1579            state_commitment_proofs: auth_paths
1580                .into_iter()
1581                .map(|proof| (proof.commitment(), proof))
1582                .collect(),
1583        };
1584
1585        tracing::debug!(?witness_data);
1586
1587        // Now we need to augment the witness data with dummy proofs such that
1588        // note commitments corresponding to dummy spends also have proofs.
1589        for nc in tx_plan
1590            .spend_plans()
1591            .filter(|plan| plan.note.amount() == 0u64.into())
1592            .map(|plan| plan.note.commit())
1593        {
1594            witness_data.add_proof(nc, Proof::dummy(&mut OsRng, nc));
1595        }
1596
1597        let witness_response = WitnessResponse {
1598            witness_data: Some(witness_data.into()),
1599        };
1600        Ok(tonic::Response::new(witness_response))
1601    }
1602
1603    #[instrument(skip_all, level = "trace")]
1604    async fn witness_and_build(
1605        &self,
1606        request: tonic::Request<pb::WitnessAndBuildRequest>,
1607    ) -> Result<tonic::Response<Self::WitnessAndBuildStream>, tonic::Status> {
1608        let pb::WitnessAndBuildRequest {
1609            transaction_plan,
1610            authorization_data,
1611        } = request.into_inner();
1612
1613        let transaction_plan: TransactionPlan = transaction_plan
1614            .ok_or_else(|| tonic::Status::invalid_argument("missing transaction plan"))?
1615            .try_into()
1616            .map_err(|e: anyhow::Error| e.context("could not decode transaction plan"))
1617            .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1618
1619        let authorization_data: AuthorizationData = authorization_data
1620            .ok_or_else(|| tonic::Status::invalid_argument("missing authorization data"))?
1621            .try_into()
1622            .map_err(|e: anyhow::Error| e.context("could not decode authorization data"))
1623            .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1624
1625        let witness_request = pb::WitnessRequest {
1626            transaction_plan: Some(transaction_plan.clone().into()),
1627        };
1628
1629        let witness_data: WitnessData = self
1630            .witness(tonic::Request::new(witness_request))
1631            .await?
1632            .into_inner()
1633            .witness_data
1634            .ok_or_else(|| tonic::Status::invalid_argument("missing witness data"))?
1635            .try_into()
1636            .map_err(|e: anyhow::Error| e.context("could not decode witness data"))
1637            .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1638
1639        let fvk =
1640            self.storage.full_viewing_key().await.map_err(|_| {
1641                tonic::Status::failed_precondition("Error retrieving full viewing key")
1642            })?;
1643
1644        let transaction = Some(
1645            transaction_plan
1646                // TODO: calling `.build` should provide some mechanism to get progress
1647                // updates
1648                .build(&fvk, &witness_data, &authorization_data)
1649                .map_err(|_| tonic::Status::failed_precondition("Error building transaction"))?
1650                .into(),
1651        );
1652
1653        let stream = try_stream! {
1654            yield pb::WitnessAndBuildResponse {
1655                status: Some(pb::witness_and_build_response::Status::Complete(
1656                    pb::witness_and_build_response::Complete { transaction },
1657                )),
1658            }
1659        };
1660
1661        Ok(tonic::Response::new(
1662            stream
1663                .map_err(|e: anyhow::Error| {
1664                    tonic::Status::unavailable(format!("error witnessing transaction: {e}"))
1665                })
1666                .boxed(),
1667        ))
1668    }
1669
1670    #[instrument(skip_all, level = "trace")]
1671    async fn app_parameters(
1672        &self,
1673        _request: tonic::Request<pb::AppParametersRequest>,
1674    ) -> Result<tonic::Response<pb::AppParametersResponse>, tonic::Status> {
1675        self.check_worker().await?;
1676
1677        let parameters =
1678            self.storage.app_params().await.map_err(|e| {
1679                tonic::Status::unavailable(format!("error getting app params: {e}"))
1680            })?;
1681
1682        let response = AppParametersResponse {
1683            parameters: Some(parameters.into()),
1684        };
1685
1686        Ok(tonic::Response::new(response))
1687    }
1688
1689    #[instrument(skip_all, level = "trace")]
1690    async fn gas_prices(
1691        &self,
1692        _request: tonic::Request<pb::GasPricesRequest>,
1693    ) -> Result<tonic::Response<pb::GasPricesResponse>, tonic::Status> {
1694        self.check_worker().await?;
1695
1696        let gas_prices =
1697            self.storage.gas_prices().await.map_err(|e| {
1698                tonic::Status::unavailable(format!("error getting gas prices: {e}"))
1699            })?;
1700
1701        let response = GasPricesResponse {
1702            gas_prices: Some(gas_prices.into()),
1703            alt_gas_prices: Vec::new(),
1704        };
1705
1706        Ok(tonic::Response::new(response))
1707    }
1708
1709    #[instrument(skip_all, level = "trace")]
1710    async fn fmd_parameters(
1711        &self,
1712        _request: tonic::Request<pb::FmdParametersRequest>,
1713    ) -> Result<tonic::Response<pb::FmdParametersResponse>, tonic::Status> {
1714        self.check_worker().await?;
1715
1716        let parameters =
1717            self.storage.fmd_parameters().await.map_err(|e| {
1718                tonic::Status::unavailable(format!("error getting FMD params: {e}"))
1719            })?;
1720
1721        let response = FmdParametersResponse {
1722            parameters: Some(parameters.into()),
1723        };
1724
1725        Ok(tonic::Response::new(response))
1726    }
1727
1728    #[instrument(skip_all, level = "trace")]
1729    async fn owned_position_ids(
1730        &self,
1731        request: tonic::Request<pb::OwnedPositionIdsRequest>,
1732    ) -> Result<tonic::Response<Self::OwnedPositionIdsStream>, tonic::Status> {
1733        self.check_worker().await?;
1734
1735        let pb::OwnedPositionIdsRequest {
1736            position_state,
1737            trading_pair,
1738            subaccount: _,
1739        } = request.into_inner();
1740
1741        let position_state: Option<position::State> = position_state
1742            .map(|state| state.try_into())
1743            .transpose()
1744            .map_err(|e: anyhow::Error| e.context("could not decode position state"))
1745            .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1746
1747        let trading_pair: Option<TradingPair> = trading_pair
1748            .map(|pair| pair.try_into())
1749            .transpose()
1750            .map_err(|e: anyhow::Error| e.context("could not decode trading pair"))
1751            .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1752
1753        let ids = self
1754            .storage
1755            .owned_position_ids(position_state, trading_pair)
1756            .await
1757            .map_err(|e| tonic::Status::unavailable(format!("error getting position ids: {e}")))?;
1758
1759        let stream = try_stream! {
1760            for id in ids {
1761                yield pb::OwnedPositionIdsResponse{
1762                    position_id: Some(id.into()),
1763                    // The rust view server does not index positions by subaccount,
1764                    // so this information is invisible to it.
1765                    subaccount: None,
1766                }
1767            }
1768        };
1769
1770        Ok(tonic::Response::new(
1771            stream
1772                .map_err(|e: anyhow::Error| {
1773                    tonic::Status::unavailable(format!("error getting position ids: {e}"))
1774                })
1775                .boxed(),
1776        ))
1777    }
1778
1779    #[instrument(skip_all, level = "trace")]
1780    async fn authorize_and_build(
1781        &self,
1782        _request: tonic::Request<pb::AuthorizeAndBuildRequest>,
1783    ) -> Result<tonic::Response<Self::AuthorizeAndBuildStream>, tonic::Status> {
1784        unimplemented!("authorize_and_build")
1785    }
1786
1787    #[instrument(skip_all, level = "trace")]
1788    async fn unclaimed_swaps(
1789        &self,
1790        _: tonic::Request<pb::UnclaimedSwapsRequest>,
1791    ) -> Result<tonic::Response<Self::UnclaimedSwapsStream>, tonic::Status> {
1792        self.check_worker().await?;
1793
1794        let swaps = self.storage.unclaimed_swaps().await.map_err(|e| {
1795            tonic::Status::unavailable(format!("error fetching unclaimed swaps: {e}"))
1796        })?;
1797
1798        let stream = try_stream! {
1799            for swap in swaps {
1800                yield pb::UnclaimedSwapsResponse{
1801                    swap: Some(swap.into()),
1802                }
1803            }
1804        };
1805
1806        Ok(tonic::Response::new(
1807            stream
1808                .map_err(|e: anyhow::Error| {
1809                    tonic::Status::unavailable(format!("error getting unclaimed swaps: {e}"))
1810                })
1811                .boxed(),
1812        ))
1813    }
1814
1815    #[instrument(skip_all, level = "trace")]
1816    async fn wallet_id(
1817        &self,
1818        _: Request<WalletIdRequest>,
1819    ) -> Result<Response<WalletIdResponse>, Status> {
1820        let fvk = self.storage.full_viewing_key().await.map_err(|e| {
1821            Status::failed_precondition(format!("Error retrieving full viewing key: {e}"))
1822        })?;
1823
1824        Ok(Response::new(WalletIdResponse {
1825            wallet_id: Some(fvk.wallet_id().into()),
1826        }))
1827    }
1828
1829    #[instrument(skip_all, level = "trace")]
1830    async fn asset_metadata_by_id(
1831        &self,
1832        request: Request<AssetMetadataByIdRequest>,
1833    ) -> Result<Response<AssetMetadataByIdResponse>, Status> {
1834        let asset_id = request
1835            .into_inner()
1836            .asset_id
1837            .ok_or_else(|| Status::invalid_argument("missing asset id"))?
1838            .try_into()
1839            .map_err(|e| Status::invalid_argument(format!("{e:#}")))?;
1840
1841        let metadata = self
1842            .storage
1843            .asset_by_id(&asset_id)
1844            .await
1845            .map_err(|e| Status::internal(format!("Error retrieving asset by id: {e:#}")))?;
1846
1847        Ok(Response::new(AssetMetadataByIdResponse {
1848            denom_metadata: metadata.map(Into::into),
1849        }))
1850    }
1851
1852    #[instrument(skip_all, level = "trace")]
1853    async fn delegations_by_address_index(
1854        &self,
1855        _request: tonic::Request<pb::DelegationsByAddressIndexRequest>,
1856    ) -> Result<tonic::Response<Self::DelegationsByAddressIndexStream>, tonic::Status> {
1857        unimplemented!("delegations_by_address_index")
1858    }
1859
1860    #[instrument(skip_all, level = "trace")]
1861    async fn unbonding_tokens_by_address_index(
1862        &self,
1863        _request: tonic::Request<pb::UnbondingTokensByAddressIndexRequest>,
1864    ) -> Result<tonic::Response<Self::UnbondingTokensByAddressIndexStream>, tonic::Status> {
1865        unimplemented!("unbonding_tokens_by_address_index currently only implemented on web")
1866    }
1867
1868    #[instrument(skip_all, level = "trace")]
1869    async fn latest_swaps(
1870        &self,
1871        _request: tonic::Request<pb::LatestSwapsRequest>,
1872    ) -> Result<tonic::Response<Self::LatestSwapsStream>, tonic::Status> {
1873        unimplemented!("latest_swaps currently only implemented on web")
1874    }
1875}
1876
1877/// Convert a pd node URL to a Tonic `Endpoint`.
1878///
1879/// Required in order to configure TLS for HTTPS endpoints.
1880async fn get_pd_endpoint(node: Url) -> anyhow::Result<Endpoint> {
1881    let endpoint = match node.scheme() {
1882        "http" => Channel::from_shared(node.to_string())?,
1883        "https" => Channel::from_shared(node.to_string())?
1884            .tls_config(ClientTlsConfig::new().with_webpki_roots())?,
1885        other => anyhow::bail!("unknown url scheme {other}"),
1886    };
1887    Ok(endpoint)
1888}