penumbra_sdk_view/
client.rs

1use std::{collections::BTreeMap, future::Future, pin::Pin};
2
3use anyhow::Result;
4use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
5use pbjson_types::Any;
6use penumbra_sdk_auction::auction::AuctionId;
7use tonic::{codegen::Bytes, Streaming};
8use tracing::instrument;
9
10use penumbra_sdk_app::params::AppParameters;
11use penumbra_sdk_asset::{
12    asset::{self, Id, Metadata},
13    ValueView,
14};
15use penumbra_sdk_dex::{
16    lp::position::{self, Position},
17    TradingPair,
18};
19use penumbra_sdk_fee::GasPrices;
20use penumbra_sdk_keys::{keys::AddressIndex, Address};
21use penumbra_sdk_num::Amount;
22use penumbra_sdk_proto::view::v1::{
23    self as pb, view_service_client::ViewServiceClient, BalancesResponse,
24    BroadcastTransactionResponse, WitnessRequest,
25};
26use penumbra_sdk_sct::Nullifier;
27use penumbra_sdk_shielded_pool::{fmd, note};
28use penumbra_sdk_stake::IdentityKey;
29use penumbra_sdk_transaction::{
30    txhash::TransactionId, AuthorizationData, Transaction, TransactionPlan, WitnessData,
31};
32
33use crate::{SpendableNoteRecord, StatusStreamResponse, SwapRecord, TransactionInfo};
34
35pub(crate) type BroadcastStatusStream = Pin<
36    Box<dyn Future<Output = Result<Streaming<BroadcastTransactionResponse>, anyhow::Error>> + Send>,
37>;
38
39/// The view protocol is used by a view client, who wants to do some
40/// transaction-related actions, to request data from a view service, which is
41/// responsible for synchronizing and scanning the public chain state with one
42/// or more full viewing keys.
43///
44/// This trait is a wrapper around the proto-generated [`ViewServiceClient`]
45/// that serves two goals:
46///
47/// 1. It can use domain types rather than proto-generated types, avoiding conversions;
48/// 2. It's easier to write as a trait bound than the `CustodyProtocolClient`,
49///   which requires complex bounds on its inner type to
50///   enforce that it is a tower `Service`.
51#[allow(clippy::type_complexity)]
52pub trait ViewClient {
53    /// Query the auction state
54    fn auctions(
55        &mut self,
56        account_filter: Option<AddressIndex>,
57        include_inactive: bool,
58        query_latest_state: bool,
59    ) -> Pin<
60        Box<
61            dyn Future<
62                    Output = Result<
63                        Vec<(
64                            AuctionId,
65                            SpendableNoteRecord,
66                            u64,
67                            Option<Any>,
68                            Vec<Position>,
69                        )>,
70                    >,
71                > + Send
72                + 'static,
73        >,
74    >;
75
76    /// Get the current status of chain sync.
77    fn status(
78        &mut self,
79    ) -> Pin<Box<dyn Future<Output = Result<pb::StatusResponse>> + Send + 'static>>;
80
81    /// Stream status updates on chain sync until it completes.
82    fn status_stream(
83        &mut self,
84    ) -> Pin<
85        Box<
86            dyn Future<
87                    Output = Result<
88                        Pin<Box<dyn Stream<Item = Result<StatusStreamResponse>> + Send + 'static>>,
89                    >,
90                > + Send
91                + 'static,
92        >,
93    >;
94
95    /// Get a copy of the app parameters.
96    fn app_params(
97        &mut self,
98    ) -> Pin<Box<dyn Future<Output = Result<AppParameters>> + Send + 'static>>;
99
100    /// Get a copy of the gas prices.
101    fn gas_prices(&mut self) -> Pin<Box<dyn Future<Output = Result<GasPrices>> + Send + 'static>>;
102
103    /// Get a copy of the FMD parameters.
104    fn fmd_parameters(
105        &mut self,
106    ) -> Pin<Box<dyn Future<Output = Result<fmd::Parameters>> + Send + 'static>>;
107
108    /// Queries for notes.
109    fn notes(
110        &mut self,
111        request: pb::NotesRequest,
112    ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>>;
113
114    /// Queries for notes for voting.
115    fn notes_for_voting(
116        &mut self,
117        request: pb::NotesForVotingRequest,
118    ) -> Pin<
119        Box<dyn Future<Output = Result<Vec<(SpendableNoteRecord, IdentityKey)>>> + Send + 'static>,
120    >;
121
122    /// Queries for account balance by address
123    fn balances(
124        &mut self,
125        address_index: AddressIndex,
126        asset_id: Option<asset::Id>,
127    ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, Amount)>>> + Send + 'static>>;
128
129    /// Queries for a specific note by commitment, returning immediately if it is not found.
130    fn note_by_commitment(
131        &mut self,
132        note_commitment: note::StateCommitment,
133    ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>>;
134
135    /// Queries for a specific swap by commitment, returning immediately if it is not found.
136    fn swap_by_commitment(
137        &mut self,
138        swap_commitment: penumbra_sdk_tct::StateCommitment,
139    ) -> Pin<Box<dyn Future<Output = Result<SwapRecord>> + Send + 'static>>;
140
141    /// Queries for a specific nullifier's status, returning immediately if it is not found.
142    fn nullifier_status(
143        &mut self,
144        nullifier: Nullifier,
145    ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'static>>;
146
147    /// Waits for a specific nullifier to be detected, returning immediately if it is already
148    /// present, but waiting otherwise.
149    fn await_nullifier(
150        &mut self,
151        nullifier: Nullifier,
152    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
153
154    /// Queries for a specific note by commitment, waiting until the note is detected if it is not found.
155    ///
156    /// This is useful for waiting for a note to be detected by the view service.
157    fn await_note_by_commitment(
158        &mut self,
159        note_commitment: note::StateCommitment,
160    ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>>;
161
162    /// Returns authentication paths for the given note commitments.
163    ///
164    /// This method takes a batch of input commitments, rather than just one, so
165    /// that the client can get a consistent set of authentication paths to a
166    /// common root.  (Otherwise, if a client made multiple requests, the wallet
167    /// service could have advanced the state commitment tree state between queries).
168    fn witness(
169        &mut self,
170        plan: &TransactionPlan,
171    ) -> Pin<Box<dyn Future<Output = Result<WitnessData>> + Send + 'static>>;
172
173    /// Returns a transaction built from the provided TransactionPlan and AuthorizationData
174    fn witness_and_build(
175        &mut self,
176        plan: TransactionPlan,
177        auth_data: AuthorizationData,
178    ) -> Pin<Box<dyn Future<Output = Result<Transaction>> + Send + 'static>>;
179
180    /// Queries for all known assets.
181    fn assets(&mut self) -> Pin<Box<dyn Future<Output = Result<asset::Cache>> + Send + 'static>>;
182
183    /// Queries for liquidity positions owned by the full viewing key.
184    fn owned_position_ids(
185        &mut self,
186        position_state: Option<position::State>,
187        trading_pair: Option<TradingPair>,
188    ) -> Pin<Box<dyn Future<Output = Result<Vec<position::Id>>> + Send + 'static>>;
189
190    /// Generates a full perspective for a selected transaction using a full viewing key
191    fn transaction_info_by_hash(
192        &mut self,
193        id: TransactionId,
194    ) -> Pin<Box<dyn Future<Output = Result<TransactionInfo>> + Send + 'static>>;
195
196    /// Queries for transactions in a range of block heights
197    fn transaction_info(
198        &mut self,
199        start_height: Option<u64>,
200        end_height: Option<u64>,
201    ) -> Pin<Box<dyn Future<Output = Result<Vec<TransactionInfo>>> + Send + 'static>>;
202
203    fn broadcast_transaction(
204        &mut self,
205        transaction: Transaction,
206        await_detection: bool,
207    ) -> BroadcastStatusStream;
208
209    /// Return unspent notes, grouped by address index and then by asset id.
210    #[instrument(skip(self))]
211    fn unspent_notes_by_address_and_asset(
212        &mut self,
213    ) -> Pin<
214        Box<
215            dyn Future<
216                    Output = Result<
217                        BTreeMap<AddressIndex, BTreeMap<asset::Id, Vec<SpendableNoteRecord>>>,
218                    >,
219                > + Send
220                + 'static,
221        >,
222    > {
223        let notes = self.notes(pb::NotesRequest {
224            include_spent: false,
225            ..Default::default()
226        });
227        async move {
228            let notes = notes.await?;
229            tracing::trace!(?notes);
230
231            let mut notes_by_address_and_asset = BTreeMap::new();
232
233            for note_record in notes {
234                notes_by_address_and_asset
235                    .entry(note_record.address_index)
236                    .or_insert_with(BTreeMap::new)
237                    .entry(note_record.note.asset_id())
238                    .or_insert_with(Vec::new)
239                    .push(note_record);
240            }
241            tracing::trace!(?notes_by_address_and_asset);
242
243            Ok(notes_by_address_and_asset)
244        }
245        .boxed()
246    }
247
248    /// Return unspent notes, grouped by account ID (combining ephemeral addresses for the account) and then by asset id.
249    #[instrument(skip(self))]
250    fn unspent_notes_by_account_and_asset(
251        &mut self,
252    ) -> Pin<
253        Box<
254            dyn Future<
255                    Output = Result<BTreeMap<u32, BTreeMap<asset::Id, Vec<SpendableNoteRecord>>>>,
256                > + Send
257                + 'static,
258        >,
259    > {
260        let notes = self.notes(pb::NotesRequest {
261            include_spent: false,
262            ..Default::default()
263        });
264        async move {
265            let notes = notes.await?;
266            tracing::trace!(?notes);
267
268            let mut notes_by_account_and_asset = BTreeMap::new();
269
270            for note_record in notes {
271                notes_by_account_and_asset
272                    .entry(note_record.address_index.account)
273                    .or_insert_with(BTreeMap::new)
274                    .entry(note_record.note.asset_id())
275                    .or_insert_with(Vec::new)
276                    .push(note_record);
277            }
278            tracing::trace!(?notes_by_account_and_asset);
279
280            Ok(notes_by_account_and_asset)
281        }
282        .boxed()
283    }
284
285    /// Return unspent notes, grouped by denom and then by address index.
286    #[instrument(skip(self))]
287    fn unspent_notes_by_asset_and_address(
288        &mut self,
289    ) -> Pin<
290        Box<
291            dyn Future<
292                    Output = Result<
293                        BTreeMap<asset::Id, BTreeMap<AddressIndex, Vec<SpendableNoteRecord>>>,
294                    >,
295                > + Send
296                + 'static,
297        >,
298    > {
299        let notes = self.notes(pb::NotesRequest {
300            include_spent: false,
301            ..Default::default()
302        });
303
304        async move {
305            let notes = notes.await?;
306            tracing::trace!(?notes);
307
308            let mut notes_by_asset_and_address = BTreeMap::new();
309
310            for note_record in notes {
311                notes_by_asset_and_address
312                    .entry(note_record.note.asset_id())
313                    .or_insert_with(BTreeMap::new)
314                    .entry(note_record.address_index)
315                    .or_insert_with(Vec::new)
316                    .push(note_record);
317            }
318            tracing::trace!(?notes_by_asset_and_address);
319
320            Ok(notes_by_asset_and_address)
321        }
322        .boxed()
323    }
324
325    fn address_by_index(
326        &mut self,
327        address_index: AddressIndex,
328    ) -> Pin<Box<dyn Future<Output = Result<Address>> + Send + 'static>>;
329
330    /// Queries for the index of a provided address, returning `None` if not
331    /// controlled by the view service's FVK.
332    fn index_by_address(
333        &mut self,
334        address: Address,
335    ) -> Pin<Box<dyn Future<Output = Result<Option<AddressIndex>>> + Send + 'static>>;
336
337    /// Queries for unclaimed Swaps.
338    fn unclaimed_swaps(
339        &mut self,
340    ) -> Pin<Box<dyn Future<Output = Result<Vec<SwapRecord>>> + Send + 'static>>;
341
342    /// Get all of the notes that can be used for voting
343    fn lqt_voting_notes(
344        &mut self,
345        epoch: u64,
346        filter: Option<AddressIndex>,
347    ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>>;
348}
349
350// We need to tell `async_trait` not to add a `Send` bound to the boxed
351// futures it generates, because the underlying `CustodyProtocolClient` isn't `Sync`,
352// but its `authorize` method takes `&mut self`. This would normally cause a huge
353// amount of problems, because non-`Send` futures don't compose well, but as long
354// as we're calling the method within an async block on a local mutable variable,
355// it should be fine.
356impl<T> ViewClient for ViewServiceClient<T>
357where
358    T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + 'static,
359    T::ResponseBody: tonic::codegen::Body<Data = Bytes> + Send + 'static,
360    T::Error: Into<tonic::codegen::StdError>,
361    T::Future: Send + 'static,
362    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
363{
364    fn status(
365        &mut self,
366    ) -> Pin<Box<dyn Future<Output = Result<pb::StatusResponse>> + Send + 'static>> {
367        let mut self2 = self.clone();
368        async move {
369            let status = self2.status(tonic::Request::new(pb::StatusRequest {}));
370            let status = status.await?.into_inner();
371            Ok(status)
372        }
373        .boxed()
374    }
375
376    fn status_stream(
377        &mut self,
378    ) -> Pin<
379        Box<
380            dyn Future<
381                    Output = Result<
382                        Pin<Box<dyn Stream<Item = Result<StatusStreamResponse>> + Send + 'static>>,
383                    >,
384                > + Send
385                + 'static,
386        >,
387    > {
388        let mut self2 = self.clone();
389        async move {
390            let stream = self2.status_stream(tonic::Request::new(pb::StatusStreamRequest {}));
391            let stream = stream.await?.into_inner();
392
393            Ok(stream
394                .map_err(|e| anyhow::anyhow!("view service error: {}", e))
395                .and_then(|msg| async move { StatusStreamResponse::try_from(msg) })
396                .boxed())
397        }
398        .boxed()
399    }
400
401    fn app_params(
402        &mut self,
403    ) -> Pin<Box<dyn Future<Output = Result<AppParameters>> + Send + 'static>> {
404        let mut self2 = self.clone();
405        async move {
406            // We have to manually invoke the method on the type, because it has the
407            // same name as the one we're implementing.
408            let rsp = ViewServiceClient::app_parameters(
409                &mut self2,
410                tonic::Request::new(pb::AppParametersRequest {}),
411            );
412            rsp.await?.into_inner().try_into()
413        }
414        .boxed()
415    }
416
417    fn gas_prices(&mut self) -> Pin<Box<dyn Future<Output = Result<GasPrices>> + Send + 'static>> {
418        let mut self2 = self.clone();
419        async move {
420            // We have to manually invoke the method on the type, because it has the
421            // same name as the one we're implementing.
422            let rsp = ViewServiceClient::gas_prices(
423                &mut self2,
424                tonic::Request::new(pb::GasPricesRequest {}),
425            );
426            rsp.await?
427                .into_inner()
428                .gas_prices
429                .ok_or_else(|| anyhow::anyhow!("empty GasPricesResponse message"))?
430                .try_into()
431        }
432        .boxed()
433    }
434
435    fn fmd_parameters(
436        &mut self,
437    ) -> Pin<Box<dyn Future<Output = Result<fmd::Parameters>> + Send + 'static>> {
438        let mut self2 = self.clone();
439        async move {
440            let parameters = ViewServiceClient::fmd_parameters(
441                &mut self2,
442                tonic::Request::new(pb::FmdParametersRequest {}),
443            );
444            let parameters = parameters.await?.into_inner().parameters;
445
446            parameters
447                .ok_or_else(|| anyhow::anyhow!("empty FmdParametersRequest message"))?
448                .try_into()
449        }
450        .boxed()
451    }
452
453    fn notes(
454        &mut self,
455        request: pb::NotesRequest,
456    ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>> {
457        let mut self2 = self.clone();
458        async move {
459            let req = self2.notes(tonic::Request::new(request));
460            let pb_notes: Vec<_> = req.await?.into_inner().try_collect().await?;
461
462            pb_notes
463                .into_iter()
464                .map(|note_rsp| {
465                    let note_record = note_rsp
466                        .note_record
467                        .ok_or_else(|| anyhow::anyhow!("empty NotesResponse message"));
468
469                    match note_record {
470                        Ok(note) => note.try_into(),
471                        Err(e) => Err(e),
472                    }
473                })
474                .collect()
475        }
476        .boxed()
477    }
478
479    fn notes_for_voting(
480        &mut self,
481        request: pb::NotesForVotingRequest,
482    ) -> Pin<
483        Box<dyn Future<Output = Result<Vec<(SpendableNoteRecord, IdentityKey)>>> + Send + 'static>,
484    > {
485        let mut self2 = self.clone();
486        async move {
487            let req = self2.notes_for_voting(tonic::Request::new(request));
488            let pb_notes: Vec<_> = req.await?.into_inner().try_collect().await?;
489
490            pb_notes
491                .into_iter()
492                .map(|note_rsp| {
493                    let note_record = note_rsp
494                        .note_record
495                        .ok_or_else(|| anyhow::anyhow!("empty NotesForVotingResponse message"))?
496                        .try_into()?;
497
498                    let identity_key = note_rsp
499                        .identity_key
500                        .ok_or_else(|| anyhow::anyhow!("empty NotesForVotingResponse message"))?
501                        .try_into()?;
502
503                    Ok((note_record, identity_key))
504                })
505                .collect()
506        }
507        .boxed()
508    }
509
510    fn note_by_commitment(
511        &mut self,
512        note_commitment: note::StateCommitment,
513    ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>> {
514        let mut self2 = self.clone();
515        async move {
516            let note_commitment_response = ViewServiceClient::note_by_commitment(
517                &mut self2,
518                tonic::Request::new(pb::NoteByCommitmentRequest {
519                    note_commitment: Some(note_commitment.into()),
520                    await_detection: false,
521                }),
522            );
523            let note_commitment_response = note_commitment_response.await?.into_inner();
524
525            note_commitment_response
526                .spendable_note
527                .ok_or_else(|| anyhow::anyhow!("empty NoteByCommitmentResponse message"))?
528                .try_into()
529        }
530        .boxed()
531    }
532
533    fn balances(
534        &mut self,
535        address_index: AddressIndex,
536        asset_id: Option<asset::Id>,
537    ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, Amount)>>> + Send + 'static>> {
538        let mut self2 = self.clone();
539        async move {
540            let req = ViewServiceClient::balances(
541                &mut self2,
542                tonic::Request::new(pb::BalancesRequest {
543                    account_filter: Some(address_index.into()),
544                    asset_id_filter: asset_id.map(Into::into),
545                }),
546            );
547
548            let balances: Vec<BalancesResponse> = req.await?.into_inner().try_collect().await?;
549
550            balances
551                .into_iter()
552                .map(|rsp| {
553                    let pb_value_view = rsp
554                        .balance_view
555                        .ok_or_else(|| anyhow::anyhow!("empty balance view"))?;
556
557                    let value_view: ValueView = pb_value_view.try_into()?;
558                    let id = value_view.asset_id();
559                    let amount = value_view.value().amount;
560                    Ok((id, amount))
561                })
562                .collect()
563        }
564        .boxed()
565    }
566
567    fn swap_by_commitment(
568        &mut self,
569        swap_commitment: penumbra_sdk_tct::StateCommitment,
570    ) -> Pin<Box<dyn Future<Output = Result<SwapRecord>> + Send + 'static>> {
571        let mut self2 = self.clone();
572        async move {
573            let swap_commitment_response = ViewServiceClient::swap_by_commitment(
574                &mut self2,
575                tonic::Request::new(pb::SwapByCommitmentRequest {
576                    swap_commitment: Some(swap_commitment.into()),
577                    await_detection: false,
578                }),
579            );
580            let swap_commitment_response = swap_commitment_response.await?.into_inner();
581
582            swap_commitment_response
583                .swap
584                .ok_or_else(|| anyhow::anyhow!("empty SwapByCommitmentResponse message"))?
585                .try_into()
586        }
587        .boxed()
588    }
589
590    /// Queries for a specific note by commitment, waiting until the note is detected if it is not found.
591    ///
592    /// This is useful for waiting for a note to be detected by the view service.
593    fn await_note_by_commitment(
594        &mut self,
595        note_commitment: note::StateCommitment,
596    ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>> {
597        let mut self2 = self.clone();
598        async move {
599            let spendable_note = ViewServiceClient::note_by_commitment(
600                &mut self2,
601                tonic::Request::new(pb::NoteByCommitmentRequest {
602                    note_commitment: Some(note_commitment.into()),
603                    await_detection: true,
604                }),
605            );
606            let spendable_note = spendable_note.await?.into_inner().spendable_note;
607
608            spendable_note
609                .ok_or_else(|| anyhow::anyhow!("empty NoteByCommitmentRequest message"))?
610                .try_into()
611        }
612        .boxed()
613    }
614
615    /// Queries for a specific nullifier's status, returning immediately if it is not found.
616    fn nullifier_status(
617        &mut self,
618        nullifier: Nullifier,
619    ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'static>> {
620        let mut self2 = self.clone();
621        async move {
622            let rsp = ViewServiceClient::nullifier_status(
623                &mut self2,
624                tonic::Request::new(pb::NullifierStatusRequest {
625                    nullifier: Some(nullifier.into()),
626                    await_detection: false,
627                }),
628            );
629            Ok(rsp.await?.into_inner().spent)
630        }
631        .boxed()
632    }
633
634    /// Waits for a specific nullifier to be detected, returning immediately if it is already
635    /// present, but waiting otherwise.
636    fn await_nullifier(
637        &mut self,
638        nullifier: Nullifier,
639    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
640        let mut self2 = self.clone();
641        async move {
642            let rsp = ViewServiceClient::nullifier_status(
643                &mut self2,
644                tonic::Request::new(pb::NullifierStatusRequest {
645                    nullifier: Some(nullifier.into()),
646                    await_detection: true,
647                }),
648            );
649            rsp.await?;
650            Ok(())
651        }
652        .boxed()
653    }
654
655    fn witness(
656        &mut self,
657        plan: &TransactionPlan,
658    ) -> Pin<Box<dyn Future<Output = Result<WitnessData>> + Send + 'static>> {
659        let request = WitnessRequest {
660            transaction_plan: Some(plan.clone().into()),
661        };
662
663        let mut self2 = self.clone();
664        async move {
665            let rsp = self2.witness(tonic::Request::new(request));
666
667            let witness_data = rsp
668                .await?
669                .into_inner()
670                .witness_data
671                .ok_or_else(|| anyhow::anyhow!("empty WitnessResponse message"))?
672                .try_into()?;
673
674            Ok(witness_data)
675        }
676        .boxed()
677    }
678
679    fn assets(&mut self) -> Pin<Box<dyn Future<Output = Result<asset::Cache>> + Send + 'static>> {
680        let mut self2 = self.clone();
681        async move {
682            // We have to manually invoke the method on the type, because it has the
683            // same name as the one we're implementing.
684            let rsp = ViewServiceClient::assets(
685                &mut self2,
686                tonic::Request::new(pb::AssetsRequest {
687                    ..Default::default()
688                }),
689            );
690
691            let pb_assets: Vec<_> = rsp.await?.into_inner().try_collect().await?;
692
693            let assets = pb_assets
694                .into_iter()
695                .map(Metadata::try_from)
696                .collect::<anyhow::Result<Vec<Metadata>>>()?;
697
698            Ok(assets.into_iter().collect())
699        }
700        .boxed()
701    }
702
703    fn owned_position_ids(
704        &mut self,
705        position_state: Option<position::State>,
706        trading_pair: Option<TradingPair>,
707    ) -> Pin<Box<dyn Future<Output = Result<Vec<position::Id>>> + Send + 'static>> {
708        // should the return be streamed here? none of the other viewclient responses are, probably fine for now
709        // but might be an issue eventually
710        let mut self2 = self.clone();
711        async move {
712            // We have to manually invoke the method on the type, because it has the
713            // same name as the one we're implementing.
714            let rsp = ViewServiceClient::owned_position_ids(
715                &mut self2,
716                tonic::Request::new(pb::OwnedPositionIdsRequest {
717                    trading_pair: trading_pair.map(TryInto::try_into).transpose()?,
718                    position_state: position_state.map(TryInto::try_into).transpose()?,
719                    subaccount: None,
720                }),
721            );
722
723            let pb_position_ids: Vec<_> = rsp.await?.into_inner().try_collect().await?;
724
725            let position_ids = pb_position_ids
726                .into_iter()
727                .map(|p| {
728                    position::Id::try_from(p.position_id.ok_or_else(|| {
729                        anyhow::anyhow!("empty OwnedPositionsIdsResponse message")
730                    })?)
731                })
732                .collect::<anyhow::Result<Vec<position::Id>>>()?;
733
734            Ok(position_ids)
735        }
736        .boxed()
737    }
738
739    fn transaction_info_by_hash(
740        &mut self,
741        id: TransactionId,
742    ) -> Pin<Box<dyn Future<Output = Result<TransactionInfo>> + Send + 'static>> {
743        let mut self2 = self.clone();
744        async move {
745            let rsp = ViewServiceClient::transaction_info_by_hash(
746                &mut self2,
747                tonic::Request::new(pb::TransactionInfoByHashRequest {
748                    id: Some(id.into()),
749                }),
750            )
751            .await?
752            .into_inner()
753            .tx_info
754            .ok_or_else(|| anyhow::anyhow!("empty TransactionInfoByHashResponse message"))?;
755
756            // Check some assumptions about response structure
757            if rsp.height == 0 {
758                anyhow::bail!("missing height");
759            }
760
761            let tx_info = TransactionInfo {
762                height: rsp.height,
763                id: rsp
764                    .id
765                    .ok_or_else(|| anyhow::anyhow!("missing id"))?
766                    .try_into()?,
767                transaction: rsp
768                    .transaction
769                    .ok_or_else(|| anyhow::anyhow!("missing transaction"))?
770                    .try_into()?,
771                perspective: rsp
772                    .perspective
773                    .ok_or_else(|| anyhow::anyhow!("missing perspective"))?
774                    .try_into()?,
775                view: rsp
776                    .view
777                    .ok_or_else(|| anyhow::anyhow!("missing view"))?
778                    .try_into()?,
779                summary: rsp
780                    .summary
781                    .ok_or_else(|| anyhow::anyhow!("missing summary"))?
782                    .try_into()?,
783            };
784
785            Ok(tx_info)
786        }
787        .boxed()
788    }
789
790    fn transaction_info(
791        &mut self,
792        start_height: Option<u64>,
793        end_height: Option<u64>,
794    ) -> Pin<Box<dyn Future<Output = Result<Vec<TransactionInfo>>> + Send + 'static>> {
795        let mut self2 = self.clone();
796        async move {
797            // Unpack optional block heights
798            let start_h = if let Some(h) = start_height { h } else { 0 };
799
800            let end_h = if let Some(h) = end_height { h } else { 0 };
801
802            let rsp = self2.transaction_info(tonic::Request::new(pb::TransactionInfoRequest {
803                start_height: start_h,
804                end_height: end_h,
805            }));
806            let pb_txs: Vec<_> = rsp.await?.into_inner().try_collect().await?;
807
808            pb_txs
809                .into_iter()
810                .map(|rsp| {
811                    let tx_rsp = rsp
812                        .tx_info
813                        .ok_or_else(|| anyhow::anyhow!("empty TransactionInfoResponse message"))?;
814
815                    // Confirm height is populated
816                    if tx_rsp.height == 0 {
817                        anyhow::bail!("missing height");
818                    }
819
820                    let tx_info = TransactionInfo {
821                        height: tx_rsp.height,
822                        transaction: tx_rsp
823                            .transaction
824                            .ok_or_else(|| {
825                                anyhow::anyhow!("empty TransactionInfoResponse message")
826                            })?
827                            .try_into()?,
828                        id: tx_rsp
829                            .id
830                            .ok_or_else(|| anyhow::anyhow!("missing id"))?
831                            .try_into()?,
832                        perspective: tx_rsp
833                            .perspective
834                            .ok_or_else(|| anyhow::anyhow!("missing perspective"))?
835                            .try_into()?,
836                        view: tx_rsp
837                            .view
838                            .ok_or_else(|| anyhow::anyhow!("missing view"))?
839                            .try_into()?,
840                        summary: tx_rsp
841                            .summary
842                            .ok_or_else(|| anyhow::anyhow!("missing summary"))?
843                            .try_into()?,
844                    };
845
846                    Ok(tx_info)
847                })
848                .collect()
849        }
850        .boxed()
851    }
852
853    fn broadcast_transaction(
854        &mut self,
855        transaction: Transaction,
856        await_detection: bool,
857    ) -> BroadcastStatusStream {
858        let mut self2 = self.clone();
859        async move {
860            let rsp = ViewServiceClient::broadcast_transaction(
861                &mut self2,
862                tonic::Request::new(pb::BroadcastTransactionRequest {
863                    transaction: Some(transaction.into()),
864                    await_detection,
865                }),
866            )
867            .await?
868            .into_inner();
869
870            Ok(rsp)
871        }
872        .boxed()
873    }
874
875    fn address_by_index(
876        &mut self,
877        address_index: AddressIndex,
878    ) -> Pin<Box<dyn Future<Output = Result<Address>> + Send + 'static>> {
879        let mut self2 = self.clone();
880        async move {
881            let address = self2.address_by_index(tonic::Request::new(pb::AddressByIndexRequest {
882                address_index: Some(address_index.into()),
883            }));
884            let address = address
885                .await?
886                .into_inner()
887                .address
888                .ok_or_else(|| anyhow::anyhow!("No address available for this address index"))?
889                .try_into()?;
890            Ok(address)
891        }
892        .boxed()
893    }
894
895    fn index_by_address(
896        &mut self,
897        address: Address,
898    ) -> Pin<Box<dyn Future<Output = Result<Option<AddressIndex>>> + Send + 'static>> {
899        let mut self2 = self.clone();
900        async move {
901            let index = self2.index_by_address(tonic::Request::new(pb::IndexByAddressRequest {
902                address: Some(address.into()),
903            }));
904            let index = index
905                .await?
906                .into_inner()
907                .address_index
908                .map(|index| index.try_into())
909                .transpose()?;
910            Ok(index)
911        }
912        .boxed()
913    }
914
915    fn witness_and_build(
916        &mut self,
917        transaction_plan: TransactionPlan,
918        authorization_data: AuthorizationData,
919    ) -> Pin<Box<dyn Future<Output = Result<Transaction>> + Send + 'static>> {
920        let request = pb::WitnessAndBuildRequest {
921            transaction_plan: Some(transaction_plan.into()),
922            authorization_data: Some(authorization_data.into()),
923        };
924        let mut self2 = self.clone();
925        async move {
926            let mut rsp = self2
927                .witness_and_build(tonic::Request::new(request))
928                .await?
929                .into_inner();
930
931            while let Some(rsp) = rsp.try_next().await? {
932                match rsp.status {
933                    Some(status) => match status {
934                        pb::witness_and_build_response::Status::BuildProgress(_) => {
935                            // TODO: should update progress here
936                        }
937                        pb::witness_and_build_response::Status::Complete(c) => {
938                            return c.transaction
939                                .ok_or_else(|| {
940                                    anyhow::anyhow!("WitnessAndBuildResponse complete status message missing transaction")
941                                })?
942                                .try_into();
943                        }
944                    },
945                    None => {
946                        // No status is unexpected behavior
947                        return Err(anyhow::anyhow!(
948                            "empty WitnessAndBuildResponse message"
949                        ));
950                    }
951                }
952            }
953
954            Err(anyhow::anyhow!("should have received complete status or error"))
955        }
956            .boxed()
957    }
958
959    fn unclaimed_swaps(
960        &mut self,
961    ) -> Pin<Box<dyn Future<Output = Result<Vec<SwapRecord>>> + Send + 'static>> {
962        let mut self2 = self.clone();
963        async move {
964            let swaps_response = ViewServiceClient::unclaimed_swaps(
965                &mut self2,
966                tonic::Request::new(pb::UnclaimedSwapsRequest {}),
967            );
968            let pb_swaps: Vec<_> = swaps_response.await?.into_inner().try_collect().await?;
969
970            pb_swaps
971                .into_iter()
972                .map(|swap_rsp| {
973                    let swap_record = swap_rsp
974                        .swap
975                        .ok_or_else(|| anyhow::anyhow!("empty UnclaimedSwapsResponse message"));
976
977                    match swap_record {
978                        Ok(swap) => swap.try_into(),
979                        Err(e) => Err(e),
980                    }
981                })
982                .collect()
983        }
984        .boxed()
985    }
986
987    fn auctions(
988        &mut self,
989        account_filter: Option<AddressIndex>,
990        include_inactive: bool,
991        query_latest_state: bool,
992    ) -> Pin<
993        Box<
994            dyn Future<
995                    Output = Result<
996                        Vec<(
997                            AuctionId,
998                            SpendableNoteRecord,
999                            u64,
1000                            Option<Any>,
1001                            Vec<Position>,
1002                        )>,
1003                    >,
1004                > + Send
1005                + 'static,
1006        >,
1007    > {
1008        let mut client = self.clone();
1009        async move {
1010            let request = tonic::Request::new(pb::AuctionsRequest {
1011                account_filter: account_filter.map(Into::into),
1012                include_inactive,
1013                query_latest_state,
1014                auction_ids_filter: Vec::new(), // TODO: Support `auction_ids_filter`
1015            });
1016
1017            let auctions: Vec<pb::AuctionsResponse> =
1018                ViewServiceClient::auctions(&mut client, request)
1019                    .await?
1020                    .into_inner()
1021                    .try_collect()
1022                    .await?;
1023
1024            let resp: Vec<(
1025                AuctionId,
1026                SpendableNoteRecord,
1027                u64,
1028                Option<Any>,
1029                Vec<Position>,
1030            )> = auctions
1031                .into_iter()
1032                .map(|auction_rsp| {
1033                    let pb_id = auction_rsp
1034                        .id
1035                        .ok_or_else(|| anyhow::anyhow!("missing auction id"))?;
1036                    let auction_id: AuctionId = pb_id.try_into()?;
1037                    let snr: SpendableNoteRecord = auction_rsp
1038                        .note_record
1039                        .ok_or_else(|| anyhow::anyhow!("missing SNR from auction response"))?
1040                        .try_into()?;
1041
1042                    let local_seq = auction_rsp.local_seq;
1043
1044                    let auction = auction_rsp.auction;
1045                    let lps: Vec<Position> = auction_rsp
1046                        .positions
1047                        .into_iter()
1048                        .map(TryInto::try_into)
1049                        .collect::<Result<Vec<_>>>()?;
1050
1051                    Ok::<
1052                        (
1053                            AuctionId,
1054                            SpendableNoteRecord,
1055                            u64, /* the local sequence number */
1056                            Option<Any>, /* the auction state if it was requested */
1057                            Vec<Position>, /* associated liquidity positions if we queried the latest state */
1058                        ),
1059                        anyhow::Error,
1060                    >((auction_id, snr, local_seq, auction, lps))
1061                })
1062                .filter_map(|res| res.ok()) // TODO: scrap this later.
1063                .collect();
1064
1065            Ok(resp)
1066        }
1067        .boxed()
1068    }
1069
1070    fn lqt_voting_notes(
1071        &mut self,
1072        epoch: u64,
1073        filter: Option<AddressIndex>,
1074    ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>> {
1075        let mut client = self.clone();
1076        async move {
1077            let request = tonic::Request::new(pb::LqtVotingNotesRequest {
1078                epoch_index: epoch,
1079                account_filter: filter.map(|x| x.into()),
1080            });
1081            let response = client.lqt_voting_notes(request).await?;
1082            let pb_notes: Vec<pb::LqtVotingNotesResponse> =
1083                response.into_inner().try_collect().await?;
1084
1085            pb_notes
1086                .into_iter()
1087                .map(|note_rsp| {
1088                    let note_record = note_rsp
1089                        .note_record
1090                        .ok_or_else(|| anyhow::anyhow!("empty LqtVotingNotesResponse message"))?
1091                        .try_into()?;
1092                    Ok(note_record)
1093                })
1094                .collect()
1095        }
1096        .boxed()
1097    }
1098}