1use std::{collections::BTreeMap, future::Future, pin::Pin};
2
3use anyhow::Result;
4use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
5use pbjson_types::Any;
6use penumbra_sdk_auction::auction::AuctionId;
7use tonic::{codegen::Bytes, Streaming};
8use tracing::instrument;
9
10use penumbra_sdk_app::params::AppParameters;
11use penumbra_sdk_asset::{
12 asset::{self, Id, Metadata},
13 ValueView,
14};
15use penumbra_sdk_dex::{
16 lp::position::{self, Position},
17 TradingPair,
18};
19use penumbra_sdk_fee::GasPrices;
20use penumbra_sdk_keys::{keys::AddressIndex, Address};
21use penumbra_sdk_num::Amount;
22use penumbra_sdk_proto::view::v1::{
23 self as pb, view_service_client::ViewServiceClient, BalancesResponse,
24 BroadcastTransactionResponse, WitnessRequest,
25};
26use penumbra_sdk_sct::Nullifier;
27use penumbra_sdk_shielded_pool::{fmd, note};
28use penumbra_sdk_stake::IdentityKey;
29use penumbra_sdk_transaction::{
30 txhash::TransactionId, AuthorizationData, Transaction, TransactionPlan, WitnessData,
31};
32
33use crate::{SpendableNoteRecord, StatusStreamResponse, SwapRecord, TransactionInfo};
34
35pub(crate) type BroadcastStatusStream = Pin<
36 Box<dyn Future<Output = Result<Streaming<BroadcastTransactionResponse>, anyhow::Error>> + Send>,
37>;
38
39#[allow(clippy::type_complexity)]
52pub trait ViewClient {
53 fn auctions(
55 &mut self,
56 account_filter: Option<AddressIndex>,
57 include_inactive: bool,
58 query_latest_state: bool,
59 ) -> Pin<
60 Box<
61 dyn Future<
62 Output = Result<
63 Vec<(
64 AuctionId,
65 SpendableNoteRecord,
66 u64,
67 Option<Any>,
68 Vec<Position>,
69 )>,
70 >,
71 > + Send
72 + 'static,
73 >,
74 >;
75
76 fn status(
78 &mut self,
79 ) -> Pin<Box<dyn Future<Output = Result<pb::StatusResponse>> + Send + 'static>>;
80
81 fn status_stream(
83 &mut self,
84 ) -> Pin<
85 Box<
86 dyn Future<
87 Output = Result<
88 Pin<Box<dyn Stream<Item = Result<StatusStreamResponse>> + Send + 'static>>,
89 >,
90 > + Send
91 + 'static,
92 >,
93 >;
94
95 fn app_params(
97 &mut self,
98 ) -> Pin<Box<dyn Future<Output = Result<AppParameters>> + Send + 'static>>;
99
100 fn gas_prices(&mut self) -> Pin<Box<dyn Future<Output = Result<GasPrices>> + Send + 'static>>;
102
103 fn fmd_parameters(
105 &mut self,
106 ) -> Pin<Box<dyn Future<Output = Result<fmd::Parameters>> + Send + 'static>>;
107
108 fn notes(
110 &mut self,
111 request: pb::NotesRequest,
112 ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>>;
113
114 fn notes_for_voting(
116 &mut self,
117 request: pb::NotesForVotingRequest,
118 ) -> Pin<
119 Box<dyn Future<Output = Result<Vec<(SpendableNoteRecord, IdentityKey)>>> + Send + 'static>,
120 >;
121
122 fn balances(
124 &mut self,
125 address_index: AddressIndex,
126 asset_id: Option<asset::Id>,
127 ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, Amount)>>> + Send + 'static>>;
128
129 fn note_by_commitment(
131 &mut self,
132 note_commitment: note::StateCommitment,
133 ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>>;
134
135 fn swap_by_commitment(
137 &mut self,
138 swap_commitment: penumbra_sdk_tct::StateCommitment,
139 ) -> Pin<Box<dyn Future<Output = Result<SwapRecord>> + Send + 'static>>;
140
141 fn nullifier_status(
143 &mut self,
144 nullifier: Nullifier,
145 ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'static>>;
146
147 fn await_nullifier(
150 &mut self,
151 nullifier: Nullifier,
152 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
153
154 fn await_note_by_commitment(
158 &mut self,
159 note_commitment: note::StateCommitment,
160 ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>>;
161
162 fn witness(
169 &mut self,
170 plan: &TransactionPlan,
171 ) -> Pin<Box<dyn Future<Output = Result<WitnessData>> + Send + 'static>>;
172
173 fn witness_and_build(
175 &mut self,
176 plan: TransactionPlan,
177 auth_data: AuthorizationData,
178 ) -> Pin<Box<dyn Future<Output = Result<Transaction>> + Send + 'static>>;
179
180 fn assets(&mut self) -> Pin<Box<dyn Future<Output = Result<asset::Cache>> + Send + 'static>>;
182
183 fn owned_position_ids(
185 &mut self,
186 position_state: Option<position::State>,
187 trading_pair: Option<TradingPair>,
188 subaccount: Option<AddressIndex>,
189 ) -> Pin<Box<dyn Future<Output = Result<Vec<position::Id>>> + Send + 'static>>;
190
191 fn transaction_info_by_hash(
193 &mut self,
194 id: TransactionId,
195 ) -> Pin<Box<dyn Future<Output = Result<TransactionInfo>> + Send + 'static>>;
196
197 fn transaction_info(
199 &mut self,
200 start_height: Option<u64>,
201 end_height: Option<u64>,
202 ) -> Pin<Box<dyn Future<Output = Result<Vec<TransactionInfo>>> + Send + 'static>>;
203
204 fn broadcast_transaction(
205 &mut self,
206 transaction: Transaction,
207 await_detection: bool,
208 ) -> BroadcastStatusStream;
209
210 #[instrument(skip(self))]
212 fn unspent_notes_by_address_and_asset(
213 &mut self,
214 ) -> Pin<
215 Box<
216 dyn Future<
217 Output = Result<
218 BTreeMap<AddressIndex, BTreeMap<asset::Id, Vec<SpendableNoteRecord>>>,
219 >,
220 > + Send
221 + 'static,
222 >,
223 > {
224 let notes = self.notes(pb::NotesRequest {
225 include_spent: false,
226 ..Default::default()
227 });
228 async move {
229 let notes = notes.await?;
230 tracing::trace!(?notes);
231
232 let mut notes_by_address_and_asset = BTreeMap::new();
233
234 for note_record in notes {
235 notes_by_address_and_asset
236 .entry(note_record.address_index)
237 .or_insert_with(BTreeMap::new)
238 .entry(note_record.note.asset_id())
239 .or_insert_with(Vec::new)
240 .push(note_record);
241 }
242 tracing::trace!(?notes_by_address_and_asset);
243
244 Ok(notes_by_address_and_asset)
245 }
246 .boxed()
247 }
248
249 #[instrument(skip(self))]
251 fn unspent_notes_by_account_and_asset(
252 &mut self,
253 ) -> Pin<
254 Box<
255 dyn Future<
256 Output = Result<BTreeMap<u32, BTreeMap<asset::Id, Vec<SpendableNoteRecord>>>>,
257 > + Send
258 + 'static,
259 >,
260 > {
261 let notes = self.notes(pb::NotesRequest {
262 include_spent: false,
263 ..Default::default()
264 });
265 async move {
266 let notes = notes.await?;
267 tracing::trace!(?notes);
268
269 let mut notes_by_account_and_asset = BTreeMap::new();
270
271 for note_record in notes {
272 notes_by_account_and_asset
273 .entry(note_record.address_index.account)
274 .or_insert_with(BTreeMap::new)
275 .entry(note_record.note.asset_id())
276 .or_insert_with(Vec::new)
277 .push(note_record);
278 }
279 tracing::trace!(?notes_by_account_and_asset);
280
281 Ok(notes_by_account_and_asset)
282 }
283 .boxed()
284 }
285
286 #[instrument(skip(self))]
288 fn unspent_notes_by_asset_and_address(
289 &mut self,
290 ) -> Pin<
291 Box<
292 dyn Future<
293 Output = Result<
294 BTreeMap<asset::Id, BTreeMap<AddressIndex, Vec<SpendableNoteRecord>>>,
295 >,
296 > + Send
297 + 'static,
298 >,
299 > {
300 let notes = self.notes(pb::NotesRequest {
301 include_spent: false,
302 ..Default::default()
303 });
304
305 async move {
306 let notes = notes.await?;
307 tracing::trace!(?notes);
308
309 let mut notes_by_asset_and_address = BTreeMap::new();
310
311 for note_record in notes {
312 notes_by_asset_and_address
313 .entry(note_record.note.asset_id())
314 .or_insert_with(BTreeMap::new)
315 .entry(note_record.address_index)
316 .or_insert_with(Vec::new)
317 .push(note_record);
318 }
319 tracing::trace!(?notes_by_asset_and_address);
320
321 Ok(notes_by_asset_and_address)
322 }
323 .boxed()
324 }
325
326 fn address_by_index(
327 &mut self,
328 address_index: AddressIndex,
329 ) -> Pin<Box<dyn Future<Output = Result<Address>> + Send + 'static>>;
330
331 fn index_by_address(
334 &mut self,
335 address: Address,
336 ) -> Pin<Box<dyn Future<Output = Result<Option<AddressIndex>>> + Send + 'static>>;
337
338 fn unclaimed_swaps(
340 &mut self,
341 ) -> Pin<Box<dyn Future<Output = Result<Vec<SwapRecord>>> + Send + 'static>>;
342
343 fn lqt_voting_notes(
345 &mut self,
346 epoch: u64,
347 filter: Option<AddressIndex>,
348 ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>>;
349}
350
351impl<T> ViewClient for ViewServiceClient<T>
358where
359 T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + 'static,
360 T::ResponseBody: tonic::codegen::Body<Data = Bytes> + Send + 'static,
361 T::Error: Into<tonic::codegen::StdError>,
362 T::Future: Send + 'static,
363 <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
364{
365 fn status(
366 &mut self,
367 ) -> Pin<Box<dyn Future<Output = Result<pb::StatusResponse>> + Send + 'static>> {
368 let mut self2 = self.clone();
369 async move {
370 let status = self2.status(tonic::Request::new(pb::StatusRequest {}));
371 let status = status.await?.into_inner();
372 Ok(status)
373 }
374 .boxed()
375 }
376
377 fn status_stream(
378 &mut self,
379 ) -> Pin<
380 Box<
381 dyn Future<
382 Output = Result<
383 Pin<Box<dyn Stream<Item = Result<StatusStreamResponse>> + Send + 'static>>,
384 >,
385 > + Send
386 + 'static,
387 >,
388 > {
389 let mut self2 = self.clone();
390 async move {
391 let stream = self2.status_stream(tonic::Request::new(pb::StatusStreamRequest {}));
392 let stream = stream.await?.into_inner();
393
394 Ok(stream
395 .map_err(|e| anyhow::anyhow!("view service error: {}", e))
396 .and_then(|msg| async move { StatusStreamResponse::try_from(msg) })
397 .boxed())
398 }
399 .boxed()
400 }
401
402 fn app_params(
403 &mut self,
404 ) -> Pin<Box<dyn Future<Output = Result<AppParameters>> + Send + 'static>> {
405 let mut self2 = self.clone();
406 async move {
407 let rsp = ViewServiceClient::app_parameters(
410 &mut self2,
411 tonic::Request::new(pb::AppParametersRequest {}),
412 );
413 rsp.await?.into_inner().try_into()
414 }
415 .boxed()
416 }
417
418 fn gas_prices(&mut self) -> Pin<Box<dyn Future<Output = Result<GasPrices>> + Send + 'static>> {
419 let mut self2 = self.clone();
420 async move {
421 let rsp = ViewServiceClient::gas_prices(
424 &mut self2,
425 tonic::Request::new(pb::GasPricesRequest {}),
426 );
427 rsp.await?
428 .into_inner()
429 .gas_prices
430 .ok_or_else(|| anyhow::anyhow!("empty GasPricesResponse message"))?
431 .try_into()
432 }
433 .boxed()
434 }
435
436 fn fmd_parameters(
437 &mut self,
438 ) -> Pin<Box<dyn Future<Output = Result<fmd::Parameters>> + Send + 'static>> {
439 let mut self2 = self.clone();
440 async move {
441 let parameters = ViewServiceClient::fmd_parameters(
442 &mut self2,
443 tonic::Request::new(pb::FmdParametersRequest {}),
444 );
445 let parameters = parameters.await?.into_inner().parameters;
446
447 parameters
448 .ok_or_else(|| anyhow::anyhow!("empty FmdParametersRequest message"))?
449 .try_into()
450 }
451 .boxed()
452 }
453
454 fn notes(
455 &mut self,
456 request: pb::NotesRequest,
457 ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>> {
458 let mut self2 = self.clone();
459 async move {
460 let req = self2.notes(tonic::Request::new(request));
461 let pb_notes: Vec<_> = req.await?.into_inner().try_collect().await?;
462
463 pb_notes
464 .into_iter()
465 .map(|note_rsp| {
466 let note_record = note_rsp
467 .note_record
468 .ok_or_else(|| anyhow::anyhow!("empty NotesResponse message"));
469
470 match note_record {
471 Ok(note) => note.try_into(),
472 Err(e) => Err(e),
473 }
474 })
475 .collect()
476 }
477 .boxed()
478 }
479
480 fn notes_for_voting(
481 &mut self,
482 request: pb::NotesForVotingRequest,
483 ) -> Pin<
484 Box<dyn Future<Output = Result<Vec<(SpendableNoteRecord, IdentityKey)>>> + Send + 'static>,
485 > {
486 let mut self2 = self.clone();
487 async move {
488 let req = self2.notes_for_voting(tonic::Request::new(request));
489 let pb_notes: Vec<_> = req.await?.into_inner().try_collect().await?;
490
491 pb_notes
492 .into_iter()
493 .map(|note_rsp| {
494 let note_record = note_rsp
495 .note_record
496 .ok_or_else(|| anyhow::anyhow!("empty NotesForVotingResponse message"))?
497 .try_into()?;
498
499 let identity_key = note_rsp
500 .identity_key
501 .ok_or_else(|| anyhow::anyhow!("empty NotesForVotingResponse message"))?
502 .try_into()?;
503
504 Ok((note_record, identity_key))
505 })
506 .collect()
507 }
508 .boxed()
509 }
510
511 fn note_by_commitment(
512 &mut self,
513 note_commitment: note::StateCommitment,
514 ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>> {
515 let mut self2 = self.clone();
516 async move {
517 let note_commitment_response = ViewServiceClient::note_by_commitment(
518 &mut self2,
519 tonic::Request::new(pb::NoteByCommitmentRequest {
520 note_commitment: Some(note_commitment.into()),
521 await_detection: false,
522 }),
523 );
524 let note_commitment_response = note_commitment_response.await?.into_inner();
525
526 note_commitment_response
527 .spendable_note
528 .ok_or_else(|| anyhow::anyhow!("empty NoteByCommitmentResponse message"))?
529 .try_into()
530 }
531 .boxed()
532 }
533
534 fn balances(
535 &mut self,
536 address_index: AddressIndex,
537 asset_id: Option<asset::Id>,
538 ) -> Pin<Box<dyn Future<Output = Result<Vec<(Id, Amount)>>> + Send + 'static>> {
539 let mut self2 = self.clone();
540 async move {
541 let req = ViewServiceClient::balances(
542 &mut self2,
543 tonic::Request::new(pb::BalancesRequest {
544 account_filter: Some(address_index.into()),
545 asset_id_filter: asset_id.map(Into::into),
546 }),
547 );
548
549 let balances: Vec<BalancesResponse> = req.await?.into_inner().try_collect().await?;
550
551 balances
552 .into_iter()
553 .map(|rsp| {
554 let pb_value_view = rsp
555 .balance_view
556 .ok_or_else(|| anyhow::anyhow!("empty balance view"))?;
557
558 let value_view: ValueView = pb_value_view.try_into()?;
559 let id = value_view.asset_id();
560 let amount = value_view.value().amount;
561 Ok((id, amount))
562 })
563 .collect()
564 }
565 .boxed()
566 }
567
568 fn swap_by_commitment(
569 &mut self,
570 swap_commitment: penumbra_sdk_tct::StateCommitment,
571 ) -> Pin<Box<dyn Future<Output = Result<SwapRecord>> + Send + 'static>> {
572 let mut self2 = self.clone();
573 async move {
574 let swap_commitment_response = ViewServiceClient::swap_by_commitment(
575 &mut self2,
576 tonic::Request::new(pb::SwapByCommitmentRequest {
577 swap_commitment: Some(swap_commitment.into()),
578 await_detection: false,
579 }),
580 );
581 let swap_commitment_response = swap_commitment_response.await?.into_inner();
582
583 swap_commitment_response
584 .swap
585 .ok_or_else(|| anyhow::anyhow!("empty SwapByCommitmentResponse message"))?
586 .try_into()
587 }
588 .boxed()
589 }
590
591 fn await_note_by_commitment(
595 &mut self,
596 note_commitment: note::StateCommitment,
597 ) -> Pin<Box<dyn Future<Output = Result<SpendableNoteRecord>> + Send + 'static>> {
598 let mut self2 = self.clone();
599 async move {
600 let spendable_note = ViewServiceClient::note_by_commitment(
601 &mut self2,
602 tonic::Request::new(pb::NoteByCommitmentRequest {
603 note_commitment: Some(note_commitment.into()),
604 await_detection: true,
605 }),
606 );
607 let spendable_note = spendable_note.await?.into_inner().spendable_note;
608
609 spendable_note
610 .ok_or_else(|| anyhow::anyhow!("empty NoteByCommitmentRequest message"))?
611 .try_into()
612 }
613 .boxed()
614 }
615
616 fn nullifier_status(
618 &mut self,
619 nullifier: Nullifier,
620 ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + 'static>> {
621 let mut self2 = self.clone();
622 async move {
623 let rsp = ViewServiceClient::nullifier_status(
624 &mut self2,
625 tonic::Request::new(pb::NullifierStatusRequest {
626 nullifier: Some(nullifier.into()),
627 await_detection: false,
628 }),
629 );
630 Ok(rsp.await?.into_inner().spent)
631 }
632 .boxed()
633 }
634
635 fn await_nullifier(
638 &mut self,
639 nullifier: Nullifier,
640 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
641 let mut self2 = self.clone();
642 async move {
643 let rsp = ViewServiceClient::nullifier_status(
644 &mut self2,
645 tonic::Request::new(pb::NullifierStatusRequest {
646 nullifier: Some(nullifier.into()),
647 await_detection: true,
648 }),
649 );
650 rsp.await?;
651 Ok(())
652 }
653 .boxed()
654 }
655
656 fn witness(
657 &mut self,
658 plan: &TransactionPlan,
659 ) -> Pin<Box<dyn Future<Output = Result<WitnessData>> + Send + 'static>> {
660 let request = WitnessRequest {
661 transaction_plan: Some(plan.clone().into()),
662 };
663
664 let mut self2 = self.clone();
665 async move {
666 let rsp = self2.witness(tonic::Request::new(request));
667
668 let witness_data = rsp
669 .await?
670 .into_inner()
671 .witness_data
672 .ok_or_else(|| anyhow::anyhow!("empty WitnessResponse message"))?
673 .try_into()?;
674
675 Ok(witness_data)
676 }
677 .boxed()
678 }
679
680 fn assets(&mut self) -> Pin<Box<dyn Future<Output = Result<asset::Cache>> + Send + 'static>> {
681 let mut self2 = self.clone();
682 async move {
683 let rsp = ViewServiceClient::assets(
686 &mut self2,
687 tonic::Request::new(pb::AssetsRequest {
688 ..Default::default()
689 }),
690 );
691
692 let pb_assets: Vec<_> = rsp.await?.into_inner().try_collect().await?;
693
694 let assets = pb_assets
695 .into_iter()
696 .map(Metadata::try_from)
697 .collect::<anyhow::Result<Vec<Metadata>>>()?;
698
699 Ok(assets.into_iter().collect())
700 }
701 .boxed()
702 }
703
704 fn owned_position_ids(
705 &mut self,
706 position_state: Option<position::State>,
707 trading_pair: Option<TradingPair>,
708 subaccount: Option<AddressIndex>,
709 ) -> Pin<Box<dyn Future<Output = Result<Vec<position::Id>>> + Send + 'static>> {
710 let mut self2 = self.clone();
713 async move {
714 let rsp = ViewServiceClient::owned_position_ids(
717 &mut self2,
718 tonic::Request::new(pb::OwnedPositionIdsRequest {
719 trading_pair: trading_pair.map(Into::into),
720 position_state: position_state.map(Into::into),
721 subaccount: subaccount.map(Into::into),
722 }),
723 );
724
725 let pb_position_ids: Vec<_> = rsp.await?.into_inner().try_collect().await?;
726
727 let position_ids = pb_position_ids
728 .into_iter()
729 .map(|p| {
730 position::Id::try_from(p.position_id.ok_or_else(|| {
731 anyhow::anyhow!("empty OwnedPositionsIdsResponse message")
732 })?)
733 })
734 .collect::<anyhow::Result<Vec<position::Id>>>()?;
735
736 Ok(position_ids)
737 }
738 .boxed()
739 }
740
741 fn transaction_info_by_hash(
742 &mut self,
743 id: TransactionId,
744 ) -> Pin<Box<dyn Future<Output = Result<TransactionInfo>> + Send + 'static>> {
745 let mut self2 = self.clone();
746 async move {
747 let rsp = ViewServiceClient::transaction_info_by_hash(
748 &mut self2,
749 tonic::Request::new(pb::TransactionInfoByHashRequest {
750 id: Some(id.into()),
751 }),
752 )
753 .await?
754 .into_inner()
755 .tx_info
756 .ok_or_else(|| anyhow::anyhow!("empty TransactionInfoByHashResponse message"))?;
757
758 if rsp.height == 0 {
760 anyhow::bail!("missing height");
761 }
762
763 let tx_info = TransactionInfo {
764 height: rsp.height,
765 id: rsp
766 .id
767 .ok_or_else(|| anyhow::anyhow!("missing id"))?
768 .try_into()?,
769 transaction: rsp
770 .transaction
771 .ok_or_else(|| anyhow::anyhow!("missing transaction"))?
772 .try_into()?,
773 perspective: rsp
774 .perspective
775 .ok_or_else(|| anyhow::anyhow!("missing perspective"))?
776 .try_into()?,
777 view: rsp
778 .view
779 .ok_or_else(|| anyhow::anyhow!("missing view"))?
780 .try_into()?,
781 summary: rsp
782 .summary
783 .ok_or_else(|| anyhow::anyhow!("missing summary"))?
784 .try_into()?,
785 };
786
787 Ok(tx_info)
788 }
789 .boxed()
790 }
791
792 fn transaction_info(
793 &mut self,
794 start_height: Option<u64>,
795 end_height: Option<u64>,
796 ) -> Pin<Box<dyn Future<Output = Result<Vec<TransactionInfo>>> + Send + 'static>> {
797 let mut self2 = self.clone();
798 async move {
799 let start_h = if let Some(h) = start_height { h } else { 0 };
801
802 let end_h = if let Some(h) = end_height { h } else { 0 };
803
804 let rsp = self2.transaction_info(tonic::Request::new(pb::TransactionInfoRequest {
805 start_height: start_h,
806 end_height: end_h,
807 }));
808 let pb_txs: Vec<_> = rsp.await?.into_inner().try_collect().await?;
809
810 pb_txs
811 .into_iter()
812 .map(|rsp| {
813 let tx_rsp = rsp
814 .tx_info
815 .ok_or_else(|| anyhow::anyhow!("empty TransactionInfoResponse message"))?;
816
817 if tx_rsp.height == 0 {
819 anyhow::bail!("missing height");
820 }
821
822 let tx_info = TransactionInfo {
823 height: tx_rsp.height,
824 transaction: tx_rsp
825 .transaction
826 .ok_or_else(|| {
827 anyhow::anyhow!("empty TransactionInfoResponse message")
828 })?
829 .try_into()?,
830 id: tx_rsp
831 .id
832 .ok_or_else(|| anyhow::anyhow!("missing id"))?
833 .try_into()?,
834 perspective: tx_rsp
835 .perspective
836 .ok_or_else(|| anyhow::anyhow!("missing perspective"))?
837 .try_into()?,
838 view: tx_rsp
839 .view
840 .ok_or_else(|| anyhow::anyhow!("missing view"))?
841 .try_into()?,
842 summary: tx_rsp
843 .summary
844 .ok_or_else(|| anyhow::anyhow!("missing summary"))?
845 .try_into()?,
846 };
847
848 Ok(tx_info)
849 })
850 .collect()
851 }
852 .boxed()
853 }
854
855 fn broadcast_transaction(
856 &mut self,
857 transaction: Transaction,
858 await_detection: bool,
859 ) -> BroadcastStatusStream {
860 let mut self2 = self.clone();
861 async move {
862 let rsp = ViewServiceClient::broadcast_transaction(
863 &mut self2,
864 tonic::Request::new(pb::BroadcastTransactionRequest {
865 transaction: Some(transaction.into()),
866 await_detection,
867 }),
868 )
869 .await?
870 .into_inner();
871
872 Ok(rsp)
873 }
874 .boxed()
875 }
876
877 fn address_by_index(
878 &mut self,
879 address_index: AddressIndex,
880 ) -> Pin<Box<dyn Future<Output = Result<Address>> + Send + 'static>> {
881 let mut self2 = self.clone();
882 async move {
883 let address = self2.address_by_index(tonic::Request::new(pb::AddressByIndexRequest {
884 address_index: Some(address_index.into()),
885 }));
886 let address = address
887 .await?
888 .into_inner()
889 .address
890 .ok_or_else(|| anyhow::anyhow!("No address available for this address index"))?
891 .try_into()?;
892 Ok(address)
893 }
894 .boxed()
895 }
896
897 fn index_by_address(
898 &mut self,
899 address: Address,
900 ) -> Pin<Box<dyn Future<Output = Result<Option<AddressIndex>>> + Send + 'static>> {
901 let mut self2 = self.clone();
902 async move {
903 let index = self2.index_by_address(tonic::Request::new(pb::IndexByAddressRequest {
904 address: Some(address.into()),
905 }));
906 let index = index
907 .await?
908 .into_inner()
909 .address_index
910 .map(|index| index.try_into())
911 .transpose()?;
912 Ok(index)
913 }
914 .boxed()
915 }
916
917 fn witness_and_build(
918 &mut self,
919 transaction_plan: TransactionPlan,
920 authorization_data: AuthorizationData,
921 ) -> Pin<Box<dyn Future<Output = Result<Transaction>> + Send + 'static>> {
922 let request = pb::WitnessAndBuildRequest {
923 transaction_plan: Some(transaction_plan.into()),
924 authorization_data: Some(authorization_data.into()),
925 };
926 let mut self2 = self.clone();
927 async move {
928 let mut rsp = self2
929 .witness_and_build(tonic::Request::new(request))
930 .await?
931 .into_inner();
932
933 while let Some(rsp) = rsp.try_next().await? {
934 match rsp.status {
935 Some(status) => match status {
936 pb::witness_and_build_response::Status::BuildProgress(_) => {
937 }
939 pb::witness_and_build_response::Status::Complete(c) => {
940 return c.transaction
941 .ok_or_else(|| {
942 anyhow::anyhow!("WitnessAndBuildResponse complete status message missing transaction")
943 })?
944 .try_into();
945 }
946 },
947 None => {
948 return Err(anyhow::anyhow!(
950 "empty WitnessAndBuildResponse message"
951 ));
952 }
953 }
954 }
955
956 Err(anyhow::anyhow!("should have received complete status or error"))
957 }
958 .boxed()
959 }
960
961 fn unclaimed_swaps(
962 &mut self,
963 ) -> Pin<Box<dyn Future<Output = Result<Vec<SwapRecord>>> + Send + 'static>> {
964 let mut self2 = self.clone();
965 async move {
966 let swaps_response = ViewServiceClient::unclaimed_swaps(
967 &mut self2,
968 tonic::Request::new(pb::UnclaimedSwapsRequest {}),
969 );
970 let pb_swaps: Vec<_> = swaps_response.await?.into_inner().try_collect().await?;
971
972 pb_swaps
973 .into_iter()
974 .map(|swap_rsp| {
975 let swap_record = swap_rsp
976 .swap
977 .ok_or_else(|| anyhow::anyhow!("empty UnclaimedSwapsResponse message"));
978
979 match swap_record {
980 Ok(swap) => swap.try_into(),
981 Err(e) => Err(e),
982 }
983 })
984 .collect()
985 }
986 .boxed()
987 }
988
989 fn auctions(
990 &mut self,
991 account_filter: Option<AddressIndex>,
992 include_inactive: bool,
993 query_latest_state: bool,
994 ) -> Pin<
995 Box<
996 dyn Future<
997 Output = Result<
998 Vec<(
999 AuctionId,
1000 SpendableNoteRecord,
1001 u64,
1002 Option<Any>,
1003 Vec<Position>,
1004 )>,
1005 >,
1006 > + Send
1007 + 'static,
1008 >,
1009 > {
1010 let mut client = self.clone();
1011 async move {
1012 let request = tonic::Request::new(pb::AuctionsRequest {
1013 account_filter: account_filter.map(Into::into),
1014 include_inactive,
1015 query_latest_state,
1016 auction_ids_filter: Vec::new(), });
1018
1019 let auctions: Vec<pb::AuctionsResponse> =
1020 ViewServiceClient::auctions(&mut client, request)
1021 .await?
1022 .into_inner()
1023 .try_collect()
1024 .await?;
1025
1026 let resp: Vec<(
1027 AuctionId,
1028 SpendableNoteRecord,
1029 u64,
1030 Option<Any>,
1031 Vec<Position>,
1032 )> = auctions
1033 .into_iter()
1034 .map(|auction_rsp| {
1035 let pb_id = auction_rsp
1036 .id
1037 .ok_or_else(|| anyhow::anyhow!("missing auction id"))?;
1038 let auction_id: AuctionId = pb_id.try_into()?;
1039 let snr: SpendableNoteRecord = auction_rsp
1040 .note_record
1041 .ok_or_else(|| anyhow::anyhow!("missing SNR from auction response"))?
1042 .try_into()?;
1043
1044 let local_seq = auction_rsp.local_seq;
1045
1046 let auction = auction_rsp.auction;
1047 let lps: Vec<Position> = auction_rsp
1048 .positions
1049 .into_iter()
1050 .map(TryInto::try_into)
1051 .collect::<Result<Vec<_>>>()?;
1052
1053 Ok::<
1054 (
1055 AuctionId,
1056 SpendableNoteRecord,
1057 u64, Option<Any>, Vec<Position>, ),
1061 anyhow::Error,
1062 >((auction_id, snr, local_seq, auction, lps))
1063 })
1064 .filter_map(|res| res.ok()) .collect();
1066
1067 Ok(resp)
1068 }
1069 .boxed()
1070 }
1071
1072 fn lqt_voting_notes(
1073 &mut self,
1074 epoch: u64,
1075 filter: Option<AddressIndex>,
1076 ) -> Pin<Box<dyn Future<Output = Result<Vec<SpendableNoteRecord>>> + Send + 'static>> {
1077 let mut client = self.clone();
1078 async move {
1079 let request = tonic::Request::new(pb::LqtVotingNotesRequest {
1080 epoch_index: epoch,
1081 account_filter: filter.map(|x| x.into()),
1082 });
1083 let response = client.lqt_voting_notes(request).await?;
1084 let pb_notes: Vec<pb::LqtVotingNotesResponse> =
1085 response.into_inner().try_collect().await?;
1086
1087 pb_notes
1088 .into_iter()
1089 .map(|note_rsp| {
1090 let note_record = note_rsp
1091 .note_record
1092 .ok_or_else(|| anyhow::anyhow!("empty LqtVotingNotesResponse message"))?
1093 .try_into()?;
1094 Ok(note_record)
1095 })
1096 .collect()
1097 }
1098 .boxed()
1099 }
1100}