1use std::{collections::BTreeMap, future::Future, pin::Pin};
2
3use anyhow::Result;
4use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
5use pbjson_types::Any;
6use penumbra_sdk_auction::auction::AuctionId;
7use tonic::{codegen::Bytes, Streaming};
8use tracing::instrument;
9
10use penumbra_sdk_app::params::AppParameters;
11use penumbra_sdk_asset::{
12 asset::{self, Id, Metadata},
13 ValueView,
14};
15use penumbra_sdk_dex::{
16 lp::position::{self, Position},
17 TradingPair,
18};
19use penumbra_sdk_fee::GasPrices;
20use penumbra_sdk_keys::{keys::AddressIndex, Address};
21use penumbra_sdk_num::Amount;
22use penumbra_sdk_proto::view::v1::{
23 self as pb, view_service_client::ViewServiceClient, BalancesResponse,
24 BroadcastTransactionResponse, WitnessRequest,
25};
26use penumbra_sdk_sct::Nullifier;
27use penumbra_sdk_shielded_pool::{fmd, note};
28use penumbra_sdk_stake::IdentityKey;
29use penumbra_sdk_transaction::{
30 txhash::TransactionId, AuthorizationData, Transaction, TransactionPlan, WitnessData,
31};
32
33use crate::{SpendableNoteRecord, StatusStreamResponse, SwapRecord, TransactionInfo};
34
35pub(crate) type BroadcastStatusStream = Pin<
36 Box<dyn Future<Output = Result<Streaming<BroadcastTransactionResponse>, anyhow::Error>> + Send>,
37>;
38
39#[allow(clippy::type_complexity)]
52pub trait ViewClient {
53 fn auctions(
55 &mut self,
56 account_filter: Option<AddressIndex>,
57 include_inactive: bool,
58 query_latest_state: bool,
59 ) -> Pin<
60 Box<
61 dyn Future<
62 Output = Result<
63 Vec<(
64 AuctionId,
65 SpendableNoteRecord,
66 u64,
67 Option<Any>,
68 Vec<Position>,
69 )>,
70 >,
71 > + Send
72 + 'static,
73 >,
74 >;
75
76 fn status(
78 &mut self,
79 ) -> Pin<Box<dyn Future<Output = Result<pb::StatusResponse>> + Send + 'static>>;
80
81 fn status_stream(
83 &mut self,
84 ) -> Pin<
85 Box<
86 dyn Future<
87 Output = Result<
88 Pin<Box<dyn Stream<Item = Result<StatusStreamResponse>> + Send + 'static>>,
89 >,
90 > + Send
91 + 'static,
92 >,
93 >;
94
95 fn app_params(
97 &mut self,
98 ) -> Pin<Box<dyn Future<Output = Result<AppParameters>> + Send + 'static>>;
99
100 fn gas_prices(&mut self) -> Pin<Box<dyn Future<Output = Result<GasPrices>> + Send + 'static>>;
102
103 fn fmd_parameters(
105 &mut self,
106 ) -> Pin<Box<dyn Future<Output = Result<fmd::Parameters>> + Send + 'static>>;
107
108 fn notes(
110 &mut self,
111 request: pb::NotesRequest,
112 ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>>;
113
114 fn notes_for_voting(
116 &mut self,
117 request: pb::NotesForVotingRequest,
118 ) -> Pin<
119 Box<dyn Future<Output = Result<Vec<(SpendableNoteRecord, IdentityKey)>>> + Send + 'static>,
120 >;
121
122 fn balances(
124 &mut self,
125 address_index: AddressIndex,
126 asset_id: Option<asset::Id>,
127 ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, Amount)>>> + Send + 'static>>;
128
129 fn note_by_commitment(
131 &mut self,
132 note_commitment: note::StateCommitment,
133 ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>>;
134
135 fn swap_by_commitment(
137 &mut self,
138 swap_commitment: penumbra_sdk_tct::StateCommitment,
139 ) -> Pin<Box<dyn Future<Output = Result<SwapRecord>> + Send + 'static>>;
140
141 fn nullifier_status(
143 &mut self,
144 nullifier: Nullifier,
145 ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'static>>;
146
147 fn await_nullifier(
150 &mut self,
151 nullifier: Nullifier,
152 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
153
154 fn await_note_by_commitment(
158 &mut self,
159 note_commitment: note::StateCommitment,
160 ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>>;
161
162 fn witness(
169 &mut self,
170 plan: &TransactionPlan,
171 ) -> Pin<Box<dyn Future<Output = Result<WitnessData>> + Send + 'static>>;
172
173 fn witness_and_build(
175 &mut self,
176 plan: TransactionPlan,
177 auth_data: AuthorizationData,
178 ) -> Pin<Box<dyn Future<Output = Result<Transaction>> + Send + 'static>>;
179
180 fn assets(&mut self) -> Pin<Box<dyn Future<Output = Result<asset::Cache>> + Send + 'static>>;
182
183 fn owned_position_ids(
185 &mut self,
186 position_state: Option<position::State>,
187 trading_pair: Option<TradingPair>,
188 ) -> Pin<Box<dyn Future<Output = Result<Vec<position::Id>>> + Send + 'static>>;
189
190 fn transaction_info_by_hash(
192 &mut self,
193 id: TransactionId,
194 ) -> Pin<Box<dyn Future<Output = Result<TransactionInfo>> + Send + 'static>>;
195
196 fn transaction_info(
198 &mut self,
199 start_height: Option<u64>,
200 end_height: Option<u64>,
201 ) -> Pin<Box<dyn Future<Output = Result<Vec<TransactionInfo>>> + Send + 'static>>;
202
203 fn broadcast_transaction(
204 &mut self,
205 transaction: Transaction,
206 await_detection: bool,
207 ) -> BroadcastStatusStream;
208
209 #[instrument(skip(self))]
211 fn unspent_notes_by_address_and_asset(
212 &mut self,
213 ) -> Pin<
214 Box<
215 dyn Future<
216 Output = Result<
217 BTreeMap<AddressIndex, BTreeMap<asset::Id, Vec<SpendableNoteRecord>>>,
218 >,
219 > + Send
220 + 'static,
221 >,
222 > {
223 let notes = self.notes(pb::NotesRequest {
224 include_spent: false,
225 ..Default::default()
226 });
227 async move {
228 let notes = notes.await?;
229 tracing::trace!(?notes);
230
231 let mut notes_by_address_and_asset = BTreeMap::new();
232
233 for note_record in notes {
234 notes_by_address_and_asset
235 .entry(note_record.address_index)
236 .or_insert_with(BTreeMap::new)
237 .entry(note_record.note.asset_id())
238 .or_insert_with(Vec::new)
239 .push(note_record);
240 }
241 tracing::trace!(?notes_by_address_and_asset);
242
243 Ok(notes_by_address_and_asset)
244 }
245 .boxed()
246 }
247
248 #[instrument(skip(self))]
250 fn unspent_notes_by_account_and_asset(
251 &mut self,
252 ) -> Pin<
253 Box<
254 dyn Future<
255 Output = Result<BTreeMap<u32, BTreeMap<asset::Id, Vec<SpendableNoteRecord>>>>,
256 > + Send
257 + 'static,
258 >,
259 > {
260 let notes = self.notes(pb::NotesRequest {
261 include_spent: false,
262 ..Default::default()
263 });
264 async move {
265 let notes = notes.await?;
266 tracing::trace!(?notes);
267
268 let mut notes_by_account_and_asset = BTreeMap::new();
269
270 for note_record in notes {
271 notes_by_account_and_asset
272 .entry(note_record.address_index.account)
273 .or_insert_with(BTreeMap::new)
274 .entry(note_record.note.asset_id())
275 .or_insert_with(Vec::new)
276 .push(note_record);
277 }
278 tracing::trace!(?notes_by_account_and_asset);
279
280 Ok(notes_by_account_and_asset)
281 }
282 .boxed()
283 }
284
285 #[instrument(skip(self))]
287 fn unspent_notes_by_asset_and_address(
288 &mut self,
289 ) -> Pin<
290 Box<
291 dyn Future<
292 Output = Result<
293 BTreeMap<asset::Id, BTreeMap<AddressIndex, Vec<SpendableNoteRecord>>>,
294 >,
295 > + Send
296 + 'static,
297 >,
298 > {
299 let notes = self.notes(pb::NotesRequest {
300 include_spent: false,
301 ..Default::default()
302 });
303
304 async move {
305 let notes = notes.await?;
306 tracing::trace!(?notes);
307
308 let mut notes_by_asset_and_address = BTreeMap::new();
309
310 for note_record in notes {
311 notes_by_asset_and_address
312 .entry(note_record.note.asset_id())
313 .or_insert_with(BTreeMap::new)
314 .entry(note_record.address_index)
315 .or_insert_with(Vec::new)
316 .push(note_record);
317 }
318 tracing::trace!(?notes_by_asset_and_address);
319
320 Ok(notes_by_asset_and_address)
321 }
322 .boxed()
323 }
324
325 fn address_by_index(
326 &mut self,
327 address_index: AddressIndex,
328 ) -> Pin<Box<dyn Future<Output = Result<Address>> + Send + 'static>>;
329
330 fn index_by_address(
333 &mut self,
334 address: Address,
335 ) -> Pin<Box<dyn Future<Output = Result<Option<AddressIndex>>> + Send + 'static>>;
336
337 fn unclaimed_swaps(
339 &mut self,
340 ) -> Pin<Box<dyn Future<Output = Result<Vec<SwapRecord>>> + Send + 'static>>;
341}
342
343impl<T> ViewClient for ViewServiceClient<T>
350where
351 T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + 'static,
352 T::ResponseBody: tonic::codegen::Body<Data = Bytes> + Send + 'static,
353 T::Error: Into<tonic::codegen::StdError>,
354 T::Future: Send + 'static,
355 <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
356{
357 fn status(
358 &mut self,
359 ) -> Pin<Box<dyn Future<Output = Result<pb::StatusResponse>> + Send + 'static>> {
360 let mut self2 = self.clone();
361 async move {
362 let status = self2.status(tonic::Request::new(pb::StatusRequest {}));
363 let status = status.await?.into_inner();
364 Ok(status)
365 }
366 .boxed()
367 }
368
369 fn status_stream(
370 &mut self,
371 ) -> Pin<
372 Box<
373 dyn Future<
374 Output = Result<
375 Pin<Box<dyn Stream<Item = Result<StatusStreamResponse>> + Send + 'static>>,
376 >,
377 > + Send
378 + 'static,
379 >,
380 > {
381 let mut self2 = self.clone();
382 async move {
383 let stream = self2.status_stream(tonic::Request::new(pb::StatusStreamRequest {}));
384 let stream = stream.await?.into_inner();
385
386 Ok(stream
387 .map_err(|e| anyhow::anyhow!("view service error: {}", e))
388 .and_then(|msg| async move { StatusStreamResponse::try_from(msg) })
389 .boxed())
390 }
391 .boxed()
392 }
393
394 fn app_params(
395 &mut self,
396 ) -> Pin<Box<dyn Future<Output = Result<AppParameters>> + Send + 'static>> {
397 let mut self2 = self.clone();
398 async move {
399 let rsp = ViewServiceClient::app_parameters(
402 &mut self2,
403 tonic::Request::new(pb::AppParametersRequest {}),
404 );
405 rsp.await?.into_inner().try_into()
406 }
407 .boxed()
408 }
409
410 fn gas_prices(&mut self) -> Pin<Box<dyn Future<Output = Result<GasPrices>> + Send + 'static>> {
411 let mut self2 = self.clone();
412 async move {
413 let rsp = ViewServiceClient::gas_prices(
416 &mut self2,
417 tonic::Request::new(pb::GasPricesRequest {}),
418 );
419 rsp.await?
420 .into_inner()
421 .gas_prices
422 .ok_or_else(|| anyhow::anyhow!("empty GasPricesResponse message"))?
423 .try_into()
424 }
425 .boxed()
426 }
427
428 fn fmd_parameters(
429 &mut self,
430 ) -> Pin<Box<dyn Future<Output = Result<fmd::Parameters>> + Send + 'static>> {
431 let mut self2 = self.clone();
432 async move {
433 let parameters = ViewServiceClient::fmd_parameters(
434 &mut self2,
435 tonic::Request::new(pb::FmdParametersRequest {}),
436 );
437 let parameters = parameters.await?.into_inner().parameters;
438
439 parameters
440 .ok_or_else(|| anyhow::anyhow!("empty FmdParametersRequest message"))?
441 .try_into()
442 }
443 .boxed()
444 }
445
446 fn notes(
447 &mut self,
448 request: pb::NotesRequest,
449 ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>> {
450 let mut self2 = self.clone();
451 async move {
452 let req = self2.notes(tonic::Request::new(request));
453 let pb_notes: Vec<_> = req.await?.into_inner().try_collect().await?;
454
455 pb_notes
456 .into_iter()
457 .map(|note_rsp| {
458 let note_record = note_rsp
459 .note_record
460 .ok_or_else(|| anyhow::anyhow!("empty NotesResponse message"));
461
462 match note_record {
463 Ok(note) => note.try_into(),
464 Err(e) => Err(e),
465 }
466 })
467 .collect()
468 }
469 .boxed()
470 }
471
472 fn notes_for_voting(
473 &mut self,
474 request: pb::NotesForVotingRequest,
475 ) -> Pin<
476 Box<dyn Future<Output = Result<Vec<(SpendableNoteRecord, IdentityKey)>>> + Send + 'static>,
477 > {
478 let mut self2 = self.clone();
479 async move {
480 let req = self2.notes_for_voting(tonic::Request::new(request));
481 let pb_notes: Vec<_> = req.await?.into_inner().try_collect().await?;
482
483 pb_notes
484 .into_iter()
485 .map(|note_rsp| {
486 let note_record = note_rsp
487 .note_record
488 .ok_or_else(|| anyhow::anyhow!("empty NotesForVotingResponse message"))?
489 .try_into()?;
490
491 let identity_key = note_rsp
492 .identity_key
493 .ok_or_else(|| anyhow::anyhow!("empty NotesForVotingResponse message"))?
494 .try_into()?;
495
496 Ok((note_record, identity_key))
497 })
498 .collect()
499 }
500 .boxed()
501 }
502
503 fn note_by_commitment(
504 &mut self,
505 note_commitment: note::StateCommitment,
506 ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>> {
507 let mut self2 = self.clone();
508 async move {
509 let note_commitment_response = ViewServiceClient::note_by_commitment(
510 &mut self2,
511 tonic::Request::new(pb::NoteByCommitmentRequest {
512 note_commitment: Some(note_commitment.into()),
513 await_detection: false,
514 }),
515 );
516 let note_commitment_response = note_commitment_response.await?.into_inner();
517
518 note_commitment_response
519 .spendable_note
520 .ok_or_else(|| anyhow::anyhow!("empty NoteByCommitmentResponse message"))?
521 .try_into()
522 }
523 .boxed()
524 }
525
526 fn balances(
527 &mut self,
528 address_index: AddressIndex,
529 asset_id: Option<asset::Id>,
530 ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, Amount)>>> + Send + 'static>> {
531 let mut self2 = self.clone();
532 async move {
533 let req = ViewServiceClient::balances(
534 &mut self2,
535 tonic::Request::new(pb::BalancesRequest {
536 account_filter: Some(address_index.into()),
537 asset_id_filter: asset_id.map(Into::into),
538 }),
539 );
540
541 let balances: Vec<BalancesResponse> = req.await?.into_inner().try_collect().await?;
542
543 balances
544 .into_iter()
545 .map(|rsp| {
546 let pb_value_view = rsp
547 .balance_view
548 .ok_or_else(|| anyhow::anyhow!("empty balance view"))?;
549
550 let value_view: ValueView = pb_value_view.try_into()?;
551 let id = value_view.asset_id();
552 let amount = value_view.value().amount;
553 Ok((id, amount))
554 })
555 .collect()
556 }
557 .boxed()
558 }
559
560 fn swap_by_commitment(
561 &mut self,
562 swap_commitment: penumbra_sdk_tct::StateCommitment,
563 ) -> Pin<Box<dyn Future<Output = Result<SwapRecord>> + Send + 'static>> {
564 let mut self2 = self.clone();
565 async move {
566 let swap_commitment_response = ViewServiceClient::swap_by_commitment(
567 &mut self2,
568 tonic::Request::new(pb::SwapByCommitmentRequest {
569 swap_commitment: Some(swap_commitment.into()),
570 await_detection: false,
571 }),
572 );
573 let swap_commitment_response = swap_commitment_response.await?.into_inner();
574
575 swap_commitment_response
576 .swap
577 .ok_or_else(|| anyhow::anyhow!("empty SwapByCommitmentResponse message"))?
578 .try_into()
579 }
580 .boxed()
581 }
582
583 fn await_note_by_commitment(
587 &mut self,
588 note_commitment: note::StateCommitment,
589 ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>> {
590 let mut self2 = self.clone();
591 async move {
592 let spendable_note = ViewServiceClient::note_by_commitment(
593 &mut self2,
594 tonic::Request::new(pb::NoteByCommitmentRequest {
595 note_commitment: Some(note_commitment.into()),
596 await_detection: true,
597 }),
598 );
599 let spendable_note = spendable_note.await?.into_inner().spendable_note;
600
601 spendable_note
602 .ok_or_else(|| anyhow::anyhow!("empty NoteByCommitmentRequest message"))?
603 .try_into()
604 }
605 .boxed()
606 }
607
608 fn nullifier_status(
610 &mut self,
611 nullifier: Nullifier,
612 ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'static>> {
613 let mut self2 = self.clone();
614 async move {
615 let rsp = ViewServiceClient::nullifier_status(
616 &mut self2,
617 tonic::Request::new(pb::NullifierStatusRequest {
618 nullifier: Some(nullifier.into()),
619 await_detection: false,
620 }),
621 );
622 Ok(rsp.await?.into_inner().spent)
623 }
624 .boxed()
625 }
626
627 fn await_nullifier(
630 &mut self,
631 nullifier: Nullifier,
632 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
633 let mut self2 = self.clone();
634 async move {
635 let rsp = ViewServiceClient::nullifier_status(
636 &mut self2,
637 tonic::Request::new(pb::NullifierStatusRequest {
638 nullifier: Some(nullifier.into()),
639 await_detection: true,
640 }),
641 );
642 rsp.await?;
643 Ok(())
644 }
645 .boxed()
646 }
647
648 fn witness(
649 &mut self,
650 plan: &TransactionPlan,
651 ) -> Pin<Box<dyn Future<Output = Result<WitnessData>> + Send + 'static>> {
652 let request = WitnessRequest {
653 transaction_plan: Some(plan.clone().into()),
654 };
655
656 let mut self2 = self.clone();
657 async move {
658 let rsp = self2.witness(tonic::Request::new(request));
659
660 let witness_data = rsp
661 .await?
662 .into_inner()
663 .witness_data
664 .ok_or_else(|| anyhow::anyhow!("empty WitnessResponse message"))?
665 .try_into()?;
666
667 Ok(witness_data)
668 }
669 .boxed()
670 }
671
672 fn assets(&mut self) -> Pin<Box<dyn Future<Output = Result<asset::Cache>> + Send + 'static>> {
673 let mut self2 = self.clone();
674 async move {
675 let rsp = ViewServiceClient::assets(
678 &mut self2,
679 tonic::Request::new(pb::AssetsRequest {
680 ..Default::default()
681 }),
682 );
683
684 let pb_assets: Vec<_> = rsp.await?.into_inner().try_collect().await?;
685
686 let assets = pb_assets
687 .into_iter()
688 .map(Metadata::try_from)
689 .collect::<anyhow::Result<Vec<Metadata>>>()?;
690
691 Ok(assets.into_iter().collect())
692 }
693 .boxed()
694 }
695
696 fn owned_position_ids(
697 &mut self,
698 position_state: Option<position::State>,
699 trading_pair: Option<TradingPair>,
700 ) -> Pin<Box<dyn Future<Output = Result<Vec<position::Id>>> + Send + 'static>> {
701 let mut self2 = self.clone();
704 async move {
705 let rsp = ViewServiceClient::owned_position_ids(
708 &mut self2,
709 tonic::Request::new(pb::OwnedPositionIdsRequest {
710 trading_pair: trading_pair.map(TryInto::try_into).transpose()?,
711 position_state: position_state.map(TryInto::try_into).transpose()?,
712 subaccount: None,
713 }),
714 );
715
716 let pb_position_ids: Vec<_> = rsp.await?.into_inner().try_collect().await?;
717
718 let position_ids = pb_position_ids
719 .into_iter()
720 .map(|p| {
721 position::Id::try_from(p.position_id.ok_or_else(|| {
722 anyhow::anyhow!("empty OwnedPositionsIdsResponse message")
723 })?)
724 })
725 .collect::<anyhow::Result<Vec<position::Id>>>()?;
726
727 Ok(position_ids)
728 }
729 .boxed()
730 }
731
732 fn transaction_info_by_hash(
733 &mut self,
734 id: TransactionId,
735 ) -> Pin<Box<dyn Future<Output = Result<TransactionInfo>> + Send + 'static>> {
736 let mut self2 = self.clone();
737 async move {
738 let rsp = ViewServiceClient::transaction_info_by_hash(
739 &mut self2,
740 tonic::Request::new(pb::TransactionInfoByHashRequest {
741 id: Some(id.into()),
742 }),
743 )
744 .await?
745 .into_inner()
746 .tx_info
747 .ok_or_else(|| anyhow::anyhow!("empty TransactionInfoByHashResponse message"))?;
748
749 if rsp.height == 0 {
751 anyhow::bail!("missing height");
752 }
753
754 let tx_info = TransactionInfo {
755 height: rsp.height,
756 id: rsp
757 .id
758 .ok_or_else(|| anyhow::anyhow!("missing id"))?
759 .try_into()?,
760 transaction: rsp
761 .transaction
762 .ok_or_else(|| anyhow::anyhow!("missing transaction"))?
763 .try_into()?,
764 perspective: rsp
765 .perspective
766 .ok_or_else(|| anyhow::anyhow!("missing perspective"))?
767 .try_into()?,
768 view: rsp
769 .view
770 .ok_or_else(|| anyhow::anyhow!("missing view"))?
771 .try_into()?,
772 summary: rsp
773 .summary
774 .ok_or_else(|| anyhow::anyhow!("missing summary"))?
775 .try_into()?,
776 };
777
778 Ok(tx_info)
779 }
780 .boxed()
781 }
782
783 fn transaction_info(
784 &mut self,
785 start_height: Option<u64>,
786 end_height: Option<u64>,
787 ) -> Pin<Box<dyn Future<Output = Result<Vec<TransactionInfo>>> + Send + 'static>> {
788 let mut self2 = self.clone();
789 async move {
790 let start_h = if let Some(h) = start_height { h } else { 0 };
792
793 let end_h = if let Some(h) = end_height { h } else { 0 };
794
795 let rsp = self2.transaction_info(tonic::Request::new(pb::TransactionInfoRequest {
796 start_height: start_h,
797 end_height: end_h,
798 }));
799 let pb_txs: Vec<_> = rsp.await?.into_inner().try_collect().await?;
800
801 pb_txs
802 .into_iter()
803 .map(|rsp| {
804 let tx_rsp = rsp
805 .tx_info
806 .ok_or_else(|| anyhow::anyhow!("empty TransactionInfoResponse message"))?;
807
808 if tx_rsp.height == 0 {
810 anyhow::bail!("missing height");
811 }
812
813 let tx_info = TransactionInfo {
814 height: tx_rsp.height,
815 transaction: tx_rsp
816 .transaction
817 .ok_or_else(|| {
818 anyhow::anyhow!("empty TransactionInfoResponse message")
819 })?
820 .try_into()?,
821 id: tx_rsp
822 .id
823 .ok_or_else(|| anyhow::anyhow!("missing id"))?
824 .try_into()?,
825 perspective: tx_rsp
826 .perspective
827 .ok_or_else(|| anyhow::anyhow!("missing perspective"))?
828 .try_into()?,
829 view: tx_rsp
830 .view
831 .ok_or_else(|| anyhow::anyhow!("missing view"))?
832 .try_into()?,
833 summary: tx_rsp
834 .summary
835 .ok_or_else(|| anyhow::anyhow!("missing summary"))?
836 .try_into()?,
837 };
838
839 Ok(tx_info)
840 })
841 .collect()
842 }
843 .boxed()
844 }
845
846 fn broadcast_transaction(
847 &mut self,
848 transaction: Transaction,
849 await_detection: bool,
850 ) -> BroadcastStatusStream {
851 let mut self2 = self.clone();
852 async move {
853 let rsp = ViewServiceClient::broadcast_transaction(
854 &mut self2,
855 tonic::Request::new(pb::BroadcastTransactionRequest {
856 transaction: Some(transaction.into()),
857 await_detection,
858 }),
859 )
860 .await?
861 .into_inner();
862
863 Ok(rsp)
864 }
865 .boxed()
866 }
867
868 fn address_by_index(
869 &mut self,
870 address_index: AddressIndex,
871 ) -> Pin<Box<dyn Future<Output = Result<Address>> + Send + 'static>> {
872 let mut self2 = self.clone();
873 async move {
874 let address = self2.address_by_index(tonic::Request::new(pb::AddressByIndexRequest {
875 address_index: Some(address_index.into()),
876 }));
877 let address = address
878 .await?
879 .into_inner()
880 .address
881 .ok_or_else(|| anyhow::anyhow!("No address available for this address index"))?
882 .try_into()?;
883 Ok(address)
884 }
885 .boxed()
886 }
887
888 fn index_by_address(
889 &mut self,
890 address: Address,
891 ) -> Pin<Box<dyn Future<Output = Result<Option<AddressIndex>>> + Send + 'static>> {
892 let mut self2 = self.clone();
893 async move {
894 let index = self2.index_by_address(tonic::Request::new(pb::IndexByAddressRequest {
895 address: Some(address.into()),
896 }));
897 let index = index
898 .await?
899 .into_inner()
900 .address_index
901 .map(|index| index.try_into())
902 .transpose()?;
903 Ok(index)
904 }
905 .boxed()
906 }
907
908 fn witness_and_build(
909 &mut self,
910 transaction_plan: TransactionPlan,
911 authorization_data: AuthorizationData,
912 ) -> Pin<Box<dyn Future<Output = Result<Transaction>> + Send + 'static>> {
913 let request = pb::WitnessAndBuildRequest {
914 transaction_plan: Some(transaction_plan.into()),
915 authorization_data: Some(authorization_data.into()),
916 };
917 let mut self2 = self.clone();
918 async move {
919 let mut rsp = self2
920 .witness_and_build(tonic::Request::new(request))
921 .await?
922 .into_inner();
923
924 while let Some(rsp) = rsp.try_next().await? {
925 match rsp.status {
926 Some(status) => match status {
927 pb::witness_and_build_response::Status::BuildProgress(_) => {
928 }
930 pb::witness_and_build_response::Status::Complete(c) => {
931 return c.transaction
932 .ok_or_else(|| {
933 anyhow::anyhow!("WitnessAndBuildResponse complete status message missing transaction")
934 })?
935 .try_into();
936 }
937 },
938 None => {
939 return Err(anyhow::anyhow!(
941 "empty WitnessAndBuildResponse message"
942 ));
943 }
944 }
945 }
946
947 Err(anyhow::anyhow!("should have received complete status or error"))
948 }
949 .boxed()
950 }
951
952 fn unclaimed_swaps(
953 &mut self,
954 ) -> Pin<Box<dyn Future<Output = Result<Vec<SwapRecord>>> + Send + 'static>> {
955 let mut self2 = self.clone();
956 async move {
957 let swaps_response = ViewServiceClient::unclaimed_swaps(
958 &mut self2,
959 tonic::Request::new(pb::UnclaimedSwapsRequest {}),
960 );
961 let pb_swaps: Vec<_> = swaps_response.await?.into_inner().try_collect().await?;
962
963 pb_swaps
964 .into_iter()
965 .map(|swap_rsp| {
966 let swap_record = swap_rsp
967 .swap
968 .ok_or_else(|| anyhow::anyhow!("empty UnclaimedSwapsResponse message"));
969
970 match swap_record {
971 Ok(swap) => swap.try_into(),
972 Err(e) => Err(e),
973 }
974 })
975 .collect()
976 }
977 .boxed()
978 }
979
980 fn auctions(
981 &mut self,
982 account_filter: Option<AddressIndex>,
983 include_inactive: bool,
984 query_latest_state: bool,
985 ) -> Pin<
986 Box<
987 dyn Future<
988 Output = Result<
989 Vec<(
990 AuctionId,
991 SpendableNoteRecord,
992 u64,
993 Option<Any>,
994 Vec<Position>,
995 )>,
996 >,
997 > + Send
998 + 'static,
999 >,
1000 > {
1001 let mut client = self.clone();
1002 async move {
1003 let request = tonic::Request::new(pb::AuctionsRequest {
1004 account_filter: account_filter.map(Into::into),
1005 include_inactive,
1006 query_latest_state,
1007 auction_ids_filter: Vec::new(), });
1009
1010 let auctions: Vec<pb::AuctionsResponse> =
1011 ViewServiceClient::auctions(&mut client, request)
1012 .await?
1013 .into_inner()
1014 .try_collect()
1015 .await?;
1016
1017 let resp: Vec<(
1018 AuctionId,
1019 SpendableNoteRecord,
1020 u64,
1021 Option<Any>,
1022 Vec<Position>,
1023 )> = auctions
1024 .into_iter()
1025 .map(|auction_rsp| {
1026 let pb_id = auction_rsp
1027 .id
1028 .ok_or_else(|| anyhow::anyhow!("missing auction id"))?;
1029 let auction_id: AuctionId = pb_id.try_into()?;
1030 let snr: SpendableNoteRecord = auction_rsp
1031 .note_record
1032 .ok_or_else(|| anyhow::anyhow!("missing SNR from auction response"))?
1033 .try_into()?;
1034
1035 let local_seq = auction_rsp.local_seq;
1036
1037 let auction = auction_rsp.auction;
1038 let lps: Vec<Position> = auction_rsp
1039 .positions
1040 .into_iter()
1041 .map(TryInto::try_into)
1042 .collect::<Result<Vec<_>>>()?;
1043
1044 Ok::<
1045 (
1046 AuctionId,
1047 SpendableNoteRecord,
1048 u64, Option<Any>, Vec<Position>, ),
1052 anyhow::Error,
1053 >((auction_id, snr, local_seq, auction, lps))
1054 })
1055 .filter_map(|res| res.ok()) .collect();
1057
1058 Ok(resp)
1059 }
1060 .boxed()
1061 }
1062}