penumbra_sdk_dex/component/
position_manager.rs

1use std::future;
2use std::{pin::Pin, sync::Arc};
3
4use anyhow::{bail, ensure, Result};
5use async_stream::try_stream;
6use async_trait::async_trait;
7use cnidarium::{EscapedByteSlice, StateRead, StateWrite};
8use futures::Stream;
9use futures::StreamExt;
10use penumbra_sdk_asset::{asset, Balance, Value, STAKING_TOKEN_ASSET_ID};
11use penumbra_sdk_num::Amount;
12use penumbra_sdk_proto::DomainType;
13use penumbra_sdk_proto::{StateReadProto, StateWriteProto};
14use tap::Tap;
15use tracing::instrument;
16
17use crate::component::{
18    dex::InternalDexWrite,
19    dex::StateReadExt as _,
20    position_manager::{
21        base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex,
22        price_index::PositionByPriceIndex, volume_tracker::PositionVolumeTracker,
23    },
24};
25use crate::lp::Reserves;
26use crate::{
27    component::position_manager::counter::PositionCounter,
28    component::ValueCircuitBreaker,
29    lp::position::{self, Position},
30    state_key::engine,
31    DirectedTradingPair,
32};
33use crate::{event, state_key};
34
35use super::chandelier::Chandelier;
36
37const DYNAMIC_ASSET_LIMIT: usize = 10;
38
39mod base_liquidity_index;
40pub(crate) mod counter;
41pub(crate) mod inventory_index;
42pub(crate) mod price_index;
43pub(crate) mod volume_tracker;
44
45#[async_trait]
46pub trait PositionRead: StateRead {
47    /// Return a stream of all [`position::Metadata`] available.
48    fn all_positions(
49        &self,
50    ) -> Pin<Box<dyn Stream<Item = Result<position::Position>> + Send + 'static>> {
51        let prefix = state_key::all_positions();
52        self.prefix(prefix)
53            .map(|entry| match entry {
54                Ok((_, metadata)) => {
55                    tracing::debug!(?metadata, "found position");
56                    Ok(metadata)
57                }
58                Err(e) => Err(e),
59            })
60            .boxed()
61    }
62
63    /// Returns a stream of [`position::Id`] ordered by effective price.
64    fn positions_by_price(
65        &self,
66        pair: &DirectedTradingPair,
67    ) -> Pin<Box<dyn Stream<Item = Result<(position::Id, position::Position)>> + Send + 'static>>
68    {
69        let prefix = engine::price_index::prefix(pair);
70        tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for positions by price");
71        self.nonverifiable_prefix(&prefix)
72            .map(|entry| match entry {
73                Ok((k, lp)) => {
74                    let raw_id = <&[u8; 32]>::try_from(&k[103..135])?.to_owned();
75                    Ok((position::Id(raw_id), lp))
76                }
77                Err(e) => Err(e),
78            })
79            .boxed()
80    }
81
82    async fn position_by_id(&self, id: &position::Id) -> Result<Option<position::Position>> {
83        self.get(&state_key::position_by_id(id)).await
84    }
85
86    async fn check_position_by_id(&self, id: &position::Id) -> bool {
87        self.get_raw(&state_key::position_by_id(id))
88            .await
89            .expect("no deserialization errors")
90            .is_some()
91    }
92
93    async fn best_position(
94        &self,
95        pair: &DirectedTradingPair,
96    ) -> Result<Option<(position::Id, position::Position)>> {
97        let mut positions_by_price = self.positions_by_price(pair);
98        positions_by_price.next().await.transpose()
99    }
100
101    /// Fetch the list of pending position closures.
102    fn pending_position_closures(&self) -> im::Vector<position::Id> {
103        self.object_get(state_key::pending_position_closures())
104            .unwrap_or_default()
105    }
106
107    /// Returns the list of candidate assets to route through for a trade from `from`.
108    /// Combines a list of fixed candidates with a list of liquidity-based candidates.
109    /// This ensures that the fixed candidates are always considered, minimizing
110    /// the risk of attacks on routing.
111    fn candidate_set(
112        &self,
113        from: asset::Id,
114        fixed_candidates: Arc<Vec<asset::Id>>,
115    ) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send>> {
116        // Clone the fixed candidates Arc so it can be moved into the stream filter's future.
117        let fc = fixed_candidates.clone();
118        let mut dynamic_candidates = self
119            .ordered_routable_assets(&from)
120            .filter(move |c| {
121                future::ready(!fc.contains(c.as_ref().expect("failed to fetch candidate")))
122            })
123            .take(DYNAMIC_ASSET_LIMIT);
124        try_stream! {
125            // First stream the fixed candidates, so those can be processed while the dynamic candidates are fetched.
126            for candidate in fixed_candidates.iter() {
127                yield candidate.clone();
128            }
129
130            // Yield the liquidity-based candidates. Note that this _may_ include some assets already included in the fixed set.
131            while let Some(candidate) = dynamic_candidates
132                .next().await {
133                    yield candidate.expect("failed to fetch candidate");
134            }
135        }
136        .boxed()
137    }
138
139    /// Returns a stream of [`asset::Id`] routable from a given asset, ordered by liquidity.
140    fn ordered_routable_assets(
141        &self,
142        start: &asset::Id,
143    ) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send + 'static>> {
144        let prefix = engine::routable_assets::starting_from(start);
145        tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for routable assets by liquidity");
146        self.nonverifiable_prefix_raw(&prefix)
147            .map(|entry| match entry {
148                Ok((_, v)) => Ok(asset::Id::decode(&*v)?),
149                Err(e) => Err(e),
150            })
151            .boxed()
152    }
153
154    /// Fetch the list of assets interacted with during this block.
155    fn recently_accessed_assets(&self) -> im::OrdSet<asset::Id> {
156        self.object_get(state_key::recently_accessed_assets())
157            .unwrap_or_default()
158    }
159}
160impl<T: StateRead + ?Sized> PositionRead for T {}
161
162/// Manages liquidity positions within the chain state.
163#[async_trait]
164pub trait PositionManager: StateWrite + PositionRead {
165    /// Close a position by id, removing it from the state.
166    ///
167    /// If the position is already closed, this is a no-op.
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if the position does not exist.
172    #[instrument(level = "debug", skip(self))]
173    async fn close_position_by_id(&mut self, id: &position::Id) -> Result<()> {
174        tracing::debug!(?id, "closing position, first fetch it");
175        let prev_state = self
176            .position_by_id(id)
177            .await
178            .expect("fetching position should not fail")
179            .ok_or_else(|| anyhow::anyhow!("could not find position {} to close", id))?
180            .tap(|lp| tracing::trace!(prev_state = ?lp, "retrieved previous lp state"));
181
182        anyhow::ensure!(
183            matches!(
184                prev_state.state,
185                position::State::Opened | position::State::Closed,
186            ),
187            "attempted to close a position with state {:?}, expected Opened or Closed",
188            prev_state.state
189        );
190
191        // Optimization: skip state update if the position is already closed.
192        // This can happen if the position was queued for closure and preemptively
193        // closed by the DEX engine during execution (e.g. auto-closing).
194        if prev_state.state == position::State::Closed {
195            tracing::debug!(
196                ?id,
197                "position is already closed so we can skip state updates"
198            );
199            return Ok(());
200        }
201
202        let new_state = {
203            let mut new_state = prev_state.clone();
204            new_state.state = position::State::Closed;
205            new_state
206        };
207
208        self.update_position(id, Some(prev_state), new_state)
209            .await?;
210        self.record_proto(event::EventPositionClose { position_id: *id }.to_proto());
211
212        Ok(())
213    }
214
215    /// Queues a position to be closed at the end of the block, after batch execution.
216    async fn queue_close_position(&mut self, id: position::Id) -> Result<()> {
217        tracing::debug!(
218            ?id,
219            "checking current position state before queueing for closure"
220        );
221        let current_state = self
222            .position_by_id(&id)
223            .await
224            .expect("fetching position should not fail")
225            .ok_or_else(|| anyhow::anyhow!("could not find position {} to close", id))?
226            .tap(|lp| tracing::trace!(prev_state = ?lp, "retrieved previous lp state"));
227
228        if current_state.state == position::State::Opened {
229            tracing::debug!(
230                ?current_state.state,
231                "queueing opened position for closure"
232            );
233            let mut to_close = self.pending_position_closures();
234            to_close.push_back(id);
235            self.object_put(state_key::pending_position_closures(), to_close);
236
237            // queue position close you will...
238            self.record_proto(event::EventQueuePositionClose { position_id: id }.to_proto());
239        } else {
240            tracing::debug!(
241                ?current_state.state,
242                "skipping queueing for closure of non-opened position"
243            );
244        }
245
246        Ok(())
247    }
248
249    /// Close all positions that have been queued for closure.
250    #[instrument(skip_all)]
251    async fn close_queued_positions(&mut self) -> Result<()> {
252        let to_close = self.pending_position_closures();
253        for id in to_close {
254            tracing::trace!(position_to_close = ?id, "processing LP queue");
255            self.close_position_by_id(&id).await?;
256        }
257        self.object_delete(state_key::pending_position_closures());
258        Ok(())
259    }
260
261    /// Opens a new position, updating all necessary indexes and checking for
262    /// its nonexistence prior to being opened.
263    ///
264    /// # Errors
265    /// This method returns an error if the position is malformed
266    /// e.g. it is set to a state other than `Opened`
267    ///  or, it specifies a position identifier already used by another position.
268    ///
269    /// An error can also occur if a DEX engine invariant is breached
270    /// e.g. overflowing the position counter (`u16::MAX`)
271    ///  or, overflowing the value circuit breaker (`u128::MAX`)
272    ///
273    /// In any of those cases, we do not want to allow a new position to be opened.
274    #[tracing::instrument(level = "debug", skip_all)]
275    async fn open_position(&mut self, position: position::Position) -> Result<()> {
276        let id = position.id();
277        tracing::debug!(?id, "attempting to open a position");
278
279        // Double-check that the position is in the `Opened` state
280        if position.state != position::State::Opened {
281            anyhow::bail!("attempted to open a position with a state besides `Opened`");
282        }
283
284        // Validate that the position ID doesn't collide
285        if let Some(existing_lp) = self.position_by_id(&id).await? {
286            anyhow::bail!(
287                "attempted to open a position with ID {id:?}, which already exists with state {existing_lp:?}",
288            );
289        }
290
291        // Credit the DEX for the inflows from this position.
292        self.dex_vcb_credit(position.reserves_1()).await?;
293        self.dex_vcb_credit(position.reserves_2()).await?;
294
295        // Add the asset IDs from the new position's trading pair
296        // to the candidate set for this block.
297        let routing_params = self.routing_params().await?;
298        self.add_recently_accessed_asset(
299            position.phi.pair.asset_1(),
300            routing_params.fixed_candidates.clone(),
301        );
302        self.add_recently_accessed_asset(
303            position.phi.pair.asset_2(),
304            routing_params.fixed_candidates,
305        );
306        // Mark the trading pair as active so that we can inspect it
307        // at the end of the block and garbage collect excess LPs.
308        self.mark_trading_pair_as_active(position.phi.pair);
309
310        // Finally, record the new position state.
311        self.record_proto(event::EventPositionOpen::from(position.clone()).to_proto());
312        self.update_position(&id, None, position).await?;
313
314        Ok(())
315    }
316
317    /// Record execution against an opened position.
318    ///
319    /// IMPORTANT: This method can mutate its input state.
320    ///
321    /// We return the position that was ultimately written to the state,
322    /// it could differ from the initial input e.g. if the position is
323    /// auto-closing.
324    ///
325    /// # Context parameter
326    ///
327    /// The `context` parameter records the global context of the path in which
328    /// the position execution happened. This may be completely different than
329    /// the trading pair of the position itself, and is used to link the
330    /// micro-scale execution (processed by this method) with the macro-scale
331    /// context (a swap or arbitrage).
332    ///
333    /// # Auto-closing positions
334    ///
335    /// Some positions are `close_on_fill` i.e. they are programmed to close after
336    /// execution exhausts either side of their reserves. This method returns the
337    /// position that was written to the chain state, making it possible for callers
338    /// to inspect any change that has occurred during execution handling.
339    #[tracing::instrument(level = "debug", skip(self, new_state))]
340    async fn position_execution(
341        &mut self,
342        mut new_state: Position,
343        context: DirectedTradingPair,
344    ) -> Result<Position> {
345        let position_id = new_state.id();
346        tracing::debug!(?position_id, "attempting to execute position");
347        let prev_state = self
348            .position_by_id(&position_id)
349            .await?
350            .ok_or_else(|| anyhow::anyhow!("withdrew from unknown position {}", new_state.id()))?;
351
352        // Optimization: it's possible that the position's reserves haven't
353        // changed, and that we're about to do a no-op update. This can happen
354        // when saving a frontier, for instance, since the FillRoute code saves
355        // the entire frontier when it finishes.
356        //
357        // If so, skip the write, but more importantly, skip emitting an event,
358        // so tooling doesn't get confused about a no-op execution.
359        if prev_state == new_state {
360            anyhow::ensure!(
361            matches!(&prev_state.state, position::State::Opened | position::State::Closed),
362            "attempted to do a no-op execution against a position with state {:?}, expected Opened or Closed",
363            prev_state.state
364        );
365            return Ok(new_state);
366        }
367
368        anyhow::ensure!(
369            matches!(&prev_state.state, position::State::Opened),
370            "attempted to execute against a position with state {:?}, expected Opened",
371            prev_state.state
372        );
373        anyhow::ensure!(
374            matches!(&new_state.state, position::State::Opened),
375            "supplied post-execution state {:?}, expected Opened",
376            prev_state.state
377        );
378
379        // We have already short-circuited no-op execution updates, so we can emit an execution
380        // event and not worry about duplicates.
381        self.record_proto(
382            event::EventPositionExecution::in_context(&prev_state, &new_state, context).to_proto(),
383        );
384
385        // Handle "close-on-fill": automatically flip the position state to "closed" if
386        // either of the reserves are zero.
387        if new_state.close_on_fill {
388            if new_state.reserves.r1 == 0u64.into() || new_state.reserves.r2 == 0u64.into() {
389                tracing::debug!(
390                    ?position_id,
391                    r1 = ?new_state.reserves.r1,
392                    r2 = ?new_state.reserves.r2,
393                    "marking position as closed due to close-on-fill"
394                );
395
396                new_state.state = position::State::Closed;
397                self.record_proto(event::EventPositionClose { position_id }.to_proto());
398            }
399        }
400
401        // Update the candlestick tracking
402        // We use `.ok` here to avoid halting the chain if there's an error recording
403        self.record_position_execution(&prev_state, &new_state)
404            .await
405            .map_err(|e| tracing::warn!(?e, "failed to record position execution"))
406            .ok();
407
408        self.update_position(&position_id, Some(prev_state), new_state)
409            .await
410    }
411
412    /// Withdraw from a closed position, incrementing its sequence number.
413    ///
414    /// Updates the position's reserves and rewards to zero and returns the withdrawn balance.
415    #[tracing::instrument(level = "debug", skip(self))]
416    async fn withdraw_position(
417        &mut self,
418        position_id: position::Id,
419        sequence: u64,
420    ) -> Result<Balance> {
421        let prev_state = self
422            .position_by_id(&position_id)
423            .await?
424            .ok_or_else(|| anyhow::anyhow!("withdrew from unknown position {}", position_id))?;
425
426        // Next, check that the withdrawal is consistent with the position state.
427        // This should be redundant with the value balance mechanism (clients should
428        // only be able to get the required input LPNFTs if the state transitions are
429        // consistent), but we check it here for defense in depth.
430        //
431        // This is just a check that sequence == current_sequence + 1, with extra logic
432        // so that we treat "closed" as "sequence -1".
433        if sequence == 0 {
434            if prev_state.state != position::State::Closed {
435                anyhow::bail!(
436                    "attempted to withdraw position {} with state {}, expected Closed",
437                    position_id,
438                    prev_state.state
439                );
440            }
441        } else {
442            if let position::State::Withdrawn {
443                sequence: current_sequence,
444            } = prev_state.state
445            {
446                // Defense-in-depth: Check that the sequence number is incremented by 1.
447                if current_sequence + 1 != sequence {
448                    anyhow::bail!(
449                        "attempted to withdraw position {} with sequence {}, expected {}",
450                        position_id,
451                        sequence,
452                        current_sequence + 1
453                    );
454                }
455            } else {
456                anyhow::bail!(
457                    "attempted to withdraw position {} with state {}, expected Withdrawn",
458                    position_id,
459                    prev_state.state
460                );
461            }
462        }
463
464        // Record an event prior to updating the position state, so we have access to
465        // the current reserves.
466        self.record_proto(
467            event::EventPositionWithdraw::in_context(position_id, &prev_state).to_proto(),
468        );
469
470        // Grab a copy of the final reserves of the position to return to the caller.
471        let reserves = prev_state.reserves.balance(&prev_state.phi.pair);
472
473        // Debit the DEX for the outflows from this position.
474        self.dex_vcb_debit(prev_state.reserves_1()).await?;
475        self.dex_vcb_debit(prev_state.reserves_2()).await?;
476
477        // Finally, update the position. This has two steps:
478        // - update the state with the correct sequence number;
479        // - zero out the reserves, to prevent double-withdrawals.
480        let new_state = {
481            let mut new_state = prev_state.clone();
482            // We just checked that the supplied sequence number is incremented by 1 from prev.
483            new_state.state = position::State::Withdrawn { sequence };
484            new_state.reserves = Reserves::zero();
485            new_state
486        };
487
488        self.update_position(&position_id, Some(prev_state), new_state)
489            .await?;
490
491        Ok(reserves)
492    }
493
494    /// This adds extra rewards in the form of staking tokens to the reserves of a position.
495    #[tracing::instrument(level = "debug", skip(self))]
496    async fn reward_position(
497        &mut self,
498        position_id: position::Id,
499        reward: Amount,
500    ) -> anyhow::Result<()> {
501        let prev_state = self
502            .position_by_id(&position_id)
503            .await?
504            .ok_or_else(|| anyhow::anyhow!("rewarding unknown position {}", position_id))?;
505        // The new state is the result of adding the staking token to the reserves,
506        // or doing nothing if for some reason this position does not have the staking token.
507        let new_state = {
508            let mut new_state = prev_state.clone();
509            let pair = prev_state.phi.pair;
510            let to_increment = if pair.asset_1() == *STAKING_TOKEN_ASSET_ID {
511                &mut new_state.reserves.r1
512            } else if pair.asset_2() == *STAKING_TOKEN_ASSET_ID {
513                &mut new_state.reserves.r2
514            } else {
515                tracing::error!("pair {} does not contain staking asset", pair);
516                return Ok(());
517            };
518            *to_increment = to_increment.checked_add(&reward).expect(&format!(
519                "failed to add reward {} to reserves {}",
520                reward, *to_increment
521            ));
522
523            // We are done, we only deposit rewards into the position's reserves.
524            // Even, if it is closed or withdrawn.
525
526            new_state
527        };
528        self.update_position(&position_id, Some(prev_state), new_state)
529            .await?;
530        // At this point, we can credit the VCB, because the update passed.
531        // This is a credit because the reward has moved value *into* the DEX.
532        self.dex_vcb_credit(Value {
533            asset_id: *STAKING_TOKEN_ASSET_ID,
534            amount: reward,
535        })
536        .await?;
537        Ok(())
538    }
539}
540
541impl<T: StateWrite + ?Sized + Chandelier> PositionManager for T {}
542
543#[async_trait]
544trait Inner: StateWrite {
545    /// Writes a position to the state, updating all necessary indexes.
546    ///
547    /// This should be the **SOLE ENTRYPOINT** for writing positions to the state.
548    /// All other position changes exposed by the `PositionManager` should run through here.
549    #[instrument(level = "debug", skip_all)]
550    async fn update_position(
551        &mut self,
552        id: &position::Id,
553        prev_state: Option<Position>,
554        new_state: Position,
555    ) -> Result<Position> {
556        tracing::debug!(?id, prev_position_state = ?prev_state.as_ref().map(|p| &p.state), new_position_state = ?new_state.state, "updating position state");
557        tracing::trace!(?id, ?prev_state, ?new_state, "updating position state");
558
559        // Assert `update_position` state transitions invariants:
560        Self::guard_invalid_transitions(&prev_state, &new_state, &id)?;
561
562        // Update the DEX engine indices:
563        self.update_position_by_inventory_index(&id, &prev_state, &new_state)?;
564        self.update_asset_by_base_liquidity_index(&id, &prev_state, &new_state)
565            .await?;
566        self.update_trading_pair_position_counter(&prev_state, &new_state)
567            .await?;
568        self.update_position_by_price_index(&id, &prev_state, &new_state)?;
569        self.update_volume_index(&id, &prev_state, &new_state).await;
570
571        self.put(state_key::position_by_id(&id), new_state.clone());
572        Ok(new_state)
573    }
574
575    fn guard_invalid_transitions(
576        prev_state: &Option<Position>,
577        new_state: &Position,
578        id: &position::Id,
579    ) -> Result<()> {
580        use position::State::*;
581
582        if let Some(prev_lp) = prev_state {
583            tracing::debug!(?id, prev = ?prev_lp.state, new = ?new_state.state, "evaluating state transition");
584            match (prev_lp.state, new_state.state) {
585                (Opened, Opened) => {}
586                (Opened, Closed) => {}
587                (Closed, Closed) => { /* no-op but allowed */ }
588                (Closed, Withdrawn { sequence }) => {
589                    ensure!(
590                        sequence == 0,
591                        "withdrawn positions must have their sequence start at zero (found: {})",
592                        sequence
593                    );
594                }
595                (Withdrawn { sequence: old_seq }, Withdrawn { sequence: new_seq }) => {
596                    tracing::debug!(?old_seq, ?new_seq, "updating withdrawn position");
597                    // We allow the sequence number to be incremented by one, or to stay the same.
598                    // We want to allow the following scenario:
599                    // 1. User withdraws from a position (increasing the sequence number)
600                    // 2. A component deposits rewards into the position (keeping the sequence number the same)
601                    ensure!(
602                        new_seq == old_seq + 1 || new_seq == old_seq,
603                        "if the sequence number increase, it must increase by exactly one"
604                    );
605                }
606                _ => bail!("invalid transition"),
607            }
608        } else {
609            ensure!(
610                matches!(new_state.state, Opened),
611                "fresh positions MUST start in the `Opened` state (found: {:?})",
612                new_state.state
613            );
614        }
615
616        Ok(())
617    }
618}
619impl<T: StateWrite + ?Sized> Inner for T {}