penumbra_sdk_dex/component/
rpc.rs

1use std::{pin::Pin, sync::Arc};
2
3use anyhow::Result;
4use async_stream::try_stream;
5use futures::{StreamExt, TryStreamExt};
6use tokio::sync::mpsc;
7use tonic::Status;
8use tracing::instrument;
9
10use cnidarium::{StateDelta, Storage};
11use penumbra_sdk_asset::{asset, Value};
12use penumbra_sdk_proto::{
13    core::component::dex::v1::{
14        query_service_server::QueryService,
15        simulate_trade_request::{
16            routing::{self, Setting},
17            Routing,
18        },
19        simulation_service_server::SimulationService,
20        ArbExecutionRequest, ArbExecutionResponse, ArbExecutionsRequest, ArbExecutionsResponse,
21        BatchSwapOutputDataRequest, BatchSwapOutputDataResponse, CandlestickDataRequest,
22        CandlestickDataResponse, CandlestickDataStreamRequest, CandlestickDataStreamResponse,
23        LiquidityPositionByIdRequest, LiquidityPositionByIdResponse, LiquidityPositionsByIdRequest,
24        LiquidityPositionsByIdResponse, LiquidityPositionsByPriceRequest,
25        LiquidityPositionsByPriceResponse, LiquidityPositionsRequest, LiquidityPositionsResponse,
26        SimulateTradeRequest, SimulateTradeResponse, SpreadRequest, SpreadResponse,
27        SwapExecutionRequest, SwapExecutionResponse, SwapExecutionsRequest, SwapExecutionsResponse,
28    },
29    DomainType, StateReadProto,
30};
31
32use super::ExecutionCircuitBreaker;
33use crate::{
34    component::metrics,
35    lp::position::{self, Position},
36    state_key, CandlestickData, DirectedTradingPair, SwapExecution, TradingPair,
37};
38
39use super::{chandelier::CandlestickRead, router::RouteAndFill, PositionRead, StateReadExt};
40
41pub mod stub;
42
43// TODO: Hide this and only expose a Router?
44pub struct Server {
45    storage: Storage,
46}
47
48impl Server {
49    pub fn new(storage: Storage) -> Self {
50        Self { storage }
51    }
52}
53
54#[tonic::async_trait]
55impl QueryService for Server {
56    type LiquidityPositionsStream = Pin<
57        Box<dyn futures::Stream<Item = Result<LiquidityPositionsResponse, tonic::Status>> + Send>,
58    >;
59    type LiquidityPositionsByPriceStream = Pin<
60        Box<
61            dyn futures::Stream<Item = Result<LiquidityPositionsByPriceResponse, tonic::Status>>
62                + Send,
63        >,
64    >;
65    type LiquidityPositionsByIdStream = Pin<
66        Box<
67            dyn futures::Stream<Item = Result<LiquidityPositionsByIdResponse, tonic::Status>>
68                + Send,
69        >,
70    >;
71    type ArbExecutionsStream =
72        Pin<Box<dyn futures::Stream<Item = Result<ArbExecutionsResponse, tonic::Status>> + Send>>;
73    type SwapExecutionsStream =
74        Pin<Box<dyn futures::Stream<Item = Result<SwapExecutionsResponse, tonic::Status>> + Send>>;
75    type CandlestickDataStreamStream = Pin<
76        Box<
77            dyn futures::Stream<Item = Result<CandlestickDataStreamResponse, tonic::Status>> + Send,
78        >,
79    >;
80
81    #[instrument(skip(self, request))]
82    async fn arb_execution(
83        &self,
84        request: tonic::Request<ArbExecutionRequest>,
85    ) -> Result<tonic::Response<ArbExecutionResponse>, Status> {
86        let state = self.storage.latest_snapshot();
87        let request_inner = request.into_inner();
88        let height = request_inner.height;
89
90        let arb_execution = state
91            .arb_execution(height)
92            .await
93            .map_err(|e| tonic::Status::internal(e.to_string()))?;
94
95        match arb_execution {
96            Some(arb_execution) => Ok(tonic::Response::new(ArbExecutionResponse {
97                swap_execution: Some(arb_execution.into()),
98                height,
99            })),
100            None => Err(Status::not_found("arb execution data not found")),
101        }
102    }
103
104    #[instrument(skip(self, request))]
105    async fn arb_executions(
106        &self,
107        request: tonic::Request<ArbExecutionsRequest>,
108    ) -> Result<tonic::Response<Self::ArbExecutionsStream>, Status> {
109        let state = self.storage.latest_snapshot();
110        let request_inner = request.into_inner();
111        let start_height = request_inner.start_height;
112        let end_height = request_inner.end_height;
113
114        let s = state.prefix(state_key::arb_executions());
115        Ok(tonic::Response::new(
116            s.filter_map(
117                move |i: anyhow::Result<(String, SwapExecution)>| async move {
118                    if i.is_err() {
119                        return Some(Err(tonic::Status::unavailable(format!(
120                            "error getting prefix value from storage: {}",
121                            i.expect_err("i is_err")
122                        ))));
123                    }
124
125                    let (key, arb_execution) = i.expect("i is Ok");
126                    let height = key
127                        .split('/')
128                        .last()
129                        .expect("arb execution key has height as last part")
130                        .parse()
131                        .expect("height is a number");
132
133                    // TODO: would be great to start iteration at start_height
134                    // and stop at end_height rather than touching _every_
135                    // key, but the current storage implementation doesn't make this
136                    // easy.
137                    if height < start_height || height > end_height {
138                        None
139                    } else {
140                        Some(Ok(ArbExecutionsResponse {
141                            swap_execution: Some(arb_execution.into()),
142                            height,
143                        }))
144                    }
145                },
146            )
147            // TODO: how do we instrument a Stream
148            //.instrument(Span::current())
149            .boxed(),
150        ))
151    }
152
153    #[instrument(skip(self, request))]
154    /// Get the batch swap data associated with a given trading pair and height.
155    async fn batch_swap_output_data(
156        &self,
157        request: tonic::Request<BatchSwapOutputDataRequest>,
158    ) -> Result<tonic::Response<BatchSwapOutputDataResponse>, Status> {
159        let state = self.storage.latest_snapshot();
160
161        let request_inner = request.into_inner();
162        let height = request_inner.height;
163        let trading_pair = request_inner
164            .trading_pair
165            .ok_or_else(|| Status::invalid_argument("missing trading_pair"))?
166            .try_into()
167            .map_err(|_| Status::invalid_argument("invalid trading_pair"))?;
168
169        let output_data = state
170            .output_data(height, trading_pair)
171            .await
172            .map_err(|e| tonic::Status::internal(e.to_string()))?;
173
174        match output_data {
175            Some(data) => Ok(tonic::Response::new(BatchSwapOutputDataResponse {
176                data: Some(data.into()),
177            })),
178            None => Err(Status::not_found("batch swap output data not found")),
179        }
180    }
181
182    #[instrument(skip(self, request))]
183    /// Get the batch swap data associated with a given trading pair and height.
184    async fn swap_execution(
185        &self,
186        request: tonic::Request<SwapExecutionRequest>,
187    ) -> Result<tonic::Response<SwapExecutionResponse>, Status> {
188        let state = self.storage.latest_snapshot();
189        let request_inner = request.into_inner();
190        let height = request_inner.height;
191        let trading_pair = request_inner
192            .trading_pair
193            .ok_or_else(|| Status::invalid_argument("missing trading_pair"))?
194            .try_into()
195            .map_err(|_| Status::invalid_argument("invalid trading_pair"))?;
196
197        let swap_execution = state
198            .swap_execution(height, trading_pair)
199            .await
200            .map_err(|e| tonic::Status::internal(e.to_string()))?;
201
202        match swap_execution {
203            Some(swap_execution) => Ok(tonic::Response::new(SwapExecutionResponse {
204                swap_execution: Some(swap_execution.into()),
205            })),
206            None => Err(Status::not_found("batch swap output data not found")),
207        }
208    }
209
210    #[instrument(skip(self, request))]
211    async fn candlestick_data(
212        &self,
213        request: tonic::Request<CandlestickDataRequest>,
214    ) -> Result<tonic::Response<CandlestickDataResponse>, Status> {
215        let state = self.storage.latest_snapshot();
216        // Limit the number of candlesticks returned to 20,000 (approximately 1 day)
217        // to prevent the server from being overwhelmed by a single request.
218        let limit = std::cmp::min(request.get_ref().limit, 20_000u64);
219        let start_height = match request.get_ref().start_height {
220            0 => {
221                // If no start height is provided, go `limit` blocks back from now.
222                let current_height = state.version();
223                current_height.saturating_sub(limit)
224            }
225            start_height => start_height,
226        };
227
228        let pair: DirectedTradingPair = request
229            .get_ref()
230            .pair
231            .clone()
232            .ok_or_else(|| Status::invalid_argument("missing trading_pair"))?
233            .try_into()
234            .map_err(|_| Status::invalid_argument("invalid trading_pair"))?;
235
236        let candlesticks = state
237            .candlesticks(&pair, start_height, limit as usize)
238            .await
239            .map_err(|e| tonic::Status::internal(e.to_string()))?;
240
241        Ok(tonic::Response::new(CandlestickDataResponse {
242            data: candlesticks.into_iter().map(Into::into).collect(),
243        }))
244    }
245
246    async fn candlestick_data_stream(
247        &self,
248        request: tonic::Request<CandlestickDataStreamRequest>,
249    ) -> Result<tonic::Response<Self::CandlestickDataStreamStream>, Status> {
250        let pair: DirectedTradingPair = request
251            .get_ref()
252            .pair
253            .clone()
254            .ok_or_else(|| Status::invalid_argument("missing trading_pair"))?
255            .try_into()
256            .map_err(|_| Status::invalid_argument("invalid trading_pair"))?;
257
258        let (tx_candle, rx_candle) = mpsc::channel::<CandlestickData>(1);
259        let storage = self.storage.clone();
260        tokio::spawn(async move {
261            // could add metrics here
262            // let _guard = CandlestickDataStreamConnectionCounter::new();
263            let mut rx_state_snapshot = storage.subscribe();
264            loop {
265                rx_state_snapshot
266                    .changed()
267                    .await
268                    .expect("channel should be open");
269                let snapshot = rx_state_snapshot.borrow().clone();
270                let height = snapshot.version();
271                match snapshot.get_candlestick(&pair, height).await? {
272                    Some(candle) => tx_candle.send(candle).await?,
273                    None => {
274                        // If there's no candlestick data, might as well check that
275                        // tx_candle is still open in case the client has disconnected.
276                        if tx_candle.is_closed() {
277                            break;
278                        }
279                    }
280                }
281            }
282            Ok::<_, anyhow::Error>(())
283        });
284
285        Ok(tonic::Response::new(
286            tokio_stream::wrappers::ReceiverStream::new(rx_candle)
287                .map(|candle| {
288                    Ok(CandlestickDataStreamResponse {
289                        data: Some(candle.into()),
290                    })
291                })
292                .boxed(),
293        ))
294    }
295
296    #[instrument(skip(self, request))]
297    async fn swap_executions(
298        &self,
299        request: tonic::Request<SwapExecutionsRequest>,
300    ) -> Result<tonic::Response<Self::SwapExecutionsStream>, Status> {
301        let state = self.storage.latest_snapshot();
302
303        let request_inner = request.into_inner();
304        let start_height = request_inner.start_height;
305        let end_height = request_inner.end_height;
306        let trading_pair = request_inner.trading_pair;
307
308        // Convert to domain type ahead of time if necessary
309        let trading_pair: Option<DirectedTradingPair> =
310            trading_pair.map(|trading_pair| trading_pair.try_into().expect("invalid trading pair"));
311
312        let s = state.nonverifiable_prefix(&state_key::swap_executions().as_bytes());
313        Ok(tonic::Response::new(
314            s.filter_map(move |i: anyhow::Result<(Vec<u8>, SwapExecution)>| {
315                async move {
316                    if i.is_err() {
317                        return Some(Err(tonic::Status::unavailable(format!(
318                            "error getting prefix value from storage: {}",
319                            i.expect_err("i is_err")
320                        ))));
321                    }
322
323                    let (key, swap_execution) = i.expect("i is Ok");
324
325                    let key = std::str::from_utf8(&key)
326                        .expect("state key for swap executions should be valid utf-8 string");
327
328                    let parts = key.split('/').collect::<Vec<_>>();
329                    let height = parts[2].parse().expect("height is not a number");
330                    let asset_1: asset::Id =
331                        parts[3].parse().expect("asset id formatted improperly");
332                    let asset_2: asset::Id =
333                        parts[4].parse().expect("asset id formatted improperly");
334
335                    let swap_trading_pair = DirectedTradingPair::new(asset_1, asset_2);
336
337                    if let Some(trading_pair) = trading_pair {
338                        // filter by trading pair
339                        if swap_trading_pair != trading_pair {
340                            return None;
341                        }
342                    }
343
344                    // TODO: would be great to start iteration at start_height
345                    // and stop at end_height rather than touching _every_
346                    // key, but the current storage implementation doesn't make this
347                    // easy.
348                    if height < start_height || height > end_height {
349                        None
350                    } else {
351                        Some(Ok(SwapExecutionsResponse {
352                            swap_execution: Some(swap_execution.into()),
353                            height,
354                            trading_pair: Some(swap_trading_pair.into()),
355                        }))
356                    }
357                }
358            })
359            // TODO: how do we instrument a Stream
360            //.instrument(Span::current())
361            .boxed(),
362        ))
363    }
364
365    async fn spread(
366        &self,
367        request: tonic::Request<SpreadRequest>,
368    ) -> Result<tonic::Response<SpreadResponse>, Status> {
369        let state = self.storage.latest_snapshot();
370        let request = request.into_inner();
371
372        let pair: TradingPair = request
373            .trading_pair
374            .ok_or_else(|| tonic::Status::invalid_argument("missing trading pair"))?
375            .try_into()
376            .map_err(|e| {
377                tonic::Status::invalid_argument(format!("error parsing trading pair: {:#}", e))
378            })?;
379
380        let pair12 = DirectedTradingPair {
381            start: pair.asset_1(),
382            end: pair.asset_2(),
383        };
384        let pair21 = DirectedTradingPair {
385            start: pair.asset_2(),
386            end: pair.asset_1(),
387        };
388        let best_1_to_2_position = state
389            .best_position(&pair12)
390            .await
391            .map_err(|e| {
392                tonic::Status::internal(format!(
393                    "error finding best position for {:?}: {:#}",
394                    pair12, e
395                ))
396            })?
397            .map(|(_, p)| p);
398        let best_2_to_1_position = state
399            .best_position(&pair12)
400            .await
401            .map_err(|e| {
402                tonic::Status::internal(format!(
403                    "error finding best position for {:?}: {:#}",
404                    pair21, e
405                ))
406            })?
407            .map(|(_, p)| p);
408
409        let approx_effective_price_1_to_2 = best_1_to_2_position
410            .as_ref()
411            .map(|p| {
412                p.phi
413                    .orient_start(pair.asset_1())
414                    .expect("position has one end = asset 1")
415                    .effective_price()
416                    .into()
417            })
418            .unwrap_or_default();
419
420        let approx_effective_price_2_to_1 = best_2_to_1_position
421            .as_ref()
422            .map(|p| {
423                p.phi
424                    .orient_start(pair.asset_2())
425                    .expect("position has one end = asset 2")
426                    .effective_price()
427                    .into()
428            })
429            .unwrap_or_default();
430
431        Ok(tonic::Response::new(SpreadResponse {
432            best_1_to_2_position: best_1_to_2_position.map(Into::into),
433            best_2_to_1_position: best_2_to_1_position.map(Into::into),
434            approx_effective_price_1_to_2,
435            approx_effective_price_2_to_1,
436        }))
437    }
438
439    #[instrument(skip(self, request))]
440    async fn liquidity_positions_by_price(
441        &self,
442        request: tonic::Request<LiquidityPositionsByPriceRequest>,
443    ) -> Result<tonic::Response<Self::LiquidityPositionsByPriceStream>, Status> {
444        let state = self.storage.latest_snapshot();
445        let request = request.into_inner();
446
447        let pair: DirectedTradingPair = request
448            .trading_pair
449            .ok_or_else(|| tonic::Status::invalid_argument("missing directed trading pair"))?
450            .try_into()
451            .map_err(|e| {
452                tonic::Status::invalid_argument(format!(
453                    "error parsing directed trading pair: {:#}",
454                    e
455                ))
456            })?;
457
458        let limit = if request.limit != 0 {
459            request.limit as usize
460        } else {
461            usize::MAX
462        };
463
464        let s = state
465            .positions_by_price(&pair)
466            .take(limit)
467            .map_ok(|(id, position)| LiquidityPositionsByPriceResponse {
468                data: Some(position.into()),
469                id: Some(id.into()),
470            })
471            .map_err(|e: anyhow::Error| {
472                tonic::Status::internal(format!("error retrieving positions: {:#}", e))
473            });
474        // TODO: how do we instrument a Stream
475        Ok(tonic::Response::new(s.boxed()))
476    }
477
478    #[instrument(skip(self, request))]
479    async fn liquidity_positions(
480        &self,
481        request: tonic::Request<LiquidityPositionsRequest>,
482    ) -> Result<tonic::Response<Self::LiquidityPositionsStream>, Status> {
483        let state = self.storage.latest_snapshot();
484
485        let include_closed = request.get_ref().include_closed;
486        let s = state.all_positions();
487        Ok(tonic::Response::new(
488            s.filter(move |item| {
489                use crate::lp::position::State;
490                let keep = match item {
491                    Ok(position) => {
492                        if position.state == State::Opened {
493                            true
494                        } else {
495                            include_closed
496                        }
497                    }
498                    Err(_) => false,
499                };
500                futures::future::ready(keep)
501            })
502            .map_ok(|i: Position| LiquidityPositionsResponse {
503                data: Some(i.into()),
504            })
505            .map_err(|e: anyhow::Error| {
506                tonic::Status::unavailable(format!("error getting prefix value from storage: {e}"))
507            })
508            // TODO: how do we instrument a Stream
509            //.instrument(Span::current())
510            .boxed(),
511        ))
512    }
513
514    #[instrument(skip(self, request))]
515    async fn liquidity_position_by_id(
516        &self,
517        request: tonic::Request<LiquidityPositionByIdRequest>,
518    ) -> Result<tonic::Response<LiquidityPositionByIdResponse>, Status> {
519        let state = self.storage.latest_snapshot();
520
521        let position_id: position::Id = request
522            .into_inner()
523            .position_id
524            .ok_or_else(|| Status::invalid_argument("empty message"))?
525            .try_into()
526            .map_err(|e: anyhow::Error| {
527                tonic::Status::invalid_argument(format!("error converting position_id: {e}"))
528            })?;
529
530        let position = state
531            .position_by_id(&position_id)
532            .await
533            .map_err(|e: anyhow::Error| {
534                tonic::Status::unavailable(format!("error fetching position from storage: {e}"))
535            })?
536            .ok_or_else(|| Status::not_found("position not found"))?;
537
538        Ok(tonic::Response::new(LiquidityPositionByIdResponse {
539            data: Some(position.into()),
540        }))
541    }
542
543    #[instrument(skip(self, request))]
544    async fn liquidity_positions_by_id(
545        &self,
546        request: tonic::Request<LiquidityPositionsByIdRequest>,
547    ) -> Result<tonic::Response<Self::LiquidityPositionsByIdStream>, Status> {
548        let state = self.storage.latest_snapshot();
549
550        let position_ids: Vec<position::Id> = request
551            .into_inner()
552            .position_id
553            .into_iter()
554            .map(TryInto::try_into)
555            .collect::<anyhow::Result<Vec<_>>>()
556            .map_err(|e: anyhow::Error| {
557                tonic::Status::invalid_argument(format!("error converting position_id: {e}"))
558            })?;
559
560        let s = try_stream! {
561            for position_id in position_ids {
562                let position = state
563                    .position_by_id(&position_id)
564                    .await
565                    .map_err(|e: anyhow::Error| {
566                        tonic::Status::unavailable(format!("error fetching position from storage: {e}"))
567                    })?
568                    .ok_or_else(|| Status::not_found("position not found"))?;
569
570                yield position.to_proto();
571            }
572        };
573        Ok(tonic::Response::new(
574            s.map_ok(
575                |p: penumbra_sdk_proto::core::component::dex::v1::Position| {
576                    LiquidityPositionsByIdResponse { data: Some(p) }
577                },
578            )
579            .map_err(|e: anyhow::Error| {
580                tonic::Status::unavailable(format!(
581                    "error getting position value from storage: {e}"
582                ))
583            })
584            // TODO: how do we instrument a Stream
585            //.instrument(Span::current())
586            .boxed(),
587        ))
588    }
589}
590
591#[tonic::async_trait]
592impl SimulationService for Server {
593    async fn simulate_trade(
594        &self,
595        request: tonic::Request<SimulateTradeRequest>,
596    ) -> Result<tonic::Response<SimulateTradeResponse>, Status> {
597        let request = request.into_inner();
598        let routing_stategy = match request.routing {
599            None => Routing {
600                setting: Some(Setting::Default(routing::Default {})),
601            },
602            Some(routing) => routing,
603        };
604
605        let routing_strategy = match routing_stategy.setting {
606            None => Setting::Default(routing::Default {}),
607            Some(setting) => setting,
608        };
609
610        let input: Value = request
611            .input
612            .ok_or_else(|| tonic::Status::invalid_argument("missing input parameter"))?
613            .try_into()
614            .map_err(|e| {
615                tonic::Status::invalid_argument(format!("error parsing input: {:#}", e))
616            })?;
617
618        let output_id = request
619            .output
620            .ok_or_else(|| tonic::Status::invalid_argument("missing output id parameter"))?
621            .try_into()
622            .map_err(|e| {
623                tonic::Status::invalid_argument(format!("error parsing output id: {:#}", e))
624            })?;
625
626        let start_time = std::time::Instant::now();
627        let state = self.storage.latest_snapshot();
628
629        let mut routing_params = state
630            .routing_params()
631            .await
632            .expect("dex routing params are set");
633        match routing_strategy {
634            Setting::SingleHop(_) => {
635                routing_params.max_hops = 1;
636            }
637            Setting::Default(_) => {
638                // no-op, use the default
639            }
640        }
641
642        let execution_budget = state
643            .get_dex_params()
644            .await
645            .expect("dex parameters are set")
646            .max_execution_budget;
647
648        let mut state_tx = Arc::new(StateDelta::new(state));
649        let execution_circuit_breaker = ExecutionCircuitBreaker::new(execution_budget);
650
651        let swap_execution = match state_tx
652            .route_and_fill(
653                input.asset_id,
654                output_id,
655                input.amount,
656                routing_params,
657                execution_circuit_breaker,
658            )
659            .await
660            .map_err(|e| tonic::Status::internal(format!("error simulating trade: {:#}", e)))?
661        {
662            Some(swap_execution) => swap_execution,
663            None => SwapExecution {
664                traces: vec![],
665                input: Value {
666                    amount: 0u64.into(),
667                    asset_id: input.asset_id,
668                },
669                output: Value {
670                    amount: 0u64.into(),
671                    asset_id: output_id,
672                },
673            },
674        };
675
676        let unfilled = Value {
677            amount: input
678                .amount
679                .checked_sub(&swap_execution.input.amount)
680                .ok_or_else(|| {
681                    tonic::Status::failed_precondition(
682                        "swap execution input amount is larger than request input amount"
683                            .to_string(),
684                    )
685                })?,
686            asset_id: input.asset_id,
687        };
688
689        let rsp = tonic::Response::new(SimulateTradeResponse {
690            unfilled: Some(unfilled.into()),
691            output: Some(swap_execution.into()),
692        });
693
694        let duration = start_time.elapsed();
695
696        metrics::histogram!(metrics::DEX_RPC_SIMULATE_TRADE_DURATION).record(duration);
697
698        Ok(rsp)
699    }
700}