1use std::future;
2use std::{pin::Pin, sync::Arc};
3
4use anyhow::{bail, ensure, Result};
5use async_stream::try_stream;
6use async_trait::async_trait;
7use cnidarium::{EscapedByteSlice, StateRead, StateWrite};
8use futures::Stream;
9use futures::StreamExt;
10use penumbra_sdk_asset::{asset, Balance};
11use penumbra_sdk_proto::DomainType;
12use penumbra_sdk_proto::{StateReadProto, StateWriteProto};
13use tap::Tap;
14use tracing::instrument;
15
16use crate::component::{
17 dex::InternalDexWrite,
18 dex::StateReadExt as _,
19 position_manager::{
20 base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex,
21 price_index::PositionByPriceIndex,
22 },
23};
24use crate::lp::Reserves;
25use crate::{
26 component::position_manager::counter::PositionCounter,
27 component::ValueCircuitBreaker,
28 lp::position::{self, Position},
29 state_key::engine,
30 DirectedTradingPair,
31};
32use crate::{event, state_key};
33
34use super::chandelier::Chandelier;
35
36const DYNAMIC_ASSET_LIMIT: usize = 10;
37
38mod base_liquidity_index;
39pub(crate) mod counter;
40pub(crate) mod inventory_index;
41pub(crate) mod price_index;
42
43#[async_trait]
44pub trait PositionRead: StateRead {
45 fn all_positions(
47 &self,
48 ) -> Pin<Box<dyn Stream<Item = Result<position::Position>> + Send + 'static>> {
49 let prefix = state_key::all_positions();
50 self.prefix(prefix)
51 .map(|entry| match entry {
52 Ok((_, metadata)) => {
53 tracing::debug!(?metadata, "found position");
54 Ok(metadata)
55 }
56 Err(e) => Err(e),
57 })
58 .boxed()
59 }
60
61 fn positions_by_price(
63 &self,
64 pair: &DirectedTradingPair,
65 ) -> Pin<Box<dyn Stream<Item = Result<(position::Id, position::Position)>> + Send + 'static>>
66 {
67 let prefix = engine::price_index::prefix(pair);
68 tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for positions by price");
69 self.nonverifiable_prefix(&prefix)
70 .map(|entry| match entry {
71 Ok((k, lp)) => {
72 let raw_id = <&[u8; 32]>::try_from(&k[103..135])?.to_owned();
73 Ok((position::Id(raw_id), lp))
74 }
75 Err(e) => Err(e),
76 })
77 .boxed()
78 }
79
80 async fn position_by_id(&self, id: &position::Id) -> Result<Option<position::Position>> {
81 self.get(&state_key::position_by_id(id)).await
82 }
83
84 async fn check_position_by_id(&self, id: &position::Id) -> bool {
85 self.get_raw(&state_key::position_by_id(id))
86 .await
87 .expect("no deserialization errors")
88 .is_some()
89 }
90
91 async fn best_position(
92 &self,
93 pair: &DirectedTradingPair,
94 ) -> Result<Option<(position::Id, position::Position)>> {
95 let mut positions_by_price = self.positions_by_price(pair);
96 positions_by_price.next().await.transpose()
97 }
98
99 fn pending_position_closures(&self) -> im::Vector<position::Id> {
101 self.object_get(state_key::pending_position_closures())
102 .unwrap_or_default()
103 }
104
105 fn candidate_set(
110 &self,
111 from: asset::Id,
112 fixed_candidates: Arc<Vec<asset::Id>>,
113 ) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send>> {
114 let fc = fixed_candidates.clone();
116 let mut dynamic_candidates = self
117 .ordered_routable_assets(&from)
118 .filter(move |c| {
119 future::ready(!fc.contains(c.as_ref().expect("failed to fetch candidate")))
120 })
121 .take(DYNAMIC_ASSET_LIMIT);
122 try_stream! {
123 for candidate in fixed_candidates.iter() {
125 yield candidate.clone();
126 }
127
128 while let Some(candidate) = dynamic_candidates
130 .next().await {
131 yield candidate.expect("failed to fetch candidate");
132 }
133 }
134 .boxed()
135 }
136
137 fn ordered_routable_assets(
139 &self,
140 start: &asset::Id,
141 ) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send + 'static>> {
142 let prefix = engine::routable_assets::starting_from(start);
143 tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for routable assets by liquidity");
144 self.nonverifiable_prefix_raw(&prefix)
145 .map(|entry| match entry {
146 Ok((_, v)) => Ok(asset::Id::decode(&*v)?),
147 Err(e) => Err(e),
148 })
149 .boxed()
150 }
151
152 fn recently_accessed_assets(&self) -> im::OrdSet<asset::Id> {
154 self.object_get(state_key::recently_accessed_assets())
155 .unwrap_or_default()
156 }
157}
158impl<T: StateRead + ?Sized> PositionRead for T {}
159
160#[async_trait]
162pub trait PositionManager: StateWrite + PositionRead {
163 #[instrument(level = "debug", skip(self))]
171 async fn close_position_by_id(&mut self, id: &position::Id) -> Result<()> {
172 tracing::debug!(?id, "closing position, first fetch it");
173 let prev_state = self
174 .position_by_id(id)
175 .await
176 .expect("fetching position should not fail")
177 .ok_or_else(|| anyhow::anyhow!("could not find position {} to close", id))?
178 .tap(|lp| tracing::trace!(prev_state = ?lp, "retrieved previous lp state"));
179
180 anyhow::ensure!(
181 matches!(
182 prev_state.state,
183 position::State::Opened | position::State::Closed,
184 ),
185 "attempted to close a position with state {:?}, expected Opened or Closed",
186 prev_state.state
187 );
188
189 if prev_state.state == position::State::Closed {
193 tracing::debug!(
194 ?id,
195 "position is already closed so we can skip state updates"
196 );
197 return Ok(());
198 }
199
200 let new_state = {
201 let mut new_state = prev_state.clone();
202 new_state.state = position::State::Closed;
203 new_state
204 };
205
206 self.update_position(id, Some(prev_state), new_state)
207 .await?;
208 self.record_proto(event::EventPositionClose { position_id: *id }.to_proto());
209
210 Ok(())
211 }
212
213 async fn queue_close_position(&mut self, id: position::Id) -> Result<()> {
215 tracing::debug!(
216 ?id,
217 "checking current position state before queueing for closure"
218 );
219 let current_state = self
220 .position_by_id(&id)
221 .await
222 .expect("fetching position should not fail")
223 .ok_or_else(|| anyhow::anyhow!("could not find position {} to close", id))?
224 .tap(|lp| tracing::trace!(prev_state = ?lp, "retrieved previous lp state"));
225
226 if current_state.state == position::State::Opened {
227 tracing::debug!(
228 ?current_state.state,
229 "queueing opened position for closure"
230 );
231 let mut to_close = self.pending_position_closures();
232 to_close.push_back(id);
233 self.object_put(state_key::pending_position_closures(), to_close);
234
235 self.record_proto(event::EventQueuePositionClose { position_id: id }.to_proto());
237 } else {
238 tracing::debug!(
239 ?current_state.state,
240 "skipping queueing for closure of non-opened position"
241 );
242 }
243
244 Ok(())
245 }
246
247 #[instrument(skip_all)]
249 async fn close_queued_positions(&mut self) -> Result<()> {
250 let to_close = self.pending_position_closures();
251 for id in to_close {
252 tracing::trace!(position_to_close = ?id, "processing LP queue");
253 self.close_position_by_id(&id).await?;
254 }
255 self.object_delete(state_key::pending_position_closures());
256 Ok(())
257 }
258
259 #[tracing::instrument(level = "debug", skip_all)]
273 async fn open_position(&mut self, position: position::Position) -> Result<()> {
274 let id = position.id();
275 tracing::debug!(?id, "attempting to open a position");
276
277 if position.state != position::State::Opened {
279 anyhow::bail!("attempted to open a position with a state besides `Opened`");
280 }
281
282 if let Some(existing_lp) = self.position_by_id(&id).await? {
284 anyhow::bail!(
285 "attempted to open a position with ID {id:?}, which already exists with state {existing_lp:?}",
286 );
287 }
288
289 self.dex_vcb_credit(position.reserves_1()).await?;
291 self.dex_vcb_credit(position.reserves_2()).await?;
292
293 let routing_params = self.routing_params().await?;
296 self.add_recently_accessed_asset(
297 position.phi.pair.asset_1(),
298 routing_params.fixed_candidates.clone(),
299 );
300 self.add_recently_accessed_asset(
301 position.phi.pair.asset_2(),
302 routing_params.fixed_candidates,
303 );
304 self.mark_trading_pair_as_active(position.phi.pair);
307
308 self.record_proto(event::EventPositionOpen::from(position.clone()).to_proto());
310 self.update_position(&id, None, position).await?;
311
312 Ok(())
313 }
314
315 #[tracing::instrument(level = "debug", skip(self, new_state))]
338 async fn position_execution(
339 &mut self,
340 mut new_state: Position,
341 context: DirectedTradingPair,
342 ) -> Result<Position> {
343 let position_id = new_state.id();
344 tracing::debug!(?position_id, "attempting to execute position");
345 let prev_state = self
346 .position_by_id(&position_id)
347 .await?
348 .ok_or_else(|| anyhow::anyhow!("withdrew from unknown position {}", new_state.id()))?;
349
350 if prev_state == new_state {
358 anyhow::ensure!(
359 matches!(&prev_state.state, position::State::Opened | position::State::Closed),
360 "attempted to do a no-op execution against a position with state {:?}, expected Opened or Closed",
361 prev_state.state
362 );
363 return Ok(new_state);
364 }
365
366 anyhow::ensure!(
367 matches!(&prev_state.state, position::State::Opened),
368 "attempted to execute against a position with state {:?}, expected Opened",
369 prev_state.state
370 );
371 anyhow::ensure!(
372 matches!(&new_state.state, position::State::Opened),
373 "supplied post-execution state {:?}, expected Opened",
374 prev_state.state
375 );
376
377 self.record_proto(
380 event::EventPositionExecution::in_context(&prev_state, &new_state, context).to_proto(),
381 );
382
383 if new_state.close_on_fill {
386 if new_state.reserves.r1 == 0u64.into() || new_state.reserves.r2 == 0u64.into() {
387 tracing::debug!(
388 ?position_id,
389 r1 = ?new_state.reserves.r1,
390 r2 = ?new_state.reserves.r2,
391 "marking position as closed due to close-on-fill"
392 );
393
394 new_state.state = position::State::Closed;
395 self.record_proto(event::EventPositionClose { position_id }.to_proto());
396 }
397 }
398
399 self.record_position_execution(&prev_state, &new_state)
402 .await
403 .map_err(|e| tracing::warn!(?e, "failed to record position execution"))
404 .ok();
405
406 self.update_position(&position_id, Some(prev_state), new_state)
407 .await
408 }
409
410 #[tracing::instrument(level = "debug", skip(self))]
414 async fn withdraw_position(
415 &mut self,
416 position_id: position::Id,
417 sequence: u64,
418 ) -> Result<Balance> {
419 let prev_state = self
420 .position_by_id(&position_id)
421 .await?
422 .ok_or_else(|| anyhow::anyhow!("withdrew from unknown position {}", position_id))?;
423
424 if sequence == 0 {
432 if prev_state.state != position::State::Closed {
433 anyhow::bail!(
434 "attempted to withdraw position {} with state {}, expected Closed",
435 position_id,
436 prev_state.state
437 );
438 }
439 } else {
440 if let position::State::Withdrawn {
441 sequence: current_sequence,
442 } = prev_state.state
443 {
444 if current_sequence + 1 != sequence {
445 anyhow::bail!(
446 "attempted to withdraw position {} with sequence {}, expected {}",
447 position_id,
448 sequence,
449 current_sequence + 1
450 );
451 }
452 } else {
453 anyhow::bail!(
454 "attempted to withdraw position {} with state {}, expected Withdrawn",
455 position_id,
456 prev_state.state
457 );
458 }
459 }
460
461 self.record_proto(
464 event::EventPositionWithdraw::in_context(position_id, &prev_state).to_proto(),
465 );
466
467 let reserves = prev_state.reserves.balance(&prev_state.phi.pair);
469
470 self.dex_vcb_debit(prev_state.reserves_1()).await?;
472 self.dex_vcb_debit(prev_state.reserves_2()).await?;
473
474 let new_state = {
478 let mut new_state = prev_state.clone();
479 new_state.state = position::State::Withdrawn { sequence };
481 new_state.reserves = Reserves::zero();
482 new_state
483 };
484
485 self.update_position(&position_id, Some(prev_state), new_state)
486 .await?;
487
488 Ok(reserves)
489 }
490}
491
492impl<T: StateWrite + ?Sized + Chandelier> PositionManager for T {}
493
494#[async_trait]
495trait Inner: StateWrite {
496 #[instrument(level = "debug", skip_all)]
501 async fn update_position(
502 &mut self,
503 id: &position::Id,
504 prev_state: Option<Position>,
505 new_state: Position,
506 ) -> Result<Position> {
507 tracing::debug!(?id, prev_position_state = ?prev_state.as_ref().map(|p| &p.state), new_position_state = ?new_state.state, "updating position state");
508 tracing::trace!(?id, ?prev_state, ?new_state, "updating position state");
509
510 Self::guard_invalid_transitions(&prev_state, &new_state, &id)?;
512
513 self.update_position_by_inventory_index(&id, &prev_state, &new_state)?;
515 self.update_asset_by_base_liquidity_index(&id, &prev_state, &new_state)
516 .await?;
517 self.update_trading_pair_position_counter(&prev_state, &new_state)
518 .await?;
519 self.update_position_by_price_index(&id, &prev_state, &new_state)?;
520
521 self.put(state_key::position_by_id(&id), new_state.clone());
522 Ok(new_state)
523 }
524
525 fn guard_invalid_transitions(
526 prev_state: &Option<Position>,
527 new_state: &Position,
528 id: &position::Id,
529 ) -> Result<()> {
530 use position::State::*;
531
532 if let Some(prev_lp) = prev_state {
533 tracing::debug!(?id, prev = ?prev_lp.state, new = ?new_state.state, "evaluating state transition");
534 match (prev_lp.state, new_state.state) {
535 (Opened, Opened) => {}
536 (Opened, Closed) => {}
537 (Closed, Closed) => { }
538 (Closed, Withdrawn { sequence }) => {
539 ensure!(
540 sequence == 0,
541 "withdrawn positions must have their sequence start at zero (found: {})",
542 sequence
543 );
544 }
545 (Withdrawn { sequence: old_seq }, Withdrawn { sequence: new_seq }) => {
546 let expected_seq = old_seq.saturating_add(1);
547 ensure!(
548 new_seq == expected_seq,
549 "withdrawn must increase 1-by-1 (old: {}, new: {}, expected: {})",
550 old_seq,
551 new_seq,
552 expected_seq
553 );
554 }
555 _ => bail!("invalid transition"),
556 }
557 } else {
558 ensure!(
559 matches!(new_state.state, Opened),
560 "fresh positions MUST start in the `Opened` state (found: {:?})",
561 new_state.state
562 );
563 }
564
565 Ok(())
566 }
567}
568impl<T: StateWrite + ?Sized> Inner for T {}