1use std::future;
2use std::{pin::Pin, sync::Arc};
3
4use anyhow::{bail, ensure, Result};
5use async_stream::try_stream;
6use async_trait::async_trait;
7use cnidarium::{EscapedByteSlice, StateRead, StateWrite};
8use futures::Stream;
9use futures::StreamExt;
10use penumbra_sdk_asset::{asset, Balance, Value, STAKING_TOKEN_ASSET_ID};
11use penumbra_sdk_num::Amount;
12use penumbra_sdk_proto::DomainType;
13use penumbra_sdk_proto::{StateReadProto, StateWriteProto};
14use tap::Tap;
15use tracing::instrument;
16
17use crate::component::{
18 dex::InternalDexWrite,
19 dex::StateReadExt as _,
20 position_manager::{
21 base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex,
22 price_index::PositionByPriceIndex, volume_tracker::PositionVolumeTracker,
23 },
24};
25use crate::lp::Reserves;
26use crate::{
27 component::position_manager::counter::PositionCounter,
28 component::ValueCircuitBreaker,
29 lp::position::{self, Position},
30 state_key::engine,
31 DirectedTradingPair,
32};
33use crate::{event, state_key};
34
35use super::chandelier::Chandelier;
36
37const DYNAMIC_ASSET_LIMIT: usize = 10;
38
39mod base_liquidity_index;
40pub(crate) mod counter;
41pub(crate) mod inventory_index;
42pub(crate) mod price_index;
43pub(crate) mod volume_tracker;
44
45#[async_trait]
46pub trait PositionRead: StateRead {
47 fn all_positions(
49 &self,
50 ) -> Pin<Box<dyn Stream<Item = Result<position::Position>> + Send + 'static>> {
51 let prefix = state_key::all_positions();
52 self.prefix(prefix)
53 .map(|entry| match entry {
54 Ok((_, metadata)) => {
55 tracing::debug!(?metadata, "found position");
56 Ok(metadata)
57 }
58 Err(e) => Err(e),
59 })
60 .boxed()
61 }
62
63 fn positions_by_price(
65 &self,
66 pair: &DirectedTradingPair,
67 ) -> Pin<Box<dyn Stream<Item = Result<(position::Id, position::Position)>> + Send + 'static>>
68 {
69 let prefix = engine::price_index::prefix(pair);
70 tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for positions by price");
71 self.nonverifiable_prefix(&prefix)
72 .map(|entry| match entry {
73 Ok((k, lp)) => {
74 let raw_id = <&[u8; 32]>::try_from(&k[103..135])?.to_owned();
75 Ok((position::Id(raw_id), lp))
76 }
77 Err(e) => Err(e),
78 })
79 .boxed()
80 }
81
82 async fn position_by_id(&self, id: &position::Id) -> Result<Option<position::Position>> {
83 self.get(&state_key::position_by_id(id)).await
84 }
85
86 async fn check_position_by_id(&self, id: &position::Id) -> bool {
87 self.get_raw(&state_key::position_by_id(id))
88 .await
89 .expect("no deserialization errors")
90 .is_some()
91 }
92
93 async fn best_position(
94 &self,
95 pair: &DirectedTradingPair,
96 ) -> Result<Option<(position::Id, position::Position)>> {
97 let mut positions_by_price = self.positions_by_price(pair);
98 positions_by_price.next().await.transpose()
99 }
100
101 fn pending_position_closures(&self) -> im::Vector<position::Id> {
103 self.object_get(state_key::pending_position_closures())
104 .unwrap_or_default()
105 }
106
107 fn candidate_set(
112 &self,
113 from: asset::Id,
114 fixed_candidates: Arc<Vec<asset::Id>>,
115 ) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send>> {
116 let fc = fixed_candidates.clone();
118 let mut dynamic_candidates = self
119 .ordered_routable_assets(&from)
120 .filter(move |c| {
121 future::ready(!fc.contains(c.as_ref().expect("failed to fetch candidate")))
122 })
123 .take(DYNAMIC_ASSET_LIMIT);
124 try_stream! {
125 for candidate in fixed_candidates.iter() {
127 yield candidate.clone();
128 }
129
130 while let Some(candidate) = dynamic_candidates
132 .next().await {
133 yield candidate.expect("failed to fetch candidate");
134 }
135 }
136 .boxed()
137 }
138
139 fn ordered_routable_assets(
141 &self,
142 start: &asset::Id,
143 ) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send + 'static>> {
144 let prefix = engine::routable_assets::starting_from(start);
145 tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for routable assets by liquidity");
146 self.nonverifiable_prefix_raw(&prefix)
147 .map(|entry| match entry {
148 Ok((_, v)) => Ok(asset::Id::decode(&*v)?),
149 Err(e) => Err(e),
150 })
151 .boxed()
152 }
153
154 fn recently_accessed_assets(&self) -> im::OrdSet<asset::Id> {
156 self.object_get(state_key::recently_accessed_assets())
157 .unwrap_or_default()
158 }
159}
160impl<T: StateRead + ?Sized> PositionRead for T {}
161
162#[async_trait]
164pub trait PositionManager: StateWrite + PositionRead {
165 #[instrument(level = "debug", skip(self))]
173 async fn close_position_by_id(&mut self, id: &position::Id) -> Result<()> {
174 tracing::debug!(?id, "closing position, first fetch it");
175 let prev_state = self
176 .position_by_id(id)
177 .await
178 .expect("fetching position should not fail")
179 .ok_or_else(|| anyhow::anyhow!("could not find position {} to close", id))?
180 .tap(|lp| tracing::trace!(prev_state = ?lp, "retrieved previous lp state"));
181
182 anyhow::ensure!(
183 matches!(
184 prev_state.state,
185 position::State::Opened | position::State::Closed,
186 ),
187 "attempted to close a position with state {:?}, expected Opened or Closed",
188 prev_state.state
189 );
190
191 if prev_state.state == position::State::Closed {
195 tracing::debug!(
196 ?id,
197 "position is already closed so we can skip state updates"
198 );
199 return Ok(());
200 }
201
202 let new_state = {
203 let mut new_state = prev_state.clone();
204 new_state.state = position::State::Closed;
205 new_state
206 };
207
208 self.update_position(id, Some(prev_state), new_state)
209 .await?;
210 self.record_proto(event::EventPositionClose { position_id: *id }.to_proto());
211
212 Ok(())
213 }
214
215 async fn queue_close_position(&mut self, id: position::Id) -> Result<()> {
217 tracing::debug!(
218 ?id,
219 "checking current position state before queueing for closure"
220 );
221 let current_state = self
222 .position_by_id(&id)
223 .await
224 .expect("fetching position should not fail")
225 .ok_or_else(|| anyhow::anyhow!("could not find position {} to close", id))?
226 .tap(|lp| tracing::trace!(prev_state = ?lp, "retrieved previous lp state"));
227
228 if current_state.state == position::State::Opened {
229 tracing::debug!(
230 ?current_state.state,
231 "queueing opened position for closure"
232 );
233 let mut to_close = self.pending_position_closures();
234 to_close.push_back(id);
235 self.object_put(state_key::pending_position_closures(), to_close);
236
237 self.record_proto(event::EventQueuePositionClose { position_id: id }.to_proto());
239 } else {
240 tracing::debug!(
241 ?current_state.state,
242 "skipping queueing for closure of non-opened position"
243 );
244 }
245
246 Ok(())
247 }
248
249 #[instrument(skip_all)]
251 async fn close_queued_positions(&mut self) -> Result<()> {
252 let to_close = self.pending_position_closures();
253 for id in to_close {
254 tracing::trace!(position_to_close = ?id, "processing LP queue");
255 self.close_position_by_id(&id).await?;
256 }
257 self.object_delete(state_key::pending_position_closures());
258 Ok(())
259 }
260
261 #[tracing::instrument(level = "debug", skip_all)]
275 async fn open_position(&mut self, position: position::Position) -> Result<()> {
276 let id = position.id();
277 tracing::debug!(?id, "attempting to open a position");
278
279 if position.state != position::State::Opened {
281 anyhow::bail!("attempted to open a position with a state besides `Opened`");
282 }
283
284 if let Some(existing_lp) = self.position_by_id(&id).await? {
286 anyhow::bail!(
287 "attempted to open a position with ID {id:?}, which already exists with state {existing_lp:?}",
288 );
289 }
290
291 self.dex_vcb_credit(position.reserves_1()).await?;
293 self.dex_vcb_credit(position.reserves_2()).await?;
294
295 let routing_params = self.routing_params().await?;
298 self.add_recently_accessed_asset(
299 position.phi.pair.asset_1(),
300 routing_params.fixed_candidates.clone(),
301 );
302 self.add_recently_accessed_asset(
303 position.phi.pair.asset_2(),
304 routing_params.fixed_candidates,
305 );
306 self.mark_trading_pair_as_active(position.phi.pair);
309
310 self.record_proto(event::EventPositionOpen::from(position.clone()).to_proto());
312 self.update_position(&id, None, position).await?;
313
314 Ok(())
315 }
316
317 #[tracing::instrument(level = "debug", skip(self, new_state))]
340 async fn position_execution(
341 &mut self,
342 mut new_state: Position,
343 context: DirectedTradingPair,
344 ) -> Result<Position> {
345 let position_id = new_state.id();
346 tracing::debug!(?position_id, "attempting to execute position");
347 let prev_state = self
348 .position_by_id(&position_id)
349 .await?
350 .ok_or_else(|| anyhow::anyhow!("withdrew from unknown position {}", new_state.id()))?;
351
352 if prev_state == new_state {
360 anyhow::ensure!(
361 matches!(&prev_state.state, position::State::Opened | position::State::Closed),
362 "attempted to do a no-op execution against a position with state {:?}, expected Opened or Closed",
363 prev_state.state
364 );
365 return Ok(new_state);
366 }
367
368 anyhow::ensure!(
369 matches!(&prev_state.state, position::State::Opened),
370 "attempted to execute against a position with state {:?}, expected Opened",
371 prev_state.state
372 );
373 anyhow::ensure!(
374 matches!(&new_state.state, position::State::Opened),
375 "supplied post-execution state {:?}, expected Opened",
376 prev_state.state
377 );
378
379 self.record_proto(
382 event::EventPositionExecution::in_context(&prev_state, &new_state, context).to_proto(),
383 );
384
385 if new_state.close_on_fill {
388 if new_state.reserves.r1 == 0u64.into() || new_state.reserves.r2 == 0u64.into() {
389 tracing::debug!(
390 ?position_id,
391 r1 = ?new_state.reserves.r1,
392 r2 = ?new_state.reserves.r2,
393 "marking position as closed due to close-on-fill"
394 );
395
396 new_state.state = position::State::Closed;
397 self.record_proto(event::EventPositionClose { position_id }.to_proto());
398 }
399 }
400
401 self.record_position_execution(&prev_state, &new_state)
404 .await
405 .map_err(|e| tracing::warn!(?e, "failed to record position execution"))
406 .ok();
407
408 self.update_position(&position_id, Some(prev_state), new_state)
409 .await
410 }
411
412 #[tracing::instrument(level = "debug", skip(self))]
416 async fn withdraw_position(
417 &mut self,
418 position_id: position::Id,
419 sequence: u64,
420 ) -> Result<Balance> {
421 let prev_state = self
422 .position_by_id(&position_id)
423 .await?
424 .ok_or_else(|| anyhow::anyhow!("withdrew from unknown position {}", position_id))?;
425
426 if sequence == 0 {
434 if prev_state.state != position::State::Closed {
435 anyhow::bail!(
436 "attempted to withdraw position {} with state {}, expected Closed",
437 position_id,
438 prev_state.state
439 );
440 }
441 } else {
442 if let position::State::Withdrawn {
443 sequence: current_sequence,
444 } = prev_state.state
445 {
446 if current_sequence + 1 != sequence {
448 anyhow::bail!(
449 "attempted to withdraw position {} with sequence {}, expected {}",
450 position_id,
451 sequence,
452 current_sequence + 1
453 );
454 }
455 } else {
456 anyhow::bail!(
457 "attempted to withdraw position {} with state {}, expected Withdrawn",
458 position_id,
459 prev_state.state
460 );
461 }
462 }
463
464 self.record_proto(
467 event::EventPositionWithdraw::in_context(position_id, &prev_state).to_proto(),
468 );
469
470 let reserves = prev_state.reserves.balance(&prev_state.phi.pair);
472
473 self.dex_vcb_debit(prev_state.reserves_1()).await?;
475 self.dex_vcb_debit(prev_state.reserves_2()).await?;
476
477 let new_state = {
481 let mut new_state = prev_state.clone();
482 new_state.state = position::State::Withdrawn { sequence };
484 new_state.reserves = Reserves::zero();
485 new_state
486 };
487
488 self.update_position(&position_id, Some(prev_state), new_state)
489 .await?;
490
491 Ok(reserves)
492 }
493
494 #[tracing::instrument(level = "debug", skip(self))]
496 async fn reward_position(
497 &mut self,
498 position_id: position::Id,
499 reward: Amount,
500 ) -> anyhow::Result<()> {
501 let prev_state = self
502 .position_by_id(&position_id)
503 .await?
504 .ok_or_else(|| anyhow::anyhow!("rewarding unknown position {}", position_id))?;
505 let new_state = {
508 let mut new_state = prev_state.clone();
509 let pair = prev_state.phi.pair;
510 let to_increment = if pair.asset_1() == *STAKING_TOKEN_ASSET_ID {
511 &mut new_state.reserves.r1
512 } else if pair.asset_2() == *STAKING_TOKEN_ASSET_ID {
513 &mut new_state.reserves.r2
514 } else {
515 tracing::error!("pair {} does not contain staking asset", pair);
516 return Ok(());
517 };
518 *to_increment = to_increment.checked_add(&reward).expect(&format!(
519 "failed to add reward {} to reserves {}",
520 reward, *to_increment
521 ));
522
523 new_state
527 };
528 self.update_position(&position_id, Some(prev_state), new_state)
529 .await?;
530 self.dex_vcb_credit(Value {
533 asset_id: *STAKING_TOKEN_ASSET_ID,
534 amount: reward,
535 })
536 .await?;
537 Ok(())
538 }
539}
540
541impl<T: StateWrite + ?Sized + Chandelier> PositionManager for T {}
542
543#[async_trait]
544trait Inner: StateWrite {
545 #[instrument(level = "debug", skip_all)]
550 async fn update_position(
551 &mut self,
552 id: &position::Id,
553 prev_state: Option<Position>,
554 new_state: Position,
555 ) -> Result<Position> {
556 tracing::debug!(?id, prev_position_state = ?prev_state.as_ref().map(|p| &p.state), new_position_state = ?new_state.state, "updating position state");
557 tracing::trace!(?id, ?prev_state, ?new_state, "updating position state");
558
559 Self::guard_invalid_transitions(&prev_state, &new_state, &id)?;
561
562 self.update_position_by_inventory_index(&id, &prev_state, &new_state)?;
564 self.update_asset_by_base_liquidity_index(&id, &prev_state, &new_state)
565 .await?;
566 self.update_trading_pair_position_counter(&prev_state, &new_state)
567 .await?;
568 self.update_position_by_price_index(&id, &prev_state, &new_state)?;
569 self.update_volume_index(&id, &prev_state, &new_state).await;
570
571 self.put(state_key::position_by_id(&id), new_state.clone());
572 Ok(new_state)
573 }
574
575 fn guard_invalid_transitions(
576 prev_state: &Option<Position>,
577 new_state: &Position,
578 id: &position::Id,
579 ) -> Result<()> {
580 use position::State::*;
581
582 if let Some(prev_lp) = prev_state {
583 tracing::debug!(?id, prev = ?prev_lp.state, new = ?new_state.state, "evaluating state transition");
584 match (prev_lp.state, new_state.state) {
585 (Opened, Opened) => {}
586 (Opened, Closed) => {}
587 (Closed, Closed) => { }
588 (Closed, Withdrawn { sequence }) => {
589 ensure!(
590 sequence == 0,
591 "withdrawn positions must have their sequence start at zero (found: {})",
592 sequence
593 );
594 }
595 (Withdrawn { sequence: old_seq }, Withdrawn { sequence: new_seq }) => {
596 tracing::debug!(?old_seq, ?new_seq, "updating withdrawn position");
597 ensure!(
602 new_seq == old_seq + 1 || new_seq == old_seq,
603 "if the sequence number increase, it must increase by exactly one"
604 );
605 }
606 _ => bail!("invalid transition"),
607 }
608 } else {
609 ensure!(
610 matches!(new_state.state, Opened),
611 "fresh positions MUST start in the `Opened` state (found: {:?})",
612 new_state.state
613 );
614 }
615
616 Ok(())
617 }
618}
619impl<T: StateWrite + ?Sized> Inner for T {}