1use std::{collections::BTreeMap, future::Future, pin::Pin};
2
3use anyhow::Result;
4use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
5use pbjson_types::Any;
6use penumbra_sdk_auction::auction::AuctionId;
7use tonic::{codegen::Bytes, Streaming};
8use tracing::instrument;
9
10use penumbra_sdk_app::params::AppParameters;
11use penumbra_sdk_asset::{
12 asset::{self, Id, Metadata},
13 ValueView,
14};
15use penumbra_sdk_dex::{
16 lp::position::{self, Position},
17 TradingPair,
18};
19use penumbra_sdk_fee::GasPrices;
20use penumbra_sdk_keys::{keys::AddressIndex, Address};
21use penumbra_sdk_num::Amount;
22use penumbra_sdk_proto::view::v1::{
23 self as pb, view_service_client::ViewServiceClient, BalancesResponse,
24 BroadcastTransactionResponse, WitnessRequest,
25};
26use penumbra_sdk_sct::Nullifier;
27use penumbra_sdk_shielded_pool::{fmd, note};
28use penumbra_sdk_stake::IdentityKey;
29use penumbra_sdk_transaction::{
30 txhash::TransactionId, AuthorizationData, Transaction, TransactionPlan, WitnessData,
31};
32
33use crate::{SpendableNoteRecord, StatusStreamResponse, SwapRecord, TransactionInfo};
34
35pub(crate) type BroadcastStatusStream = Pin<
36 Box<dyn Future<Output = Result<Streaming<BroadcastTransactionResponse>, anyhow::Error>> + Send>,
37>;
38
39#[allow(clippy::type_complexity)]
52pub trait ViewClient {
53 fn auctions(
55 &mut self,
56 account_filter: Option<AddressIndex>,
57 include_inactive: bool,
58 query_latest_state: bool,
59 ) -> Pin<
60 Box<
61 dyn Future<
62 Output = Result<
63 Vec<(
64 AuctionId,
65 SpendableNoteRecord,
66 u64,
67 Option<Any>,
68 Vec<Position>,
69 )>,
70 >,
71 > + Send
72 + 'static,
73 >,
74 >;
75
76 fn status(
78 &mut self,
79 ) -> Pin<Box<dyn Future<Output = Result<pb::StatusResponse>> + Send + 'static>>;
80
81 fn status_stream(
83 &mut self,
84 ) -> Pin<
85 Box<
86 dyn Future<
87 Output = Result<
88 Pin<Box<dyn Stream<Item = Result<StatusStreamResponse>> + Send + 'static>>,
89 >,
90 > + Send
91 + 'static,
92 >,
93 >;
94
95 fn app_params(
97 &mut self,
98 ) -> Pin<Box<dyn Future<Output = Result<AppParameters>> + Send + 'static>>;
99
100 fn gas_prices(&mut self) -> Pin<Box<dyn Future<Output = Result<GasPrices>> + Send + 'static>>;
102
103 fn fmd_parameters(
105 &mut self,
106 ) -> Pin<Box<dyn Future<Output = Result<fmd::Parameters>> + Send + 'static>>;
107
108 fn notes(
110 &mut self,
111 request: pb::NotesRequest,
112 ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>>;
113
114 fn notes_for_voting(
116 &mut self,
117 request: pb::NotesForVotingRequest,
118 ) -> Pin<
119 Box<dyn Future<Output = Result<Vec<(SpendableNoteRecord, IdentityKey)>>> + Send + 'static>,
120 >;
121
122 fn balances(
124 &mut self,
125 address_index: AddressIndex,
126 asset_id: Option<asset::Id>,
127 ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, Amount)>>> + Send + 'static>>;
128
129 fn note_by_commitment(
131 &mut self,
132 note_commitment: note::StateCommitment,
133 ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>>;
134
135 fn swap_by_commitment(
137 &mut self,
138 swap_commitment: penumbra_sdk_tct::StateCommitment,
139 ) -> Pin<Box<dyn Future<Output = Result<SwapRecord>> + Send + 'static>>;
140
141 fn nullifier_status(
143 &mut self,
144 nullifier: Nullifier,
145 ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'static>>;
146
147 fn await_nullifier(
150 &mut self,
151 nullifier: Nullifier,
152 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
153
154 fn await_note_by_commitment(
158 &mut self,
159 note_commitment: note::StateCommitment,
160 ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>>;
161
162 fn witness(
169 &mut self,
170 plan: &TransactionPlan,
171 ) -> Pin<Box<dyn Future<Output = Result<WitnessData>> + Send + 'static>>;
172
173 fn witness_and_build(
175 &mut self,
176 plan: TransactionPlan,
177 auth_data: AuthorizationData,
178 ) -> Pin<Box<dyn Future<Output = Result<Transaction>> + Send + 'static>>;
179
180 fn assets(&mut self) -> Pin<Box<dyn Future<Output = Result<asset::Cache>> + Send + 'static>>;
182
183 fn owned_position_ids(
185 &mut self,
186 position_state: Option<position::State>,
187 trading_pair: Option<TradingPair>,
188 ) -> Pin<Box<dyn Future<Output = Result<Vec<position::Id>>> + Send + 'static>>;
189
190 fn transaction_info_by_hash(
192 &mut self,
193 id: TransactionId,
194 ) -> Pin<Box<dyn Future<Output = Result<TransactionInfo>> + Send + 'static>>;
195
196 fn transaction_info(
198 &mut self,
199 start_height: Option<u64>,
200 end_height: Option<u64>,
201 ) -> Pin<Box<dyn Future<Output = Result<Vec<TransactionInfo>>> + Send + 'static>>;
202
203 fn broadcast_transaction(
204 &mut self,
205 transaction: Transaction,
206 await_detection: bool,
207 ) -> BroadcastStatusStream;
208
209 #[instrument(skip(self))]
211 fn unspent_notes_by_address_and_asset(
212 &mut self,
213 ) -> Pin<
214 Box<
215 dyn Future<
216 Output = Result<
217 BTreeMap<AddressIndex, BTreeMap<asset::Id, Vec<SpendableNoteRecord>>>,
218 >,
219 > + Send
220 + 'static,
221 >,
222 > {
223 let notes = self.notes(pb::NotesRequest {
224 include_spent: false,
225 ..Default::default()
226 });
227 async move {
228 let notes = notes.await?;
229 tracing::trace!(?notes);
230
231 let mut notes_by_address_and_asset = BTreeMap::new();
232
233 for note_record in notes {
234 notes_by_address_and_asset
235 .entry(note_record.address_index)
236 .or_insert_with(BTreeMap::new)
237 .entry(note_record.note.asset_id())
238 .or_insert_with(Vec::new)
239 .push(note_record);
240 }
241 tracing::trace!(?notes_by_address_and_asset);
242
243 Ok(notes_by_address_and_asset)
244 }
245 .boxed()
246 }
247
248 #[instrument(skip(self))]
250 fn unspent_notes_by_account_and_asset(
251 &mut self,
252 ) -> Pin<
253 Box<
254 dyn Future<
255 Output = Result<BTreeMap<u32, BTreeMap<asset::Id, Vec<SpendableNoteRecord>>>>,
256 > + Send
257 + 'static,
258 >,
259 > {
260 let notes = self.notes(pb::NotesRequest {
261 include_spent: false,
262 ..Default::default()
263 });
264 async move {
265 let notes = notes.await?;
266 tracing::trace!(?notes);
267
268 let mut notes_by_account_and_asset = BTreeMap::new();
269
270 for note_record in notes {
271 notes_by_account_and_asset
272 .entry(note_record.address_index.account)
273 .or_insert_with(BTreeMap::new)
274 .entry(note_record.note.asset_id())
275 .or_insert_with(Vec::new)
276 .push(note_record);
277 }
278 tracing::trace!(?notes_by_account_and_asset);
279
280 Ok(notes_by_account_and_asset)
281 }
282 .boxed()
283 }
284
285 #[instrument(skip(self))]
287 fn unspent_notes_by_asset_and_address(
288 &mut self,
289 ) -> Pin<
290 Box<
291 dyn Future<
292 Output = Result<
293 BTreeMap<asset::Id, BTreeMap<AddressIndex, Vec<SpendableNoteRecord>>>,
294 >,
295 > + Send
296 + 'static,
297 >,
298 > {
299 let notes = self.notes(pb::NotesRequest {
300 include_spent: false,
301 ..Default::default()
302 });
303
304 async move {
305 let notes = notes.await?;
306 tracing::trace!(?notes);
307
308 let mut notes_by_asset_and_address = BTreeMap::new();
309
310 for note_record in notes {
311 notes_by_asset_and_address
312 .entry(note_record.note.asset_id())
313 .or_insert_with(BTreeMap::new)
314 .entry(note_record.address_index)
315 .or_insert_with(Vec::new)
316 .push(note_record);
317 }
318 tracing::trace!(?notes_by_asset_and_address);
319
320 Ok(notes_by_asset_and_address)
321 }
322 .boxed()
323 }
324
325 fn address_by_index(
326 &mut self,
327 address_index: AddressIndex,
328 ) -> Pin<Box<dyn Future<Output = Result<Address>> + Send + 'static>>;
329
330 fn index_by_address(
333 &mut self,
334 address: Address,
335 ) -> Pin<Box<dyn Future<Output = Result<Option<AddressIndex>>> + Send + 'static>>;
336
337 fn unclaimed_swaps(
339 &mut self,
340 ) -> Pin<Box<dyn Future<Output = Result<Vec<SwapRecord>>> + Send + 'static>>;
341
342 fn lqt_voting_notes(
344 &mut self,
345 epoch: u64,
346 filter: Option<AddressIndex>,
347 ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>>;
348}
349
350impl<T> ViewClient for ViewServiceClient<T>
357where
358 T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + 'static,
359 T::ResponseBody: tonic::codegen::Body<Data = Bytes> + Send + 'static,
360 T::Error: Into<tonic::codegen::StdError>,
361 T::Future: Send + 'static,
362 <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
363{
364 fn status(
365 &mut self,
366 ) -> Pin<Box<dyn Future<Output = Result<pb::StatusResponse>> + Send + 'static>> {
367 let mut self2 = self.clone();
368 async move {
369 let status = self2.status(tonic::Request::new(pb::StatusRequest {}));
370 let status = status.await?.into_inner();
371 Ok(status)
372 }
373 .boxed()
374 }
375
376 fn status_stream(
377 &mut self,
378 ) -> Pin<
379 Box<
380 dyn Future<
381 Output = Result<
382 Pin<Box<dyn Stream<Item = Result<StatusStreamResponse>> + Send + 'static>>,
383 >,
384 > + Send
385 + 'static,
386 >,
387 > {
388 let mut self2 = self.clone();
389 async move {
390 let stream = self2.status_stream(tonic::Request::new(pb::StatusStreamRequest {}));
391 let stream = stream.await?.into_inner();
392
393 Ok(stream
394 .map_err(|e| anyhow::anyhow!("view service error: {}", e))
395 .and_then(|msg| async move { StatusStreamResponse::try_from(msg) })
396 .boxed())
397 }
398 .boxed()
399 }
400
401 fn app_params(
402 &mut self,
403 ) -> Pin<Box<dyn Future<Output = Result<AppParameters>> + Send + 'static>> {
404 let mut self2 = self.clone();
405 async move {
406 let rsp = ViewServiceClient::app_parameters(
409 &mut self2,
410 tonic::Request::new(pb::AppParametersRequest {}),
411 );
412 rsp.await?.into_inner().try_into()
413 }
414 .boxed()
415 }
416
417 fn gas_prices(&mut self) -> Pin<Box<dyn Future<Output = Result<GasPrices>> + Send + 'static>> {
418 let mut self2 = self.clone();
419 async move {
420 let rsp = ViewServiceClient::gas_prices(
423 &mut self2,
424 tonic::Request::new(pb::GasPricesRequest {}),
425 );
426 rsp.await?
427 .into_inner()
428 .gas_prices
429 .ok_or_else(|| anyhow::anyhow!("empty GasPricesResponse message"))?
430 .try_into()
431 }
432 .boxed()
433 }
434
435 fn fmd_parameters(
436 &mut self,
437 ) -> Pin<Box<dyn Future<Output = Result<fmd::Parameters>> + Send + 'static>> {
438 let mut self2 = self.clone();
439 async move {
440 let parameters = ViewServiceClient::fmd_parameters(
441 &mut self2,
442 tonic::Request::new(pb::FmdParametersRequest {}),
443 );
444 let parameters = parameters.await?.into_inner().parameters;
445
446 parameters
447 .ok_or_else(|| anyhow::anyhow!("empty FmdParametersRequest message"))?
448 .try_into()
449 }
450 .boxed()
451 }
452
453 fn notes(
454 &mut self,
455 request: pb::NotesRequest,
456 ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>> {
457 let mut self2 = self.clone();
458 async move {
459 let req = self2.notes(tonic::Request::new(request));
460 let pb_notes: Vec<_> = req.await?.into_inner().try_collect().await?;
461
462 pb_notes
463 .into_iter()
464 .map(|note_rsp| {
465 let note_record = note_rsp
466 .note_record
467 .ok_or_else(|| anyhow::anyhow!("empty NotesResponse message"));
468
469 match note_record {
470 Ok(note) => note.try_into(),
471 Err(e) => Err(e),
472 }
473 })
474 .collect()
475 }
476 .boxed()
477 }
478
479 fn notes_for_voting(
480 &mut self,
481 request: pb::NotesForVotingRequest,
482 ) -> Pin<
483 Box<dyn Future<Output = Result<Vec<(SpendableNoteRecord, IdentityKey)>>> + Send + 'static>,
484 > {
485 let mut self2 = self.clone();
486 async move {
487 let req = self2.notes_for_voting(tonic::Request::new(request));
488 let pb_notes: Vec<_> = req.await?.into_inner().try_collect().await?;
489
490 pb_notes
491 .into_iter()
492 .map(|note_rsp| {
493 let note_record = note_rsp
494 .note_record
495 .ok_or_else(|| anyhow::anyhow!("empty NotesForVotingResponse message"))?
496 .try_into()?;
497
498 let identity_key = note_rsp
499 .identity_key
500 .ok_or_else(|| anyhow::anyhow!("empty NotesForVotingResponse message"))?
501 .try_into()?;
502
503 Ok((note_record, identity_key))
504 })
505 .collect()
506 }
507 .boxed()
508 }
509
510 fn note_by_commitment(
511 &mut self,
512 note_commitment: note::StateCommitment,
513 ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>> {
514 let mut self2 = self.clone();
515 async move {
516 let note_commitment_response = ViewServiceClient::note_by_commitment(
517 &mut self2,
518 tonic::Request::new(pb::NoteByCommitmentRequest {
519 note_commitment: Some(note_commitment.into()),
520 await_detection: false,
521 }),
522 );
523 let note_commitment_response = note_commitment_response.await?.into_inner();
524
525 note_commitment_response
526 .spendable_note
527 .ok_or_else(|| anyhow::anyhow!("empty NoteByCommitmentResponse message"))?
528 .try_into()
529 }
530 .boxed()
531 }
532
533 fn balances(
534 &mut self,
535 address_index: AddressIndex,
536 asset_id: Option<asset::Id>,
537 ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, Amount)>>> + Send + 'static>> {
538 let mut self2 = self.clone();
539 async move {
540 let req = ViewServiceClient::balances(
541 &mut self2,
542 tonic::Request::new(pb::BalancesRequest {
543 account_filter: Some(address_index.into()),
544 asset_id_filter: asset_id.map(Into::into),
545 }),
546 );
547
548 let balances: Vec<BalancesResponse> = req.await?.into_inner().try_collect().await?;
549
550 balances
551 .into_iter()
552 .map(|rsp| {
553 let pb_value_view = rsp
554 .balance_view
555 .ok_or_else(|| anyhow::anyhow!("empty balance view"))?;
556
557 let value_view: ValueView = pb_value_view.try_into()?;
558 let id = value_view.asset_id();
559 let amount = value_view.value().amount;
560 Ok((id, amount))
561 })
562 .collect()
563 }
564 .boxed()
565 }
566
567 fn swap_by_commitment(
568 &mut self,
569 swap_commitment: penumbra_sdk_tct::StateCommitment,
570 ) -> Pin<Box<dyn Future<Output = Result<SwapRecord>> + Send + 'static>> {
571 let mut self2 = self.clone();
572 async move {
573 let swap_commitment_response = ViewServiceClient::swap_by_commitment(
574 &mut self2,
575 tonic::Request::new(pb::SwapByCommitmentRequest {
576 swap_commitment: Some(swap_commitment.into()),
577 await_detection: false,
578 }),
579 );
580 let swap_commitment_response = swap_commitment_response.await?.into_inner();
581
582 swap_commitment_response
583 .swap
584 .ok_or_else(|| anyhow::anyhow!("empty SwapByCommitmentResponse message"))?
585 .try_into()
586 }
587 .boxed()
588 }
589
590 fn await_note_by_commitment(
594 &mut self,
595 note_commitment: note::StateCommitment,
596 ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>> {
597 let mut self2 = self.clone();
598 async move {
599 let spendable_note = ViewServiceClient::note_by_commitment(
600 &mut self2,
601 tonic::Request::new(pb::NoteByCommitmentRequest {
602 note_commitment: Some(note_commitment.into()),
603 await_detection: true,
604 }),
605 );
606 let spendable_note = spendable_note.await?.into_inner().spendable_note;
607
608 spendable_note
609 .ok_or_else(|| anyhow::anyhow!("empty NoteByCommitmentRequest message"))?
610 .try_into()
611 }
612 .boxed()
613 }
614
615 fn nullifier_status(
617 &mut self,
618 nullifier: Nullifier,
619 ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'static>> {
620 let mut self2 = self.clone();
621 async move {
622 let rsp = ViewServiceClient::nullifier_status(
623 &mut self2,
624 tonic::Request::new(pb::NullifierStatusRequest {
625 nullifier: Some(nullifier.into()),
626 await_detection: false,
627 }),
628 );
629 Ok(rsp.await?.into_inner().spent)
630 }
631 .boxed()
632 }
633
634 fn await_nullifier(
637 &mut self,
638 nullifier: Nullifier,
639 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
640 let mut self2 = self.clone();
641 async move {
642 let rsp = ViewServiceClient::nullifier_status(
643 &mut self2,
644 tonic::Request::new(pb::NullifierStatusRequest {
645 nullifier: Some(nullifier.into()),
646 await_detection: true,
647 }),
648 );
649 rsp.await?;
650 Ok(())
651 }
652 .boxed()
653 }
654
655 fn witness(
656 &mut self,
657 plan: &TransactionPlan,
658 ) -> Pin<Box<dyn Future<Output = Result<WitnessData>> + Send + 'static>> {
659 let request = WitnessRequest {
660 transaction_plan: Some(plan.clone().into()),
661 };
662
663 let mut self2 = self.clone();
664 async move {
665 let rsp = self2.witness(tonic::Request::new(request));
666
667 let witness_data = rsp
668 .await?
669 .into_inner()
670 .witness_data
671 .ok_or_else(|| anyhow::anyhow!("empty WitnessResponse message"))?
672 .try_into()?;
673
674 Ok(witness_data)
675 }
676 .boxed()
677 }
678
679 fn assets(&mut self) -> Pin<Box<dyn Future<Output = Result<asset::Cache>> + Send + 'static>> {
680 let mut self2 = self.clone();
681 async move {
682 let rsp = ViewServiceClient::assets(
685 &mut self2,
686 tonic::Request::new(pb::AssetsRequest {
687 ..Default::default()
688 }),
689 );
690
691 let pb_assets: Vec<_> = rsp.await?.into_inner().try_collect().await?;
692
693 let assets = pb_assets
694 .into_iter()
695 .map(Metadata::try_from)
696 .collect::<anyhow::Result<Vec<Metadata>>>()?;
697
698 Ok(assets.into_iter().collect())
699 }
700 .boxed()
701 }
702
703 fn owned_position_ids(
704 &mut self,
705 position_state: Option<position::State>,
706 trading_pair: Option<TradingPair>,
707 ) -> Pin<Box<dyn Future<Output = Result<Vec<position::Id>>> + Send + 'static>> {
708 let mut self2 = self.clone();
711 async move {
712 let rsp = ViewServiceClient::owned_position_ids(
715 &mut self2,
716 tonic::Request::new(pb::OwnedPositionIdsRequest {
717 trading_pair: trading_pair.map(TryInto::try_into).transpose()?,
718 position_state: position_state.map(TryInto::try_into).transpose()?,
719 subaccount: None,
720 }),
721 );
722
723 let pb_position_ids: Vec<_> = rsp.await?.into_inner().try_collect().await?;
724
725 let position_ids = pb_position_ids
726 .into_iter()
727 .map(|p| {
728 position::Id::try_from(p.position_id.ok_or_else(|| {
729 anyhow::anyhow!("empty OwnedPositionsIdsResponse message")
730 })?)
731 })
732 .collect::<anyhow::Result<Vec<position::Id>>>()?;
733
734 Ok(position_ids)
735 }
736 .boxed()
737 }
738
739 fn transaction_info_by_hash(
740 &mut self,
741 id: TransactionId,
742 ) -> Pin<Box<dyn Future<Output = Result<TransactionInfo>> + Send + 'static>> {
743 let mut self2 = self.clone();
744 async move {
745 let rsp = ViewServiceClient::transaction_info_by_hash(
746 &mut self2,
747 tonic::Request::new(pb::TransactionInfoByHashRequest {
748 id: Some(id.into()),
749 }),
750 )
751 .await?
752 .into_inner()
753 .tx_info
754 .ok_or_else(|| anyhow::anyhow!("empty TransactionInfoByHashResponse message"))?;
755
756 if rsp.height == 0 {
758 anyhow::bail!("missing height");
759 }
760
761 let tx_info = TransactionInfo {
762 height: rsp.height,
763 id: rsp
764 .id
765 .ok_or_else(|| anyhow::anyhow!("missing id"))?
766 .try_into()?,
767 transaction: rsp
768 .transaction
769 .ok_or_else(|| anyhow::anyhow!("missing transaction"))?
770 .try_into()?,
771 perspective: rsp
772 .perspective
773 .ok_or_else(|| anyhow::anyhow!("missing perspective"))?
774 .try_into()?,
775 view: rsp
776 .view
777 .ok_or_else(|| anyhow::anyhow!("missing view"))?
778 .try_into()?,
779 summary: rsp
780 .summary
781 .ok_or_else(|| anyhow::anyhow!("missing summary"))?
782 .try_into()?,
783 };
784
785 Ok(tx_info)
786 }
787 .boxed()
788 }
789
790 fn transaction_info(
791 &mut self,
792 start_height: Option<u64>,
793 end_height: Option<u64>,
794 ) -> Pin<Box<dyn Future<Output = Result<Vec<TransactionInfo>>> + Send + 'static>> {
795 let mut self2 = self.clone();
796 async move {
797 let start_h = if let Some(h) = start_height { h } else { 0 };
799
800 let end_h = if let Some(h) = end_height { h } else { 0 };
801
802 let rsp = self2.transaction_info(tonic::Request::new(pb::TransactionInfoRequest {
803 start_height: start_h,
804 end_height: end_h,
805 }));
806 let pb_txs: Vec<_> = rsp.await?.into_inner().try_collect().await?;
807
808 pb_txs
809 .into_iter()
810 .map(|rsp| {
811 let tx_rsp = rsp
812 .tx_info
813 .ok_or_else(|| anyhow::anyhow!("empty TransactionInfoResponse message"))?;
814
815 if tx_rsp.height == 0 {
817 anyhow::bail!("missing height");
818 }
819
820 let tx_info = TransactionInfo {
821 height: tx_rsp.height,
822 transaction: tx_rsp
823 .transaction
824 .ok_or_else(|| {
825 anyhow::anyhow!("empty TransactionInfoResponse message")
826 })?
827 .try_into()?,
828 id: tx_rsp
829 .id
830 .ok_or_else(|| anyhow::anyhow!("missing id"))?
831 .try_into()?,
832 perspective: tx_rsp
833 .perspective
834 .ok_or_else(|| anyhow::anyhow!("missing perspective"))?
835 .try_into()?,
836 view: tx_rsp
837 .view
838 .ok_or_else(|| anyhow::anyhow!("missing view"))?
839 .try_into()?,
840 summary: tx_rsp
841 .summary
842 .ok_or_else(|| anyhow::anyhow!("missing summary"))?
843 .try_into()?,
844 };
845
846 Ok(tx_info)
847 })
848 .collect()
849 }
850 .boxed()
851 }
852
853 fn broadcast_transaction(
854 &mut self,
855 transaction: Transaction,
856 await_detection: bool,
857 ) -> BroadcastStatusStream {
858 let mut self2 = self.clone();
859 async move {
860 let rsp = ViewServiceClient::broadcast_transaction(
861 &mut self2,
862 tonic::Request::new(pb::BroadcastTransactionRequest {
863 transaction: Some(transaction.into()),
864 await_detection,
865 }),
866 )
867 .await?
868 .into_inner();
869
870 Ok(rsp)
871 }
872 .boxed()
873 }
874
875 fn address_by_index(
876 &mut self,
877 address_index: AddressIndex,
878 ) -> Pin<Box<dyn Future<Output = Result<Address>> + Send + 'static>> {
879 let mut self2 = self.clone();
880 async move {
881 let address = self2.address_by_index(tonic::Request::new(pb::AddressByIndexRequest {
882 address_index: Some(address_index.into()),
883 }));
884 let address = address
885 .await?
886 .into_inner()
887 .address
888 .ok_or_else(|| anyhow::anyhow!("No address available for this address index"))?
889 .try_into()?;
890 Ok(address)
891 }
892 .boxed()
893 }
894
895 fn index_by_address(
896 &mut self,
897 address: Address,
898 ) -> Pin<Box<dyn Future<Output = Result<Option<AddressIndex>>> + Send + 'static>> {
899 let mut self2 = self.clone();
900 async move {
901 let index = self2.index_by_address(tonic::Request::new(pb::IndexByAddressRequest {
902 address: Some(address.into()),
903 }));
904 let index = index
905 .await?
906 .into_inner()
907 .address_index
908 .map(|index| index.try_into())
909 .transpose()?;
910 Ok(index)
911 }
912 .boxed()
913 }
914
915 fn witness_and_build(
916 &mut self,
917 transaction_plan: TransactionPlan,
918 authorization_data: AuthorizationData,
919 ) -> Pin<Box<dyn Future<Output = Result<Transaction>> + Send + 'static>> {
920 let request = pb::WitnessAndBuildRequest {
921 transaction_plan: Some(transaction_plan.into()),
922 authorization_data: Some(authorization_data.into()),
923 };
924 let mut self2 = self.clone();
925 async move {
926 let mut rsp = self2
927 .witness_and_build(tonic::Request::new(request))
928 .await?
929 .into_inner();
930
931 while let Some(rsp) = rsp.try_next().await? {
932 match rsp.status {
933 Some(status) => match status {
934 pb::witness_and_build_response::Status::BuildProgress(_) => {
935 }
937 pb::witness_and_build_response::Status::Complete(c) => {
938 return c.transaction
939 .ok_or_else(|| {
940 anyhow::anyhow!("WitnessAndBuildResponse complete status message missing transaction")
941 })?
942 .try_into();
943 }
944 },
945 None => {
946 return Err(anyhow::anyhow!(
948 "empty WitnessAndBuildResponse message"
949 ));
950 }
951 }
952 }
953
954 Err(anyhow::anyhow!("should have received complete status or error"))
955 }
956 .boxed()
957 }
958
959 fn unclaimed_swaps(
960 &mut self,
961 ) -> Pin<Box<dyn Future<Output = Result<Vec<SwapRecord>>> + Send + 'static>> {
962 let mut self2 = self.clone();
963 async move {
964 let swaps_response = ViewServiceClient::unclaimed_swaps(
965 &mut self2,
966 tonic::Request::new(pb::UnclaimedSwapsRequest {}),
967 );
968 let pb_swaps: Vec<_> = swaps_response.await?.into_inner().try_collect().await?;
969
970 pb_swaps
971 .into_iter()
972 .map(|swap_rsp| {
973 let swap_record = swap_rsp
974 .swap
975 .ok_or_else(|| anyhow::anyhow!("empty UnclaimedSwapsResponse message"));
976
977 match swap_record {
978 Ok(swap) => swap.try_into(),
979 Err(e) => Err(e),
980 }
981 })
982 .collect()
983 }
984 .boxed()
985 }
986
987 fn auctions(
988 &mut self,
989 account_filter: Option<AddressIndex>,
990 include_inactive: bool,
991 query_latest_state: bool,
992 ) -> Pin<
993 Box<
994 dyn Future<
995 Output = Result<
996 Vec<(
997 AuctionId,
998 SpendableNoteRecord,
999 u64,
1000 Option<Any>,
1001 Vec<Position>,
1002 )>,
1003 >,
1004 > + Send
1005 + 'static,
1006 >,
1007 > {
1008 let mut client = self.clone();
1009 async move {
1010 let request = tonic::Request::new(pb::AuctionsRequest {
1011 account_filter: account_filter.map(Into::into),
1012 include_inactive,
1013 query_latest_state,
1014 auction_ids_filter: Vec::new(), });
1016
1017 let auctions: Vec<pb::AuctionsResponse> =
1018 ViewServiceClient::auctions(&mut client, request)
1019 .await?
1020 .into_inner()
1021 .try_collect()
1022 .await?;
1023
1024 let resp: Vec<(
1025 AuctionId,
1026 SpendableNoteRecord,
1027 u64,
1028 Option<Any>,
1029 Vec<Position>,
1030 )> = auctions
1031 .into_iter()
1032 .map(|auction_rsp| {
1033 let pb_id = auction_rsp
1034 .id
1035 .ok_or_else(|| anyhow::anyhow!("missing auction id"))?;
1036 let auction_id: AuctionId = pb_id.try_into()?;
1037 let snr: SpendableNoteRecord = auction_rsp
1038 .note_record
1039 .ok_or_else(|| anyhow::anyhow!("missing SNR from auction response"))?
1040 .try_into()?;
1041
1042 let local_seq = auction_rsp.local_seq;
1043
1044 let auction = auction_rsp.auction;
1045 let lps: Vec<Position> = auction_rsp
1046 .positions
1047 .into_iter()
1048 .map(TryInto::try_into)
1049 .collect::<Result<Vec<_>>>()?;
1050
1051 Ok::<
1052 (
1053 AuctionId,
1054 SpendableNoteRecord,
1055 u64, Option<Any>, Vec<Position>, ),
1059 anyhow::Error,
1060 >((auction_id, snr, local_seq, auction, lps))
1061 })
1062 .filter_map(|res| res.ok()) .collect();
1064
1065 Ok(resp)
1066 }
1067 .boxed()
1068 }
1069
1070 fn lqt_voting_notes(
1071 &mut self,
1072 epoch: u64,
1073 filter: Option<AddressIndex>,
1074 ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>> {
1075 let mut client = self.clone();
1076 async move {
1077 let request = tonic::Request::new(pb::LqtVotingNotesRequest {
1078 epoch_index: epoch,
1079 account_filter: filter.map(|x| x.into()),
1080 });
1081 let response = client.lqt_voting_notes(request).await?;
1082 let pb_notes: Vec<pb::LqtVotingNotesResponse> =
1083 response.into_inner().try_collect().await?;
1084
1085 pb_notes
1086 .into_iter()
1087 .map(|note_rsp| {
1088 let note_record = note_rsp
1089 .note_record
1090 .ok_or_else(|| anyhow::anyhow!("empty LqtVotingNotesResponse message"))?
1091 .try_into()?;
1092 Ok(note_record)
1093 })
1094 .collect()
1095 }
1096 .boxed()
1097 }
1098}