1use anyhow::{Context, Result};
2use comfy_table::{presets, Table};
3use futures::{Stream, StreamExt, TryStreamExt};
4use std::pin::Pin;
5
6use penumbra_sdk_asset::{asset, asset::Metadata, Value};
7use penumbra_sdk_dex::{
8 lp::position::{self, Position},
9 BatchSwapOutputData, DirectedTradingPair, SwapExecution, TradingPair,
10};
11use penumbra_sdk_proto::core::component::{
12 dex::v1::{
13 query_service_client::QueryServiceClient as DexQueryServiceClient,
14 simulation_service_client::SimulationServiceClient, ArbExecutionRequest,
15 BatchSwapOutputDataRequest, LiquidityPositionByIdRequest, LiquidityPositionsByPriceRequest,
16 LiquidityPositionsRequest, SimulateTradeRequest, SwapExecutionRequest,
17 },
18 shielded_pool::v1::{
19 query_service_client::QueryServiceClient as ShieldedPoolQueryServiceClient,
20 AssetMetadataByIdRequest,
21 },
22};
23use penumbra_sdk_view::ViewClient;
24use tonic::transport::Channel;
25
26use crate::{
27 command::utils::{self, render_positions},
28 App,
29};
30
31#[derive(Debug, clap::Subcommand)]
32pub enum DexCmd {
33 BatchOutputs {
35 #[clap(long)]
37 height: u64,
38 #[clap(value_name = "asset_1:asset_2")]
41 trading_pair: TradingPair,
42 },
43 SwapExecution {
45 #[clap(long)]
47 height: u64,
48 #[clap(value_name = "asset_1:asset_2")]
51 trading_pair: DirectedTradingPair,
52 },
53 #[clap(visible_alias = "arb")]
55 ArbExecution {
56 #[clap(long)]
58 height: u64,
59 },
60 #[clap(display_order(900))]
62 AllPositions {
63 #[clap(long)]
65 include_closed: bool,
66 },
67 Position {
69 id: position::Id,
71 #[clap(long)]
73 raw: bool,
74 },
75 Positions {
77 #[clap(value_name = "asset_1:asset_2")]
80 trading_pair: DirectedTradingPair,
81 #[clap(long)]
83 limit: Option<u64>,
84 },
85 Simulate {
87 input: String,
89 #[clap(long, display_order = 100)]
91 into: String,
92 },
93}
94
95impl DexCmd {
96 pub async fn get_batch_outputs(
97 &self,
98 app: &mut App,
99 height: &u64,
100 trading_pair: &TradingPair,
101 ) -> Result<BatchSwapOutputData> {
102 let mut client = DexQueryServiceClient::new(app.pd_channel().await?);
103 client
104 .batch_swap_output_data(BatchSwapOutputDataRequest {
105 height: *height,
106 trading_pair: Some((*trading_pair).into()),
107 })
108 .await?
109 .into_inner()
110 .try_into()
111 .context("cannot parse batch swap output data")
112 }
113
114 pub async fn get_swap_execution(
115 &self,
116 app: &mut App,
117 height: &u64,
118 trading_pair: &DirectedTradingPair,
119 ) -> Result<SwapExecution> {
120 let mut client = DexQueryServiceClient::new(app.pd_channel().await?);
121
122 client
123 .swap_execution(SwapExecutionRequest {
124 height: *height,
125 trading_pair: Some((*trading_pair).into()),
126 ..Default::default()
127 })
128 .await?
129 .into_inner()
130 .swap_execution
131 .ok_or_else(|| anyhow::anyhow!("proto response missing swap execution"))?
132 .try_into()
133 .context("cannot parse batch swap output data")
134 }
135
136 pub async fn get_arb_execution(&self, app: &mut App, height: &u64) -> Result<SwapExecution> {
137 let mut client = DexQueryServiceClient::new(app.pd_channel().await?);
138 client
139 .arb_execution(ArbExecutionRequest {
140 height: *height,
141 ..Default::default()
142 })
143 .await?
144 .into_inner()
145 .swap_execution
146 .ok_or_else(|| anyhow::anyhow!("proto response missing arb execution"))?
147 .try_into()
148 .context("cannot parse batch swap output data")
149 }
150
151 pub async fn get_simulated_execution(
152 &self,
153 app: &mut App,
154 input: Value,
155 output: asset::Id,
156 ) -> Result<SwapExecution> {
157 use penumbra_sdk_proto::core::component::dex::v1::simulate_trade_request::{
158 routing::Setting, Routing,
159 };
160 let mut client = SimulationServiceClient::new(app.pd_channel().await?);
161 client
162 .simulate_trade(SimulateTradeRequest {
163 input: Some(input.into()),
164 output: Some(output.into()),
165 routing: Some(Routing {
166 setting: Some(Setting::Default(Default::default())),
167 }),
168 })
169 .await?
170 .into_inner()
171 .output
172 .ok_or_else(|| anyhow::anyhow!("proto response missing swap execution"))?
173 .try_into()
174 .context("cannot parse simulation response")
175 }
176
177 pub async fn get_all_liquidity_positions(
178 &self,
179 mut client: DexQueryServiceClient<Channel>,
180 include_closed: bool,
181 ) -> Result<Pin<Box<dyn Stream<Item = Result<Position>> + Send + 'static>>> {
182 let stream = client.liquidity_positions(LiquidityPositionsRequest { include_closed });
183 let stream = stream.await?.into_inner();
184
185 Ok(stream
186 .map_err(|e| anyhow::anyhow!("error fetching liquidity positions: {}", e))
187 .and_then(|msg| async move {
188 msg.data
189 .ok_or_else(|| anyhow::anyhow!("missing liquidity position in response data"))
190 .map(Position::try_from)?
191 })
192 .boxed())
193 }
194
195 pub async fn get_liquidity_positions_by_price(
196 &self,
197 mut client: DexQueryServiceClient<Channel>,
198 pair: DirectedTradingPair,
199 limit: Option<u64>,
200 ) -> Result<Pin<Box<dyn Stream<Item = Result<Position>> + Send + 'static>>> {
201 let stream = client.liquidity_positions_by_price(LiquidityPositionsByPriceRequest {
202 trading_pair: Some(pair.into()),
203 limit: limit.unwrap_or_default(),
204 ..Default::default()
205 });
206 let stream = stream.await?.into_inner();
207
208 Ok(stream
209 .map_err(|e| anyhow::anyhow!("error fetching liquidity positions: {}", e))
210 .and_then(|msg| async move {
211 msg.data
212 .ok_or_else(|| anyhow::anyhow!("missing liquidity position in response data"))
213 .map(Position::try_from)?
214 })
215 .boxed())
216 }
217
218 pub async fn print_swap_execution(
219 &self,
220 app: &mut App,
221 swap_execution: &SwapExecution,
222 ) -> Result<()> {
223 let cache = app.view().assets().await?;
224
225 println!(
226 "{} => {} via:",
227 swap_execution.input.format(&cache),
228 swap_execution.output.format(&cache),
229 );
230
231 let max_trace_len = swap_execution
234 .traces
235 .iter()
236 .map(|trace| trace.len())
237 .max()
238 .unwrap_or(0);
239
240 let column_count = 1 + max_trace_len + 1;
242
243 let mut table = Table::new();
244 table.load_preset(presets::NOTHING);
245 let mut headers = vec![""; column_count];
246 headers[1] = "Trace";
247 headers[column_count - 1] = "Subprice";
248 table.set_header(headers);
249
250 let price_string = |input: Value, output: Value| -> String {
251 use penumbra_sdk_dex::lp::SellOrder;
252 format!(
253 "{}/{}",
254 SellOrder {
255 offered: output,
256 desired: input,
257 fee: 0,
258 }
259 .price_str(&cache)
260 .expect("assets are known"),
261 Value {
264 asset_id: output.asset_id,
265 amount: cache
266 .get(&output.asset_id)
267 .expect("asset ID should exist in the cache")
268 .default_unit()
269 .unit_amount(),
270 }
271 .format(&cache)
272 )
273 };
274
275 for trace in &swap_execution.traces {
276 let mut row = vec![String::new(); column_count];
277 for i in 0..(trace.len() - 1) {
279 row[1 + i] = format!("{} =>", trace[i].format(&cache));
280 }
281 row[column_count - 2] = trace
283 .last()
284 .context("trace should have elements")?
285 .format(&cache)
286 .to_string();
287 row[column_count - 1] = price_string(
289 *trace.first().context("trace should have elements")?,
290 *trace.last().context("trace should have elements")?,
291 );
292
293 table.add_row(row);
294 }
295
296 println!("{}", table);
297
298 Ok(())
299 }
300
301 pub async fn print_batch_outputs(
302 &self,
303 app: &mut App,
304 height: &u64,
305 trading_pair: &TradingPair,
306 ) -> Result<()> {
307 let mut client = ShieldedPoolQueryServiceClient::new(app.pd_channel().await?);
308
309 let outputs = self.get_batch_outputs(app, height, trading_pair).await?;
310
311 let asset_1: Metadata = client
312 .asset_metadata_by_id(AssetMetadataByIdRequest {
313 asset_id: Some(trading_pair.asset_1().into()),
314 })
315 .await?
316 .into_inner()
317 .denom_metadata
318 .context("denom metadata for asset 1 not found")?
319 .try_into()?;
320 let asset_2: Metadata = client
321 .asset_metadata_by_id(AssetMetadataByIdRequest {
322 asset_id: Some(trading_pair.asset_2().into()),
323 })
324 .await?
325 .into_inner()
326 .denom_metadata
327 .context("denom metadata for asset 2 not found")?
328 .try_into()?;
329
330 let unit_1 = asset_1.default_unit();
331 let unit_2 = asset_2.default_unit();
332
333 let consumed_1 = outputs.delta_1 - outputs.unfilled_1;
334 let consumed_2 = outputs.delta_2 - outputs.unfilled_2;
335
336 println!("Batch Swap Outputs for height {}:", outputs.height);
337 println!(
338 "Trade {} => {}",
339 unit_1.format_value(outputs.delta_1),
340 unit_2
341 );
342 println!(
343 "\tOutput: {} for {}",
344 unit_2.format_value(outputs.lambda_2),
345 unit_1.format_value(consumed_1)
346 );
347 println!(
348 "\tUnfilled Input: {}",
349 unit_1.format_value(outputs.unfilled_1)
350 );
351 println!(
352 "Trade {} => {}",
353 unit_2.format_value(outputs.delta_2),
354 unit_1
355 );
356 println!(
357 "\tOutput: {} for {}",
358 unit_1.format_value(outputs.lambda_1),
359 unit_2.format_value(consumed_2)
360 );
361 println!(
362 "\tUnfilled Input: {}",
363 unit_2.format_value(outputs.unfilled_2)
364 );
365
366 Ok(())
367 }
368
369 pub async fn exec(&self, app: &mut App) -> Result<()> {
370 match self {
371 DexCmd::BatchOutputs {
372 height,
373 trading_pair,
374 } => {
375 self.print_batch_outputs(app, height, trading_pair).await?;
376 }
377 DexCmd::SwapExecution {
378 height,
379 trading_pair,
380 } => {
381 let swap_execution = self.get_swap_execution(app, height, trading_pair).await?;
382
383 self.print_swap_execution(app, &swap_execution).await?;
384 }
385 DexCmd::ArbExecution { height } => {
386 let swap_execution = self.get_arb_execution(app, height).await?;
387
388 self.print_swap_execution(app, &swap_execution).await?;
389 }
390 DexCmd::Simulate { input, into } => {
391 let input = input.parse::<Value>()?;
392 let into = asset::REGISTRY.parse_unit(into.as_str()).base();
393
394 let swap_execution = self.get_simulated_execution(app, input, into.id()).await?;
395 self.print_swap_execution(app, &swap_execution).await?;
396 }
397 DexCmd::AllPositions { include_closed } => {
398 let client = DexQueryServiceClient::new(app.pd_channel().await?);
399
400 let positions_stream = self
401 .get_all_liquidity_positions(client.clone(), *include_closed)
402 .await?;
403
404 let asset_cache = app.view().assets().await?;
405
406 let positions = positions_stream.try_collect::<Vec<_>>().await?;
407
408 println!("{}", utils::render_positions(&asset_cache, &positions));
409 }
410 DexCmd::Positions {
411 trading_pair,
412 limit,
413 } => {
414 let client = DexQueryServiceClient::new(app.pd_channel().await?);
415 let positions = self
416 .get_liquidity_positions_by_price(client, *trading_pair, *limit)
417 .await?
418 .try_collect::<Vec<_>>()
419 .await?;
420 let asset_cache = app.view().assets().await?;
421 println!("{}", render_positions(&asset_cache, &positions));
422 }
423 DexCmd::Position { id, raw } => {
424 let mut client = DexQueryServiceClient::new(app.pd_channel().await?);
425 let position: Position = client
426 .liquidity_position_by_id(LiquidityPositionByIdRequest {
427 position_id: Some((*id).into()),
428 ..Default::default()
429 })
430 .await?
431 .into_inner()
432 .data
433 .ok_or_else(|| anyhow::anyhow!("position not found"))?
434 .try_into()?;
435
436 if *raw {
437 println!("{}", serde_json::to_string_pretty(&position)?);
438 } else {
439 let asset_cache = app.view().assets().await?;
440 let mut table = Table::new();
441 table.load_preset(presets::NOTHING);
442 table.add_row(vec!["ID".to_string(), id.to_string()]);
443 table.add_row(vec!["State".to_string(), position.state.to_string()]);
444 table.add_row(vec![
445 "Reserves 1".to_string(),
446 Value {
447 asset_id: position.phi.pair.asset_1(),
448 amount: position.reserves.r1,
449 }
450 .format(&asset_cache),
451 ]);
452 table.add_row(vec![
453 "Reserves 2".to_string(),
454 Value {
455 asset_id: position.phi.pair.asset_2(),
456 amount: position.reserves.r2,
457 }
458 .format(&asset_cache),
459 ]);
460 table.add_row(vec![
461 "Fee".to_string(),
462 format!("{}bps", position.phi.component.fee),
463 ]);
464 table.add_row(vec![
465 "p".to_string(),
466 position.phi.component.p.value().to_string(),
467 ]);
468 table.add_row(vec![
469 "q".to_string(),
470 position.phi.component.q.value().to_string(),
471 ]);
472 table.add_row(vec!["Nonce".to_string(), hex::encode(position.nonce)]);
473 println!("{}", table);
474 }
475 }
476 };
477
478 Ok(())
479 }
480}