penumbra_sdk_dex/component/
position_manager.rs

1use std::future;
2use std::{pin::Pin, sync::Arc};
3
4use anyhow::{bail, ensure, Result};
5use async_stream::try_stream;
6use async_trait::async_trait;
7use cnidarium::{EscapedByteSlice, StateRead, StateWrite};
8use futures::Stream;
9use futures::StreamExt;
10use penumbra_sdk_asset::{asset, Balance};
11use penumbra_sdk_proto::DomainType;
12use penumbra_sdk_proto::{StateReadProto, StateWriteProto};
13use tap::Tap;
14use tracing::instrument;
15
16use crate::component::{
17    dex::InternalDexWrite,
18    dex::StateReadExt as _,
19    position_manager::{
20        base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex,
21        price_index::PositionByPriceIndex,
22    },
23};
24use crate::lp::Reserves;
25use crate::{
26    component::position_manager::counter::PositionCounter,
27    component::ValueCircuitBreaker,
28    lp::position::{self, Position},
29    state_key::engine,
30    DirectedTradingPair,
31};
32use crate::{event, state_key};
33
34use super::chandelier::Chandelier;
35
36const DYNAMIC_ASSET_LIMIT: usize = 10;
37
38mod base_liquidity_index;
39pub(crate) mod counter;
40pub(crate) mod inventory_index;
41pub(crate) mod price_index;
42
43#[async_trait]
44pub trait PositionRead: StateRead {
45    /// Return a stream of all [`position::Metadata`] available.
46    fn all_positions(
47        &self,
48    ) -> Pin<Box<dyn Stream<Item = Result<position::Position>> + Send + 'static>> {
49        let prefix = state_key::all_positions();
50        self.prefix(prefix)
51            .map(|entry| match entry {
52                Ok((_, metadata)) => {
53                    tracing::debug!(?metadata, "found position");
54                    Ok(metadata)
55                }
56                Err(e) => Err(e),
57            })
58            .boxed()
59    }
60
61    /// Returns a stream of [`position::Id`] ordered by effective price.
62    fn positions_by_price(
63        &self,
64        pair: &DirectedTradingPair,
65    ) -> Pin<Box<dyn Stream<Item = Result<(position::Id, position::Position)>> + Send + 'static>>
66    {
67        let prefix = engine::price_index::prefix(pair);
68        tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for positions by price");
69        self.nonverifiable_prefix(&prefix)
70            .map(|entry| match entry {
71                Ok((k, lp)) => {
72                    let raw_id = <&[u8; 32]>::try_from(&k[103..135])?.to_owned();
73                    Ok((position::Id(raw_id), lp))
74                }
75                Err(e) => Err(e),
76            })
77            .boxed()
78    }
79
80    async fn position_by_id(&self, id: &position::Id) -> Result<Option<position::Position>> {
81        self.get(&state_key::position_by_id(id)).await
82    }
83
84    async fn check_position_by_id(&self, id: &position::Id) -> bool {
85        self.get_raw(&state_key::position_by_id(id))
86            .await
87            .expect("no deserialization errors")
88            .is_some()
89    }
90
91    async fn best_position(
92        &self,
93        pair: &DirectedTradingPair,
94    ) -> Result<Option<(position::Id, position::Position)>> {
95        let mut positions_by_price = self.positions_by_price(pair);
96        positions_by_price.next().await.transpose()
97    }
98
99    /// Fetch the list of pending position closures.
100    fn pending_position_closures(&self) -> im::Vector<position::Id> {
101        self.object_get(state_key::pending_position_closures())
102            .unwrap_or_default()
103    }
104
105    /// Returns the list of candidate assets to route through for a trade from `from`.
106    /// Combines a list of fixed candidates with a list of liquidity-based candidates.
107    /// This ensures that the fixed candidates are always considered, minimizing
108    /// the risk of attacks on routing.
109    fn candidate_set(
110        &self,
111        from: asset::Id,
112        fixed_candidates: Arc<Vec<asset::Id>>,
113    ) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send>> {
114        // Clone the fixed candidates Arc so it can be moved into the stream filter's future.
115        let fc = fixed_candidates.clone();
116        let mut dynamic_candidates = self
117            .ordered_routable_assets(&from)
118            .filter(move |c| {
119                future::ready(!fc.contains(c.as_ref().expect("failed to fetch candidate")))
120            })
121            .take(DYNAMIC_ASSET_LIMIT);
122        try_stream! {
123            // First stream the fixed candidates, so those can be processed while the dynamic candidates are fetched.
124            for candidate in fixed_candidates.iter() {
125                yield candidate.clone();
126            }
127
128            // Yield the liquidity-based candidates. Note that this _may_ include some assets already included in the fixed set.
129            while let Some(candidate) = dynamic_candidates
130                .next().await {
131                    yield candidate.expect("failed to fetch candidate");
132            }
133        }
134        .boxed()
135    }
136
137    /// Returns a stream of [`asset::Id`] routable from a given asset, ordered by liquidity.
138    fn ordered_routable_assets(
139        &self,
140        start: &asset::Id,
141    ) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send + 'static>> {
142        let prefix = engine::routable_assets::starting_from(start);
143        tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for routable assets by liquidity");
144        self.nonverifiable_prefix_raw(&prefix)
145            .map(|entry| match entry {
146                Ok((_, v)) => Ok(asset::Id::decode(&*v)?),
147                Err(e) => Err(e),
148            })
149            .boxed()
150    }
151
152    /// Fetch the list of assets interacted with during this block.
153    fn recently_accessed_assets(&self) -> im::OrdSet<asset::Id> {
154        self.object_get(state_key::recently_accessed_assets())
155            .unwrap_or_default()
156    }
157}
158impl<T: StateRead + ?Sized> PositionRead for T {}
159
160/// Manages liquidity positions within the chain state.
161#[async_trait]
162pub trait PositionManager: StateWrite + PositionRead {
163    /// Close a position by id, removing it from the state.
164    ///
165    /// If the position is already closed, this is a no-op.
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if the position does not exist.
170    #[instrument(level = "debug", skip(self))]
171    async fn close_position_by_id(&mut self, id: &position::Id) -> Result<()> {
172        tracing::debug!(?id, "closing position, first fetch it");
173        let prev_state = self
174            .position_by_id(id)
175            .await
176            .expect("fetching position should not fail")
177            .ok_or_else(|| anyhow::anyhow!("could not find position {} to close", id))?
178            .tap(|lp| tracing::trace!(prev_state = ?lp, "retrieved previous lp state"));
179
180        anyhow::ensure!(
181            matches!(
182                prev_state.state,
183                position::State::Opened | position::State::Closed,
184            ),
185            "attempted to close a position with state {:?}, expected Opened or Closed",
186            prev_state.state
187        );
188
189        // Optimization: skip state update if the position is already closed.
190        // This can happen if the position was queued for closure and premptively
191        // closed by the DEX engine during execution (e.g. auto-closing).
192        if prev_state.state == position::State::Closed {
193            tracing::debug!(
194                ?id,
195                "position is already closed so we can skip state updates"
196            );
197            return Ok(());
198        }
199
200        let new_state = {
201            let mut new_state = prev_state.clone();
202            new_state.state = position::State::Closed;
203            new_state
204        };
205
206        self.update_position(id, Some(prev_state), new_state)
207            .await?;
208        self.record_proto(event::EventPositionClose { position_id: *id }.to_proto());
209
210        Ok(())
211    }
212
213    /// Queues a position to be closed at the end of the block, after batch execution.
214    async fn queue_close_position(&mut self, id: position::Id) -> Result<()> {
215        tracing::debug!(
216            ?id,
217            "checking current position state before queueing for closure"
218        );
219        let current_state = self
220            .position_by_id(&id)
221            .await
222            .expect("fetching position should not fail")
223            .ok_or_else(|| anyhow::anyhow!("could not find position {} to close", id))?
224            .tap(|lp| tracing::trace!(prev_state = ?lp, "retrieved previous lp state"));
225
226        if current_state.state == position::State::Opened {
227            tracing::debug!(
228                ?current_state.state,
229                "queueing opened position for closure"
230            );
231            let mut to_close = self.pending_position_closures();
232            to_close.push_back(id);
233            self.object_put(state_key::pending_position_closures(), to_close);
234
235            // queue position close you will...
236            self.record_proto(event::EventQueuePositionClose { position_id: id }.to_proto());
237        } else {
238            tracing::debug!(
239                ?current_state.state,
240                "skipping queueing for closure of non-opened position"
241            );
242        }
243
244        Ok(())
245    }
246
247    /// Close all positions that have been queued for closure.
248    #[instrument(skip_all)]
249    async fn close_queued_positions(&mut self) -> Result<()> {
250        let to_close = self.pending_position_closures();
251        for id in to_close {
252            tracing::trace!(position_to_close = ?id, "processing LP queue");
253            self.close_position_by_id(&id).await?;
254        }
255        self.object_delete(state_key::pending_position_closures());
256        Ok(())
257    }
258
259    /// Opens a new position, updating all necessary indexes and checking for
260    /// its nonexistence prior to being opened.
261    ///
262    /// # Errors
263    /// This method returns an error if the position is malformed
264    /// e.g. it is set to a state other than `Opened`
265    ///  or, it specifies a position identifier already used by another position.
266    ///
267    /// An error can also occur if a DEX engine invariant is breached
268    /// e.g. overflowing the position counter (`u16::MAX`)
269    ///  or, overflowing the value circuit breaker (`u128::MAX`)
270    ///
271    /// In any of those cases, we do not want to allow a new position to be opened.
272    #[tracing::instrument(level = "debug", skip_all)]
273    async fn open_position(&mut self, position: position::Position) -> Result<()> {
274        let id = position.id();
275        tracing::debug!(?id, "attempting to open a position");
276
277        // Double-check that the position is in the `Opened` state
278        if position.state != position::State::Opened {
279            anyhow::bail!("attempted to open a position with a state besides `Opened`");
280        }
281
282        // Validate that the position ID doesn't collide
283        if let Some(existing_lp) = self.position_by_id(&id).await? {
284            anyhow::bail!(
285                "attempted to open a position with ID {id:?}, which already exists with state {existing_lp:?}",
286            );
287        }
288
289        // Credit the DEX for the inflows from this position.
290        self.dex_vcb_credit(position.reserves_1()).await?;
291        self.dex_vcb_credit(position.reserves_2()).await?;
292
293        // Add the asset IDs from the new position's trading pair
294        // to the candidate set for this block.
295        let routing_params = self.routing_params().await?;
296        self.add_recently_accessed_asset(
297            position.phi.pair.asset_1(),
298            routing_params.fixed_candidates.clone(),
299        );
300        self.add_recently_accessed_asset(
301            position.phi.pair.asset_2(),
302            routing_params.fixed_candidates,
303        );
304        // Mark the trading pair as active so that we can inspect it
305        // at the end of the block and garbage collect excess LPs.
306        self.mark_trading_pair_as_active(position.phi.pair);
307
308        // Finally, record the new position state.
309        self.record_proto(event::EventPositionOpen::from(position.clone()).to_proto());
310        self.update_position(&id, None, position).await?;
311
312        Ok(())
313    }
314
315    /// Record execution against an opened position.
316    ///
317    /// IMPORTANT: This method can mutate its input state.
318    ///
319    /// We return the position that was ultimately written to the state,
320    /// it could differ from the initial input e.g. if the position is
321    /// auto-closing.
322    ///
323    /// # Context parameter
324    ///
325    /// The `context` parameter records the global context of the path in which
326    /// the position execution happened. This may be completely different than
327    /// the trading pair of the position itself, and is used to link the
328    /// micro-scale execution (processed by this method) with the macro-scale
329    /// context (a swap or arbitrage).
330    ///
331    /// # Auto-closing positions
332    ///
333    /// Some positions are `close_on_fill` i.e. they are programmed to close after
334    /// execution exhausts either side of their reserves. This method returns the
335    /// position that was written to the chain state, making it possible for callers
336    /// to inspect any change that has occured during execution handling.
337    #[tracing::instrument(level = "debug", skip(self, new_state))]
338    async fn position_execution(
339        &mut self,
340        mut new_state: Position,
341        context: DirectedTradingPair,
342    ) -> Result<Position> {
343        let position_id = new_state.id();
344        tracing::debug!(?position_id, "attempting to execute position");
345        let prev_state = self
346            .position_by_id(&position_id)
347            .await?
348            .ok_or_else(|| anyhow::anyhow!("withdrew from unknown position {}", new_state.id()))?;
349
350        // Optimization: it's possible that the position's reserves haven't
351        // changed, and that we're about to do a no-op update. This can happen
352        // when saving a frontier, for instance, since the FillRoute code saves
353        // the entire frontier when it finishes.
354        //
355        // If so, skip the write, but more importantly, skip emitting an event,
356        // so tooling doesn't get confused about a no-op execution.
357        if prev_state == new_state {
358            anyhow::ensure!(
359            matches!(&prev_state.state, position::State::Opened | position::State::Closed),
360            "attempted to do a no-op execution against a position with state {:?}, expected Opened or Closed",
361            prev_state.state
362        );
363            return Ok(new_state);
364        }
365
366        anyhow::ensure!(
367            matches!(&prev_state.state, position::State::Opened),
368            "attempted to execute against a position with state {:?}, expected Opened",
369            prev_state.state
370        );
371        anyhow::ensure!(
372            matches!(&new_state.state, position::State::Opened),
373            "supplied post-execution state {:?}, expected Opened",
374            prev_state.state
375        );
376
377        // We have already short-circuited no-op execution updates, so we can emit an execution
378        // event and not worry about duplicates.
379        self.record_proto(
380            event::EventPositionExecution::in_context(&prev_state, &new_state, context).to_proto(),
381        );
382
383        // Handle "close-on-fill": automatically flip the position state to "closed" if
384        // either of the reserves are zero.
385        if new_state.close_on_fill {
386            if new_state.reserves.r1 == 0u64.into() || new_state.reserves.r2 == 0u64.into() {
387                tracing::debug!(
388                    ?position_id,
389                    r1 = ?new_state.reserves.r1,
390                    r2 = ?new_state.reserves.r2,
391                    "marking position as closed due to close-on-fill"
392                );
393
394                new_state.state = position::State::Closed;
395                self.record_proto(event::EventPositionClose { position_id }.to_proto());
396            }
397        }
398
399        // Update the candlestick tracking
400        // We use `.ok` here to avoid halting the chain if there's an error recording
401        self.record_position_execution(&prev_state, &new_state)
402            .await
403            .map_err(|e| tracing::warn!(?e, "failed to record position execution"))
404            .ok();
405
406        self.update_position(&position_id, Some(prev_state), new_state)
407            .await
408    }
409
410    /// Withdraw from a closed position, incrementing its sequence number.
411    ///
412    /// Updates the position's reserves and rewards to zero and returns the withdrawn balance.
413    #[tracing::instrument(level = "debug", skip(self))]
414    async fn withdraw_position(
415        &mut self,
416        position_id: position::Id,
417        sequence: u64,
418    ) -> Result<Balance> {
419        let prev_state = self
420            .position_by_id(&position_id)
421            .await?
422            .ok_or_else(|| anyhow::anyhow!("withdrew from unknown position {}", position_id))?;
423
424        // Next, check that the withdrawal is consistent with the position state.
425        // This should be redundant with the value balance mechanism (clients should
426        // only be able to get the required input LPNFTs if the state transitions are
427        // consistent), but we check it here for defense in depth.
428        //
429        // This is just a check that sequence == current_sequence + 1, with extra logic
430        // so that we treat "closed" as "sequence -1".
431        if sequence == 0 {
432            if prev_state.state != position::State::Closed {
433                anyhow::bail!(
434                    "attempted to withdraw position {} with state {}, expected Closed",
435                    position_id,
436                    prev_state.state
437                );
438            }
439        } else {
440            if let position::State::Withdrawn {
441                sequence: current_sequence,
442            } = prev_state.state
443            {
444                if current_sequence + 1 != sequence {
445                    anyhow::bail!(
446                        "attempted to withdraw position {} with sequence {}, expected {}",
447                        position_id,
448                        sequence,
449                        current_sequence + 1
450                    );
451                }
452            } else {
453                anyhow::bail!(
454                    "attempted to withdraw position {} with state {}, expected Withdrawn",
455                    position_id,
456                    prev_state.state
457                );
458            }
459        }
460
461        // Record an event prior to updating the position state, so we have access to
462        // the current reserves.
463        self.record_proto(
464            event::EventPositionWithdraw::in_context(position_id, &prev_state).to_proto(),
465        );
466
467        // Grab a copy of the final reserves of the position to return to the caller.
468        let reserves = prev_state.reserves.balance(&prev_state.phi.pair);
469
470        // Debit the DEX for the outflows from this position.
471        self.dex_vcb_debit(prev_state.reserves_1()).await?;
472        self.dex_vcb_debit(prev_state.reserves_2()).await?;
473
474        // Finally, update the position. This has two steps:
475        // - update the state with the correct sequence number;
476        // - zero out the reserves, to prevent double-withdrawals.
477        let new_state = {
478            let mut new_state = prev_state.clone();
479            // We just checked that the supplied sequence number is incremented by 1 from prev.
480            new_state.state = position::State::Withdrawn { sequence };
481            new_state.reserves = Reserves::zero();
482            new_state
483        };
484
485        self.update_position(&position_id, Some(prev_state), new_state)
486            .await?;
487
488        Ok(reserves)
489    }
490}
491
492impl<T: StateWrite + ?Sized + Chandelier> PositionManager for T {}
493
494#[async_trait]
495trait Inner: StateWrite {
496    /// Writes a position to the state, updating all necessary indexes.
497    ///
498    /// This should be the **SOLE ENTRYPOINT** for writing positions to the state.
499    /// All other position changes exposed by the `PositionManager` should run through here.
500    #[instrument(level = "debug", skip_all)]
501    async fn update_position(
502        &mut self,
503        id: &position::Id,
504        prev_state: Option<Position>,
505        new_state: Position,
506    ) -> Result<Position> {
507        tracing::debug!(?id, prev_position_state = ?prev_state.as_ref().map(|p| &p.state), new_position_state = ?new_state.state, "updating position state");
508        tracing::trace!(?id, ?prev_state, ?new_state, "updating position state");
509
510        // Assert `update_position` state transitions invariants:
511        Self::guard_invalid_transitions(&prev_state, &new_state, &id)?;
512
513        // Update the DEX engine indices:
514        self.update_position_by_inventory_index(&id, &prev_state, &new_state)?;
515        self.update_asset_by_base_liquidity_index(&id, &prev_state, &new_state)
516            .await?;
517        self.update_trading_pair_position_counter(&prev_state, &new_state)
518            .await?;
519        self.update_position_by_price_index(&id, &prev_state, &new_state)?;
520
521        self.put(state_key::position_by_id(&id), new_state.clone());
522        Ok(new_state)
523    }
524
525    fn guard_invalid_transitions(
526        prev_state: &Option<Position>,
527        new_state: &Position,
528        id: &position::Id,
529    ) -> Result<()> {
530        use position::State::*;
531
532        if let Some(prev_lp) = prev_state {
533            tracing::debug!(?id, prev = ?prev_lp.state, new = ?new_state.state, "evaluating state transition");
534            match (prev_lp.state, new_state.state) {
535                (Opened, Opened) => {}
536                (Opened, Closed) => {}
537                (Closed, Closed) => { /* no-op but allowed */ }
538                (Closed, Withdrawn { sequence }) => {
539                    ensure!(
540                        sequence == 0,
541                        "withdrawn positions must have their sequence start at zero (found: {})",
542                        sequence
543                    );
544                }
545                (Withdrawn { sequence: old_seq }, Withdrawn { sequence: new_seq }) => {
546                    let expected_seq = old_seq.saturating_add(1);
547                    ensure!(
548                        new_seq == expected_seq,
549                        "withdrawn must increase 1-by-1 (old: {}, new: {}, expected: {})",
550                        old_seq,
551                        new_seq,
552                        expected_seq
553                    );
554                }
555                _ => bail!("invalid transition"),
556            }
557        } else {
558            ensure!(
559                matches!(new_state.state, Opened),
560                "fresh positions MUST start in the `Opened` state (found: {:?})",
561                new_state.state
562            );
563        }
564
565        Ok(())
566    }
567}
568impl<T: StateWrite + ?Sized> Inner for T {}