penumbra_sdk_view/
client.rs

1use std::{collections::BTreeMap, future::Future, pin::Pin};
2
3use anyhow::Result;
4use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
5use pbjson_types::Any;
6use penumbra_sdk_auction::auction::AuctionId;
7use tonic::{codegen::Bytes, Streaming};
8use tracing::instrument;
9
10use penumbra_sdk_app::params::AppParameters;
11use penumbra_sdk_asset::{
12    asset::{self, Id, Metadata},
13    ValueView,
14};
15use penumbra_sdk_dex::{
16    lp::position::{self, Position},
17    TradingPair,
18};
19use penumbra_sdk_fee::GasPrices;
20use penumbra_sdk_keys::{keys::AddressIndex, Address};
21use penumbra_sdk_num::Amount;
22use penumbra_sdk_proto::view::v1::{
23    self as pb, view_service_client::ViewServiceClient, BalancesResponse,
24    BroadcastTransactionResponse, WitnessRequest,
25};
26use penumbra_sdk_sct::Nullifier;
27use penumbra_sdk_shielded_pool::{fmd, note};
28use penumbra_sdk_stake::IdentityKey;
29use penumbra_sdk_transaction::{
30    txhash::TransactionId, AuthorizationData, Transaction, TransactionPlan, WitnessData,
31};
32
33use crate::{SpendableNoteRecord, StatusStreamResponse, SwapRecord, TransactionInfo};
34
35pub(crate) type BroadcastStatusStream = Pin<
36    Box<dyn Future<Output = Result<Streaming<BroadcastTransactionResponse>, anyhow::Error>> + Send>,
37>;
38
39/// The view protocol is used by a view client, who wants to do some
40/// transaction-related actions, to request data from a view service, which is
41/// responsible for synchronizing and scanning the public chain state with one
42/// or more full viewing keys.
43///
44/// This trait is a wrapper around the proto-generated [`ViewServiceClient`]
45/// that serves two goals:
46///
47/// 1. It can use domain types rather than proto-generated types, avoiding conversions;
48/// 2. It's easier to write as a trait bound than the `CustodyProtocolClient`,
49///   which requires complex bounds on its inner type to
50///   enforce that it is a tower `Service`.
51#[allow(clippy::type_complexity)]
52pub trait ViewClient {
53    /// Query the auction state
54    fn auctions(
55        &mut self,
56        account_filter: Option<AddressIndex>,
57        include_inactive: bool,
58        query_latest_state: bool,
59    ) -> Pin<
60        Box<
61            dyn Future<
62                    Output = Result<
63                        Vec<(
64                            AuctionId,
65                            SpendableNoteRecord,
66                            u64,
67                            Option<Any>,
68                            Vec<Position>,
69                        )>,
70                    >,
71                > + Send
72                + 'static,
73        >,
74    >;
75
76    /// Get the current status of chain sync.
77    fn status(
78        &mut self,
79    ) -> Pin<Box<dyn Future<Output = Result<pb::StatusResponse>> + Send + 'static>>;
80
81    /// Stream status updates on chain sync until it completes.
82    fn status_stream(
83        &mut self,
84    ) -> Pin<
85        Box<
86            dyn Future<
87                    Output = Result<
88                        Pin<Box<dyn Stream<Item = Result<StatusStreamResponse>> + Send + 'static>>,
89                    >,
90                > + Send
91                + 'static,
92        >,
93    >;
94
95    /// Get a copy of the app parameters.
96    fn app_params(
97        &mut self,
98    ) -> Pin<Box<dyn Future<Output = Result<AppParameters>> + Send + 'static>>;
99
100    /// Get a copy of the gas prices.
101    fn gas_prices(&mut self) -> Pin<Box<dyn Future<Output = Result<GasPrices>> + Send + 'static>>;
102
103    /// Get a copy of the FMD parameters.
104    fn fmd_parameters(
105        &mut self,
106    ) -> Pin<Box<dyn Future<Output = Result<fmd::Parameters>> + Send + 'static>>;
107
108    /// Queries for notes.
109    fn notes(
110        &mut self,
111        request: pb::NotesRequest,
112    ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>>;
113
114    /// Queries for notes for voting.
115    fn notes_for_voting(
116        &mut self,
117        request: pb::NotesForVotingRequest,
118    ) -> Pin<
119        Box<dyn Future<Output = Result<Vec<(SpendableNoteRecord, IdentityKey)>>> + Send + 'static>,
120    >;
121
122    /// Queries for account balance by address
123    fn balances(
124        &mut self,
125        address_index: AddressIndex,
126        asset_id: Option<asset::Id>,
127    ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, Amount)>>> + Send + 'static>>;
128
129    /// Queries for a specific note by commitment, returning immediately if it is not found.
130    fn note_by_commitment(
131        &mut self,
132        note_commitment: note::StateCommitment,
133    ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>>;
134
135    /// Queries for a specific swap by commitment, returning immediately if it is not found.
136    fn swap_by_commitment(
137        &mut self,
138        swap_commitment: penumbra_sdk_tct::StateCommitment,
139    ) -> Pin<Box<dyn Future<Output = Result<SwapRecord>> + Send + 'static>>;
140
141    /// Queries for a specific nullifier's status, returning immediately if it is not found.
142    fn nullifier_status(
143        &mut self,
144        nullifier: Nullifier,
145    ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'static>>;
146
147    /// Waits for a specific nullifier to be detected, returning immediately if it is already
148    /// present, but waiting otherwise.
149    fn await_nullifier(
150        &mut self,
151        nullifier: Nullifier,
152    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
153
154    /// Queries for a specific note by commitment, waiting until the note is detected if it is not found.
155    ///
156    /// This is useful for waiting for a note to be detected by the view service.
157    fn await_note_by_commitment(
158        &mut self,
159        note_commitment: note::StateCommitment,
160    ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>>;
161
162    /// Returns authentication paths for the given note commitments.
163    ///
164    /// This method takes a batch of input commitments, rather than just one, so
165    /// that the client can get a consistent set of authentication paths to a
166    /// common root.  (Otherwise, if a client made multiple requests, the wallet
167    /// service could have advanced the state commitment tree state between queries).
168    fn witness(
169        &mut self,
170        plan: &TransactionPlan,
171    ) -> Pin<Box<dyn Future<Output = Result<WitnessData>> + Send + 'static>>;
172
173    /// Returns a transaction built from the provided TransactionPlan and AuthorizationData
174    fn witness_and_build(
175        &mut self,
176        plan: TransactionPlan,
177        auth_data: AuthorizationData,
178    ) -> Pin<Box<dyn Future<Output = Result<Transaction>> + Send + 'static>>;
179
180    /// Queries for all known assets.
181    fn assets(&mut self) -> Pin<Box<dyn Future<Output = Result<asset::Cache>> + Send + 'static>>;
182
183    /// Queries for liquidity positions owned by the full viewing key.
184    fn owned_position_ids(
185        &mut self,
186        position_state: Option<position::State>,
187        trading_pair: Option<TradingPair>,
188    ) -> Pin<Box<dyn Future<Output = Result<Vec<position::Id>>> + Send + 'static>>;
189
190    /// Generates a full perspective for a selected transaction using a full viewing key
191    fn transaction_info_by_hash(
192        &mut self,
193        id: TransactionId,
194    ) -> Pin<Box<dyn Future<Output = Result<TransactionInfo>> + Send + 'static>>;
195
196    /// Queries for transactions in a range of block heights
197    fn transaction_info(
198        &mut self,
199        start_height: Option<u64>,
200        end_height: Option<u64>,
201    ) -> Pin<Box<dyn Future<Output = Result<Vec<TransactionInfo>>> + Send + 'static>>;
202
203    fn broadcast_transaction(
204        &mut self,
205        transaction: Transaction,
206        await_detection: bool,
207    ) -> BroadcastStatusStream;
208
209    /// Return unspent notes, grouped by address index and then by asset id.
210    #[instrument(skip(self))]
211    fn unspent_notes_by_address_and_asset(
212        &mut self,
213    ) -> Pin<
214        Box<
215            dyn Future<
216                    Output = Result<
217                        BTreeMap<AddressIndex, BTreeMap<asset::Id, Vec<SpendableNoteRecord>>>,
218                    >,
219                > + Send
220                + 'static,
221        >,
222    > {
223        let notes = self.notes(pb::NotesRequest {
224            include_spent: false,
225            ..Default::default()
226        });
227        async move {
228            let notes = notes.await?;
229            tracing::trace!(?notes);
230
231            let mut notes_by_address_and_asset = BTreeMap::new();
232
233            for note_record in notes {
234                notes_by_address_and_asset
235                    .entry(note_record.address_index)
236                    .or_insert_with(BTreeMap::new)
237                    .entry(note_record.note.asset_id())
238                    .or_insert_with(Vec::new)
239                    .push(note_record);
240            }
241            tracing::trace!(?notes_by_address_and_asset);
242
243            Ok(notes_by_address_and_asset)
244        }
245        .boxed()
246    }
247
248    /// Return unspent notes, grouped by account ID (combining ephemeral addresses for the account) and then by asset id.
249    #[instrument(skip(self))]
250    fn unspent_notes_by_account_and_asset(
251        &mut self,
252    ) -> Pin<
253        Box<
254            dyn Future<
255                    Output = Result<BTreeMap<u32, BTreeMap<asset::Id, Vec<SpendableNoteRecord>>>>,
256                > + Send
257                + 'static,
258        >,
259    > {
260        let notes = self.notes(pb::NotesRequest {
261            include_spent: false,
262            ..Default::default()
263        });
264        async move {
265            let notes = notes.await?;
266            tracing::trace!(?notes);
267
268            let mut notes_by_account_and_asset = BTreeMap::new();
269
270            for note_record in notes {
271                notes_by_account_and_asset
272                    .entry(note_record.address_index.account)
273                    .or_insert_with(BTreeMap::new)
274                    .entry(note_record.note.asset_id())
275                    .or_insert_with(Vec::new)
276                    .push(note_record);
277            }
278            tracing::trace!(?notes_by_account_and_asset);
279
280            Ok(notes_by_account_and_asset)
281        }
282        .boxed()
283    }
284
285    /// Return unspent notes, grouped by denom and then by address index.
286    #[instrument(skip(self))]
287    fn unspent_notes_by_asset_and_address(
288        &mut self,
289    ) -> Pin<
290        Box<
291            dyn Future<
292                    Output = Result<
293                        BTreeMap<asset::Id, BTreeMap<AddressIndex, Vec<SpendableNoteRecord>>>,
294                    >,
295                > + Send
296                + 'static,
297        >,
298    > {
299        let notes = self.notes(pb::NotesRequest {
300            include_spent: false,
301            ..Default::default()
302        });
303
304        async move {
305            let notes = notes.await?;
306            tracing::trace!(?notes);
307
308            let mut notes_by_asset_and_address = BTreeMap::new();
309
310            for note_record in notes {
311                notes_by_asset_and_address
312                    .entry(note_record.note.asset_id())
313                    .or_insert_with(BTreeMap::new)
314                    .entry(note_record.address_index)
315                    .or_insert_with(Vec::new)
316                    .push(note_record);
317            }
318            tracing::trace!(?notes_by_asset_and_address);
319
320            Ok(notes_by_asset_and_address)
321        }
322        .boxed()
323    }
324
325    fn address_by_index(
326        &mut self,
327        address_index: AddressIndex,
328    ) -> Pin<Box<dyn Future<Output = Result<Address>> + Send + 'static>>;
329
330    /// Queries for the index of a provided address, returning `None` if not
331    /// controlled by the view service's FVK.
332    fn index_by_address(
333        &mut self,
334        address: Address,
335    ) -> Pin<Box<dyn Future<Output = Result<Option<AddressIndex>>> + Send + 'static>>;
336
337    /// Queries for unclaimed Swaps.
338    fn unclaimed_swaps(
339        &mut self,
340    ) -> Pin<Box<dyn Future<Output = Result<Vec<SwapRecord>>> + Send + 'static>>;
341}
342
343// We need to tell `async_trait` not to add a `Send` bound to the boxed
344// futures it generates, because the underlying `CustodyProtocolClient` isn't `Sync`,
345// but its `authorize` method takes `&mut self`. This would normally cause a huge
346// amount of problems, because non-`Send` futures don't compose well, but as long
347// as we're calling the method within an async block on a local mutable variable,
348// it should be fine.
349impl<T> ViewClient for ViewServiceClient<T>
350where
351    T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + 'static,
352    T::ResponseBody: tonic::codegen::Body<Data = Bytes> + Send + 'static,
353    T::Error: Into<tonic::codegen::StdError>,
354    T::Future: Send + 'static,
355    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
356{
357    fn status(
358        &mut self,
359    ) -> Pin<Box<dyn Future<Output = Result<pb::StatusResponse>> + Send + 'static>> {
360        let mut self2 = self.clone();
361        async move {
362            let status = self2.status(tonic::Request::new(pb::StatusRequest {}));
363            let status = status.await?.into_inner();
364            Ok(status)
365        }
366        .boxed()
367    }
368
369    fn status_stream(
370        &mut self,
371    ) -> Pin<
372        Box<
373            dyn Future<
374                    Output = Result<
375                        Pin<Box<dyn Stream<Item = Result<StatusStreamResponse>> + Send + 'static>>,
376                    >,
377                > + Send
378                + 'static,
379        >,
380    > {
381        let mut self2 = self.clone();
382        async move {
383            let stream = self2.status_stream(tonic::Request::new(pb::StatusStreamRequest {}));
384            let stream = stream.await?.into_inner();
385
386            Ok(stream
387                .map_err(|e| anyhow::anyhow!("view service error: {}", e))
388                .and_then(|msg| async move { StatusStreamResponse::try_from(msg) })
389                .boxed())
390        }
391        .boxed()
392    }
393
394    fn app_params(
395        &mut self,
396    ) -> Pin<Box<dyn Future<Output = Result<AppParameters>> + Send + 'static>> {
397        let mut self2 = self.clone();
398        async move {
399            // We have to manually invoke the method on the type, because it has the
400            // same name as the one we're implementing.
401            let rsp = ViewServiceClient::app_parameters(
402                &mut self2,
403                tonic::Request::new(pb::AppParametersRequest {}),
404            );
405            rsp.await?.into_inner().try_into()
406        }
407        .boxed()
408    }
409
410    fn gas_prices(&mut self) -> Pin<Box<dyn Future<Output = Result<GasPrices>> + Send + 'static>> {
411        let mut self2 = self.clone();
412        async move {
413            // We have to manually invoke the method on the type, because it has the
414            // same name as the one we're implementing.
415            let rsp = ViewServiceClient::gas_prices(
416                &mut self2,
417                tonic::Request::new(pb::GasPricesRequest {}),
418            );
419            rsp.await?
420                .into_inner()
421                .gas_prices
422                .ok_or_else(|| anyhow::anyhow!("empty GasPricesResponse message"))?
423                .try_into()
424        }
425        .boxed()
426    }
427
428    fn fmd_parameters(
429        &mut self,
430    ) -> Pin<Box<dyn Future<Output = Result<fmd::Parameters>> + Send + 'static>> {
431        let mut self2 = self.clone();
432        async move {
433            let parameters = ViewServiceClient::fmd_parameters(
434                &mut self2,
435                tonic::Request::new(pb::FmdParametersRequest {}),
436            );
437            let parameters = parameters.await?.into_inner().parameters;
438
439            parameters
440                .ok_or_else(|| anyhow::anyhow!("empty FmdParametersRequest message"))?
441                .try_into()
442        }
443        .boxed()
444    }
445
446    fn notes(
447        &mut self,
448        request: pb::NotesRequest,
449    ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>> {
450        let mut self2 = self.clone();
451        async move {
452            let req = self2.notes(tonic::Request::new(request));
453            let pb_notes: Vec<_> = req.await?.into_inner().try_collect().await?;
454
455            pb_notes
456                .into_iter()
457                .map(|note_rsp| {
458                    let note_record = note_rsp
459                        .note_record
460                        .ok_or_else(|| anyhow::anyhow!("empty NotesResponse message"));
461
462                    match note_record {
463                        Ok(note) => note.try_into(),
464                        Err(e) => Err(e),
465                    }
466                })
467                .collect()
468        }
469        .boxed()
470    }
471
472    fn notes_for_voting(
473        &mut self,
474        request: pb::NotesForVotingRequest,
475    ) -> Pin<
476        Box<dyn Future<Output = Result<Vec<(SpendableNoteRecord, IdentityKey)>>> + Send + 'static>,
477    > {
478        let mut self2 = self.clone();
479        async move {
480            let req = self2.notes_for_voting(tonic::Request::new(request));
481            let pb_notes: Vec<_> = req.await?.into_inner().try_collect().await?;
482
483            pb_notes
484                .into_iter()
485                .map(|note_rsp| {
486                    let note_record = note_rsp
487                        .note_record
488                        .ok_or_else(|| anyhow::anyhow!("empty NotesForVotingResponse message"))?
489                        .try_into()?;
490
491                    let identity_key = note_rsp
492                        .identity_key
493                        .ok_or_else(|| anyhow::anyhow!("empty NotesForVotingResponse message"))?
494                        .try_into()?;
495
496                    Ok((note_record, identity_key))
497                })
498                .collect()
499        }
500        .boxed()
501    }
502
503    fn note_by_commitment(
504        &mut self,
505        note_commitment: note::StateCommitment,
506    ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>> {
507        let mut self2 = self.clone();
508        async move {
509            let note_commitment_response = ViewServiceClient::note_by_commitment(
510                &mut self2,
511                tonic::Request::new(pb::NoteByCommitmentRequest {
512                    note_commitment: Some(note_commitment.into()),
513                    await_detection: false,
514                }),
515            );
516            let note_commitment_response = note_commitment_response.await?.into_inner();
517
518            note_commitment_response
519                .spendable_note
520                .ok_or_else(|| anyhow::anyhow!("empty NoteByCommitmentResponse message"))?
521                .try_into()
522        }
523        .boxed()
524    }
525
526    fn balances(
527        &mut self,
528        address_index: AddressIndex,
529        asset_id: Option<asset::Id>,
530    ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, Amount)>>> + Send + 'static>> {
531        let mut self2 = self.clone();
532        async move {
533            let req = ViewServiceClient::balances(
534                &mut self2,
535                tonic::Request::new(pb::BalancesRequest {
536                    account_filter: Some(address_index.into()),
537                    asset_id_filter: asset_id.map(Into::into),
538                }),
539            );
540
541            let balances: Vec<BalancesResponse> = req.await?.into_inner().try_collect().await?;
542
543            balances
544                .into_iter()
545                .map(|rsp| {
546                    let pb_value_view = rsp
547                        .balance_view
548                        .ok_or_else(|| anyhow::anyhow!("empty balance view"))?;
549
550                    let value_view: ValueView = pb_value_view.try_into()?;
551                    let id = value_view.asset_id();
552                    let amount = value_view.value().amount;
553                    Ok((id, amount))
554                })
555                .collect()
556        }
557        .boxed()
558    }
559
560    fn swap_by_commitment(
561        &mut self,
562        swap_commitment: penumbra_sdk_tct::StateCommitment,
563    ) -> Pin<Box<dyn Future<Output = Result<SwapRecord>> + Send + 'static>> {
564        let mut self2 = self.clone();
565        async move {
566            let swap_commitment_response = ViewServiceClient::swap_by_commitment(
567                &mut self2,
568                tonic::Request::new(pb::SwapByCommitmentRequest {
569                    swap_commitment: Some(swap_commitment.into()),
570                    await_detection: false,
571                }),
572            );
573            let swap_commitment_response = swap_commitment_response.await?.into_inner();
574
575            swap_commitment_response
576                .swap
577                .ok_or_else(|| anyhow::anyhow!("empty SwapByCommitmentResponse message"))?
578                .try_into()
579        }
580        .boxed()
581    }
582
583    /// Queries for a specific note by commitment, waiting until the note is detected if it is not found.
584    ///
585    /// This is useful for waiting for a note to be detected by the view service.
586    fn await_note_by_commitment(
587        &mut self,
588        note_commitment: note::StateCommitment,
589    ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>> {
590        let mut self2 = self.clone();
591        async move {
592            let spendable_note = ViewServiceClient::note_by_commitment(
593                &mut self2,
594                tonic::Request::new(pb::NoteByCommitmentRequest {
595                    note_commitment: Some(note_commitment.into()),
596                    await_detection: true,
597                }),
598            );
599            let spendable_note = spendable_note.await?.into_inner().spendable_note;
600
601            spendable_note
602                .ok_or_else(|| anyhow::anyhow!("empty NoteByCommitmentRequest message"))?
603                .try_into()
604        }
605        .boxed()
606    }
607
608    /// Queries for a specific nullifier's status, returning immediately if it is not found.
609    fn nullifier_status(
610        &mut self,
611        nullifier: Nullifier,
612    ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'static>> {
613        let mut self2 = self.clone();
614        async move {
615            let rsp = ViewServiceClient::nullifier_status(
616                &mut self2,
617                tonic::Request::new(pb::NullifierStatusRequest {
618                    nullifier: Some(nullifier.into()),
619                    await_detection: false,
620                }),
621            );
622            Ok(rsp.await?.into_inner().spent)
623        }
624        .boxed()
625    }
626
627    /// Waits for a specific nullifier to be detected, returning immediately if it is already
628    /// present, but waiting otherwise.
629    fn await_nullifier(
630        &mut self,
631        nullifier: Nullifier,
632    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
633        let mut self2 = self.clone();
634        async move {
635            let rsp = ViewServiceClient::nullifier_status(
636                &mut self2,
637                tonic::Request::new(pb::NullifierStatusRequest {
638                    nullifier: Some(nullifier.into()),
639                    await_detection: true,
640                }),
641            );
642            rsp.await?;
643            Ok(())
644        }
645        .boxed()
646    }
647
648    fn witness(
649        &mut self,
650        plan: &TransactionPlan,
651    ) -> Pin<Box<dyn Future<Output = Result<WitnessData>> + Send + 'static>> {
652        let request = WitnessRequest {
653            transaction_plan: Some(plan.clone().into()),
654        };
655
656        let mut self2 = self.clone();
657        async move {
658            let rsp = self2.witness(tonic::Request::new(request));
659
660            let witness_data = rsp
661                .await?
662                .into_inner()
663                .witness_data
664                .ok_or_else(|| anyhow::anyhow!("empty WitnessResponse message"))?
665                .try_into()?;
666
667            Ok(witness_data)
668        }
669        .boxed()
670    }
671
672    fn assets(&mut self) -> Pin<Box<dyn Future<Output = Result<asset::Cache>> + Send + 'static>> {
673        let mut self2 = self.clone();
674        async move {
675            // We have to manually invoke the method on the type, because it has the
676            // same name as the one we're implementing.
677            let rsp = ViewServiceClient::assets(
678                &mut self2,
679                tonic::Request::new(pb::AssetsRequest {
680                    ..Default::default()
681                }),
682            );
683
684            let pb_assets: Vec<_> = rsp.await?.into_inner().try_collect().await?;
685
686            let assets = pb_assets
687                .into_iter()
688                .map(Metadata::try_from)
689                .collect::<anyhow::Result<Vec<Metadata>>>()?;
690
691            Ok(assets.into_iter().collect())
692        }
693        .boxed()
694    }
695
696    fn owned_position_ids(
697        &mut self,
698        position_state: Option<position::State>,
699        trading_pair: Option<TradingPair>,
700    ) -> Pin<Box<dyn Future<Output = Result<Vec<position::Id>>> + Send + 'static>> {
701        // should the return be streamed here? none of the other viewclient responses are, probably fine for now
702        // but might be an issue eventually
703        let mut self2 = self.clone();
704        async move {
705            // We have to manually invoke the method on the type, because it has the
706            // same name as the one we're implementing.
707            let rsp = ViewServiceClient::owned_position_ids(
708                &mut self2,
709                tonic::Request::new(pb::OwnedPositionIdsRequest {
710                    trading_pair: trading_pair.map(TryInto::try_into).transpose()?,
711                    position_state: position_state.map(TryInto::try_into).transpose()?,
712                    subaccount: None,
713                }),
714            );
715
716            let pb_position_ids: Vec<_> = rsp.await?.into_inner().try_collect().await?;
717
718            let position_ids = pb_position_ids
719                .into_iter()
720                .map(|p| {
721                    position::Id::try_from(p.position_id.ok_or_else(|| {
722                        anyhow::anyhow!("empty OwnedPositionsIdsResponse message")
723                    })?)
724                })
725                .collect::<anyhow::Result<Vec<position::Id>>>()?;
726
727            Ok(position_ids)
728        }
729        .boxed()
730    }
731
732    fn transaction_info_by_hash(
733        &mut self,
734        id: TransactionId,
735    ) -> Pin<Box<dyn Future<Output = Result<TransactionInfo>> + Send + 'static>> {
736        let mut self2 = self.clone();
737        async move {
738            let rsp = ViewServiceClient::transaction_info_by_hash(
739                &mut self2,
740                tonic::Request::new(pb::TransactionInfoByHashRequest {
741                    id: Some(id.into()),
742                }),
743            )
744            .await?
745            .into_inner()
746            .tx_info
747            .ok_or_else(|| anyhow::anyhow!("empty TransactionInfoByHashResponse message"))?;
748
749            // Check some assumptions about response structure
750            if rsp.height == 0 {
751                anyhow::bail!("missing height");
752            }
753
754            let tx_info = TransactionInfo {
755                height: rsp.height,
756                id: rsp
757                    .id
758                    .ok_or_else(|| anyhow::anyhow!("missing id"))?
759                    .try_into()?,
760                transaction: rsp
761                    .transaction
762                    .ok_or_else(|| anyhow::anyhow!("missing transaction"))?
763                    .try_into()?,
764                perspective: rsp
765                    .perspective
766                    .ok_or_else(|| anyhow::anyhow!("missing perspective"))?
767                    .try_into()?,
768                view: rsp
769                    .view
770                    .ok_or_else(|| anyhow::anyhow!("missing view"))?
771                    .try_into()?,
772                summary: rsp
773                    .summary
774                    .ok_or_else(|| anyhow::anyhow!("missing summary"))?
775                    .try_into()?,
776            };
777
778            Ok(tx_info)
779        }
780        .boxed()
781    }
782
783    fn transaction_info(
784        &mut self,
785        start_height: Option<u64>,
786        end_height: Option<u64>,
787    ) -> Pin<Box<dyn Future<Output = Result<Vec<TransactionInfo>>> + Send + 'static>> {
788        let mut self2 = self.clone();
789        async move {
790            // Unpack optional block heights
791            let start_h = if let Some(h) = start_height { h } else { 0 };
792
793            let end_h = if let Some(h) = end_height { h } else { 0 };
794
795            let rsp = self2.transaction_info(tonic::Request::new(pb::TransactionInfoRequest {
796                start_height: start_h,
797                end_height: end_h,
798            }));
799            let pb_txs: Vec<_> = rsp.await?.into_inner().try_collect().await?;
800
801            pb_txs
802                .into_iter()
803                .map(|rsp| {
804                    let tx_rsp = rsp
805                        .tx_info
806                        .ok_or_else(|| anyhow::anyhow!("empty TransactionInfoResponse message"))?;
807
808                    // Confirm height is populated
809                    if tx_rsp.height == 0 {
810                        anyhow::bail!("missing height");
811                    }
812
813                    let tx_info = TransactionInfo {
814                        height: tx_rsp.height,
815                        transaction: tx_rsp
816                            .transaction
817                            .ok_or_else(|| {
818                                anyhow::anyhow!("empty TransactionInfoResponse message")
819                            })?
820                            .try_into()?,
821                        id: tx_rsp
822                            .id
823                            .ok_or_else(|| anyhow::anyhow!("missing id"))?
824                            .try_into()?,
825                        perspective: tx_rsp
826                            .perspective
827                            .ok_or_else(|| anyhow::anyhow!("missing perspective"))?
828                            .try_into()?,
829                        view: tx_rsp
830                            .view
831                            .ok_or_else(|| anyhow::anyhow!("missing view"))?
832                            .try_into()?,
833                        summary: tx_rsp
834                            .summary
835                            .ok_or_else(|| anyhow::anyhow!("missing summary"))?
836                            .try_into()?,
837                    };
838
839                    Ok(tx_info)
840                })
841                .collect()
842        }
843        .boxed()
844    }
845
846    fn broadcast_transaction(
847        &mut self,
848        transaction: Transaction,
849        await_detection: bool,
850    ) -> BroadcastStatusStream {
851        let mut self2 = self.clone();
852        async move {
853            let rsp = ViewServiceClient::broadcast_transaction(
854                &mut self2,
855                tonic::Request::new(pb::BroadcastTransactionRequest {
856                    transaction: Some(transaction.into()),
857                    await_detection,
858                }),
859            )
860            .await?
861            .into_inner();
862
863            Ok(rsp)
864        }
865        .boxed()
866    }
867
868    fn address_by_index(
869        &mut self,
870        address_index: AddressIndex,
871    ) -> Pin<Box<dyn Future<Output = Result<Address>> + Send + 'static>> {
872        let mut self2 = self.clone();
873        async move {
874            let address = self2.address_by_index(tonic::Request::new(pb::AddressByIndexRequest {
875                address_index: Some(address_index.into()),
876            }));
877            let address = address
878                .await?
879                .into_inner()
880                .address
881                .ok_or_else(|| anyhow::anyhow!("No address available for this address index"))?
882                .try_into()?;
883            Ok(address)
884        }
885        .boxed()
886    }
887
888    fn index_by_address(
889        &mut self,
890        address: Address,
891    ) -> Pin<Box<dyn Future<Output = Result<Option<AddressIndex>>> + Send + 'static>> {
892        let mut self2 = self.clone();
893        async move {
894            let index = self2.index_by_address(tonic::Request::new(pb::IndexByAddressRequest {
895                address: Some(address.into()),
896            }));
897            let index = index
898                .await?
899                .into_inner()
900                .address_index
901                .map(|index| index.try_into())
902                .transpose()?;
903            Ok(index)
904        }
905        .boxed()
906    }
907
908    fn witness_and_build(
909        &mut self,
910        transaction_plan: TransactionPlan,
911        authorization_data: AuthorizationData,
912    ) -> Pin<Box<dyn Future<Output = Result<Transaction>> + Send + 'static>> {
913        let request = pb::WitnessAndBuildRequest {
914            transaction_plan: Some(transaction_plan.into()),
915            authorization_data: Some(authorization_data.into()),
916        };
917        let mut self2 = self.clone();
918        async move {
919            let mut rsp = self2
920                .witness_and_build(tonic::Request::new(request))
921                .await?
922                .into_inner();
923
924            while let Some(rsp) = rsp.try_next().await? {
925                match rsp.status {
926                    Some(status) => match status {
927                        pb::witness_and_build_response::Status::BuildProgress(_) => {
928                            // TODO: should update progress here
929                        }
930                        pb::witness_and_build_response::Status::Complete(c) => {
931                            return c.transaction
932                                .ok_or_else(|| {
933                                    anyhow::anyhow!("WitnessAndBuildResponse complete status message missing transaction")
934                                })?
935                                .try_into();
936                        }
937                    },
938                    None => {
939                        // No status is unexpected behavior
940                        return Err(anyhow::anyhow!(
941                            "empty WitnessAndBuildResponse message"
942                        ));
943                    }
944                }
945            }
946
947            Err(anyhow::anyhow!("should have received complete status or error"))
948        }
949            .boxed()
950    }
951
952    fn unclaimed_swaps(
953        &mut self,
954    ) -> Pin<Box<dyn Future<Output = Result<Vec<SwapRecord>>> + Send + 'static>> {
955        let mut self2 = self.clone();
956        async move {
957            let swaps_response = ViewServiceClient::unclaimed_swaps(
958                &mut self2,
959                tonic::Request::new(pb::UnclaimedSwapsRequest {}),
960            );
961            let pb_swaps: Vec<_> = swaps_response.await?.into_inner().try_collect().await?;
962
963            pb_swaps
964                .into_iter()
965                .map(|swap_rsp| {
966                    let swap_record = swap_rsp
967                        .swap
968                        .ok_or_else(|| anyhow::anyhow!("empty UnclaimedSwapsResponse message"));
969
970                    match swap_record {
971                        Ok(swap) => swap.try_into(),
972                        Err(e) => Err(e),
973                    }
974                })
975                .collect()
976        }
977        .boxed()
978    }
979
980    fn auctions(
981        &mut self,
982        account_filter: Option<AddressIndex>,
983        include_inactive: bool,
984        query_latest_state: bool,
985    ) -> Pin<
986        Box<
987            dyn Future<
988                    Output = Result<
989                        Vec<(
990                            AuctionId,
991                            SpendableNoteRecord,
992                            u64,
993                            Option<Any>,
994                            Vec<Position>,
995                        )>,
996                    >,
997                > + Send
998                + 'static,
999        >,
1000    > {
1001        let mut client = self.clone();
1002        async move {
1003            let request = tonic::Request::new(pb::AuctionsRequest {
1004                account_filter: account_filter.map(Into::into),
1005                include_inactive,
1006                query_latest_state,
1007                auction_ids_filter: Vec::new(), // TODO: Support `auction_ids_filter`
1008            });
1009
1010            let auctions: Vec<pb::AuctionsResponse> =
1011                ViewServiceClient::auctions(&mut client, request)
1012                    .await?
1013                    .into_inner()
1014                    .try_collect()
1015                    .await?;
1016
1017            let resp: Vec<(
1018                AuctionId,
1019                SpendableNoteRecord,
1020                u64,
1021                Option<Any>,
1022                Vec<Position>,
1023            )> = auctions
1024                .into_iter()
1025                .map(|auction_rsp| {
1026                    let pb_id = auction_rsp
1027                        .id
1028                        .ok_or_else(|| anyhow::anyhow!("missing auction id"))?;
1029                    let auction_id: AuctionId = pb_id.try_into()?;
1030                    let snr: SpendableNoteRecord = auction_rsp
1031                        .note_record
1032                        .ok_or_else(|| anyhow::anyhow!("missing SNR from auction response"))?
1033                        .try_into()?;
1034
1035                    let local_seq = auction_rsp.local_seq;
1036
1037                    let auction = auction_rsp.auction;
1038                    let lps: Vec<Position> = auction_rsp
1039                        .positions
1040                        .into_iter()
1041                        .map(TryInto::try_into)
1042                        .collect::<Result<Vec<_>>>()?;
1043
1044                    Ok::<
1045                        (
1046                            AuctionId,
1047                            SpendableNoteRecord,
1048                            u64, /* the local sequence number */
1049                            Option<Any>, /* the auction state if it was requested */
1050                            Vec<Position>, /* associated liquidity positions if we queried the latest state */
1051                        ),
1052                        anyhow::Error,
1053                    >((auction_id, snr, local_seq, auction, lps))
1054                })
1055                .filter_map(|res| res.ok()) // TODO: scrap this later.
1056                .collect();
1057
1058            Ok(resp)
1059        }
1060        .boxed()
1061    }
1062}