pcli/command/query/
dex.rs

1use anyhow::{Context, Result};
2use comfy_table::{presets, Table};
3use futures::{Stream, StreamExt, TryStreamExt};
4use std::pin::Pin;
5
6use penumbra_sdk_asset::{asset, asset::Metadata, Value};
7use penumbra_sdk_dex::{
8    lp::position::{self, Position},
9    BatchSwapOutputData, DirectedTradingPair, SwapExecution, TradingPair,
10};
11use penumbra_sdk_proto::core::component::{
12    dex::v1::{
13        query_service_client::QueryServiceClient as DexQueryServiceClient,
14        simulation_service_client::SimulationServiceClient, ArbExecutionRequest,
15        BatchSwapOutputDataRequest, LiquidityPositionByIdRequest, LiquidityPositionsByPriceRequest,
16        LiquidityPositionsRequest, SimulateTradeRequest, SwapExecutionRequest,
17    },
18    shielded_pool::v1::{
19        query_service_client::QueryServiceClient as ShieldedPoolQueryServiceClient,
20        AssetMetadataByIdRequest,
21    },
22};
23use penumbra_sdk_view::ViewClient;
24use tonic::transport::Channel;
25
26use crate::{
27    command::utils::{self, render_positions},
28    App,
29};
30
31#[derive(Debug, clap::Subcommand)]
32pub enum DexCmd {
33    /// Display information about a specific trading pair & height's batch swap.
34    BatchOutputs {
35        /// The height to query for batch outputs.
36        #[clap(long)]
37        height: u64,
38        /// The trading pair to query for batch outputs.
39        /// Pairs must be specified with a colon separating them, e.g. "penumbra:test_usd".
40        #[clap(value_name = "asset_1:asset_2")]
41        trading_pair: TradingPair,
42    },
43    /// Display information about a specific trading pair & height's swap execution.
44    SwapExecution {
45        /// The height to query for the swap execution.
46        #[clap(long)]
47        height: u64,
48        /// The trading pair to query for the swap execution.
49        /// Pairs must be specified with a colon separating them, e.g. "penumbra:test_usd".
50        #[clap(value_name = "asset_1:asset_2")]
51        trading_pair: DirectedTradingPair,
52    },
53    /// Display information about an arb execution at a specific height.
54    #[clap(visible_alias = "arb")]
55    ArbExecution {
56        /// The height to query for the swap execution.
57        #[clap(long)]
58        height: u64,
59    },
60    /// Display information about all liquidity positions known to the chain.
61    #[clap(display_order(900))]
62    AllPositions {
63        /// Display closed and withdrawn liquidity positions as well as open ones.
64        #[clap(long)]
65        include_closed: bool,
66    },
67    /// Display a single position's state given a position ID.
68    Position {
69        /// The ID of the position to query.
70        id: position::Id,
71        /// If set, output raw JSON instead of a table.
72        #[clap(long)]
73        raw: bool,
74    },
75    /// Display open liquidity for a given pair, sorted by effective price.
76    Positions {
77        /// The trading pair to query, with ordering determining direction of trade (1=>2).
78        /// Pairs must be specified with a colon separating them, e.g. "penumbra:test_usd".
79        #[clap(value_name = "asset_1:asset_2")]
80        trading_pair: DirectedTradingPair,
81        /// A limit on the number of positions to display.
82        #[clap(long)]
83        limit: Option<u64>,
84    },
85    /// Simulates execution of a trade against the current DEX state.
86    Simulate {
87        /// The input amount to swap, written as a typed value 1.87penumbra, 12cubes, etc.
88        input: String,
89        /// The denomination to swap the input into, e.g. `gm`
90        #[clap(long, display_order = 100)]
91        into: String,
92    },
93}
94
95impl DexCmd {
96    pub async fn get_batch_outputs(
97        &self,
98        app: &mut App,
99        height: &u64,
100        trading_pair: &TradingPair,
101    ) -> Result<BatchSwapOutputData> {
102        let mut client = DexQueryServiceClient::new(app.pd_channel().await?);
103        client
104            .batch_swap_output_data(BatchSwapOutputDataRequest {
105                height: *height,
106                trading_pair: Some((*trading_pair).into()),
107            })
108            .await?
109            .into_inner()
110            .try_into()
111            .context("cannot parse batch swap output data")
112    }
113
114    pub async fn get_swap_execution(
115        &self,
116        app: &mut App,
117        height: &u64,
118        trading_pair: &DirectedTradingPair,
119    ) -> Result<SwapExecution> {
120        let mut client = DexQueryServiceClient::new(app.pd_channel().await?);
121
122        client
123            .swap_execution(SwapExecutionRequest {
124                height: *height,
125                trading_pair: Some((*trading_pair).into()),
126                ..Default::default()
127            })
128            .await?
129            .into_inner()
130            .swap_execution
131            .ok_or_else(|| anyhow::anyhow!("proto response missing swap execution"))?
132            .try_into()
133            .context("cannot parse batch swap output data")
134    }
135
136    pub async fn get_arb_execution(&self, app: &mut App, height: &u64) -> Result<SwapExecution> {
137        let mut client = DexQueryServiceClient::new(app.pd_channel().await?);
138        client
139            .arb_execution(ArbExecutionRequest {
140                height: *height,
141                ..Default::default()
142            })
143            .await?
144            .into_inner()
145            .swap_execution
146            .ok_or_else(|| anyhow::anyhow!("proto response missing arb execution"))?
147            .try_into()
148            .context("cannot parse batch swap output data")
149    }
150
151    pub async fn get_simulated_execution(
152        &self,
153        app: &mut App,
154        input: Value,
155        output: asset::Id,
156    ) -> Result<SwapExecution> {
157        use penumbra_sdk_proto::core::component::dex::v1::simulate_trade_request::{
158            routing::Setting, Routing,
159        };
160        let mut client = SimulationServiceClient::new(app.pd_channel().await?);
161        client
162            .simulate_trade(SimulateTradeRequest {
163                input: Some(input.into()),
164                output: Some(output.into()),
165                routing: Some(Routing {
166                    setting: Some(Setting::Default(Default::default())),
167                }),
168            })
169            .await?
170            .into_inner()
171            .output
172            .ok_or_else(|| anyhow::anyhow!("proto response missing swap execution"))?
173            .try_into()
174            .context("cannot parse simulation response")
175    }
176
177    pub async fn get_all_liquidity_positions(
178        &self,
179        mut client: DexQueryServiceClient<Channel>,
180        include_closed: bool,
181    ) -> Result<Pin<Box<dyn Stream<Item = Result<Position>> + Send + 'static>>> {
182        let stream = client.liquidity_positions(LiquidityPositionsRequest { include_closed });
183        let stream = stream.await?.into_inner();
184
185        Ok(stream
186            .map_err(|e| anyhow::anyhow!("error fetching liquidity positions: {}", e))
187            .and_then(|msg| async move {
188                msg.data
189                    .ok_or_else(|| anyhow::anyhow!("missing liquidity position in response data"))
190                    .map(Position::try_from)?
191            })
192            .boxed())
193    }
194
195    pub async fn get_liquidity_positions_by_price(
196        &self,
197        mut client: DexQueryServiceClient<Channel>,
198        pair: DirectedTradingPair,
199        limit: Option<u64>,
200    ) -> Result<Pin<Box<dyn Stream<Item = Result<Position>> + Send + 'static>>> {
201        let stream = client.liquidity_positions_by_price(LiquidityPositionsByPriceRequest {
202            trading_pair: Some(pair.into()),
203            limit: limit.unwrap_or_default(),
204            ..Default::default()
205        });
206        let stream = stream.await?.into_inner();
207
208        Ok(stream
209            .map_err(|e| anyhow::anyhow!("error fetching liquidity positions: {}", e))
210            .and_then(|msg| async move {
211                msg.data
212                    .ok_or_else(|| anyhow::anyhow!("missing liquidity position in response data"))
213                    .map(Position::try_from)?
214            })
215            .boxed())
216    }
217
218    pub async fn print_swap_execution(
219        &self,
220        app: &mut App,
221        swap_execution: &SwapExecution,
222    ) -> Result<()> {
223        let cache = app.view().assets().await?;
224
225        println!(
226            "{} => {} via:",
227            swap_execution.input.format(&cache),
228            swap_execution.output.format(&cache),
229        );
230
231        // Try to make a nice table of execution traces. To do this, first find
232        // the max length of any subtrace:
233        let max_trace_len = swap_execution
234            .traces
235            .iter()
236            .map(|trace| trace.len())
237            .max()
238            .unwrap_or(0);
239
240        // Spacer | trace hops | trace price
241        let column_count = 1 + max_trace_len + 1;
242
243        let mut table = Table::new();
244        table.load_preset(presets::NOTHING);
245        let mut headers = vec![""; column_count];
246        headers[1] = "Trace";
247        headers[column_count - 1] = "Subprice";
248        table.set_header(headers);
249
250        let price_string = |input: Value, output: Value| -> String {
251            use penumbra_sdk_dex::lp::SellOrder;
252            format!(
253                "{}/{}",
254                SellOrder {
255                    offered: output,
256                    desired: input,
257                    fee: 0,
258                }
259                .price_str(&cache)
260                .expect("assets are known"),
261                // kind of hacky, this is assuming coincidency between price_str calcs
262                // and this code
263                Value {
264                    asset_id: output.asset_id,
265                    amount: cache
266                        .get(&output.asset_id)
267                        .expect("asset ID should exist in the cache")
268                        .default_unit()
269                        .unit_amount(),
270                }
271                .format(&cache)
272            )
273        };
274
275        for trace in &swap_execution.traces {
276            let mut row = vec![String::new(); column_count];
277            // Put all but the last element of the trace in the columns, left-to-right
278            for i in 0..(trace.len() - 1) {
279                row[1 + i] = format!("{} =>", trace[i].format(&cache));
280            }
281            // Right-align the last element of the trace, in case subtraces have different lengths
282            row[column_count - 2] = trace
283                .last()
284                .context("trace should have elements")?
285                .format(&cache)
286                .to_string();
287            // Print the price in the last column.
288            row[column_count - 1] = price_string(
289                *trace.first().context("trace should have elements")?,
290                *trace.last().context("trace should have elements")?,
291            );
292
293            table.add_row(row);
294        }
295
296        println!("{}", table);
297
298        Ok(())
299    }
300
301    pub async fn print_batch_outputs(
302        &self,
303        app: &mut App,
304        height: &u64,
305        trading_pair: &TradingPair,
306    ) -> Result<()> {
307        let mut client = ShieldedPoolQueryServiceClient::new(app.pd_channel().await?);
308
309        let outputs = self.get_batch_outputs(app, height, trading_pair).await?;
310
311        let asset_1: Metadata = client
312            .asset_metadata_by_id(AssetMetadataByIdRequest {
313                asset_id: Some(trading_pair.asset_1().into()),
314            })
315            .await?
316            .into_inner()
317            .denom_metadata
318            .context("denom metadata for asset 1 not found")?
319            .try_into()?;
320        let asset_2: Metadata = client
321            .asset_metadata_by_id(AssetMetadataByIdRequest {
322                asset_id: Some(trading_pair.asset_2().into()),
323            })
324            .await?
325            .into_inner()
326            .denom_metadata
327            .context("denom metadata for asset 2 not found")?
328            .try_into()?;
329
330        let unit_1 = asset_1.default_unit();
331        let unit_2 = asset_2.default_unit();
332
333        let consumed_1 = outputs.delta_1 - outputs.unfilled_1;
334        let consumed_2 = outputs.delta_2 - outputs.unfilled_2;
335
336        println!("Batch Swap Outputs for height {}:", outputs.height);
337        println!(
338            "Trade {} => {}",
339            unit_1.format_value(outputs.delta_1),
340            unit_2
341        );
342        println!(
343            "\tOutput:         {} for {}",
344            unit_2.format_value(outputs.lambda_2),
345            unit_1.format_value(consumed_1)
346        );
347        println!(
348            "\tUnfilled Input: {}",
349            unit_1.format_value(outputs.unfilled_1)
350        );
351        println!(
352            "Trade {} => {}",
353            unit_2.format_value(outputs.delta_2),
354            unit_1
355        );
356        println!(
357            "\tOutput:         {} for {}",
358            unit_1.format_value(outputs.lambda_1),
359            unit_2.format_value(consumed_2)
360        );
361        println!(
362            "\tUnfilled Input: {}",
363            unit_2.format_value(outputs.unfilled_2)
364        );
365
366        Ok(())
367    }
368
369    pub async fn exec(&self, app: &mut App) -> Result<()> {
370        match self {
371            DexCmd::BatchOutputs {
372                height,
373                trading_pair,
374            } => {
375                self.print_batch_outputs(app, height, trading_pair).await?;
376            }
377            DexCmd::SwapExecution {
378                height,
379                trading_pair,
380            } => {
381                let swap_execution = self.get_swap_execution(app, height, trading_pair).await?;
382
383                self.print_swap_execution(app, &swap_execution).await?;
384            }
385            DexCmd::ArbExecution { height } => {
386                let swap_execution = self.get_arb_execution(app, height).await?;
387
388                self.print_swap_execution(app, &swap_execution).await?;
389            }
390            DexCmd::Simulate { input, into } => {
391                let input = input.parse::<Value>()?;
392                let into = asset::REGISTRY.parse_unit(into.as_str()).base();
393
394                let swap_execution = self.get_simulated_execution(app, input, into.id()).await?;
395                self.print_swap_execution(app, &swap_execution).await?;
396            }
397            DexCmd::AllPositions { include_closed } => {
398                let client = DexQueryServiceClient::new(app.pd_channel().await?);
399
400                let positions_stream = self
401                    .get_all_liquidity_positions(client.clone(), *include_closed)
402                    .await?;
403
404                let asset_cache = app.view().assets().await?;
405
406                let positions = positions_stream.try_collect::<Vec<_>>().await?;
407
408                println!("{}", utils::render_positions(&asset_cache, &positions));
409            }
410            DexCmd::Positions {
411                trading_pair,
412                limit,
413            } => {
414                let client = DexQueryServiceClient::new(app.pd_channel().await?);
415                let positions = self
416                    .get_liquidity_positions_by_price(client, *trading_pair, *limit)
417                    .await?
418                    .try_collect::<Vec<_>>()
419                    .await?;
420                let asset_cache = app.view().assets().await?;
421                println!("{}", render_positions(&asset_cache, &positions));
422            }
423            DexCmd::Position { id, raw } => {
424                let mut client = DexQueryServiceClient::new(app.pd_channel().await?);
425                let position: Position = client
426                    .liquidity_position_by_id(LiquidityPositionByIdRequest {
427                        position_id: Some((*id).into()),
428                        ..Default::default()
429                    })
430                    .await?
431                    .into_inner()
432                    .data
433                    .ok_or_else(|| anyhow::anyhow!("position not found"))?
434                    .try_into()?;
435
436                if *raw {
437                    println!("{}", serde_json::to_string_pretty(&position)?);
438                } else {
439                    let asset_cache = app.view().assets().await?;
440                    let mut table = Table::new();
441                    table.load_preset(presets::NOTHING);
442                    table.add_row(vec!["ID".to_string(), id.to_string()]);
443                    table.add_row(vec!["State".to_string(), position.state.to_string()]);
444                    table.add_row(vec![
445                        "Reserves 1".to_string(),
446                        Value {
447                            asset_id: position.phi.pair.asset_1(),
448                            amount: position.reserves.r1,
449                        }
450                        .format(&asset_cache),
451                    ]);
452                    table.add_row(vec![
453                        "Reserves 2".to_string(),
454                        Value {
455                            asset_id: position.phi.pair.asset_2(),
456                            amount: position.reserves.r2,
457                        }
458                        .format(&asset_cache),
459                    ]);
460                    table.add_row(vec![
461                        "Fee".to_string(),
462                        format!("{}bps", position.phi.component.fee),
463                    ]);
464                    table.add_row(vec![
465                        "p".to_string(),
466                        position.phi.component.p.value().to_string(),
467                    ]);
468                    table.add_row(vec![
469                        "q".to_string(),
470                        position.phi.component.q.value().to_string(),
471                    ]);
472                    table.add_row(vec!["Nonce".to_string(), hex::encode(position.nonce)]);
473                    println!("{}", table);
474                }
475            }
476        };
477
478        Ok(())
479    }
480}