1use std::{pin::Pin, sync::Arc};
2
3use anyhow::Result;
4use async_stream::try_stream;
5use futures::{StreamExt, TryStreamExt};
6use tokio::sync::mpsc;
7use tonic::Status;
8use tracing::instrument;
9
10use cnidarium::{StateDelta, Storage};
11use penumbra_sdk_asset::{asset, Value};
12use penumbra_sdk_proto::{
13 core::component::dex::v1::{
14 query_service_server::QueryService,
15 simulate_trade_request::{
16 routing::{self, Setting},
17 Routing,
18 },
19 simulation_service_server::SimulationService,
20 ArbExecutionRequest, ArbExecutionResponse, ArbExecutionsRequest, ArbExecutionsResponse,
21 BatchSwapOutputDataRequest, BatchSwapOutputDataResponse, CandlestickDataRequest,
22 CandlestickDataResponse, CandlestickDataStreamRequest, CandlestickDataStreamResponse,
23 LiquidityPositionByIdRequest, LiquidityPositionByIdResponse, LiquidityPositionsByIdRequest,
24 LiquidityPositionsByIdResponse, LiquidityPositionsByPriceRequest,
25 LiquidityPositionsByPriceResponse, LiquidityPositionsRequest, LiquidityPositionsResponse,
26 SimulateTradeRequest, SimulateTradeResponse, SpreadRequest, SpreadResponse,
27 SwapExecutionRequest, SwapExecutionResponse, SwapExecutionsRequest, SwapExecutionsResponse,
28 },
29 DomainType, StateReadProto,
30};
31
32use super::ExecutionCircuitBreaker;
33use crate::{
34 component::metrics,
35 lp::position::{self, Position},
36 state_key, CandlestickData, DirectedTradingPair, SwapExecution, TradingPair,
37};
38
39use super::{chandelier::CandlestickRead, router::RouteAndFill, PositionRead, StateReadExt};
40
41pub mod stub;
42
43pub struct Server {
45 storage: Storage,
46}
47
48impl Server {
49 pub fn new(storage: Storage) -> Self {
50 Self { storage }
51 }
52}
53
54#[tonic::async_trait]
55impl QueryService for Server {
56 type LiquidityPositionsStream = Pin<
57 Box<dyn futures::Stream<Item = Result<LiquidityPositionsResponse, tonic::Status>> + Send>,
58 >;
59 type LiquidityPositionsByPriceStream = Pin<
60 Box<
61 dyn futures::Stream<Item = Result<LiquidityPositionsByPriceResponse, tonic::Status>>
62 + Send,
63 >,
64 >;
65 type LiquidityPositionsByIdStream = Pin<
66 Box<
67 dyn futures::Stream<Item = Result<LiquidityPositionsByIdResponse, tonic::Status>>
68 + Send,
69 >,
70 >;
71 type ArbExecutionsStream =
72 Pin<Box<dyn futures::Stream<Item = Result<ArbExecutionsResponse, tonic::Status>> + Send>>;
73 type SwapExecutionsStream =
74 Pin<Box<dyn futures::Stream<Item = Result<SwapExecutionsResponse, tonic::Status>> + Send>>;
75 type CandlestickDataStreamStream = Pin<
76 Box<
77 dyn futures::Stream<Item = Result<CandlestickDataStreamResponse, tonic::Status>> + Send,
78 >,
79 >;
80
81 #[instrument(skip(self, request))]
82 async fn arb_execution(
83 &self,
84 request: tonic::Request<ArbExecutionRequest>,
85 ) -> Result<tonic::Response<ArbExecutionResponse>, Status> {
86 let state = self.storage.latest_snapshot();
87 let request_inner = request.into_inner();
88 let height = request_inner.height;
89
90 let arb_execution = state
91 .arb_execution(height)
92 .await
93 .map_err(|e| tonic::Status::internal(e.to_string()))?;
94
95 match arb_execution {
96 Some(arb_execution) => Ok(tonic::Response::new(ArbExecutionResponse {
97 swap_execution: Some(arb_execution.into()),
98 height,
99 })),
100 None => Err(Status::not_found("arb execution data not found")),
101 }
102 }
103
104 #[instrument(skip(self, request))]
105 async fn arb_executions(
106 &self,
107 request: tonic::Request<ArbExecutionsRequest>,
108 ) -> Result<tonic::Response<Self::ArbExecutionsStream>, Status> {
109 let state = self.storage.latest_snapshot();
110 let request_inner = request.into_inner();
111 let start_height = request_inner.start_height;
112 let end_height = request_inner.end_height;
113
114 let s = state.prefix(state_key::arb_executions());
115 Ok(tonic::Response::new(
116 s.filter_map(
117 move |i: anyhow::Result<(String, SwapExecution)>| async move {
118 if i.is_err() {
119 return Some(Err(tonic::Status::unavailable(format!(
120 "error getting prefix value from storage: {}",
121 i.expect_err("i is_err")
122 ))));
123 }
124
125 let (key, arb_execution) = i.expect("i is Ok");
126 let height = key
127 .split('/')
128 .last()
129 .expect("arb execution key has height as last part")
130 .parse()
131 .expect("height is a number");
132
133 if height < start_height || height > end_height {
138 None
139 } else {
140 Some(Ok(ArbExecutionsResponse {
141 swap_execution: Some(arb_execution.into()),
142 height,
143 }))
144 }
145 },
146 )
147 .boxed(),
150 ))
151 }
152
153 #[instrument(skip(self, request))]
154 async fn batch_swap_output_data(
156 &self,
157 request: tonic::Request<BatchSwapOutputDataRequest>,
158 ) -> Result<tonic::Response<BatchSwapOutputDataResponse>, Status> {
159 let state = self.storage.latest_snapshot();
160
161 let request_inner = request.into_inner();
162 let height = request_inner.height;
163 let trading_pair = request_inner
164 .trading_pair
165 .ok_or_else(|| Status::invalid_argument("missing trading_pair"))?
166 .try_into()
167 .map_err(|_| Status::invalid_argument("invalid trading_pair"))?;
168
169 let output_data = state
170 .output_data(height, trading_pair)
171 .await
172 .map_err(|e| tonic::Status::internal(e.to_string()))?;
173
174 match output_data {
175 Some(data) => Ok(tonic::Response::new(BatchSwapOutputDataResponse {
176 data: Some(data.into()),
177 })),
178 None => Err(Status::not_found("batch swap output data not found")),
179 }
180 }
181
182 #[instrument(skip(self, request))]
183 async fn swap_execution(
185 &self,
186 request: tonic::Request<SwapExecutionRequest>,
187 ) -> Result<tonic::Response<SwapExecutionResponse>, Status> {
188 let state = self.storage.latest_snapshot();
189 let request_inner = request.into_inner();
190 let height = request_inner.height;
191 let trading_pair = request_inner
192 .trading_pair
193 .ok_or_else(|| Status::invalid_argument("missing trading_pair"))?
194 .try_into()
195 .map_err(|_| Status::invalid_argument("invalid trading_pair"))?;
196
197 let swap_execution = state
198 .swap_execution(height, trading_pair)
199 .await
200 .map_err(|e| tonic::Status::internal(e.to_string()))?;
201
202 match swap_execution {
203 Some(swap_execution) => Ok(tonic::Response::new(SwapExecutionResponse {
204 swap_execution: Some(swap_execution.into()),
205 })),
206 None => Err(Status::not_found("batch swap output data not found")),
207 }
208 }
209
210 #[instrument(skip(self, request))]
211 async fn candlestick_data(
212 &self,
213 request: tonic::Request<CandlestickDataRequest>,
214 ) -> Result<tonic::Response<CandlestickDataResponse>, Status> {
215 let state = self.storage.latest_snapshot();
216 let limit = std::cmp::min(request.get_ref().limit, 20_000u64);
219 let start_height = match request.get_ref().start_height {
220 0 => {
221 let current_height = state.version();
223 current_height.saturating_sub(limit)
224 }
225 start_height => start_height,
226 };
227
228 let pair: DirectedTradingPair = request
229 .get_ref()
230 .pair
231 .clone()
232 .ok_or_else(|| Status::invalid_argument("missing trading_pair"))?
233 .try_into()
234 .map_err(|_| Status::invalid_argument("invalid trading_pair"))?;
235
236 let candlesticks = state
237 .candlesticks(&pair, start_height, limit as usize)
238 .await
239 .map_err(|e| tonic::Status::internal(e.to_string()))?;
240
241 Ok(tonic::Response::new(CandlestickDataResponse {
242 data: candlesticks.into_iter().map(Into::into).collect(),
243 }))
244 }
245
246 async fn candlestick_data_stream(
247 &self,
248 request: tonic::Request<CandlestickDataStreamRequest>,
249 ) -> Result<tonic::Response<Self::CandlestickDataStreamStream>, Status> {
250 let pair: DirectedTradingPair = request
251 .get_ref()
252 .pair
253 .clone()
254 .ok_or_else(|| Status::invalid_argument("missing trading_pair"))?
255 .try_into()
256 .map_err(|_| Status::invalid_argument("invalid trading_pair"))?;
257
258 let (tx_candle, rx_candle) = mpsc::channel::<CandlestickData>(1);
259 let storage = self.storage.clone();
260 tokio::spawn(async move {
261 let mut rx_state_snapshot = storage.subscribe();
264 loop {
265 rx_state_snapshot
266 .changed()
267 .await
268 .expect("channel should be open");
269 let snapshot = rx_state_snapshot.borrow().clone();
270 let height = snapshot.version();
271 match snapshot.get_candlestick(&pair, height).await? {
272 Some(candle) => tx_candle.send(candle).await?,
273 None => {
274 if tx_candle.is_closed() {
277 break;
278 }
279 }
280 }
281 }
282 Ok::<_, anyhow::Error>(())
283 });
284
285 Ok(tonic::Response::new(
286 tokio_stream::wrappers::ReceiverStream::new(rx_candle)
287 .map(|candle| {
288 Ok(CandlestickDataStreamResponse {
289 data: Some(candle.into()),
290 })
291 })
292 .boxed(),
293 ))
294 }
295
296 #[instrument(skip(self, request))]
297 async fn swap_executions(
298 &self,
299 request: tonic::Request<SwapExecutionsRequest>,
300 ) -> Result<tonic::Response<Self::SwapExecutionsStream>, Status> {
301 let state = self.storage.latest_snapshot();
302
303 let request_inner = request.into_inner();
304 let start_height = request_inner.start_height;
305 let end_height = request_inner.end_height;
306 let trading_pair = request_inner.trading_pair;
307
308 let trading_pair: Option<DirectedTradingPair> =
310 trading_pair.map(|trading_pair| trading_pair.try_into().expect("invalid trading pair"));
311
312 let s = state.nonverifiable_prefix(&state_key::swap_executions().as_bytes());
313 Ok(tonic::Response::new(
314 s.filter_map(move |i: anyhow::Result<(Vec<u8>, SwapExecution)>| {
315 async move {
316 if i.is_err() {
317 return Some(Err(tonic::Status::unavailable(format!(
318 "error getting prefix value from storage: {}",
319 i.expect_err("i is_err")
320 ))));
321 }
322
323 let (key, swap_execution) = i.expect("i is Ok");
324
325 let key = std::str::from_utf8(&key)
326 .expect("state key for swap executions should be valid utf-8 string");
327
328 let parts = key.split('/').collect::<Vec<_>>();
329 let height = parts[2].parse().expect("height is not a number");
330 let asset_1: asset::Id =
331 parts[3].parse().expect("asset id formatted improperly");
332 let asset_2: asset::Id =
333 parts[4].parse().expect("asset id formatted improperly");
334
335 let swap_trading_pair = DirectedTradingPair::new(asset_1, asset_2);
336
337 if let Some(trading_pair) = trading_pair {
338 if swap_trading_pair != trading_pair {
340 return None;
341 }
342 }
343
344 if height < start_height || height > end_height {
349 None
350 } else {
351 Some(Ok(SwapExecutionsResponse {
352 swap_execution: Some(swap_execution.into()),
353 height,
354 trading_pair: Some(swap_trading_pair.into()),
355 }))
356 }
357 }
358 })
359 .boxed(),
362 ))
363 }
364
365 async fn spread(
366 &self,
367 request: tonic::Request<SpreadRequest>,
368 ) -> Result<tonic::Response<SpreadResponse>, Status> {
369 let state = self.storage.latest_snapshot();
370 let request = request.into_inner();
371
372 let pair: TradingPair = request
373 .trading_pair
374 .ok_or_else(|| tonic::Status::invalid_argument("missing trading pair"))?
375 .try_into()
376 .map_err(|e| {
377 tonic::Status::invalid_argument(format!("error parsing trading pair: {:#}", e))
378 })?;
379
380 let pair12 = DirectedTradingPair {
381 start: pair.asset_1(),
382 end: pair.asset_2(),
383 };
384 let pair21 = DirectedTradingPair {
385 start: pair.asset_2(),
386 end: pair.asset_1(),
387 };
388 let best_1_to_2_position = state
389 .best_position(&pair12)
390 .await
391 .map_err(|e| {
392 tonic::Status::internal(format!(
393 "error finding best position for {:?}: {:#}",
394 pair12, e
395 ))
396 })?
397 .map(|(_, p)| p);
398 let best_2_to_1_position = state
399 .best_position(&pair12)
400 .await
401 .map_err(|e| {
402 tonic::Status::internal(format!(
403 "error finding best position for {:?}: {:#}",
404 pair21, e
405 ))
406 })?
407 .map(|(_, p)| p);
408
409 let approx_effective_price_1_to_2 = best_1_to_2_position
410 .as_ref()
411 .map(|p| {
412 p.phi
413 .orient_start(pair.asset_1())
414 .expect("position has one end = asset 1")
415 .effective_price()
416 .into()
417 })
418 .unwrap_or_default();
419
420 let approx_effective_price_2_to_1 = best_2_to_1_position
421 .as_ref()
422 .map(|p| {
423 p.phi
424 .orient_start(pair.asset_2())
425 .expect("position has one end = asset 2")
426 .effective_price()
427 .into()
428 })
429 .unwrap_or_default();
430
431 Ok(tonic::Response::new(SpreadResponse {
432 best_1_to_2_position: best_1_to_2_position.map(Into::into),
433 best_2_to_1_position: best_2_to_1_position.map(Into::into),
434 approx_effective_price_1_to_2,
435 approx_effective_price_2_to_1,
436 }))
437 }
438
439 #[instrument(skip(self, request))]
440 async fn liquidity_positions_by_price(
441 &self,
442 request: tonic::Request<LiquidityPositionsByPriceRequest>,
443 ) -> Result<tonic::Response<Self::LiquidityPositionsByPriceStream>, Status> {
444 let state = self.storage.latest_snapshot();
445 let request = request.into_inner();
446
447 let pair: DirectedTradingPair = request
448 .trading_pair
449 .ok_or_else(|| tonic::Status::invalid_argument("missing directed trading pair"))?
450 .try_into()
451 .map_err(|e| {
452 tonic::Status::invalid_argument(format!(
453 "error parsing directed trading pair: {:#}",
454 e
455 ))
456 })?;
457
458 let limit = if request.limit != 0 {
459 request.limit as usize
460 } else {
461 usize::MAX
462 };
463
464 let s = state
465 .positions_by_price(&pair)
466 .take(limit)
467 .map_ok(|(id, position)| LiquidityPositionsByPriceResponse {
468 data: Some(position.into()),
469 id: Some(id.into()),
470 })
471 .map_err(|e: anyhow::Error| {
472 tonic::Status::internal(format!("error retrieving positions: {:#}", e))
473 });
474 Ok(tonic::Response::new(s.boxed()))
476 }
477
478 #[instrument(skip(self, request))]
479 async fn liquidity_positions(
480 &self,
481 request: tonic::Request<LiquidityPositionsRequest>,
482 ) -> Result<tonic::Response<Self::LiquidityPositionsStream>, Status> {
483 let state = self.storage.latest_snapshot();
484
485 let include_closed = request.get_ref().include_closed;
486 let s = state.all_positions();
487 Ok(tonic::Response::new(
488 s.filter(move |item| {
489 use crate::lp::position::State;
490 let keep = match item {
491 Ok(position) => {
492 if position.state == State::Opened {
493 true
494 } else {
495 include_closed
496 }
497 }
498 Err(_) => false,
499 };
500 futures::future::ready(keep)
501 })
502 .map_ok(|i: Position| LiquidityPositionsResponse {
503 data: Some(i.into()),
504 })
505 .map_err(|e: anyhow::Error| {
506 tonic::Status::unavailable(format!("error getting prefix value from storage: {e}"))
507 })
508 .boxed(),
511 ))
512 }
513
514 #[instrument(skip(self, request))]
515 async fn liquidity_position_by_id(
516 &self,
517 request: tonic::Request<LiquidityPositionByIdRequest>,
518 ) -> Result<tonic::Response<LiquidityPositionByIdResponse>, Status> {
519 let state = self.storage.latest_snapshot();
520
521 let position_id: position::Id = request
522 .into_inner()
523 .position_id
524 .ok_or_else(|| Status::invalid_argument("empty message"))?
525 .try_into()
526 .map_err(|e: anyhow::Error| {
527 tonic::Status::invalid_argument(format!("error converting position_id: {e}"))
528 })?;
529
530 let position = state
531 .position_by_id(&position_id)
532 .await
533 .map_err(|e: anyhow::Error| {
534 tonic::Status::unavailable(format!("error fetching position from storage: {e}"))
535 })?
536 .ok_or_else(|| Status::not_found("position not found"))?;
537
538 Ok(tonic::Response::new(LiquidityPositionByIdResponse {
539 data: Some(position.into()),
540 }))
541 }
542
543 #[instrument(skip(self, request))]
544 async fn liquidity_positions_by_id(
545 &self,
546 request: tonic::Request<LiquidityPositionsByIdRequest>,
547 ) -> Result<tonic::Response<Self::LiquidityPositionsByIdStream>, Status> {
548 let state = self.storage.latest_snapshot();
549
550 let position_ids: Vec<position::Id> = request
551 .into_inner()
552 .position_id
553 .into_iter()
554 .map(TryInto::try_into)
555 .collect::<anyhow::Result<Vec<_>>>()
556 .map_err(|e: anyhow::Error| {
557 tonic::Status::invalid_argument(format!("error converting position_id: {e}"))
558 })?;
559
560 let s = try_stream! {
561 for position_id in position_ids {
562 let position = state
563 .position_by_id(&position_id)
564 .await
565 .map_err(|e: anyhow::Error| {
566 tonic::Status::unavailable(format!("error fetching position from storage: {e}"))
567 })?
568 .ok_or_else(|| Status::not_found("position not found"))?;
569
570 yield position.to_proto();
571 }
572 };
573 Ok(tonic::Response::new(
574 s.map_ok(
575 |p: penumbra_sdk_proto::core::component::dex::v1::Position| {
576 LiquidityPositionsByIdResponse { data: Some(p) }
577 },
578 )
579 .map_err(|e: anyhow::Error| {
580 tonic::Status::unavailable(format!(
581 "error getting position value from storage: {e}"
582 ))
583 })
584 .boxed(),
587 ))
588 }
589}
590
591#[tonic::async_trait]
592impl SimulationService for Server {
593 async fn simulate_trade(
594 &self,
595 request: tonic::Request<SimulateTradeRequest>,
596 ) -> Result<tonic::Response<SimulateTradeResponse>, Status> {
597 let request = request.into_inner();
598 let routing_stategy = match request.routing {
599 None => Routing {
600 setting: Some(Setting::Default(routing::Default {})),
601 },
602 Some(routing) => routing,
603 };
604
605 let routing_strategy = match routing_stategy.setting {
606 None => Setting::Default(routing::Default {}),
607 Some(setting) => setting,
608 };
609
610 let input: Value = request
611 .input
612 .ok_or_else(|| tonic::Status::invalid_argument("missing input parameter"))?
613 .try_into()
614 .map_err(|e| {
615 tonic::Status::invalid_argument(format!("error parsing input: {:#}", e))
616 })?;
617
618 let output_id = request
619 .output
620 .ok_or_else(|| tonic::Status::invalid_argument("missing output id parameter"))?
621 .try_into()
622 .map_err(|e| {
623 tonic::Status::invalid_argument(format!("error parsing output id: {:#}", e))
624 })?;
625
626 let start_time = std::time::Instant::now();
627 let state = self.storage.latest_snapshot();
628
629 let mut routing_params = state
630 .routing_params()
631 .await
632 .expect("dex routing params are set");
633 match routing_strategy {
634 Setting::SingleHop(_) => {
635 routing_params.max_hops = 1;
636 }
637 Setting::Default(_) => {
638 }
640 }
641
642 let execution_budget = state
643 .get_dex_params()
644 .await
645 .expect("dex parameters are set")
646 .max_execution_budget;
647
648 let mut state_tx = Arc::new(StateDelta::new(state));
649 let execution_circuit_breaker = ExecutionCircuitBreaker::new(execution_budget);
650
651 let swap_execution = match state_tx
652 .route_and_fill(
653 input.asset_id,
654 output_id,
655 input.amount,
656 routing_params,
657 execution_circuit_breaker,
658 )
659 .await
660 .map_err(|e| tonic::Status::internal(format!("error simulating trade: {:#}", e)))?
661 {
662 Some(swap_execution) => swap_execution,
663 None => SwapExecution {
664 traces: vec![],
665 input: Value {
666 amount: 0u64.into(),
667 asset_id: input.asset_id,
668 },
669 output: Value {
670 amount: 0u64.into(),
671 asset_id: output_id,
672 },
673 },
674 };
675
676 let unfilled = Value {
677 amount: input
678 .amount
679 .checked_sub(&swap_execution.input.amount)
680 .ok_or_else(|| {
681 tonic::Status::failed_precondition(
682 "swap execution input amount is larger than request input amount"
683 .to_string(),
684 )
685 })?,
686 asset_id: input.asset_id,
687 };
688
689 let rsp = tonic::Response::new(SimulateTradeResponse {
690 unfilled: Some(unfilled.into()),
691 output: Some(swap_execution.into()),
692 });
693
694 let duration = start_time.elapsed();
695
696 metrics::histogram!(metrics::DEX_RPC_SIMULATE_TRADE_DURATION).record(duration);
697
698 Ok(rsp)
699 }
700}