penumbra_sdk_view/
service.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    pin::Pin,
4    sync::{Arc, Mutex},
5};
6
7use anyhow::{anyhow, Context};
8use async_stream::try_stream;
9use camino::Utf8Path;
10use decaf377::Fq;
11use futures::stream::{self, StreamExt, TryStreamExt};
12use penumbra_sdk_auction::auction::dutch::actions::view::{
13    ActionDutchAuctionScheduleView, ActionDutchAuctionWithdrawView,
14};
15use rand::Rng;
16use rand_core::OsRng;
17use tap::{Tap, TapFallible};
18use tokio::sync::{watch, RwLock};
19use tokio_stream::wrappers::WatchStream;
20use tonic::transport::channel::ClientTlsConfig;
21use tonic::transport::channel::Endpoint;
22use tonic::{async_trait, transport::Channel, Request, Response, Status};
23use tracing::{instrument, Instrument};
24use url::Url;
25
26use penumbra_sdk_asset::{asset, asset::Metadata, Value};
27use penumbra_sdk_dex::{
28    lp::{
29        position::{self, Position},
30        Reserves,
31    },
32    swap_claim::SwapClaimPlan,
33    TradingPair,
34};
35use penumbra_sdk_fee::Fee;
36use penumbra_sdk_keys::{
37    keys::WalletId,
38    keys::{AddressIndex, FullViewingKey},
39    Address, AddressView,
40};
41use penumbra_sdk_num::Amount;
42use penumbra_sdk_proto::{
43    util::tendermint_proxy::v1::{
44        tendermint_proxy_service_client::TendermintProxyServiceClient, BroadcastTxSyncRequest,
45        GetStatusRequest, GetStatusResponse, SyncInfo,
46    },
47    view::v1::{
48        self as pb,
49        broadcast_transaction_response::{BroadcastSuccess, Confirmed, Status as BroadcastStatus},
50        view_service_client::ViewServiceClient,
51        view_service_server::{ViewService, ViewServiceServer},
52        AppParametersResponse, AssetMetadataByIdRequest, AssetMetadataByIdResponse,
53        BroadcastTransactionResponse, FmdParametersResponse, GasPricesResponse,
54        NoteByCommitmentResponse, StatusResponse, SwapByCommitmentResponse,
55        TransactionPlannerResponse, WalletIdRequest, WalletIdResponse, WitnessResponse,
56    },
57    DomainType,
58};
59use penumbra_sdk_stake::{rate::RateData, IdentityKey};
60use penumbra_sdk_tct::{Proof, StateCommitment};
61use penumbra_sdk_transaction::{
62    AuthorizationData, Transaction, TransactionPerspective, TransactionPlan, WitnessData,
63};
64
65use crate::{worker::Worker, Planner, SpendableNoteRecord, Storage};
66
67/// A [`futures::Stream`] of broadcast transaction responses.
68///
69/// See [`ViewService::broadcast_transaction()`].
70type BroadcastTransactionStream = Pin<
71    Box<dyn futures::Stream<Item = Result<pb::BroadcastTransactionResponse, tonic::Status>> + Send>,
72>;
73
74/// A service that synchronizes private chain state and responds to queries
75/// about it.
76///
77/// The [`ViewServer`] implements the Tonic-derived [`ViewService`] trait,
78/// so it can be used as a gRPC server, or called directly.  It spawns a task
79/// internally that performs synchronization and scanning.  The
80/// [`ViewServer`] can be cloned; each clone will read from the same shared
81/// state, but there will only be a single scanning task.
82#[derive(Clone)]
83pub struct ViewServer {
84    storage: Storage,
85    // A shared error slot for errors bubbled up by the worker. This is a regular Mutex
86    // rather than a Tokio Mutex because it should be uncontended.
87    error_slot: Arc<Mutex<Option<anyhow::Error>>>,
88    // A copy of the SCT used by the worker task.
89    state_commitment_tree: Arc<RwLock<penumbra_sdk_tct::Tree>>,
90    // The Url for the pd gRPC endpoint on remote node.
91    node: Url,
92    /// Used to watch for changes to the sync height.
93    sync_height_rx: watch::Receiver<u64>,
94}
95
96impl ViewServer {
97    /// Convenience method that calls [`Storage::load_or_initialize`] and then [`Self::new`].
98    pub async fn load_or_initialize(
99        storage_path: Option<impl AsRef<Utf8Path>>,
100        registry_path: Option<impl AsRef<Utf8Path>>,
101        fvk: &FullViewingKey,
102        node: Url,
103    ) -> anyhow::Result<Self> {
104        let storage = Storage::load_or_initialize(storage_path, fvk, node.clone())
105            .tap(|_| tracing::trace!("loading or initializing storage"))
106            .await?
107            .tap(|_| tracing::debug!("storage is ready"));
108
109        if let Some(registry_path) = registry_path {
110            storage.load_asset_metadata(registry_path).await?;
111        }
112
113        Self::new(storage, node)
114            .tap(|_| tracing::trace!("constructing view server"))
115            .await
116            .tap(|_| tracing::debug!("constructed view server"))
117    }
118
119    /// Constructs a new [`ViewService`], spawning a sync task internally.
120    ///
121    /// The sync task uses the provided `client` to sync with the chain.
122    ///
123    /// To create multiple [`ViewService`]s, clone the [`ViewService`] returned
124    /// by this method, rather than calling it multiple times.  That way, each clone
125    /// will be backed by the same scanning task, rather than each spawning its own.
126    pub async fn new(storage: Storage, node: Url) -> anyhow::Result<Self> {
127        let span = tracing::error_span!(parent: None, "view");
128        let channel = Self::get_pd_channel(node.clone()).await?;
129
130        let (worker, state_commitment_tree, error_slot, sync_height_rx) =
131            Worker::new(storage.clone(), channel)
132                .instrument(span.clone())
133                .tap(|_| tracing::trace!("constructing view server worker"))
134                .await?
135                .tap(|_| tracing::debug!("constructed view server worker"));
136
137        tokio::spawn(worker.run().instrument(span))
138            .tap(|_| tracing::debug!("spawned view server worker"));
139
140        Ok(Self {
141            storage,
142            error_slot,
143            sync_height_rx,
144            state_commitment_tree,
145            node,
146        })
147    }
148
149    /// Obtain a Tonic [Channel] to a remote `pd` endpoint.
150    ///
151    /// Provided as a convenience method for bootstrapping a connection.
152    /// Handles configuring TLS if the URL is HTTPS. Also adds a tracing span
153    /// to the working [Channel].
154    pub async fn get_pd_channel(node: Url) -> anyhow::Result<Channel> {
155        let endpoint = get_pd_endpoint(node).await?;
156        let span = tracing::error_span!(parent: None, "view");
157        let c: Channel = endpoint
158            .connect()
159            .instrument(span.clone())
160            .await
161            .with_context(|| "could not connect to grpc server")
162            .tap_err(|error| tracing::error!(?error, "could not connect to grpc server"))?;
163
164        Ok(c)
165    }
166
167    /// Checks if the view server worker has encountered an error.
168    ///
169    /// This function returns a gRPC [`tonic::Status`] containing the view server worker error if
170    /// any exists, otherwise it returns `Ok(())`.
171    #[instrument(level = "debug", skip_all)]
172    async fn check_worker(&self) -> Result<(), tonic::Status> {
173        // If the shared error slot is set, then an error has occurred in the worker
174        // that we should bubble up.
175        tracing::debug!("checking view server worker");
176        if let Some(error) = self
177            .error_slot
178            .lock()
179            .tap_err(|error| tracing::error!(?error, "unable to lock worker error slot"))
180            .map_err(|e| {
181                tonic::Status::unavailable(format!("unable to lock worker error slot {:#}", e))
182            })?
183            .as_ref()
184        {
185            return Err(tonic::Status::new(
186                tonic::Code::Internal,
187                format!("Worker failed: {error}"),
188            ));
189        }
190
191        // TODO: check whether the worker is still alive, else fail, when we have a way to do that
192        // (if the worker is to crash without setting the error_slot, the service should die as well)
193
194        Ok(()).tap(|_| tracing::trace!("view server worker is healthy"))
195    }
196
197    #[instrument(skip(self, transaction), fields(id = %transaction.id()))]
198    fn broadcast_transaction(
199        &self,
200        transaction: Transaction,
201        await_detection: bool,
202    ) -> BroadcastTransactionStream {
203        let self2 = self.clone();
204        try_stream! {
205                // 1. Broadcast the transaction to the network.
206                // Note that "synchronous" here means "wait for the tx to be accepted by
207                // the fullnode", not "wait for the tx to be included on chain.
208                let mut fullnode_client = self2.tendermint_proxy_client().await
209                            .map_err(|e| {
210                                tonic::Status::unavailable(format!(
211                                    "couldn't connect to fullnode: {:#?}",
212                                    e
213                                ))
214                            })?
215                        ;
216                let node_rsp = fullnode_client
217                    .broadcast_tx_sync(BroadcastTxSyncRequest {
218                        params: transaction.encode_to_vec(),
219                        req_id: OsRng.gen(),
220                    })
221                    .await
222                    .map_err(|e| {
223                        tonic::Status::unavailable(format!(
224                            "error broadcasting tx: {:#?}",
225                            e
226                        ))
227                    })?
228                    .into_inner();
229                tracing::info!(?node_rsp);
230                match node_rsp.code {
231                    0 => Ok(()),
232                    _ => Err(tonic::Status::new(
233                        tonic::Code::Internal,
234                        format!(
235                            "Error submitting transaction: code {}, log: {}",
236                            node_rsp.code,
237                            node_rsp.log,
238                        ),
239                    )),
240                }?;
241
242                // The transaction was submitted so we provide a status update
243                yield BroadcastTransactionResponse{ status: Some(BroadcastStatus::BroadcastSuccess(BroadcastSuccess{id:Some(transaction.id().into())}))};
244
245                // 2. Optionally wait for the transaction to be detected by the view service.
246                let nullifier = if await_detection {
247                    // This needs to be only *spend* nullifiers because the nullifier detection
248                    // is broken for swaps, https://github.com/penumbra-zone/penumbra/issues/1749
249                    //
250                    // in the meantime, inline the definition from `Transaction`
251                    transaction
252                        .actions()
253                        .filter_map(|action| match action {
254                            penumbra_sdk_transaction::Action::Spend(spend) => Some(spend.body.nullifier),
255                            /*
256                            penumbra_sdk_transaction::Action::SwapClaim(swap_claim) => {
257                                Some(swap_claim.body.nullifier)
258                            }
259                             */
260                            _ => None,
261                        })
262                        .next()
263                } else {
264                    None
265                };
266
267                if let Some(nullifier) = nullifier {
268                    tracing::info!(?nullifier, "waiting for detection of nullifier");
269                    let detection = self2.storage.nullifier_status(nullifier, true);
270                    tokio::time::timeout(std::time::Duration::from_secs(20), detection)
271                        .await
272                        .map_err(|_| {
273                            tonic::Status::unavailable(
274                                "timeout waiting to detect nullifier of submitted transaction"
275                            )
276                        })?
277                        .map_err(|_| {
278                            tonic::Status::unavailable(
279                                "error while waiting for detection of submitted transaction"
280                            )
281                        })?;
282                }
283
284                let detection_height = self2.storage
285                    .transaction_by_hash(&transaction.id().0)
286                    .await
287                    .map_err(|e| tonic::Status::internal(format!("error querying storage: {:#}", e)))?
288                    .map(|(height, _tx)| height)
289                    // If we didn't find it for some reason, return 0 for unknown.
290                    // TODO: how does this change if we detach extended transaction fetch from scanning?
291                    .unwrap_or(0);
292                yield BroadcastTransactionResponse{ status: Some(BroadcastStatus::Confirmed(Confirmed{id:Some(transaction.id().into()), detection_height}))};
293            }.boxed()
294    }
295
296    #[instrument(level = "trace", skip(self))]
297    async fn tendermint_proxy_client(
298        &self,
299    ) -> anyhow::Result<TendermintProxyServiceClient<Channel>> {
300        TendermintProxyServiceClient::connect(self.node.to_string())
301            .tap(|_| tracing::debug!("connecting to tendermint proxy"))
302            .await
303            .tap_err(|error| tracing::error!(?error, "failed to connect to tendermint proxy"))
304            .map_err(anyhow::Error::from)
305    }
306
307    /// Return the latest block height known by the fullnode or its peers, as
308    /// well as whether the fullnode is caught up with that height.
309    #[instrument(skip(self))]
310    pub async fn latest_known_block_height(&self) -> anyhow::Result<(u64, bool)> {
311        let mut client = self.tendermint_proxy_client().await?;
312
313        let GetStatusResponse { sync_info, .. } = client
314            .get_status(GetStatusRequest {})
315            .tap(|_| tracing::debug!("querying current status"))
316            .await
317            .tap_err(|error| tracing::debug!(?error, "failed to query current status"))?
318            .into_inner();
319
320        let SyncInfo {
321            latest_block_height,
322            catching_up,
323            ..
324        } = sync_info
325            .ok_or_else(|| anyhow::anyhow!("could not parse sync_info in gRPC response"))?;
326
327        // There is a `max_peer_block_height` available in TM 0.35, however it should not be used
328        // as it does not seem to reflect the consensus height. Since clients use `latest_known_block_height`
329        // to determine the height to attempt syncing to, a validator reporting a non-consensus height
330        // can cause a DoS to clients attempting to sync if `max_peer_block_height` is used.
331        let latest_known_block_height = latest_block_height;
332
333        tracing::debug!(
334            ?latest_block_height,
335            ?catching_up,
336            ?latest_known_block_height,
337            "found latest known block height"
338        );
339
340        Ok((latest_known_block_height, catching_up))
341    }
342
343    #[instrument(skip(self))]
344    pub async fn status(&self) -> anyhow::Result<StatusResponse> {
345        let full_sync_height = self.storage.last_sync_height().await?.unwrap_or(0);
346
347        let (latest_known_block_height, node_catching_up) =
348            self.latest_known_block_height().await?;
349
350        let height_diff = latest_known_block_height
351            .checked_sub(full_sync_height)
352            .ok_or_else(|| anyhow!("sync height ahead of node height"))?;
353
354        let catching_up = match (node_catching_up, height_diff) {
355            // We're synced to the same height as the node
356            (false, 0) => false,
357            // We're one block behind, and will learn about it soon, close enough
358            (false, 1) => false,
359            // We're behind the node
360            (false, _) => true,
361            // The node is behind the network
362            (true, _) => true,
363        };
364
365        Ok(StatusResponse {
366            full_sync_height,
367            catching_up,
368            partial_sync_height: full_sync_height, // Set these as the same for backwards compatibility following adding the partial_sync_height
369        })
370    }
371}
372
373#[async_trait]
374impl ViewService for ViewServer {
375    type NotesStream =
376        Pin<Box<dyn futures::Stream<Item = Result<pb::NotesResponse, tonic::Status>> + Send>>;
377    type NotesForVotingStream = Pin<
378        Box<dyn futures::Stream<Item = Result<pb::NotesForVotingResponse, tonic::Status>> + Send>,
379    >;
380    type AssetsStream =
381        Pin<Box<dyn futures::Stream<Item = Result<pb::AssetsResponse, tonic::Status>> + Send>>;
382    type StatusStreamStream = Pin<
383        Box<dyn futures::Stream<Item = Result<pb::StatusStreamResponse, tonic::Status>> + Send>,
384    >;
385    type TransactionInfoStream = Pin<
386        Box<dyn futures::Stream<Item = Result<pb::TransactionInfoResponse, tonic::Status>> + Send>,
387    >;
388    type BalancesStream =
389        Pin<Box<dyn futures::Stream<Item = Result<pb::BalancesResponse, tonic::Status>> + Send>>;
390    type OwnedPositionIdsStream = Pin<
391        Box<dyn futures::Stream<Item = Result<pb::OwnedPositionIdsResponse, tonic::Status>> + Send>,
392    >;
393    type UnclaimedSwapsStream = Pin<
394        Box<dyn futures::Stream<Item = Result<pb::UnclaimedSwapsResponse, tonic::Status>> + Send>,
395    >;
396    type BroadcastTransactionStream = BroadcastTransactionStream;
397    type WitnessAndBuildStream = Pin<
398        Box<dyn futures::Stream<Item = Result<pb::WitnessAndBuildResponse, tonic::Status>> + Send>,
399    >;
400    type AuthorizeAndBuildStream = Pin<
401        Box<
402            dyn futures::Stream<Item = Result<pb::AuthorizeAndBuildResponse, tonic::Status>> + Send,
403        >,
404    >;
405    type DelegationsByAddressIndexStream = Pin<
406        Box<
407            dyn futures::Stream<Item = Result<pb::DelegationsByAddressIndexResponse, tonic::Status>>
408                + Send,
409        >,
410    >;
411    type UnbondingTokensByAddressIndexStream = Pin<
412        Box<
413            dyn futures::Stream<
414                    Item = Result<pb::UnbondingTokensByAddressIndexResponse, tonic::Status>,
415                > + Send,
416        >,
417    >;
418    type AuctionsStream =
419        Pin<Box<dyn futures::Stream<Item = Result<pb::AuctionsResponse, tonic::Status>> + Send>>;
420    type LatestSwapsStream =
421        Pin<Box<dyn futures::Stream<Item = Result<pb::LatestSwapsResponse, tonic::Status>> + Send>>;
422    type LqtVotingNotesStream = Pin<
423        Box<dyn futures::Stream<Item = Result<pb::LqtVotingNotesResponse, tonic::Status>> + Send>,
424    >;
425    type TournamentVotesStream = Pin<
426        Box<dyn futures::Stream<Item = Result<pb::TournamentVotesResponse, tonic::Status>> + Send>,
427    >;
428    type LpPositionBundleStream = Pin<
429        Box<dyn futures::Stream<Item = Result<pb::LpPositionBundleResponse, tonic::Status>> + Send>,
430    >;
431    type LpStrategyCatalogStream = Pin<
432        Box<
433            dyn futures::Stream<Item = Result<pb::LpStrategyCatalogResponse, tonic::Status>> + Send,
434        >,
435    >;
436
437    #[instrument(skip_all, level = "trace")]
438    async fn auctions(
439        &self,
440        request: tonic::Request<pb::AuctionsRequest>,
441    ) -> Result<tonic::Response<Self::AuctionsStream>, tonic::Status> {
442        use penumbra_sdk_proto::core::component::auction::v1 as pb_auction;
443        use penumbra_sdk_proto::core::component::auction::v1::query_service_client::QueryServiceClient as AuctionQueryServiceClient;
444
445        let parameters = request.into_inner();
446        let query_latest_state = parameters.query_latest_state;
447        let include_inactive = parameters.include_inactive;
448
449        let account_filter = parameters
450            .account_filter
451            .to_owned()
452            .map(AddressIndex::try_from)
453            .map_or(Ok(None), |v| v.map(Some))
454            .map_err(|_| tonic::Status::invalid_argument("invalid account filter"))?;
455
456        let all_auctions = self
457            .storage
458            .fetch_auctions_by_account(account_filter, include_inactive)
459            .await
460            .map_err(|e| tonic::Status::internal(e.to_string()))?;
461
462        let client = if query_latest_state {
463            Some(
464                AuctionQueryServiceClient::connect(self.node.to_string())
465                    .await
466                    .map_err(|e| tonic::Status::internal(e.to_string()))?,
467            )
468        } else {
469            None
470        };
471
472        let responses = futures::future::join_all(all_auctions.into_iter().map(
473            |(auction_id, note_record, local_seq)| {
474                let maybe_client = client.clone();
475                async move {
476                    let (any_state, positions) = if let Some(mut client2) = maybe_client {
477                        let extra_data = client2
478                            .auction_state_by_id(pb_auction::AuctionStateByIdRequest {
479                                id: Some(auction_id.into()),
480                            })
481                            .await
482                            .map_err(|e| tonic::Status::internal(e.to_string()))?
483                            .into_inner();
484                        (extra_data.auction, extra_data.positions)
485                    } else {
486                        (None, vec![])
487                    };
488
489                    Result::<_, tonic::Status>::Ok(pb::AuctionsResponse {
490                        id: Some(auction_id.into()),
491                        note_record: Some(note_record.into()),
492                        auction: any_state,
493                        positions,
494                        local_seq,
495                    })
496                }
497            },
498        ))
499        .await;
500
501        let stream = stream::iter(responses)
502            .map_err(|e| tonic::Status::internal(format!("error getting auction: {e}")))
503            .boxed();
504
505        Ok(Response::new(stream))
506    }
507
508    #[instrument(skip_all, level = "trace")]
509    async fn broadcast_transaction(
510        &self,
511        request: tonic::Request<pb::BroadcastTransactionRequest>,
512    ) -> Result<tonic::Response<Self::BroadcastTransactionStream>, tonic::Status> {
513        let pb::BroadcastTransactionRequest {
514            transaction,
515            await_detection,
516        } = request.into_inner();
517
518        let transaction: Transaction = transaction
519            .ok_or_else(|| tonic::Status::invalid_argument("missing transaction"))?
520            .try_into()
521            .map_err(|e: anyhow::Error| e.context("could not decode transaction"))
522            .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
523
524        let stream = self.broadcast_transaction(transaction, await_detection);
525
526        Ok(tonic::Response::new(stream))
527    }
528
529    #[instrument(skip_all, level = "trace")]
530    async fn transaction_planner(
531        &self,
532        request: tonic::Request<pb::TransactionPlannerRequest>,
533    ) -> Result<tonic::Response<pb::TransactionPlannerResponse>, tonic::Status> {
534        let prq = request.into_inner();
535
536        let app_params =
537            self.storage.app_params().await.map_err(|e| {
538                tonic::Status::internal(format!("could not get app params: {:#}", e))
539            })?;
540
541        let gas_prices =
542            self.storage.gas_prices().await.map_err(|e| {
543                tonic::Status::internal(format!("could not get gas prices: {:#}", e))
544            })?;
545
546        // TODO: need to support passing the fee _in_ to this API via the TransactionPlannerRequest
547        // meaning the requester should fetch the gas prices and estimate cost/allow the user to modify
548        // fee paid
549        let mut planner = Planner::new(OsRng);
550        planner.set_gas_prices(gas_prices);
551        planner.expiry_height(prq.expiry_height);
552
553        for output in prq.outputs {
554            let address: Address = output
555                .address
556                .ok_or_else(|| tonic::Status::invalid_argument("Missing address"))?
557                .try_into()
558                .map_err(|e| {
559                    tonic::Status::invalid_argument(format!("Could not parse address: {e:#}"))
560                })?;
561
562            let value: Value = output
563                .value
564                .ok_or_else(|| tonic::Status::invalid_argument("Missing value"))?
565                .try_into()
566                .map_err(|e| {
567                    tonic::Status::invalid_argument(format!("Could not parse value: {e:#}"))
568                })?;
569
570            planner.output(value, address);
571        }
572
573        for swap in prq.swaps {
574            let value: Value = swap
575                .value
576                .ok_or_else(|| tonic::Status::invalid_argument("Missing value"))?
577                .try_into()
578                .map_err(|e| {
579                    tonic::Status::invalid_argument(format!("Could not parse value: {e:#}"))
580                })?;
581
582            let target_asset: asset::Id = swap
583                .target_asset
584                .ok_or_else(|| tonic::Status::invalid_argument("Missing target asset"))?
585                .try_into()
586                .map_err(|e| {
587                    tonic::Status::invalid_argument(format!("Could not parse target asset: {e:#}"))
588                })?;
589
590            let fee: Fee = swap
591                .fee
592                .ok_or_else(|| tonic::Status::invalid_argument("Missing fee"))?
593                .try_into()
594                .map_err(|e| {
595                    tonic::Status::invalid_argument(format!("Could not parse fee: {e:#}"))
596                })?;
597
598            let claim_address: Address = swap
599                .claim_address
600                .ok_or_else(|| tonic::Status::invalid_argument("Missing claim address"))?
601                .try_into()
602                .map_err(|e| {
603                    tonic::Status::invalid_argument(format!("Could not parse claim address: {e:#}"))
604                })?;
605
606            planner
607                .swap(value, target_asset, fee, claim_address)
608                .map_err(|e| {
609                    tonic::Status::invalid_argument(format!("Could not plan swap: {e:#}"))
610                })?;
611        }
612
613        for swap_claim in prq.swap_claims {
614            let swap_commitment: StateCommitment = swap_claim
615                .swap_commitment
616                .ok_or_else(|| tonic::Status::invalid_argument("Missing swap commitment"))?
617                .try_into()
618                .map_err(|e| {
619                    tonic::Status::invalid_argument(format!(
620                        "Could not parse swap commitment: {e:#}"
621                    ))
622                })?;
623            let swap_record = self
624                .storage
625                // TODO: should there be a timeout on detection here instead?
626                .swap_by_commitment(swap_commitment, false)
627                .await
628                .map_err(|e| {
629                    tonic::Status::invalid_argument(format!(
630                        "Could not fetch swap by commitment: {e:#}"
631                    ))
632                })?;
633
634            planner.swap_claim(SwapClaimPlan {
635                swap_plaintext: swap_record.swap,
636                position: swap_record.position,
637                output_data: swap_record.output_data,
638                epoch_duration: app_params.sct_params.epoch_duration,
639                proof_blinding_r: Fq::rand(&mut OsRng),
640                proof_blinding_s: Fq::rand(&mut OsRng),
641            });
642        }
643
644        let current_epoch = if prq.undelegations.is_empty() && prq.delegations.is_empty() {
645            None
646        } else {
647            Some(
648                prq.epoch
649                    .ok_or_else(|| {
650                        tonic::Status::invalid_argument(
651                            "Missing current epoch in TransactionPlannerRequest",
652                        )
653                    })?
654                    .try_into()
655                    .map_err(|e| {
656                        tonic::Status::invalid_argument(format!(
657                            "Could not parse current epoch: {e:#}"
658                        ))
659                    })?,
660            )
661        };
662
663        for delegation in prq.delegations {
664            let amount: Amount = delegation
665                .amount
666                .ok_or_else(|| tonic::Status::invalid_argument("Missing amount"))?
667                .try_into()
668                .map_err(|e| {
669                    tonic::Status::invalid_argument(format!("Could not parse amount: {e:#}"))
670                })?;
671
672            let rate_data: RateData = delegation
673                .rate_data
674                .ok_or_else(|| tonic::Status::invalid_argument("Missing rate data"))?
675                .try_into()
676                .map_err(|e| {
677                    tonic::Status::invalid_argument(format!("Could not parse rate data: {e:#}"))
678                })?;
679
680            planner.delegate(
681                current_epoch.expect("checked that current epoch is present"),
682                amount,
683                rate_data,
684            );
685        }
686
687        for undelegation in prq.undelegations {
688            let value: Value = undelegation
689                .value
690                .ok_or_else(|| tonic::Status::invalid_argument("Missing value"))?
691                .try_into()
692                .map_err(|e| {
693                    tonic::Status::invalid_argument(format!("Could not parse value: {e:#}"))
694                })?;
695
696            let rate_data: RateData = undelegation
697                .rate_data
698                .ok_or_else(|| tonic::Status::invalid_argument("Missing rate data"))?
699                .try_into()
700                .map_err(|e| {
701                    tonic::Status::invalid_argument(format!("Could not parse rate data: {e:#}"))
702                })?;
703
704            planner.undelegate(
705                current_epoch.expect("checked that current epoch is present"),
706                value.amount,
707                rate_data,
708            );
709        }
710
711        for position_open in prq.position_opens {
712            let position: Position = position_open
713                .position
714                .ok_or_else(|| tonic::Status::invalid_argument("Missing position"))?
715                .try_into()
716                .map_err(|e| {
717                    tonic::Status::invalid_argument(format!("Could not parse position: {e:#}"))
718                })?;
719
720            planner.position_open(position);
721        }
722
723        for position_close in prq.position_closes {
724            let position_id: position::Id = position_close
725                .position_id
726                .ok_or_else(|| tonic::Status::invalid_argument("Missing position_id"))?
727                .try_into()
728                .map_err(|e| {
729                    tonic::Status::invalid_argument(format!("Could not parse position ID: {e:#}"))
730                })?;
731
732            planner.position_close(position_id);
733        }
734
735        for position_withdraw in prq.position_withdraws {
736            let position_id: position::Id = position_withdraw
737                .position_id
738                .ok_or_else(|| tonic::Status::invalid_argument("Missing position_id"))?
739                .try_into()
740                .map_err(|e| {
741                    tonic::Status::invalid_argument(format!("Could not parse position ID: {e:#}"))
742                })?;
743
744            let reserves: Reserves = position_withdraw
745                .reserves
746                .ok_or_else(|| tonic::Status::invalid_argument("Missing reserves"))?
747                .try_into()
748                .map_err(|e| {
749                    tonic::Status::invalid_argument(format!("Could not parse reserves: {e:#}"))
750                })?;
751
752            let trading_pair: TradingPair = position_withdraw
753                .trading_pair
754                .ok_or_else(|| tonic::Status::invalid_argument("Missing pair"))?
755                .try_into()
756                .map_err(|e| {
757                    tonic::Status::invalid_argument(format!("Could not parse pair: {e:#}"))
758                })?;
759
760            planner.position_withdraw(position_id, reserves, trading_pair, 0);
761        }
762
763        // Insert any ICS20 withdrawals.
764        for ics20_withdrawal in prq.ics20_withdrawals {
765            planner.ics20_withdrawal(
766                ics20_withdrawal
767                    .try_into()
768                    .map_err(|e| tonic::Status::invalid_argument(format!("{e:#}")))?,
769            );
770        }
771
772        // Finally, insert all the requested IBC actions.
773        for ibc_action in prq.ibc_relay_actions {
774            planner.ibc_action(
775                ibc_action
776                    .try_into()
777                    .map_err(|e| tonic::Status::invalid_argument(format!("{e:#}")))?,
778            );
779        }
780
781        let mut client_of_self = ViewServiceClient::new(ViewServiceServer::new(self.clone()));
782
783        let source = prq
784            .source
785            // If the request specified a source of funds, pass it to the planner...
786            .map(|addr_index| addr_index.account)
787            // ... or just use the default account if not.
788            .unwrap_or(0u32);
789
790        let plan = planner
791            .plan(&mut client_of_self, source.into())
792            .await
793            .context("could not plan requested transaction")
794            .map_err(|e| tonic::Status::invalid_argument(format!("{e:#}")))?;
795
796        Ok(tonic::Response::new(TransactionPlannerResponse {
797            plan: Some(plan.into()),
798        }))
799    }
800
801    #[instrument(skip_all, level = "trace")]
802    async fn address_by_index(
803        &self,
804        request: tonic::Request<pb::AddressByIndexRequest>,
805    ) -> Result<tonic::Response<pb::AddressByIndexResponse>, tonic::Status> {
806        let fvk =
807            self.storage.full_viewing_key().await.map_err(|_| {
808                tonic::Status::failed_precondition("Error retrieving full viewing key")
809            })?;
810
811        let address_index = request
812            .into_inner()
813            .address_index
814            .ok_or_else(|| tonic::Status::invalid_argument("Missing address index"))?
815            .try_into()
816            .map_err(|e| {
817                tonic::Status::invalid_argument(format!("Could not parse address index: {e:#}"))
818            })?;
819
820        Ok(tonic::Response::new(pb::AddressByIndexResponse {
821            address: Some(fvk.payment_address(address_index).0.into()),
822        }))
823    }
824
825    #[instrument(skip_all, level = "trace")]
826    async fn index_by_address(
827        &self,
828        request: tonic::Request<pb::IndexByAddressRequest>,
829    ) -> Result<tonic::Response<pb::IndexByAddressResponse>, tonic::Status> {
830        let fvk =
831            self.storage.full_viewing_key().await.map_err(|_| {
832                tonic::Status::failed_precondition("Error retrieving full viewing key")
833            })?;
834
835        let address: Address = request
836            .into_inner()
837            .address
838            .ok_or_else(|| tonic::Status::invalid_argument("Missing address"))?
839            .try_into()
840            .map_err(|e| {
841                tonic::Status::invalid_argument(format!("Could not parse address: {e:#}"))
842            })?;
843
844        Ok(tonic::Response::new(pb::IndexByAddressResponse {
845            address_index: fvk.address_index(&address).map(Into::into),
846        }))
847    }
848    async fn transparent_address(
849        &self,
850        _request: tonic::Request<pb::TransparentAddressRequest>,
851    ) -> Result<tonic::Response<pb::TransparentAddressResponse>, tonic::Status> {
852        let fvk =
853            self.storage.full_viewing_key().await.map_err(|_| {
854                tonic::Status::failed_precondition("Error retrieving full viewing key")
855            })?;
856
857        let encoding = fvk.incoming().transparent_address();
858        let address: Address = encoding
859            .parse()
860            .map_err(|_| tonic::Status::internal("could not parse newly generated address"))?;
861
862        Ok(tonic::Response::new(pb::TransparentAddressResponse {
863            address: Some(address.into()),
864            encoding,
865        }))
866    }
867
868    #[instrument(skip_all, level = "trace")]
869    async fn ephemeral_address(
870        &self,
871        request: tonic::Request<pb::EphemeralAddressRequest>,
872    ) -> Result<tonic::Response<pb::EphemeralAddressResponse>, tonic::Status> {
873        let fvk =
874            self.storage.full_viewing_key().await.map_err(|_| {
875                tonic::Status::failed_precondition("Error retrieving full viewing key")
876            })?;
877
878        let address_index = request
879            .into_inner()
880            .address_index
881            .ok_or_else(|| tonic::Status::invalid_argument("Missing address index"))?
882            .try_into()
883            .map_err(|e| {
884                tonic::Status::invalid_argument(format!("Could not parse address index: {e:#}"))
885            })?;
886
887        Ok(tonic::Response::new(pb::EphemeralAddressResponse {
888            address: Some(fvk.ephemeral_address(OsRng, address_index).0.into()),
889        }))
890    }
891
892    #[instrument(skip_all, level = "trace")]
893    async fn transaction_info_by_hash(
894        &self,
895        request: tonic::Request<pb::TransactionInfoByHashRequest>,
896    ) -> Result<tonic::Response<pb::TransactionInfoByHashResponse>, tonic::Status> {
897        self.check_worker().await?;
898
899        let request = request.into_inner();
900
901        let fvk =
902            self.storage.full_viewing_key().await.map_err(|_| {
903                tonic::Status::failed_precondition("Error retrieving full viewing key")
904            })?;
905
906        let maybe_tx = self
907            .storage
908            .transaction_by_hash(
909                &request
910                    .id
911                    .clone()
912                    .ok_or_else(|| {
913                        tonic::Status::invalid_argument(
914                            "missing transaction ID in TransactionInfoByHashRequest",
915                        )
916                    })?
917                    .inner,
918            )
919            .await
920            .map_err(|_| {
921                tonic::Status::failed_precondition(format!(
922                    "Error retrieving transaction by hash {}",
923                    hex::encode(request.id.expect("transaction id is present").inner)
924                ))
925            })?;
926
927        let Some((height, tx)) = maybe_tx else {
928            return Ok(tonic::Response::new(
929                pb::TransactionInfoByHashResponse::default(),
930            ));
931        };
932
933        // First, create a TxP with the payload keys visible to our FVK and no other data.
934        let mut txp = TransactionPerspective {
935            payload_keys: tx
936                .payload_keys(&fvk)
937                .map_err(|_| tonic::Status::failed_precondition("Error generating payload keys"))?,
938            ..Default::default()
939        };
940
941        // Next, extend the TxP with the openings of commitments known to our view server
942        // but not included in the transaction body, for instance spent notes or swap claim outputs.
943        for action in tx.actions() {
944            use penumbra_sdk_transaction::Action;
945            match action {
946                Action::Spend(spend) => {
947                    let nullifier = spend.body.nullifier;
948                    // An error here indicates we don't know the nullifier, so we omit it from the Perspective.
949                    if let Ok(spendable_note_record) =
950                        self.storage.note_by_nullifier(nullifier, false).await
951                    {
952                        txp.spend_nullifiers
953                            .insert(nullifier, spendable_note_record.note);
954                    }
955                }
956                Action::SwapClaim(claim) => {
957                    let output_1_record = self
958                        .storage
959                        .note_by_commitment(claim.body.output_1_commitment, false)
960                        .await
961                        .map_err(|e| {
962                            tonic::Status::internal(format!(
963                                "Error retrieving first SwapClaim output note record: {:#}",
964                                e
965                            ))
966                        })?;
967                    let output_2_record = self
968                        .storage
969                        .note_by_commitment(claim.body.output_2_commitment, false)
970                        .await
971                        .map_err(|e| {
972                            tonic::Status::internal(format!(
973                                "Error retrieving second SwapClaim output note record: {:#}",
974                                e
975                            ))
976                        })?;
977
978                    txp.advice_notes
979                        .insert(claim.body.output_1_commitment, output_1_record.note);
980                    txp.advice_notes
981                        .insert(claim.body.output_2_commitment, output_2_record.note);
982                }
983                _ => {}
984            }
985        }
986
987        // Now, generate a stub TxV from our minimal TxP, and inspect it to see what data we should
988        // augment the minimal TxP with to provide additional context (e.g., filling in denoms for
989        // visible asset IDs).
990        let min_view = tx.view_from_perspective(&txp);
991        let mut address_views = BTreeMap::new();
992        let mut asset_ids = BTreeSet::new();
993        for action_view in min_view.action_views() {
994            use penumbra_sdk_dex::{swap::SwapView, swap_claim::SwapClaimView};
995            use penumbra_sdk_transaction::view::action_view::{
996                ActionView, DelegatorVoteView, OutputView, SpendView,
997            };
998            match action_view {
999                ActionView::Spend(SpendView::Visible { note, .. }) => {
1000                    let address = note.address();
1001                    address_views.insert(address.clone(), fvk.view_address(address));
1002                    asset_ids.insert(note.asset_id());
1003                }
1004                ActionView::Output(OutputView::Visible { note, .. }) => {
1005                    let address = note.address();
1006                    address_views.insert(address.clone(), fvk.view_address(address.clone()));
1007                    asset_ids.insert(note.asset_id());
1008
1009                    // Also add an AddressView for the return address in the memo.
1010                    let memo = tx.decrypt_memo(&fvk).map_err(|_| {
1011                        tonic::Status::internal("Error decrypting memo for OutputView")
1012                    })?;
1013                    address_views.insert(memo.return_address(), fvk.view_address(address));
1014                }
1015                ActionView::Swap(SwapView::Visible { swap_plaintext, .. }) => {
1016                    let address = swap_plaintext.claim_address.clone();
1017                    address_views.insert(address.clone(), fvk.view_address(address));
1018                    asset_ids.insert(swap_plaintext.trading_pair.asset_1());
1019                    asset_ids.insert(swap_plaintext.trading_pair.asset_2());
1020                }
1021                ActionView::SwapClaim(SwapClaimView::Visible {
1022                    output_1, output_2, ..
1023                }) => {
1024                    // Both will be sent to the same address so this only needs to be added once
1025                    let address = output_1.address();
1026                    address_views.insert(address.clone(), fvk.view_address(address));
1027                    asset_ids.insert(output_1.asset_id());
1028                    asset_ids.insert(output_2.asset_id());
1029                }
1030                ActionView::DelegatorVote(DelegatorVoteView::Visible { note, .. }) => {
1031                    let address = note.address();
1032                    address_views.insert(address.clone(), fvk.view_address(address));
1033                    asset_ids.insert(note.asset_id());
1034                }
1035                ActionView::ActionDutchAuctionWithdraw(ActionDutchAuctionWithdrawView {
1036                    action: _,
1037                    reserves,
1038                }) => {
1039                    // previous comment: /* no-op for now - i'm not totally sure we have all the necessary data to attribute specific note openings to this view */
1040                    // to this cronokirby replied: well, we can however at least fill in some asset ids!
1041                    for value in reserves {
1042                        asset_ids.insert(value.asset_id());
1043                    }
1044                }
1045                // We can populate asset ids for the assets involved in the auction
1046                ActionView::ActionDutchAuctionSchedule(ActionDutchAuctionScheduleView {
1047                    action,
1048                    ..
1049                }) => {
1050                    let description = &action.description;
1051                    asset_ids.insert(description.input.asset_id);
1052                    asset_ids.insert(description.output_id);
1053                }
1054                _ => {}
1055            }
1056        }
1057
1058        // Now, extend the TxV with information helpful to understand the data it can view:
1059
1060        let mut denoms = Vec::new();
1061
1062        for id in asset_ids {
1063            if let Some(asset) = self.storage.asset_by_id(&id).await.map_err(|e| {
1064                tonic::Status::internal(format!("Error retrieving asset by id: {:#}", e))
1065            })? {
1066                denoms.push(asset);
1067            }
1068        }
1069
1070        txp.denoms.extend(denoms);
1071
1072        txp.address_views = address_views.into_values().collect();
1073
1074        // Finally, compute the full TxV from the full TxP:
1075        let txv = tx.view_from_perspective(&txp);
1076        let summary = txv.summary();
1077
1078        let response = pb::TransactionInfoByHashResponse {
1079            tx_info: Some(pb::TransactionInfo {
1080                height,
1081                id: Some(tx.id().into()),
1082                perspective: Some(txp.into()),
1083                transaction: Some(tx.into()),
1084                view: Some(txv.into()),
1085                summary: Some(summary.into()),
1086            }),
1087        };
1088
1089        Ok(tonic::Response::new(response))
1090    }
1091
1092    #[instrument(skip_all, level = "trace")]
1093    async fn swap_by_commitment(
1094        &self,
1095        request: tonic::Request<pb::SwapByCommitmentRequest>,
1096    ) -> Result<tonic::Response<pb::SwapByCommitmentResponse>, tonic::Status> {
1097        self.check_worker().await?;
1098
1099        let request = request.into_inner();
1100
1101        let swap_commitment = request
1102            .swap_commitment
1103            .ok_or_else(|| {
1104                tonic::Status::failed_precondition("Missing swap commitment in request")
1105            })?
1106            .try_into()
1107            .map_err(|_| {
1108                tonic::Status::failed_precondition("Invalid swap commitment in request")
1109            })?;
1110
1111        let swap = pb::SwapRecord::from(
1112            self.storage
1113                .swap_by_commitment(swap_commitment, request.await_detection)
1114                .await
1115                .map_err(|e| tonic::Status::internal(format!("error: {e}")))?,
1116        );
1117
1118        Ok(tonic::Response::new(SwapByCommitmentResponse {
1119            swap: Some(swap),
1120        }))
1121    }
1122
1123    #[allow(deprecated)]
1124    #[instrument(skip(self, request))]
1125    async fn balances(
1126        &self,
1127        request: tonic::Request<pb::BalancesRequest>,
1128    ) -> Result<tonic::Response<Self::BalancesStream>, tonic::Status> {
1129        let request = request.into_inner();
1130
1131        let account_filter = request.account_filter.and_then(|x| {
1132            AddressIndex::try_from(x)
1133                .map_err(|_| {
1134                    tonic::Status::failed_precondition("Invalid swap commitment in request")
1135                })
1136                .map_or(None, |x| x.into())
1137        });
1138
1139        let asset_id_filter = request.asset_id_filter.and_then(|x| {
1140            asset::Id::try_from(x)
1141                .map_err(|_| {
1142                    tonic::Status::failed_precondition("Invalid swap commitment in request")
1143                })
1144                .map_or(None, |x| x.into())
1145        });
1146
1147        let result = self
1148            .storage
1149            .balances(account_filter, asset_id_filter)
1150            .await
1151            .map_err(|e| tonic::Status::internal(format!("error: {e}")))?;
1152
1153        tracing::debug!(?account_filter, ?asset_id_filter, ?result);
1154
1155        let self2 = self.clone();
1156        let stream = try_stream! {
1157            // retrieve balance and address views
1158            for element in result {
1159                let metadata: Metadata = self2
1160                    .asset_metadata_by_id(Request::new(pb::AssetMetadataByIdRequest {
1161                        asset_id: Some(element.id.into()),
1162                    }))
1163                    .await?
1164                    .into_inner()
1165                    .denom_metadata
1166                    .context("denom metadata not found")?
1167                    .try_into()?;
1168
1169                 let value = Value {
1170                    asset_id: element.id,
1171                    amount: element.amount.into(),
1172                };
1173
1174                let value_view = value.view_with_denom(metadata)?;
1175
1176                let address: Address = self2
1177                  .address_by_index(Request::new(pb::AddressByIndexRequest {
1178                       address_index: account_filter.map(Into::into),
1179                   }))
1180                   .await?
1181                    .into_inner()
1182                    .address
1183                    .context("address not found")?
1184                    .try_into()?;
1185
1186                 let wallet_id: WalletId = self2
1187                            .wallet_id(Request::new(pb::WalletIdRequest {}))
1188                            .await?
1189                            .into_inner()
1190                            .wallet_id
1191                            .context("wallet id not found")?
1192                            .try_into()?;
1193
1194                let address_view = AddressView::Decoded {
1195                    address,
1196                    index: element.address_index,
1197                    wallet_id,
1198                };
1199
1200                yield pb::BalancesResponse {
1201                    account_address: Some(address_view.into()),
1202                    balance_view: Some(value_view.into()),
1203                    balance: None,
1204                    account: None,
1205                }
1206            }
1207        };
1208
1209        Ok(tonic::Response::new(
1210            stream
1211                .map_err(|e: anyhow::Error| {
1212                    tonic::Status::unavailable(format!("error getting balances: {e}"))
1213                })
1214                .boxed(),
1215        ))
1216    }
1217
1218    #[instrument(skip_all, level = "trace")]
1219    async fn note_by_commitment(
1220        &self,
1221        request: tonic::Request<pb::NoteByCommitmentRequest>,
1222    ) -> Result<tonic::Response<pb::NoteByCommitmentResponse>, tonic::Status> {
1223        self.check_worker().await?;
1224
1225        let request = request.into_inner();
1226
1227        let note_commitment = request
1228            .note_commitment
1229            .ok_or_else(|| {
1230                tonic::Status::failed_precondition("Missing note commitment in request")
1231            })?
1232            .try_into()
1233            .map_err(|_| {
1234                tonic::Status::failed_precondition("Invalid note commitment in request")
1235            })?;
1236
1237        let spendable_note = pb::SpendableNoteRecord::from(
1238            self.storage
1239                .note_by_commitment(note_commitment, request.await_detection)
1240                .await
1241                .map_err(|e| tonic::Status::internal(format!("error: {e}")))?,
1242        );
1243
1244        Ok(tonic::Response::new(NoteByCommitmentResponse {
1245            spendable_note: Some(spendable_note),
1246        }))
1247    }
1248
1249    #[instrument(skip_all, level = "trace")]
1250    async fn nullifier_status(
1251        &self,
1252        request: tonic::Request<pb::NullifierStatusRequest>,
1253    ) -> Result<tonic::Response<pb::NullifierStatusResponse>, tonic::Status> {
1254        self.check_worker().await?;
1255
1256        let request = request.into_inner();
1257
1258        let nullifier = request
1259            .nullifier
1260            .ok_or_else(|| tonic::Status::failed_precondition("Missing nullifier in request"))?
1261            .try_into()
1262            .map_err(|_| tonic::Status::failed_precondition("Invalid nullifier in request"))?;
1263
1264        Ok(tonic::Response::new(pb::NullifierStatusResponse {
1265            spent: self
1266                .storage
1267                .nullifier_status(nullifier, request.await_detection)
1268                .await
1269                .map_err(|e| tonic::Status::internal(format!("error: {e}")))?,
1270        }))
1271    }
1272
1273    #[instrument(skip_all, level = "trace")]
1274    async fn status(
1275        &self,
1276        _: tonic::Request<pb::StatusRequest>,
1277    ) -> Result<tonic::Response<pb::StatusResponse>, tonic::Status> {
1278        self.check_worker().await?;
1279
1280        Ok(tonic::Response::new(self.status().await.map_err(|e| {
1281            tonic::Status::internal(format!("error: {e}"))
1282        })?))
1283    }
1284
1285    #[instrument(skip_all, level = "trace")]
1286    async fn status_stream(
1287        &self,
1288        _: tonic::Request<pb::StatusStreamRequest>,
1289    ) -> Result<tonic::Response<Self::StatusStreamStream>, tonic::Status> {
1290        self.check_worker().await?;
1291
1292        let (latest_known_block_height, _) = self
1293            .latest_known_block_height()
1294            .await
1295            .tap_err(|error| {
1296                tracing::debug!(
1297                    ?error,
1298                    "unable to fetch latest known block height from fullnode"
1299                )
1300            })
1301            .map_err(|e| {
1302                tonic::Status::unknown(format!(
1303                    "unable to fetch latest known block height from fullnode: {e}"
1304                ))
1305            })?;
1306
1307        // Create a stream of sync height updates from our worker, and send them to the client
1308        // until we've reached the latest known block height at the time the request was made.
1309        let mut sync_height_stream = WatchStream::new(self.sync_height_rx.clone());
1310        let stream = try_stream! {
1311            while let Some(sync_height) = sync_height_stream.next().await {
1312                yield pb::StatusStreamResponse {
1313                    latest_known_block_height,
1314                    full_sync_height: sync_height,
1315                    partial_sync_height: sync_height, // Set these as the same for backwards compatibility following adding the partial_sync_height
1316                };
1317                if sync_height >= latest_known_block_height {
1318                    break;
1319                }
1320            }
1321        };
1322
1323        Ok(tonic::Response::new(stream.boxed()))
1324    }
1325
1326    #[instrument(skip_all, level = "trace")]
1327    async fn notes(
1328        &self,
1329        request: tonic::Request<pb::NotesRequest>,
1330    ) -> Result<tonic::Response<Self::NotesStream>, tonic::Status> {
1331        self.check_worker().await?;
1332
1333        let request = request.into_inner();
1334
1335        let include_spent = request.include_spent;
1336        let asset_id = request
1337            .asset_id
1338            .to_owned()
1339            .map(asset::Id::try_from)
1340            .map_or(Ok(None), |v| v.map(Some))
1341            .map_err(|_| tonic::Status::invalid_argument("invalid asset id"))?;
1342        let address_index = request
1343            .address_index
1344            .to_owned()
1345            .map(AddressIndex::try_from)
1346            .map_or(Ok(None), |v| v.map(Some))
1347            .map_err(|_| tonic::Status::invalid_argument("invalid address index"))?;
1348
1349        let amount_to_spend = request
1350            .amount_to_spend
1351            .map(Amount::try_from)
1352            .map_or(Ok(None), |v| v.map(Some))
1353            .map_err(|_| tonic::Status::invalid_argument("invalid amount to spend"))?;
1354
1355        let notes = self
1356            .storage
1357            .notes(include_spent, asset_id, address_index, amount_to_spend)
1358            .await
1359            .map_err(|e| tonic::Status::unavailable(format!("error fetching notes: {e}")))?;
1360
1361        let stream = try_stream! {
1362            for note in notes {
1363                yield pb::NotesResponse {
1364                    note_record: Some(note.into()),
1365                }
1366            }
1367        };
1368
1369        Ok(tonic::Response::new(
1370            stream
1371                .map_err(|e: anyhow::Error| {
1372                    tonic::Status::unavailable(format!("error getting notes: {e}"))
1373                })
1374                .boxed(),
1375        ))
1376    }
1377
1378    #[instrument(skip_all, level = "trace")]
1379    async fn notes_for_voting(
1380        &self,
1381        request: tonic::Request<pb::NotesForVotingRequest>,
1382    ) -> Result<tonic::Response<Self::NotesForVotingStream>, tonic::Status> {
1383        self.check_worker().await?;
1384
1385        let address_index = request
1386            .get_ref()
1387            .address_index
1388            .to_owned()
1389            .map(AddressIndex::try_from)
1390            .map_or(Ok(None), |v| v.map(Some))
1391            .map_err(|_| tonic::Status::invalid_argument("invalid address index"))?;
1392
1393        let votable_at_height = request.get_ref().votable_at_height;
1394
1395        let notes = self
1396            .storage
1397            .notes_for_voting(address_index, votable_at_height)
1398            .await
1399            .map_err(|e| tonic::Status::unavailable(format!("error fetching notes: {e}")))?;
1400
1401        let stream = try_stream! {
1402            for (note, identity_key) in notes {
1403                yield pb::NotesForVotingResponse {
1404                    note_record: Some(note.into()),
1405                    identity_key: Some(identity_key.into()),
1406                }
1407            }
1408        };
1409
1410        Ok(tonic::Response::new(
1411            stream
1412                .map_err(|e: anyhow::Error| {
1413                    tonic::Status::unavailable(format!("error getting notes: {e}"))
1414                })
1415                .boxed(),
1416        ))
1417    }
1418
1419    #[instrument(skip_all, level = "trace")]
1420    async fn assets(
1421        &self,
1422        request: tonic::Request<pb::AssetsRequest>,
1423    ) -> Result<tonic::Response<Self::AssetsStream>, tonic::Status> {
1424        self.check_worker().await?;
1425
1426        let pb::AssetsRequest {
1427            filtered,
1428            include_specific_denominations,
1429            include_delegation_tokens,
1430            include_unbonding_tokens,
1431            include_lp_nfts,
1432            include_proposal_nfts,
1433            include_voting_receipt_tokens,
1434        } = request.get_ref();
1435
1436        // Fetch assets from storage.
1437        let assets = if !filtered {
1438            self.storage
1439                .all_assets()
1440                .await
1441                .map_err(|e| tonic::Status::unavailable(format!("error fetching assets: {e}")))?
1442        } else {
1443            let mut assets = vec![];
1444            for denom in include_specific_denominations {
1445                if let Some(denom) = asset::REGISTRY.parse_denom(&denom.denom) {
1446                    assets.push(denom);
1447                }
1448            }
1449            for (include, pattern) in [
1450                (include_delegation_tokens, "_delegation\\_%"),
1451                (include_unbonding_tokens, "_unbonding\\_%"),
1452                (include_lp_nfts, "lpnft\\_%"),
1453                (include_proposal_nfts, "proposal\\_%"),
1454                (include_voting_receipt_tokens, "voted\\_on\\_%"),
1455            ] {
1456                if *include {
1457                    assets.extend(
1458                        self.storage
1459                            .assets_matching(pattern.to_string())
1460                            .await
1461                            .map_err(|e| {
1462                                tonic::Status::unavailable(format!("error fetching assets: {e}"))
1463                            })?,
1464                    );
1465                }
1466            }
1467            assets
1468        };
1469
1470        let stream = try_stream! {
1471            for asset in assets {
1472                yield
1473                    pb::AssetsResponse {
1474                        denom_metadata: Some(asset.into()),
1475                    }
1476            }
1477        };
1478
1479        Ok(tonic::Response::new(
1480            stream
1481                .map_err(|e: anyhow::Error| {
1482                    tonic::Status::unavailable(format!("error getting assets: {e}"))
1483                })
1484                .boxed(),
1485        ))
1486    }
1487
1488    #[instrument(skip_all, level = "trace")]
1489    async fn transaction_info(
1490        &self,
1491        request: tonic::Request<pb::TransactionInfoRequest>,
1492    ) -> Result<tonic::Response<Self::TransactionInfoStream>, tonic::Status> {
1493        self.check_worker().await?;
1494        // Unpack optional start/end heights.
1495        let start_height = if request.get_ref().start_height == 0 {
1496            None
1497        } else {
1498            Some(request.get_ref().start_height)
1499        };
1500        let end_height = if request.get_ref().end_height == 0 {
1501            None
1502        } else {
1503            Some(request.get_ref().end_height)
1504        };
1505
1506        // Fetch transactions from storage.
1507        let txs = self
1508            .storage
1509            .transactions(start_height, end_height)
1510            .await
1511            .map_err(|e| tonic::Status::unavailable(format!("error fetching transactions: {e}")))?;
1512
1513        let self2 = self.clone();
1514        let stream = try_stream! {
1515            for tx in txs {
1516
1517                let rsp = self2.transaction_info_by_hash(tonic::Request::new(pb::TransactionInfoByHashRequest {
1518                    id: Some(tx.2.id().into()),
1519                })).await?.into_inner();
1520
1521                yield pb::TransactionInfoResponse {
1522                    tx_info: rsp.tx_info,
1523                }
1524            }
1525        };
1526
1527        Ok(tonic::Response::new(
1528            stream
1529                .map_err(|e: anyhow::Error| {
1530                    tonic::Status::unavailable(format!("error getting transactions: {e}"))
1531                })
1532                .boxed(),
1533        ))
1534    }
1535
1536    #[instrument(skip_all, level = "trace")]
1537    async fn witness(
1538        &self,
1539        request: tonic::Request<pb::WitnessRequest>,
1540    ) -> Result<tonic::Response<WitnessResponse>, tonic::Status> {
1541        self.check_worker().await?;
1542
1543        // Acquire a read lock for the SCT that will live for the entire request,
1544        // so that all auth paths are relative to the same SCT root.
1545        let sct = self.state_commitment_tree.read().await;
1546
1547        // Read the SCT root
1548        let anchor = sct.root();
1549
1550        // Obtain an auth path for each requested note commitment
1551        let tx_plan: TransactionPlan =
1552            request
1553                .get_ref()
1554                .to_owned()
1555                .transaction_plan
1556                .map_or(TransactionPlan::default(), |x| {
1557                    x.try_into()
1558                        .expect("TransactionPlan should exist in request")
1559                });
1560
1561        let requested_note_commitments: Vec<StateCommitment> = tx_plan
1562            .spend_plans()
1563            .filter(|plan| plan.note.amount() != 0u64.into())
1564            .map(|spend| spend.note.commit().into())
1565            .chain(
1566                tx_plan
1567                    .swap_claim_plans()
1568                    .map(|swap_claim| swap_claim.swap_plaintext.swap_commitment().into()),
1569            )
1570            .chain(
1571                tx_plan
1572                    .delegator_vote_plans()
1573                    .map(|vote_plan| vote_plan.staked_note.commit().into()),
1574            )
1575            .collect();
1576
1577        tracing::debug!(?requested_note_commitments);
1578
1579        let auth_paths: Vec<Proof> = requested_note_commitments
1580            .iter()
1581            .map(|nc| {
1582                sct.witness(*nc).ok_or_else(|| {
1583                    tonic::Status::new(tonic::Code::InvalidArgument, "Note commitment missing")
1584                })
1585            })
1586            .collect::<Result<Vec<Proof>, tonic::Status>>()?;
1587
1588        // Release the read lock on the SCT
1589        drop(sct);
1590
1591        let mut witness_data = WitnessData {
1592            anchor,
1593            state_commitment_proofs: auth_paths
1594                .into_iter()
1595                .map(|proof| (proof.commitment(), proof))
1596                .collect(),
1597        };
1598
1599        tracing::debug!(?witness_data);
1600
1601        // Now we need to augment the witness data with dummy proofs such that
1602        // note commitments corresponding to dummy spends also have proofs.
1603        for nc in tx_plan
1604            .spend_plans()
1605            .filter(|plan| plan.note.amount() == 0u64.into())
1606            .map(|plan| plan.note.commit())
1607        {
1608            witness_data.add_proof(nc, Proof::dummy(&mut OsRng, nc));
1609        }
1610
1611        let witness_response = WitnessResponse {
1612            witness_data: Some(witness_data.into()),
1613        };
1614        Ok(tonic::Response::new(witness_response))
1615    }
1616
1617    #[instrument(skip_all, level = "trace")]
1618    async fn witness_and_build(
1619        &self,
1620        request: tonic::Request<pb::WitnessAndBuildRequest>,
1621    ) -> Result<tonic::Response<Self::WitnessAndBuildStream>, tonic::Status> {
1622        let pb::WitnessAndBuildRequest {
1623            transaction_plan,
1624            authorization_data,
1625        } = request.into_inner();
1626
1627        let transaction_plan: TransactionPlan = transaction_plan
1628            .ok_or_else(|| tonic::Status::invalid_argument("missing transaction plan"))?
1629            .try_into()
1630            .map_err(|e: anyhow::Error| e.context("could not decode transaction plan"))
1631            .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1632
1633        let authorization_data: AuthorizationData = authorization_data
1634            .ok_or_else(|| tonic::Status::invalid_argument("missing authorization data"))?
1635            .try_into()
1636            .map_err(|e: anyhow::Error| e.context("could not decode authorization data"))
1637            .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1638
1639        let witness_request = pb::WitnessRequest {
1640            transaction_plan: Some(transaction_plan.clone().into()),
1641        };
1642
1643        let witness_data: WitnessData = self
1644            .witness(tonic::Request::new(witness_request))
1645            .await?
1646            .into_inner()
1647            .witness_data
1648            .ok_or_else(|| tonic::Status::invalid_argument("missing witness data"))?
1649            .try_into()
1650            .map_err(|e: anyhow::Error| e.context("could not decode witness data"))
1651            .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1652
1653        let fvk =
1654            self.storage.full_viewing_key().await.map_err(|_| {
1655                tonic::Status::failed_precondition("Error retrieving full viewing key")
1656            })?;
1657
1658        let transaction = Some(
1659            transaction_plan
1660                // TODO: calling `.build` should provide some mechanism to get progress
1661                // updates
1662                .build(&fvk, &witness_data, &authorization_data)
1663                .map_err(|_| tonic::Status::failed_precondition("Error building transaction"))?
1664                .into(),
1665        );
1666
1667        let stream = try_stream! {
1668            yield pb::WitnessAndBuildResponse {
1669                status: Some(pb::witness_and_build_response::Status::Complete(
1670                    pb::witness_and_build_response::Complete { transaction },
1671                )),
1672            }
1673        };
1674
1675        Ok(tonic::Response::new(
1676            stream
1677                .map_err(|e: anyhow::Error| {
1678                    tonic::Status::unavailable(format!("error witnessing transaction: {e}"))
1679                })
1680                .boxed(),
1681        ))
1682    }
1683
1684    #[instrument(skip_all, level = "trace")]
1685    async fn app_parameters(
1686        &self,
1687        _request: tonic::Request<pb::AppParametersRequest>,
1688    ) -> Result<tonic::Response<pb::AppParametersResponse>, tonic::Status> {
1689        self.check_worker().await?;
1690
1691        let parameters =
1692            self.storage.app_params().await.map_err(|e| {
1693                tonic::Status::unavailable(format!("error getting app params: {e}"))
1694            })?;
1695
1696        let response = AppParametersResponse {
1697            parameters: Some(parameters.into()),
1698        };
1699
1700        Ok(tonic::Response::new(response))
1701    }
1702
1703    #[instrument(skip_all, level = "trace")]
1704    async fn gas_prices(
1705        &self,
1706        _request: tonic::Request<pb::GasPricesRequest>,
1707    ) -> Result<tonic::Response<pb::GasPricesResponse>, tonic::Status> {
1708        self.check_worker().await?;
1709
1710        let gas_prices =
1711            self.storage.gas_prices().await.map_err(|e| {
1712                tonic::Status::unavailable(format!("error getting gas prices: {e}"))
1713            })?;
1714
1715        let response = GasPricesResponse {
1716            gas_prices: Some(gas_prices.into()),
1717            alt_gas_prices: Vec::new(),
1718        };
1719
1720        Ok(tonic::Response::new(response))
1721    }
1722
1723    #[instrument(skip_all, level = "trace")]
1724    async fn fmd_parameters(
1725        &self,
1726        _request: tonic::Request<pb::FmdParametersRequest>,
1727    ) -> Result<tonic::Response<pb::FmdParametersResponse>, tonic::Status> {
1728        self.check_worker().await?;
1729
1730        let parameters =
1731            self.storage.fmd_parameters().await.map_err(|e| {
1732                tonic::Status::unavailable(format!("error getting FMD params: {e}"))
1733            })?;
1734
1735        let response = FmdParametersResponse {
1736            parameters: Some(parameters.into()),
1737        };
1738
1739        Ok(tonic::Response::new(response))
1740    }
1741
1742    #[instrument(skip_all, level = "trace")]
1743    async fn owned_position_ids(
1744        &self,
1745        request: tonic::Request<pb::OwnedPositionIdsRequest>,
1746    ) -> Result<tonic::Response<Self::OwnedPositionIdsStream>, tonic::Status> {
1747        self.check_worker().await?;
1748
1749        let pb::OwnedPositionIdsRequest {
1750            position_state,
1751            trading_pair,
1752            subaccount: _,
1753        } = request.into_inner();
1754
1755        let position_state: Option<position::State> = position_state
1756            .map(|state| state.try_into())
1757            .transpose()
1758            .map_err(|e: anyhow::Error| e.context("could not decode position state"))
1759            .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1760
1761        let trading_pair: Option<TradingPair> = trading_pair
1762            .map(|pair| pair.try_into())
1763            .transpose()
1764            .map_err(|e: anyhow::Error| e.context("could not decode trading pair"))
1765            .map_err(|e| tonic::Status::invalid_argument(format!("{:#}", e)))?;
1766
1767        let ids = self
1768            .storage
1769            .owned_position_ids(position_state, trading_pair)
1770            .await
1771            .map_err(|e| tonic::Status::unavailable(format!("error getting position ids: {e}")))?;
1772
1773        let stream = try_stream! {
1774            for id in ids {
1775                yield pb::OwnedPositionIdsResponse{
1776                    position_id: Some(id.into()),
1777                    // The rust view server does not index positions by subaccount,
1778                    // so this information is invisible to it.
1779                    subaccount: None,
1780                }
1781            }
1782        };
1783
1784        Ok(tonic::Response::new(
1785            stream
1786                .map_err(|e: anyhow::Error| {
1787                    tonic::Status::unavailable(format!("error getting position ids: {e}"))
1788                })
1789                .boxed(),
1790        ))
1791    }
1792
1793    #[instrument(skip_all, level = "trace")]
1794    async fn authorize_and_build(
1795        &self,
1796        _request: tonic::Request<pb::AuthorizeAndBuildRequest>,
1797    ) -> Result<tonic::Response<Self::AuthorizeAndBuildStream>, tonic::Status> {
1798        unimplemented!("authorize_and_build")
1799    }
1800
1801    #[instrument(skip_all, level = "trace")]
1802    async fn unclaimed_swaps(
1803        &self,
1804        _: tonic::Request<pb::UnclaimedSwapsRequest>,
1805    ) -> Result<tonic::Response<Self::UnclaimedSwapsStream>, tonic::Status> {
1806        self.check_worker().await?;
1807
1808        let swaps = self.storage.unclaimed_swaps().await.map_err(|e| {
1809            tonic::Status::unavailable(format!("error fetching unclaimed swaps: {e}"))
1810        })?;
1811
1812        let stream = try_stream! {
1813            for swap in swaps {
1814                yield pb::UnclaimedSwapsResponse{
1815                    swap: Some(swap.into()),
1816                }
1817            }
1818        };
1819
1820        Ok(tonic::Response::new(
1821            stream
1822                .map_err(|e: anyhow::Error| {
1823                    tonic::Status::unavailable(format!("error getting unclaimed swaps: {e}"))
1824                })
1825                .boxed(),
1826        ))
1827    }
1828
1829    #[instrument(skip_all, level = "trace")]
1830    async fn wallet_id(
1831        &self,
1832        _: Request<WalletIdRequest>,
1833    ) -> Result<Response<WalletIdResponse>, Status> {
1834        let fvk = self.storage.full_viewing_key().await.map_err(|e| {
1835            Status::failed_precondition(format!("Error retrieving full viewing key: {e}"))
1836        })?;
1837
1838        Ok(Response::new(WalletIdResponse {
1839            wallet_id: Some(fvk.wallet_id().into()),
1840        }))
1841    }
1842
1843    #[instrument(skip_all, level = "trace")]
1844    async fn asset_metadata_by_id(
1845        &self,
1846        request: Request<AssetMetadataByIdRequest>,
1847    ) -> Result<Response<AssetMetadataByIdResponse>, Status> {
1848        let asset_id = request
1849            .into_inner()
1850            .asset_id
1851            .ok_or_else(|| Status::invalid_argument("missing asset id"))?
1852            .try_into()
1853            .map_err(|e| Status::invalid_argument(format!("{e:#}")))?;
1854
1855        let metadata = self
1856            .storage
1857            .asset_by_id(&asset_id)
1858            .await
1859            .map_err(|e| Status::internal(format!("Error retrieving asset by id: {e:#}")))?;
1860
1861        Ok(Response::new(AssetMetadataByIdResponse {
1862            denom_metadata: metadata.map(Into::into),
1863        }))
1864    }
1865
1866    #[instrument(skip_all, level = "trace")]
1867    async fn delegations_by_address_index(
1868        &self,
1869        _request: tonic::Request<pb::DelegationsByAddressIndexRequest>,
1870    ) -> Result<tonic::Response<Self::DelegationsByAddressIndexStream>, tonic::Status> {
1871        unimplemented!("delegations_by_address_index")
1872    }
1873
1874    #[instrument(skip_all, level = "trace")]
1875    async fn unbonding_tokens_by_address_index(
1876        &self,
1877        _request: tonic::Request<pb::UnbondingTokensByAddressIndexRequest>,
1878    ) -> Result<tonic::Response<Self::UnbondingTokensByAddressIndexStream>, tonic::Status> {
1879        unimplemented!("unbonding_tokens_by_address_index currently only implemented on web")
1880    }
1881
1882    #[instrument(skip_all, level = "trace")]
1883    async fn latest_swaps(
1884        &self,
1885        _request: tonic::Request<pb::LatestSwapsRequest>,
1886    ) -> Result<tonic::Response<Self::LatestSwapsStream>, tonic::Status> {
1887        unimplemented!("latest_swaps currently only implemented on web")
1888    }
1889
1890    #[instrument(skip_all, level = "trace")]
1891    async fn tournament_votes(
1892        &self,
1893        _request: tonic::Request<pb::TournamentVotesRequest>,
1894    ) -> Result<tonic::Response<Self::TournamentVotesStream>, tonic::Status> {
1895        unimplemented!("tournament_votes currently only implemented on web")
1896    }
1897
1898    #[instrument(skip_all, level = "trace")]
1899    async fn lqt_voting_notes(
1900        &self,
1901        request: tonic::Request<pb::LqtVotingNotesRequest>,
1902    ) -> Result<tonic::Response<Self::LqtVotingNotesStream>, tonic::Status> {
1903        async fn inner(
1904            this: &ViewServer,
1905            epoch: u64,
1906            filter: Option<AddressIndex>,
1907        ) -> anyhow::Result<Vec<(SpendableNoteRecord, IdentityKey)>> {
1908            let (_, start_height) = this.storage.get_epoch(epoch).await?;
1909            let start_height =
1910                start_height.ok_or_else(|| anyhow!("missing height for epoch {epoch}"))?;
1911            let notes = this.storage.notes_for_voting(filter, start_height).await?;
1912            Ok(notes)
1913        }
1914
1915        let request = request.into_inner();
1916        let epoch = request.epoch_index;
1917        let filter = request
1918            .account_filter
1919            .map(|x| AddressIndex::try_from(x))
1920            .transpose()
1921            .map_err(|_| tonic::Status::invalid_argument("invalid account filter"))?;
1922        let notes = inner(self, epoch, filter).await.map_err(|e| {
1923            tonic::Status::internal(format!("error fetching voting notes: {:#}", e))
1924        })?;
1925        let stream = tokio_stream::iter(notes.into_iter().map(|(note, _)| {
1926            Result::<_, tonic::Status>::Ok(pb::LqtVotingNotesResponse {
1927                note_record: Some(note.into()),
1928                already_voted: false,
1929            })
1930        }));
1931        Ok(tonic::Response::new(stream.boxed()))
1932    }
1933
1934    #[instrument(skip_all, level = "trace")]
1935    async fn lp_position_bundle(
1936        &self,
1937        _request: tonic::Request<pb::LpPositionBundleRequest>,
1938    ) -> Result<tonic::Response<Self::LpPositionBundleStream>, tonic::Status> {
1939        unimplemented!("lp_position_bundle currently only implemented on web")
1940    }
1941
1942    #[instrument(skip_all, level = "trace")]
1943    async fn lp_strategy_catalog(
1944        &self,
1945        _request: tonic::Request<pb::LpStrategyCatalogRequest>,
1946    ) -> Result<tonic::Response<Self::LpStrategyCatalogStream>, tonic::Status> {
1947        unimplemented!("lp_strategy_catalog currently only implemented on web")
1948    }
1949}
1950
1951/// Convert a pd node URL to a Tonic `Endpoint`.
1952///
1953/// Required in order to configure TLS for HTTPS endpoints.
1954async fn get_pd_endpoint(node: Url) -> anyhow::Result<Endpoint> {
1955    let endpoint = match node.scheme() {
1956        "http" => Channel::from_shared(node.to_string())?,
1957        "https" => Channel::from_shared(node.to_string())?
1958            .tls_config(ClientTlsConfig::new().with_webpki_roots())?,
1959        other => anyhow::bail!("unknown url scheme {other}"),
1960    };
1961    Ok(endpoint)
1962}