penumbra_sdk_view/
client.rs

1use std::{collections::BTreeMap, future::Future, pin::Pin};
2
3use anyhow::Result;
4use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
5use pbjson_types::Any;
6use penumbra_sdk_auction::auction::AuctionId;
7use tonic::{codegen::Bytes, Streaming};
8use tracing::instrument;
9
10use penumbra_sdk_app::params::AppParameters;
11use penumbra_sdk_asset::{
12    asset::{self, Id, Metadata},
13    ValueView,
14};
15use penumbra_sdk_dex::{
16    lp::position::{self, Position},
17    TradingPair,
18};
19use penumbra_sdk_fee::GasPrices;
20use penumbra_sdk_keys::{keys::AddressIndex, Address};
21use penumbra_sdk_num::Amount;
22use penumbra_sdk_proto::view::v1::{
23    self as pb, view_service_client::ViewServiceClient, BalancesResponse,
24    BroadcastTransactionResponse, WitnessRequest,
25};
26use penumbra_sdk_sct::Nullifier;
27use penumbra_sdk_shielded_pool::{fmd, note};
28use penumbra_sdk_stake::IdentityKey;
29use penumbra_sdk_transaction::{
30    txhash::TransactionId, AuthorizationData, Transaction, TransactionPlan, WitnessData,
31};
32
33use crate::{SpendableNoteRecord, StatusStreamResponse, SwapRecord, TransactionInfo};
34
35pub(crate) type BroadcastStatusStream = Pin<
36    Box<dyn Future<Output = Result<Streaming<BroadcastTransactionResponse>, anyhow::Error>> + Send>,
37>;
38
39/// The view protocol is used by a view client, who wants to do some
40/// transaction-related actions, to request data from a view service, which is
41/// responsible for synchronizing and scanning the public chain state with one
42/// or more full viewing keys.
43///
44/// This trait is a wrapper around the proto-generated [`ViewServiceClient`]
45/// that serves two goals:
46///
47/// 1. It can use domain types rather than proto-generated types, avoiding conversions;
48/// 2. It's easier to write as a trait bound than the `CustodyProtocolClient`,
49///   which requires complex bounds on its inner type to
50///   enforce that it is a tower `Service`.
51#[allow(clippy::type_complexity)]
52pub trait ViewClient {
53    /// Query the auction state
54    fn auctions(
55        &mut self,
56        account_filter: Option<AddressIndex>,
57        include_inactive: bool,
58        query_latest_state: bool,
59    ) -> Pin<
60        Box<
61            dyn Future<
62                    Output = Result<
63                        Vec<(
64                            AuctionId,
65                            SpendableNoteRecord,
66                            u64,
67                            Option<Any>,
68                            Vec<Position>,
69                        )>,
70                    >,
71                > + Send
72                + 'static,
73        >,
74    >;
75
76    /// Get the current status of chain sync.
77    fn status(
78        &mut self,
79    ) -> Pin<Box<dyn Future<Output = Result<pb::StatusResponse>> + Send + 'static>>;
80
81    /// Stream status updates on chain sync until it completes.
82    fn status_stream(
83        &mut self,
84    ) -> Pin<
85        Box<
86            dyn Future<
87                    Output = Result<
88                        Pin<Box<dyn Stream<Item = Result<StatusStreamResponse>> + Send + 'static>>,
89                    >,
90                > + Send
91                + 'static,
92        >,
93    >;
94
95    /// Get a copy of the app parameters.
96    fn app_params(
97        &mut self,
98    ) -> Pin<Box<dyn Future<Output = Result<AppParameters>> + Send + 'static>>;
99
100    /// Get a copy of the gas prices.
101    fn gas_prices(&mut self) -> Pin<Box<dyn Future<Output = Result<GasPrices>> + Send + 'static>>;
102
103    /// Get a copy of the FMD parameters.
104    fn fmd_parameters(
105        &mut self,
106    ) -> Pin<Box<dyn Future<Output = Result<fmd::Parameters>> + Send + 'static>>;
107
108    /// Queries for notes.
109    fn notes(
110        &mut self,
111        request: pb::NotesRequest,
112    ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>>;
113
114    /// Queries for notes for voting.
115    fn notes_for_voting(
116        &mut self,
117        request: pb::NotesForVotingRequest,
118    ) -> Pin<
119        Box<dyn Future<Output = Result<Vec<(SpendableNoteRecord, IdentityKey)>>> + Send + 'static>,
120    >;
121
122    /// Queries for account balance by address
123    fn balances(
124        &mut self,
125        address_index: AddressIndex,
126        asset_id: Option<asset::Id>,
127    ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, Amount)>>> + Send + 'static>>;
128
129    /// Queries for a specific note by commitment, returning immediately if it is not found.
130    fn note_by_commitment(
131        &mut self,
132        note_commitment: note::StateCommitment,
133    ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>>;
134
135    /// Queries for a specific swap by commitment, returning immediately if it is not found.
136    fn swap_by_commitment(
137        &mut self,
138        swap_commitment: penumbra_sdk_tct::StateCommitment,
139    ) -> Pin<Box<dyn Future<Output = Result<SwapRecord>> + Send + 'static>>;
140
141    /// Queries for a specific nullifier's status, returning immediately if it is not found.
142    fn nullifier_status(
143        &mut self,
144        nullifier: Nullifier,
145    ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'static>>;
146
147    /// Waits for a specific nullifier to be detected, returning immediately if it is already
148    /// present, but waiting otherwise.
149    fn await_nullifier(
150        &mut self,
151        nullifier: Nullifier,
152    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
153
154    /// Queries for a specific note by commitment, waiting until the note is detected if it is not found.
155    ///
156    /// This is useful for waiting for a note to be detected by the view service.
157    fn await_note_by_commitment(
158        &mut self,
159        note_commitment: note::StateCommitment,
160    ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>>;
161
162    /// Returns authentication paths for the given note commitments.
163    ///
164    /// This method takes a batch of input commitments, rather than just one, so
165    /// that the client can get a consistent set of authentication paths to a
166    /// common root.  (Otherwise, if a client made multiple requests, the wallet
167    /// service could have advanced the state commitment tree state between queries).
168    fn witness(
169        &mut self,
170        plan: &TransactionPlan,
171    ) -> Pin<Box<dyn Future<Output = Result<WitnessData>> + Send + 'static>>;
172
173    /// Returns a transaction built from the provided TransactionPlan and AuthorizationData
174    fn witness_and_build(
175        &mut self,
176        plan: TransactionPlan,
177        auth_data: AuthorizationData,
178    ) -> Pin<Box<dyn Future<Output = Result<Transaction>> + Send + 'static>>;
179
180    /// Queries for all known assets.
181    fn assets(&mut self) -> Pin<Box<dyn Future<Output = Result<asset::Cache>> + Send + 'static>>;
182
183    /// Queries for liquidity positions owned by the full viewing key.
184    fn owned_position_ids(
185        &mut self,
186        position_state: Option<position::State>,
187        trading_pair: Option<TradingPair>,
188        subaccount: Option<AddressIndex>,
189    ) -> Pin<Box<dyn Future<Output = Result<Vec<position::Id>>> + Send + 'static>>;
190
191    /// Generates a full perspective for a selected transaction using a full viewing key
192    fn transaction_info_by_hash(
193        &mut self,
194        id: TransactionId,
195    ) -> Pin<Box<dyn Future<Output = Result<TransactionInfo>> + Send + 'static>>;
196
197    /// Queries for transactions in a range of block heights
198    fn transaction_info(
199        &mut self,
200        start_height: Option<u64>,
201        end_height: Option<u64>,
202    ) -> Pin<Box<dyn Future<Output = Result<Vec<TransactionInfo>>> + Send + 'static>>;
203
204    fn broadcast_transaction(
205        &mut self,
206        transaction: Transaction,
207        await_detection: bool,
208    ) -> BroadcastStatusStream;
209
210    /// Return unspent notes, grouped by address index and then by asset id.
211    #[instrument(skip(self))]
212    fn unspent_notes_by_address_and_asset(
213        &mut self,
214    ) -> Pin<
215        Box<
216            dyn Future<
217                    Output = Result<
218                        BTreeMap<AddressIndex, BTreeMap<asset::Id, Vec<SpendableNoteRecord>>>,
219                    >,
220                > + Send
221                + 'static,
222        >,
223    > {
224        let notes = self.notes(pb::NotesRequest {
225            include_spent: false,
226            ..Default::default()
227        });
228        async move {
229            let notes = notes.await?;
230            tracing::trace!(?notes);
231
232            let mut notes_by_address_and_asset = BTreeMap::new();
233
234            for note_record in notes {
235                notes_by_address_and_asset
236                    .entry(note_record.address_index)
237                    .or_insert_with(BTreeMap::new)
238                    .entry(note_record.note.asset_id())
239                    .or_insert_with(Vec::new)
240                    .push(note_record);
241            }
242            tracing::trace!(?notes_by_address_and_asset);
243
244            Ok(notes_by_address_and_asset)
245        }
246        .boxed()
247    }
248
249    /// Return unspent notes, grouped by account ID (combining ephemeral addresses for the account) and then by asset id.
250    #[instrument(skip(self))]
251    fn unspent_notes_by_account_and_asset(
252        &mut self,
253    ) -> Pin<
254        Box<
255            dyn Future<
256                    Output = Result<BTreeMap<u32, BTreeMap<asset::Id, Vec<SpendableNoteRecord>>>>,
257                > + Send
258                + 'static,
259        >,
260    > {
261        let notes = self.notes(pb::NotesRequest {
262            include_spent: false,
263            ..Default::default()
264        });
265        async move {
266            let notes = notes.await?;
267            tracing::trace!(?notes);
268
269            let mut notes_by_account_and_asset = BTreeMap::new();
270
271            for note_record in notes {
272                notes_by_account_and_asset
273                    .entry(note_record.address_index.account)
274                    .or_insert_with(BTreeMap::new)
275                    .entry(note_record.note.asset_id())
276                    .or_insert_with(Vec::new)
277                    .push(note_record);
278            }
279            tracing::trace!(?notes_by_account_and_asset);
280
281            Ok(notes_by_account_and_asset)
282        }
283        .boxed()
284    }
285
286    /// Return unspent notes, grouped by denom and then by address index.
287    #[instrument(skip(self))]
288    fn unspent_notes_by_asset_and_address(
289        &mut self,
290    ) -> Pin<
291        Box<
292            dyn Future<
293                    Output = Result<
294                        BTreeMap<asset::Id, BTreeMap<AddressIndex, Vec<SpendableNoteRecord>>>,
295                    >,
296                > + Send
297                + 'static,
298        >,
299    > {
300        let notes = self.notes(pb::NotesRequest {
301            include_spent: false,
302            ..Default::default()
303        });
304
305        async move {
306            let notes = notes.await?;
307            tracing::trace!(?notes);
308
309            let mut notes_by_asset_and_address = BTreeMap::new();
310
311            for note_record in notes {
312                notes_by_asset_and_address
313                    .entry(note_record.note.asset_id())
314                    .or_insert_with(BTreeMap::new)
315                    .entry(note_record.address_index)
316                    .or_insert_with(Vec::new)
317                    .push(note_record);
318            }
319            tracing::trace!(?notes_by_asset_and_address);
320
321            Ok(notes_by_asset_and_address)
322        }
323        .boxed()
324    }
325
326    fn address_by_index(
327        &mut self,
328        address_index: AddressIndex,
329    ) -> Pin<Box<dyn Future<Output = Result<Address>> + Send + 'static>>;
330
331    /// Queries for the index of a provided address, returning `None` if not
332    /// controlled by the view service's FVK.
333    fn index_by_address(
334        &mut self,
335        address: Address,
336    ) -> Pin<Box<dyn Future<Output = Result<Option<AddressIndex>>> + Send + 'static>>;
337
338    /// Queries for unclaimed Swaps.
339    fn unclaimed_swaps(
340        &mut self,
341    ) -> Pin<Box<dyn Future<Output = Result<Vec<SwapRecord>>> + Send + 'static>>;
342
343    /// Get all of the notes that can be used for voting
344    fn lqt_voting_notes(
345        &mut self,
346        epoch: u64,
347        filter: Option<AddressIndex>,
348    ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>>;
349}
350
351// We need to tell `async_trait` not to add a `Send` bound to the boxed
352// futures it generates, because the underlying `CustodyProtocolClient` isn't `Sync`,
353// but its `authorize` method takes `&mut self`. This would normally cause a huge
354// amount of problems, because non-`Send` futures don't compose well, but as long
355// as we're calling the method within an async block on a local mutable variable,
356// it should be fine.
357impl<T> ViewClient for ViewServiceClient<T>
358where
359    T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + 'static,
360    T::ResponseBody: tonic::codegen::Body<Data = Bytes> + Send + 'static,
361    T::Error: Into<tonic::codegen::StdError>,
362    T::Future: Send + 'static,
363    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
364{
365    fn status(
366        &mut self,
367    ) -> Pin<Box<dyn Future<Output = Result<pb::StatusResponse>> + Send + 'static>> {
368        let mut self2 = self.clone();
369        async move {
370            let status = self2.status(tonic::Request::new(pb::StatusRequest {}));
371            let status = status.await?.into_inner();
372            Ok(status)
373        }
374        .boxed()
375    }
376
377    fn status_stream(
378        &mut self,
379    ) -> Pin<
380        Box<
381            dyn Future<
382                    Output = Result<
383                        Pin<Box<dyn Stream<Item = Result<StatusStreamResponse>> + Send + 'static>>,
384                    >,
385                > + Send
386                + 'static,
387        >,
388    > {
389        let mut self2 = self.clone();
390        async move {
391            let stream = self2.status_stream(tonic::Request::new(pb::StatusStreamRequest {}));
392            let stream = stream.await?.into_inner();
393
394            Ok(stream
395                .map_err(|e| anyhow::anyhow!("view service error: {}", e))
396                .and_then(|msg| async move { StatusStreamResponse::try_from(msg) })
397                .boxed())
398        }
399        .boxed()
400    }
401
402    fn app_params(
403        &mut self,
404    ) -> Pin<Box<dyn Future<Output = Result<AppParameters>> + Send + 'static>> {
405        let mut self2 = self.clone();
406        async move {
407            // We have to manually invoke the method on the type, because it has the
408            // same name as the one we're implementing.
409            let rsp = ViewServiceClient::app_parameters(
410                &mut self2,
411                tonic::Request::new(pb::AppParametersRequest {}),
412            );
413            rsp.await?.into_inner().try_into()
414        }
415        .boxed()
416    }
417
418    fn gas_prices(&mut self) -> Pin<Box<dyn Future<Output = Result<GasPrices>> + Send + 'static>> {
419        let mut self2 = self.clone();
420        async move {
421            // We have to manually invoke the method on the type, because it has the
422            // same name as the one we're implementing.
423            let rsp = ViewServiceClient::gas_prices(
424                &mut self2,
425                tonic::Request::new(pb::GasPricesRequest {}),
426            );
427            rsp.await?
428                .into_inner()
429                .gas_prices
430                .ok_or_else(|| anyhow::anyhow!("empty GasPricesResponse message"))?
431                .try_into()
432        }
433        .boxed()
434    }
435
436    fn fmd_parameters(
437        &mut self,
438    ) -> Pin<Box<dyn Future<Output = Result<fmd::Parameters>> + Send + 'static>> {
439        let mut self2 = self.clone();
440        async move {
441            let parameters = ViewServiceClient::fmd_parameters(
442                &mut self2,
443                tonic::Request::new(pb::FmdParametersRequest {}),
444            );
445            let parameters = parameters.await?.into_inner().parameters;
446
447            parameters
448                .ok_or_else(|| anyhow::anyhow!("empty FmdParametersRequest message"))?
449                .try_into()
450        }
451        .boxed()
452    }
453
454    fn notes(
455        &mut self,
456        request: pb::NotesRequest,
457    ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>> {
458        let mut self2 = self.clone();
459        async move {
460            let req = self2.notes(tonic::Request::new(request));
461            let pb_notes: Vec<_> = req.await?.into_inner().try_collect().await?;
462
463            pb_notes
464                .into_iter()
465                .map(|note_rsp| {
466                    let note_record = note_rsp
467                        .note_record
468                        .ok_or_else(|| anyhow::anyhow!("empty NotesResponse message"));
469
470                    match note_record {
471                        Ok(note) => note.try_into(),
472                        Err(e) => Err(e),
473                    }
474                })
475                .collect()
476        }
477        .boxed()
478    }
479
480    fn notes_for_voting(
481        &mut self,
482        request: pb::NotesForVotingRequest,
483    ) -> Pin<
484        Box<dyn Future<Output = Result<Vec<(SpendableNoteRecord, IdentityKey)>>> + Send + 'static>,
485    > {
486        let mut self2 = self.clone();
487        async move {
488            let req = self2.notes_for_voting(tonic::Request::new(request));
489            let pb_notes: Vec<_> = req.await?.into_inner().try_collect().await?;
490
491            pb_notes
492                .into_iter()
493                .map(|note_rsp| {
494                    let note_record = note_rsp
495                        .note_record
496                        .ok_or_else(|| anyhow::anyhow!("empty NotesForVotingResponse message"))?
497                        .try_into()?;
498
499                    let identity_key = note_rsp
500                        .identity_key
501                        .ok_or_else(|| anyhow::anyhow!("empty NotesForVotingResponse message"))?
502                        .try_into()?;
503
504                    Ok((note_record, identity_key))
505                })
506                .collect()
507        }
508        .boxed()
509    }
510
511    fn note_by_commitment(
512        &mut self,
513        note_commitment: note::StateCommitment,
514    ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>> {
515        let mut self2 = self.clone();
516        async move {
517            let note_commitment_response = ViewServiceClient::note_by_commitment(
518                &mut self2,
519                tonic::Request::new(pb::NoteByCommitmentRequest {
520                    note_commitment: Some(note_commitment.into()),
521                    await_detection: false,
522                }),
523            );
524            let note_commitment_response = note_commitment_response.await?.into_inner();
525
526            note_commitment_response
527                .spendable_note
528                .ok_or_else(|| anyhow::anyhow!("empty NoteByCommitmentResponse message"))?
529                .try_into()
530        }
531        .boxed()
532    }
533
534    fn balances(
535        &mut self,
536        address_index: AddressIndex,
537        asset_id: Option<asset::Id>,
538    ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, Amount)>>> + Send + 'static>> {
539        let mut self2 = self.clone();
540        async move {
541            let req = ViewServiceClient::balances(
542                &mut self2,
543                tonic::Request::new(pb::BalancesRequest {
544                    account_filter: Some(address_index.into()),
545                    asset_id_filter: asset_id.map(Into::into),
546                }),
547            );
548
549            let balances: Vec<BalancesResponse> = req.await?.into_inner().try_collect().await?;
550
551            balances
552                .into_iter()
553                .map(|rsp| {
554                    let pb_value_view = rsp
555                        .balance_view
556                        .ok_or_else(|| anyhow::anyhow!("empty balance view"))?;
557
558                    let value_view: ValueView = pb_value_view.try_into()?;
559                    let id = value_view.asset_id();
560                    let amount = value_view.value().amount;
561                    Ok((id, amount))
562                })
563                .collect()
564        }
565        .boxed()
566    }
567
568    fn swap_by_commitment(
569        &mut self,
570        swap_commitment: penumbra_sdk_tct::StateCommitment,
571    ) -> Pin<Box<dyn Future<Output = Result<SwapRecord>> + Send + 'static>> {
572        let mut self2 = self.clone();
573        async move {
574            let swap_commitment_response = ViewServiceClient::swap_by_commitment(
575                &mut self2,
576                tonic::Request::new(pb::SwapByCommitmentRequest {
577                    swap_commitment: Some(swap_commitment.into()),
578                    await_detection: false,
579                }),
580            );
581            let swap_commitment_response = swap_commitment_response.await?.into_inner();
582
583            swap_commitment_response
584                .swap
585                .ok_or_else(|| anyhow::anyhow!("empty SwapByCommitmentResponse message"))?
586                .try_into()
587        }
588        .boxed()
589    }
590
591    /// Queries for a specific note by commitment, waiting until the note is detected if it is not found.
592    ///
593    /// This is useful for waiting for a note to be detected by the view service.
594    fn await_note_by_commitment(
595        &mut self,
596        note_commitment: note::StateCommitment,
597    ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>> {
598        let mut self2 = self.clone();
599        async move {
600            let spendable_note = ViewServiceClient::note_by_commitment(
601                &mut self2,
602                tonic::Request::new(pb::NoteByCommitmentRequest {
603                    note_commitment: Some(note_commitment.into()),
604                    await_detection: true,
605                }),
606            );
607            let spendable_note = spendable_note.await?.into_inner().spendable_note;
608
609            spendable_note
610                .ok_or_else(|| anyhow::anyhow!("empty NoteByCommitmentRequest message"))?
611                .try_into()
612        }
613        .boxed()
614    }
615
616    /// Queries for a specific nullifier's status, returning immediately if it is not found.
617    fn nullifier_status(
618        &mut self,
619        nullifier: Nullifier,
620    ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'static>> {
621        let mut self2 = self.clone();
622        async move {
623            let rsp = ViewServiceClient::nullifier_status(
624                &mut self2,
625                tonic::Request::new(pb::NullifierStatusRequest {
626                    nullifier: Some(nullifier.into()),
627                    await_detection: false,
628                }),
629            );
630            Ok(rsp.await?.into_inner().spent)
631        }
632        .boxed()
633    }
634
635    /// Waits for a specific nullifier to be detected, returning immediately if it is already
636    /// present, but waiting otherwise.
637    fn await_nullifier(
638        &mut self,
639        nullifier: Nullifier,
640    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
641        let mut self2 = self.clone();
642        async move {
643            let rsp = ViewServiceClient::nullifier_status(
644                &mut self2,
645                tonic::Request::new(pb::NullifierStatusRequest {
646                    nullifier: Some(nullifier.into()),
647                    await_detection: true,
648                }),
649            );
650            rsp.await?;
651            Ok(())
652        }
653        .boxed()
654    }
655
656    fn witness(
657        &mut self,
658        plan: &TransactionPlan,
659    ) -> Pin<Box<dyn Future<Output = Result<WitnessData>> + Send + 'static>> {
660        let request = WitnessRequest {
661            transaction_plan: Some(plan.clone().into()),
662        };
663
664        let mut self2 = self.clone();
665        async move {
666            let rsp = self2.witness(tonic::Request::new(request));
667
668            let witness_data = rsp
669                .await?
670                .into_inner()
671                .witness_data
672                .ok_or_else(|| anyhow::anyhow!("empty WitnessResponse message"))?
673                .try_into()?;
674
675            Ok(witness_data)
676        }
677        .boxed()
678    }
679
680    fn assets(&mut self) -> Pin<Box<dyn Future<Output = Result<asset::Cache>> + Send + 'static>> {
681        let mut self2 = self.clone();
682        async move {
683            // We have to manually invoke the method on the type, because it has the
684            // same name as the one we're implementing.
685            let rsp = ViewServiceClient::assets(
686                &mut self2,
687                tonic::Request::new(pb::AssetsRequest {
688                    ..Default::default()
689                }),
690            );
691
692            let pb_assets: Vec<_> = rsp.await?.into_inner().try_collect().await?;
693
694            let assets = pb_assets
695                .into_iter()
696                .map(Metadata::try_from)
697                .collect::<anyhow::Result<Vec<Metadata>>>()?;
698
699            Ok(assets.into_iter().collect())
700        }
701        .boxed()
702    }
703
704    fn owned_position_ids(
705        &mut self,
706        position_state: Option<position::State>,
707        trading_pair: Option<TradingPair>,
708        subaccount: Option<AddressIndex>,
709    ) -> Pin<Box<dyn Future<Output = Result<Vec<position::Id>>> + Send + 'static>> {
710        // should the return be streamed here? none of the other viewclient responses are, probably fine for now
711        // but might be an issue eventually
712        let mut self2 = self.clone();
713        async move {
714            // We have to manually invoke the method on the type, because it has the
715            // same name as the one we're implementing.
716            let rsp = ViewServiceClient::owned_position_ids(
717                &mut self2,
718                tonic::Request::new(pb::OwnedPositionIdsRequest {
719                    trading_pair: trading_pair.map(Into::into),
720                    position_state: position_state.map(Into::into),
721                    subaccount: subaccount.map(Into::into),
722                }),
723            );
724
725            let pb_position_ids: Vec<_> = rsp.await?.into_inner().try_collect().await?;
726
727            let position_ids = pb_position_ids
728                .into_iter()
729                .map(|p| {
730                    position::Id::try_from(p.position_id.ok_or_else(|| {
731                        anyhow::anyhow!("empty OwnedPositionsIdsResponse message")
732                    })?)
733                })
734                .collect::<anyhow::Result<Vec<position::Id>>>()?;
735
736            Ok(position_ids)
737        }
738        .boxed()
739    }
740
741    fn transaction_info_by_hash(
742        &mut self,
743        id: TransactionId,
744    ) -> Pin<Box<dyn Future<Output = Result<TransactionInfo>> + Send + 'static>> {
745        let mut self2 = self.clone();
746        async move {
747            let rsp = ViewServiceClient::transaction_info_by_hash(
748                &mut self2,
749                tonic::Request::new(pb::TransactionInfoByHashRequest {
750                    id: Some(id.into()),
751                }),
752            )
753            .await?
754            .into_inner()
755            .tx_info
756            .ok_or_else(|| anyhow::anyhow!("empty TransactionInfoByHashResponse message"))?;
757
758            // Check some assumptions about response structure
759            if rsp.height == 0 {
760                anyhow::bail!("missing height");
761            }
762
763            let tx_info = TransactionInfo {
764                height: rsp.height,
765                id: rsp
766                    .id
767                    .ok_or_else(|| anyhow::anyhow!("missing id"))?
768                    .try_into()?,
769                transaction: rsp
770                    .transaction
771                    .ok_or_else(|| anyhow::anyhow!("missing transaction"))?
772                    .try_into()?,
773                perspective: rsp
774                    .perspective
775                    .ok_or_else(|| anyhow::anyhow!("missing perspective"))?
776                    .try_into()?,
777                view: rsp
778                    .view
779                    .ok_or_else(|| anyhow::anyhow!("missing view"))?
780                    .try_into()?,
781                summary: rsp
782                    .summary
783                    .ok_or_else(|| anyhow::anyhow!("missing summary"))?
784                    .try_into()?,
785            };
786
787            Ok(tx_info)
788        }
789        .boxed()
790    }
791
792    fn transaction_info(
793        &mut self,
794        start_height: Option<u64>,
795        end_height: Option<u64>,
796    ) -> Pin<Box<dyn Future<Output = Result<Vec<TransactionInfo>>> + Send + 'static>> {
797        let mut self2 = self.clone();
798        async move {
799            // Unpack optional block heights
800            let start_h = if let Some(h) = start_height { h } else { 0 };
801
802            let end_h = if let Some(h) = end_height { h } else { 0 };
803
804            let rsp = self2.transaction_info(tonic::Request::new(pb::TransactionInfoRequest {
805                start_height: start_h,
806                end_height: end_h,
807            }));
808            let pb_txs: Vec<_> = rsp.await?.into_inner().try_collect().await?;
809
810            pb_txs
811                .into_iter()
812                .map(|rsp| {
813                    let tx_rsp = rsp
814                        .tx_info
815                        .ok_or_else(|| anyhow::anyhow!("empty TransactionInfoResponse message"))?;
816
817                    // Confirm height is populated
818                    if tx_rsp.height == 0 {
819                        anyhow::bail!("missing height");
820                    }
821
822                    let tx_info = TransactionInfo {
823                        height: tx_rsp.height,
824                        transaction: tx_rsp
825                            .transaction
826                            .ok_or_else(|| {
827                                anyhow::anyhow!("empty TransactionInfoResponse message")
828                            })?
829                            .try_into()?,
830                        id: tx_rsp
831                            .id
832                            .ok_or_else(|| anyhow::anyhow!("missing id"))?
833                            .try_into()?,
834                        perspective: tx_rsp
835                            .perspective
836                            .ok_or_else(|| anyhow::anyhow!("missing perspective"))?
837                            .try_into()?,
838                        view: tx_rsp
839                            .view
840                            .ok_or_else(|| anyhow::anyhow!("missing view"))?
841                            .try_into()?,
842                        summary: tx_rsp
843                            .summary
844                            .ok_or_else(|| anyhow::anyhow!("missing summary"))?
845                            .try_into()?,
846                    };
847
848                    Ok(tx_info)
849                })
850                .collect()
851        }
852        .boxed()
853    }
854
855    fn broadcast_transaction(
856        &mut self,
857        transaction: Transaction,
858        await_detection: bool,
859    ) -> BroadcastStatusStream {
860        let mut self2 = self.clone();
861        async move {
862            let rsp = ViewServiceClient::broadcast_transaction(
863                &mut self2,
864                tonic::Request::new(pb::BroadcastTransactionRequest {
865                    transaction: Some(transaction.into()),
866                    await_detection,
867                }),
868            )
869            .await?
870            .into_inner();
871
872            Ok(rsp)
873        }
874        .boxed()
875    }
876
877    fn address_by_index(
878        &mut self,
879        address_index: AddressIndex,
880    ) -> Pin<Box<dyn Future<Output = Result<Address>> + Send + 'static>> {
881        let mut self2 = self.clone();
882        async move {
883            let address = self2.address_by_index(tonic::Request::new(pb::AddressByIndexRequest {
884                address_index: Some(address_index.into()),
885            }));
886            let address = address
887                .await?
888                .into_inner()
889                .address
890                .ok_or_else(|| anyhow::anyhow!("No address available for this address index"))?
891                .try_into()?;
892            Ok(address)
893        }
894        .boxed()
895    }
896
897    fn index_by_address(
898        &mut self,
899        address: Address,
900    ) -> Pin<Box<dyn Future<Output = Result<Option<AddressIndex>>> + Send + 'static>> {
901        let mut self2 = self.clone();
902        async move {
903            let index = self2.index_by_address(tonic::Request::new(pb::IndexByAddressRequest {
904                address: Some(address.into()),
905            }));
906            let index = index
907                .await?
908                .into_inner()
909                .address_index
910                .map(|index| index.try_into())
911                .transpose()?;
912            Ok(index)
913        }
914        .boxed()
915    }
916
917    fn witness_and_build(
918        &mut self,
919        transaction_plan: TransactionPlan,
920        authorization_data: AuthorizationData,
921    ) -> Pin<Box<dyn Future<Output = Result<Transaction>> + Send + 'static>> {
922        let request = pb::WitnessAndBuildRequest {
923            transaction_plan: Some(transaction_plan.into()),
924            authorization_data: Some(authorization_data.into()),
925        };
926        let mut self2 = self.clone();
927        async move {
928            let mut rsp = self2
929                .witness_and_build(tonic::Request::new(request))
930                .await?
931                .into_inner();
932
933            while let Some(rsp) = rsp.try_next().await? {
934                match rsp.status {
935                    Some(status) => match status {
936                        pb::witness_and_build_response::Status::BuildProgress(_) => {
937                            // TODO: should update progress here
938                        }
939                        pb::witness_and_build_response::Status::Complete(c) => {
940                            return c.transaction
941                                .ok_or_else(|| {
942                                    anyhow::anyhow!("WitnessAndBuildResponse complete status message missing transaction")
943                                })?
944                                .try_into();
945                        }
946                    },
947                    None => {
948                        // No status is unexpected behavior
949                        return Err(anyhow::anyhow!(
950                            "empty WitnessAndBuildResponse message"
951                        ));
952                    }
953                }
954            }
955
956            Err(anyhow::anyhow!("should have received complete status or error"))
957        }
958            .boxed()
959    }
960
961    fn unclaimed_swaps(
962        &mut self,
963    ) -> Pin<Box<dyn Future<Output = Result<Vec<SwapRecord>>> + Send + 'static>> {
964        let mut self2 = self.clone();
965        async move {
966            let swaps_response = ViewServiceClient::unclaimed_swaps(
967                &mut self2,
968                tonic::Request::new(pb::UnclaimedSwapsRequest {}),
969            );
970            let pb_swaps: Vec<_> = swaps_response.await?.into_inner().try_collect().await?;
971
972            pb_swaps
973                .into_iter()
974                .map(|swap_rsp| {
975                    let swap_record = swap_rsp
976                        .swap
977                        .ok_or_else(|| anyhow::anyhow!("empty UnclaimedSwapsResponse message"));
978
979                    match swap_record {
980                        Ok(swap) => swap.try_into(),
981                        Err(e) => Err(e),
982                    }
983                })
984                .collect()
985        }
986        .boxed()
987    }
988
989    fn auctions(
990        &mut self,
991        account_filter: Option<AddressIndex>,
992        include_inactive: bool,
993        query_latest_state: bool,
994    ) -> Pin<
995        Box<
996            dyn Future<
997                    Output = Result<
998                        Vec<(
999                            AuctionId,
1000                            SpendableNoteRecord,
1001                            u64,
1002                            Option<Any>,
1003                            Vec<Position>,
1004                        )>,
1005                    >,
1006                > + Send
1007                + 'static,
1008        >,
1009    > {
1010        let mut client = self.clone();
1011        async move {
1012            let request = tonic::Request::new(pb::AuctionsRequest {
1013                account_filter: account_filter.map(Into::into),
1014                include_inactive,
1015                query_latest_state,
1016                auction_ids_filter: Vec::new(), // TODO: Support `auction_ids_filter`
1017            });
1018
1019            let auctions: Vec<pb::AuctionsResponse> =
1020                ViewServiceClient::auctions(&mut client, request)
1021                    .await?
1022                    .into_inner()
1023                    .try_collect()
1024                    .await?;
1025
1026            let resp: Vec<(
1027                AuctionId,
1028                SpendableNoteRecord,
1029                u64,
1030                Option<Any>,
1031                Vec<Position>,
1032            )> = auctions
1033                .into_iter()
1034                .map(|auction_rsp| {
1035                    let pb_id = auction_rsp
1036                        .id
1037                        .ok_or_else(|| anyhow::anyhow!("missing auction id"))?;
1038                    let auction_id: AuctionId = pb_id.try_into()?;
1039                    let snr: SpendableNoteRecord = auction_rsp
1040                        .note_record
1041                        .ok_or_else(|| anyhow::anyhow!("missing SNR from auction response"))?
1042                        .try_into()?;
1043
1044                    let local_seq = auction_rsp.local_seq;
1045
1046                    let auction = auction_rsp.auction;
1047                    let lps: Vec<Position> = auction_rsp
1048                        .positions
1049                        .into_iter()
1050                        .map(TryInto::try_into)
1051                        .collect::<Result<Vec<_>>>()?;
1052
1053                    Ok::<
1054                        (
1055                            AuctionId,
1056                            SpendableNoteRecord,
1057                            u64, /* the local sequence number */
1058                            Option<Any>, /* the auction state if it was requested */
1059                            Vec<Position>, /* associated liquidity positions if we queried the latest state */
1060                        ),
1061                        anyhow::Error,
1062                    >((auction_id, snr, local_seq, auction, lps))
1063                })
1064                .filter_map(|res| res.ok()) // TODO: scrap this later.
1065                .collect();
1066
1067            Ok(resp)
1068        }
1069        .boxed()
1070    }
1071
1072    fn lqt_voting_notes(
1073        &mut self,
1074        epoch: u64,
1075        filter: Option<AddressIndex>,
1076    ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>> {
1077        let mut client = self.clone();
1078        async move {
1079            let request = tonic::Request::new(pb::LqtVotingNotesRequest {
1080                epoch_index: epoch,
1081                account_filter: filter.map(|x| x.into()),
1082            });
1083            let response = client.lqt_voting_notes(request).await?;
1084            let pb_notes: Vec<pb::LqtVotingNotesResponse> =
1085                response.into_inner().try_collect().await?;
1086
1087            pb_notes
1088                .into_iter()
1089                .map(|note_rsp| {
1090                    let note_record = note_rsp
1091                        .note_record
1092                        .ok_or_else(|| anyhow::anyhow!("empty LqtVotingNotesResponse message"))?
1093                        .try_into()?;
1094                    Ok(note_record)
1095                })
1096                .collect()
1097        }
1098        .boxed()
1099    }
1100}