1use anyhow::anyhow;
2use clap::Result;
3use cometindex::{
4 async_trait,
5 index::{BlockEvents, EventBatch, EventBatchContext},
6 AppView, PgTransaction,
7};
8use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID};
9use penumbra_sdk_dex::{
10 event::{
11 EventBatchSwap, EventCandlestickData, EventPositionClose, EventPositionExecution,
12 EventPositionOpen, EventPositionWithdraw, EventQueuePositionClose,
13 },
14 lp::{position::Flows, Reserves},
15 DirectedTradingPair, SwapExecution, TradingPair,
16};
17use penumbra_sdk_dex::{
18 event::{EventSwap, EventSwapClaim},
19 lp::position::{Id as PositionId, Position},
20};
21use penumbra_sdk_funding::event::EventLqtPositionReward;
22use penumbra_sdk_num::Amount;
23use penumbra_sdk_proto::event::EventDomainType;
24use penumbra_sdk_proto::DomainType;
25use penumbra_sdk_sct::event::EventBlockRoot;
26use penumbra_sdk_transaction::Transaction;
27use sqlx::types::BigDecimal;
28use sqlx::Row;
29use std::collections::{BTreeMap, HashMap, HashSet};
30
31type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;
32
33mod candle {
34 use super::DateTime;
35 use chrono::{Datelike as _, Days, TimeDelta, TimeZone as _, Timelike as _, Utc};
36 use penumbra_sdk_dex::CandlestickData;
37 use std::fmt::Display;
38
39 fn geo_mean(a: f64, b: f64) -> f64 {
40 (a * b).sqrt()
41 }
42
43 #[derive(Debug, Clone, Copy)]
48 pub struct Candle {
49 pub open: f64,
50 pub close: f64,
51 pub low: f64,
52 pub high: f64,
53 pub direct_volume: f64,
54 pub swap_volume: f64,
55 }
56
57 impl Candle {
58 pub fn from_candlestick_data(data: &CandlestickData) -> Self {
59 Self {
63 open: data.open,
64 close: data.close,
65 low: data.low,
66 high: data.high,
67 direct_volume: data.direct_volume,
68 swap_volume: data.swap_volume,
69 }
70 }
71
72 pub fn merge(&mut self, that: &Self) {
73 self.close = that.close;
74 self.low = self.low.min(that.low);
75 self.high = self.high.max(that.high);
76 self.direct_volume += that.direct_volume;
77 self.swap_volume += that.swap_volume;
78 }
79
80 pub fn mix(&mut self, op: &Self) {
82 self.close /= geo_mean(self.close, op.close);
85 self.open /= geo_mean(self.open, op.open);
86 self.low = self.low.min(1.0 / op.low);
87 self.high = self.high.min(1.0 / op.high);
88 self.direct_volume += op.direct_volume / self.close;
90 self.swap_volume += op.swap_volume / self.close;
91 }
92
93 pub fn flip(&self) -> Self {
95 Self {
96 open: 1.0 / self.open,
97 close: 1.0 / self.close,
98 low: 1.0 / self.low,
99 high: 1.0 / self.high,
100 direct_volume: self.direct_volume * self.close,
101 swap_volume: self.swap_volume * self.close,
102 }
103 }
104 }
105
106 impl From<CandlestickData> for Candle {
107 fn from(value: CandlestickData) -> Self {
108 Self::from(&value)
109 }
110 }
111
112 impl From<&CandlestickData> for Candle {
113 fn from(value: &CandlestickData) -> Self {
114 Self::from_candlestick_data(value)
115 }
116 }
117
118 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
119 pub enum Window {
120 W1m,
121 W15m,
122 W1h,
123 W4h,
124 W1d,
125 W1w,
126 W1mo,
127 }
128
129 impl Window {
130 pub fn all() -> impl Iterator<Item = Self> {
131 [
132 Window::W1m,
133 Window::W15m,
134 Window::W1h,
135 Window::W4h,
136 Window::W1d,
137 Window::W1w,
138 Window::W1mo,
139 ]
140 .into_iter()
141 }
142
143 pub fn anchor(&self, time: DateTime) -> DateTime {
150 let (y, mo, d, h, m) = (
151 time.year(),
152 time.month(),
153 time.day(),
154 time.hour(),
155 time.minute(),
156 );
157 let out = match self {
158 Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(),
159 Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(),
160 Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(),
161 Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(),
162 Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(),
163 Window::W1w => Utc
164 .with_ymd_and_hms(y, mo, d, 0, 0, 0)
165 .single()
166 .and_then(|x| {
167 x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into()))
168 }),
169 Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(),
170 };
171 out.unwrap()
172 }
173
174 pub fn subtract_from(&self, time: DateTime) -> DateTime {
175 let delta = match self {
176 Window::W1m => TimeDelta::minutes(1),
177 Window::W15m => TimeDelta::minutes(15),
178 Window::W1h => TimeDelta::hours(1),
179 Window::W4h => TimeDelta::hours(4),
180 Window::W1d => TimeDelta::days(1),
181 Window::W1w => TimeDelta::weeks(1),
182 Window::W1mo => TimeDelta::days(30),
183 };
184 time - delta
185 }
186 }
187
188 impl Display for Window {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 use Window::*;
191 let str = match self {
192 W1m => "1m",
193 W15m => "15m",
194 W1h => "1h",
195 W4h => "4h",
196 W1d => "1d",
197 W1w => "1w",
198 W1mo => "1mo",
199 };
200 write!(f, "{}", str)
201 }
202 }
203
204 #[derive(Debug)]
205 pub struct WindowedCandle {
206 start: DateTime,
207 window: Window,
208 candle: Candle,
209 }
210
211 impl WindowedCandle {
212 pub fn new(now: DateTime, window: Window, candle: Candle) -> Self {
213 Self {
214 start: window.anchor(now),
215 window,
216 candle,
217 }
218 }
219
220 pub fn with_candle(&mut self, now: DateTime, candle: Candle) -> Option<(DateTime, Candle)> {
225 let start = self.window.anchor(now);
226 if start > self.start {
228 let old_start = std::mem::replace(&mut self.start, start);
229 let old_candle = std::mem::replace(&mut self.candle, candle);
230 Some((old_start, old_candle))
231 } else {
232 self.candle.merge(&candle);
233 None
234 }
235 }
236
237 pub fn window(&self) -> Window {
238 self.window
239 }
240
241 pub fn flush(self) -> (DateTime, Candle) {
242 (self.start, self.candle)
243 }
244 }
245}
246pub use candle::{Candle, Window, WindowedCandle};
247
248mod price_chart {
249 use super::*;
250
251 #[derive(Debug)]
253 pub struct Context {
254 asset_start: asset::Id,
255 asset_end: asset::Id,
256 window: Window,
257 state: Option<WindowedCandle>,
258 }
259
260 impl Context {
261 pub async fn load(
262 dbtx: &mut PgTransaction<'_>,
263 asset_start: asset::Id,
264 asset_end: asset::Id,
265 window: Window,
266 ) -> anyhow::Result<Self> {
267 let row: Option<(f64, f64, f64, f64, f64, f64, DateTime)> = sqlx::query_as(
268 "
269 SELECT open, close, high, low, direct_volume, swap_volume, start_time
270 FROM dex_ex_price_charts
271 WHERE asset_start = $1
272 AND asset_end = $2
273 AND the_window = $3
274 ORDER BY start_time DESC
275 LIMIT 1
276 ",
277 )
278 .bind(asset_start.to_bytes())
279 .bind(asset_end.to_bytes())
280 .bind(window.to_string())
281 .fetch_optional(dbtx.as_mut())
282 .await?;
283 let state = row.map(
284 |(open, close, low, high, direct_volume, swap_volume, start)| {
285 let candle = Candle {
286 open,
287 close,
288 low,
289 high,
290 direct_volume,
291 swap_volume,
292 };
293 WindowedCandle::new(start, window, candle)
294 },
295 );
296 Ok(Self {
297 asset_start,
298 asset_end,
299 window,
300 state,
301 })
302 }
303
304 async fn write_candle(
305 &self,
306 dbtx: &mut PgTransaction<'_>,
307 start: DateTime,
308 candle: Candle,
309 ) -> anyhow::Result<()> {
310 sqlx::query(
311 "
312 INSERT INTO dex_ex_price_charts(
313 id, asset_start, asset_end, the_window,
314 start_time, open, close, high, low, direct_volume, swap_volume
315 )
316 VALUES(
317 DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
318 )
319 ON CONFLICT (asset_start, asset_end, the_window, start_time) DO UPDATE SET
320 open = EXCLUDED.open,
321 close = EXCLUDED.close,
322 high = EXCLUDED.high,
323 low = EXCLUDED.low,
324 direct_volume = EXCLUDED.direct_volume,
325 swap_volume = EXCLUDED.swap_volume
326 ",
327 )
328 .bind(self.asset_start.to_bytes())
329 .bind(self.asset_end.to_bytes())
330 .bind(self.window.to_string())
331 .bind(start)
332 .bind(candle.open)
333 .bind(candle.close)
334 .bind(candle.high)
335 .bind(candle.low)
336 .bind(candle.direct_volume)
337 .bind(candle.swap_volume)
338 .execute(dbtx.as_mut())
339 .await?;
340 Ok(())
341 }
342
343 pub async fn update(
344 &mut self,
345 dbtx: &mut PgTransaction<'_>,
346 now: DateTime,
347 candle: Candle,
348 ) -> anyhow::Result<()> {
349 let state = match self.state.as_mut() {
350 Some(x) => x,
351 None => {
352 self.state = Some(WindowedCandle::new(now, self.window, candle));
353 self.state.as_mut().unwrap()
354 }
355 };
356 if let Some((start, old_candle)) = state.with_candle(now, candle) {
357 self.write_candle(dbtx, start, old_candle).await?;
358 };
359 Ok(())
360 }
361
362 pub async fn unload(mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
363 let state = std::mem::replace(&mut self.state, None);
364 if let Some(state) = state {
365 let (start, candle) = state.flush();
366 self.write_candle(dbtx, start, candle).await?;
367 }
368 Ok(())
369 }
370 }
371}
372
373use price_chart::Context as PriceChartContext;
374
375mod summary {
376 use cometindex::PgTransaction;
377 use penumbra_sdk_asset::asset;
378
379 use super::{Candle, DateTime, PairMetrics, Window};
380
381 pub struct Context {
382 start: asset::Id,
383 end: asset::Id,
384 price: f64,
385 liquidity: f64,
386 start_price_indexing_denom: f64,
387 }
388
389 impl Context {
390 pub async fn load(
391 dbtx: &mut PgTransaction<'_>,
392 start: asset::Id,
393 end: asset::Id,
394 ) -> anyhow::Result<Self> {
395 let row: Option<(f64, f64, f64)> = sqlx::query_as(
396 "
397 SELECT price, liquidity, start_price_indexing_denom
398 FROM dex_ex_pairs_block_snapshot
399 WHERE asset_start = $1
400 AND asset_end = $2
401 ORDER BY id DESC
402 LIMIT 1
403 ",
404 )
405 .bind(start.to_bytes())
406 .bind(end.to_bytes())
407 .fetch_optional(dbtx.as_mut())
408 .await?;
409 let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
410 Ok(Self {
411 start,
412 end,
413 price,
414 liquidity,
415 start_price_indexing_denom,
416 })
417 }
418
419 pub async fn update(
420 &mut self,
421 dbtx: &mut PgTransaction<'_>,
422 now: DateTime,
423 candle: Option<Candle>,
424 metrics: PairMetrics,
425 start_price_indexing_denom: Option<f64>,
426 ) -> anyhow::Result<()> {
427 if let Some(candle) = candle {
428 self.price = candle.close;
429 }
430 if let Some(price) = start_price_indexing_denom {
431 self.start_price_indexing_denom = price;
432 }
433 self.liquidity += metrics.liquidity_change;
434
435 sqlx::query(
436 "
437 INSERT INTO dex_ex_pairs_block_snapshot VALUES (
438 DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
439 )
440 ",
441 )
442 .bind(now)
443 .bind(self.start.to_bytes())
444 .bind(self.end.to_bytes())
445 .bind(self.price)
446 .bind(self.liquidity)
447 .bind(candle.map(|x| x.direct_volume).unwrap_or_default())
448 .bind(candle.map(|x| x.swap_volume).unwrap_or_default())
449 .bind(self.start_price_indexing_denom)
450 .bind(metrics.trades)
451 .execute(dbtx.as_mut())
452 .await?;
453 Ok(())
454 }
455 }
456
457 pub async fn update_summaries(
458 dbtx: &mut PgTransaction<'_>,
459 now: DateTime,
460 window: Window,
461 ) -> anyhow::Result<()> {
462 let then = window.subtract_from(now);
463 let pairs = sqlx::query_as::<_, (Vec<u8>, Vec<u8>)>(
464 "SELECT asset_start, asset_end FROM dex_ex_pairs_block_snapshot GROUP BY asset_start, asset_end",
465 )
466 .fetch_all(dbtx.as_mut())
467 .await?;
468 for (start, end) in pairs {
469 sqlx::query(
470 "
471 WITH
472 snapshots AS (
473 SELECT *
474 FROM dex_ex_pairs_block_snapshot
475 WHERE asset_start = $1
476 AND asset_end = $2
477 ),
478 previous AS (
479 SELECT price AS price_then, liquidity AS liquidity_then
480 FROM snapshots
481 WHERE time <= $4
482 ORDER BY time DESC
483 LIMIT 1
484 ),
485 previous_or_default AS (
486 SELECT
487 COALESCE((SELECT price_then FROM previous), 0.0) AS price_then,
488 COALESCE((SELECT liquidity_then FROM previous), 0.0) AS liquidity_then
489 ),
490 now AS (
491 SELECT price, liquidity
492 FROM snapshots
493 WHERE time <= $3
494 ORDER BY time DESC
495 LIMIT 1
496 ),
497 sums AS (
498 SELECT
499 COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
500 COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
501 COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
502 COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
503 COALESCE(SUM(trades), 0.0) AS trades_over_window,
504 COALESCE(MIN(price), 0.0) AS low,
505 COALESCE(MAX(price), 0.0) AS high
506 FROM snapshots
507 WHERE time <= $3
508 AND time >= $4
509 )
510 INSERT INTO dex_ex_pairs_summary
511 SELECT
512 $1, $2, $5,
513 price, price_then,
514 low, high,
515 liquidity, liquidity_then,
516 direct_volume_over_window,
517 swap_volume_over_window,
518 direct_volume_indexing_denom_over_window,
519 swap_volume_indexing_denom_over_window,
520 trades_over_window
521 FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
522 ON CONFLICT (asset_start, asset_end, the_window)
523 DO UPDATE SET
524 price = EXCLUDED.price,
525 price_then = EXCLUDED.price_then,
526 liquidity = EXCLUDED.liquidity,
527 liquidity_then = EXCLUDED.liquidity_then,
528 direct_volume_over_window = EXCLUDED.direct_volume_over_window,
529 swap_volume_over_window = EXCLUDED.swap_volume_over_window,
530 direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
531 swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
532 trades_over_window = EXCLUDED.trades_over_window
533 ",
534 )
535 .bind(start)
536 .bind(end)
537 .bind(now)
538 .bind(then)
539 .bind(window.to_string())
540 .execute(dbtx.as_mut())
541 .await?;
542 }
543 Ok(())
544 }
545
546 pub async fn update_aggregate_summary(
547 dbtx: &mut PgTransaction<'_>,
548 window: Window,
549 denom: asset::Id,
550 min_liquidity: f64,
551 ) -> anyhow::Result<()> {
552 sqlx::query(
554 "
555 WITH
556 eligible_denoms AS (
557 SELECT asset_start as asset, price
558 FROM dex_ex_pairs_summary
559 WHERE asset_end = $1
560 AND liquidity >= $2
561 UNION VALUES ($1, 1.0)
562 ),
563 converted_pairs_summary AS (
564 SELECT
565 asset_start, asset_end,
566 (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
567 liquidity * ed_end.price AS liquidity,
568 direct_volume_indexing_denom_over_window AS dv,
569 swap_volume_indexing_denom_over_window AS sv,
570 trades_over_window as trades
571 FROM dex_ex_pairs_summary
572 JOIN eligible_denoms AS ed_end
573 ON ed_end.asset = asset_end
574 JOIN eligible_denoms AS ed_start
575 ON ed_start.asset = asset_start
576 WHERE the_window = $3
577 ),
578 sums AS (
579 SELECT
580 SUM(dv) AS direct_volume,
581 SUM(sv) AS swap_volume,
582 SUM(liquidity) AS liquidity,
583 SUM(trades) AS trades,
584 (SELECT COUNT(*) FROM converted_pairs_summary WHERE dv > 0 OR sv > 0) AS active_pairs
585 FROM converted_pairs_summary
586 ),
587 largest_sv AS (
588 SELECT
589 asset_start AS largest_sv_trading_pair_start,
590 asset_end AS largest_sv_trading_pair_end,
591 sv AS largest_sv_trading_pair_volume
592 FROM converted_pairs_summary
593 ORDER BY sv DESC
594 LIMIT 1
595 ),
596 largest_dv AS (
597 SELECT
598 asset_start AS largest_dv_trading_pair_start,
599 asset_end AS largest_dv_trading_pair_end,
600 dv AS largest_dv_trading_pair_volume
601 FROM converted_pairs_summary
602 ORDER BY dv DESC
603 LIMIT 1
604 ),
605 top_price_mover AS (
606 SELECT
607 asset_start AS top_price_mover_start,
608 asset_end AS top_price_mover_end,
609 price_change AS top_price_mover_change_percent
610 FROM converted_pairs_summary
611 ORDER BY price_change DESC
612 LIMIT 1
613 )
614 INSERT INTO dex_ex_aggregate_summary
615 SELECT
616 $3,
617 direct_volume, swap_volume, liquidity, trades, active_pairs,
618 largest_sv_trading_pair_start,
619 largest_sv_trading_pair_end,
620 largest_sv_trading_pair_volume,
621 largest_dv_trading_pair_start,
622 largest_dv_trading_pair_end,
623 largest_dv_trading_pair_volume,
624 top_price_mover_start,
625 top_price_mover_end,
626 top_price_mover_change_percent
627 FROM
628 sums
629 JOIN largest_sv ON TRUE
630 JOIN largest_dv ON TRUE
631 JOIN top_price_mover ON TRUE
632 ON CONFLICT (the_window) DO UPDATE SET
633 direct_volume = EXCLUDED.direct_volume,
634 swap_volume = EXCLUDED.swap_volume,
635 liquidity = EXCLUDED.liquidity,
636 trades = EXCLUDED.trades,
637 active_pairs = EXCLUDED.active_pairs,
638 largest_sv_trading_pair_start = EXCLUDED.largest_sv_trading_pair_start,
639 largest_sv_trading_pair_end = EXCLUDED.largest_sv_trading_pair_end,
640 largest_sv_trading_pair_volume = EXCLUDED.largest_sv_trading_pair_volume,
641 largest_dv_trading_pair_start = EXCLUDED.largest_dv_trading_pair_start,
642 largest_dv_trading_pair_end = EXCLUDED.largest_dv_trading_pair_end,
643 largest_dv_trading_pair_volume = EXCLUDED.largest_dv_trading_pair_volume,
644 top_price_mover_start = EXCLUDED.top_price_mover_start,
645 top_price_mover_end = EXCLUDED.top_price_mover_end,
646 top_price_mover_change_percent = EXCLUDED.top_price_mover_change_percent
647 ")
648 .bind(denom.to_bytes())
649 .bind(min_liquidity)
650 .bind(window.to_string())
651 .execute(dbtx.as_mut())
652 .await?;
653 Ok(())
654 }
655}
656
657mod metadata {
658 use super::*;
659
660 pub async fn set(dbtx: &mut PgTransaction<'_>, quote_asset: asset::Id) -> anyhow::Result<()> {
661 sqlx::query(
662 "
663 INSERT INTO dex_ex_metadata
664 VALUES (1, $1)
665 ON CONFLICT (id) DO UPDATE
666 SET id = EXCLUDED.id,
667 quote_asset_id = EXCLUDED.quote_asset_id
668 ",
669 )
670 .bind(quote_asset.to_bytes())
671 .execute(dbtx.as_mut())
672 .await?;
673 Ok(())
674 }
675}
676
677#[derive(Debug, Default, Clone, Copy)]
678struct PairMetrics {
679 trades: f64,
680 liquidity_change: f64,
681}
682
683#[derive(Debug, Clone, serde::Serialize)]
684struct BatchSwapSummary {
685 asset_start: asset::Id,
686 asset_end: asset::Id,
687 input: Amount,
688 output: Amount,
689 num_swaps: i32,
690 price_float: f64,
691}
692
693#[derive(Debug)]
694struct Events {
695 time: Option<DateTime>,
696 height: i32,
697 candles: HashMap<DirectedTradingPair, Candle>,
698 metrics: HashMap<DirectedTradingPair, PairMetrics>,
699 positions: BTreeMap<PositionId, Position>,
701 position_opens: Vec<EventPositionOpen>,
703 position_executions: Vec<EventPositionExecution>,
704 position_closes: Vec<EventPositionClose>,
705 position_withdrawals: Vec<EventPositionWithdraw>,
706 batch_swaps: Vec<EventBatchSwap>,
707 swaps: BTreeMap<TradingPair, Vec<EventSwap>>,
708 swap_claims: BTreeMap<TradingPair, Vec<EventSwapClaim>>,
709 position_open_txs: BTreeMap<PositionId, [u8; 32]>,
711 position_close_txs: BTreeMap<PositionId, [u8; 32]>,
712 position_withdrawal_txs: BTreeMap<PositionId, [u8; 32]>,
713}
714
715impl Events {
716 fn new() -> Self {
717 Self {
718 time: None,
719 height: 0,
720 candles: HashMap::new(),
721 metrics: HashMap::new(),
722 positions: BTreeMap::new(),
723 position_opens: Vec::new(),
724 position_executions: Vec::new(),
725 position_closes: Vec::new(),
726 position_withdrawals: Vec::new(),
727 batch_swaps: Vec::new(),
728 swaps: BTreeMap::new(),
729 swap_claims: BTreeMap::new(),
730 position_open_txs: BTreeMap::new(),
731 position_close_txs: BTreeMap::new(),
732 position_withdrawal_txs: BTreeMap::new(),
733 }
734 }
735
736 fn with_time(&mut self, time: DateTime) {
737 self.time = Some(time)
738 }
739
740 fn with_candle(&mut self, pair: DirectedTradingPair, candle: Candle) {
741 let flip = pair.flip();
744 let new_candle = match self.candles.get(&flip).cloned() {
745 None => candle,
746 Some(flipped) => {
747 let mut out = candle;
748 out.mix(&flipped);
749 out
750 }
751 };
752
753 self.candles.insert(pair, new_candle);
754 self.candles.insert(flip, new_candle.flip());
755 }
756
757 fn metric(&mut self, pair: &DirectedTradingPair) -> &mut PairMetrics {
758 if !self.metrics.contains_key(pair) {
759 self.metrics.insert(*pair, PairMetrics::default());
760 }
761 self.metrics.get_mut(pair).unwrap()
763 }
764
765 fn with_trade(&mut self, pair: &DirectedTradingPair) {
766 self.metric(pair).trades += 1.0;
767 }
768
769 fn with_reserve_change(
770 &mut self,
771 pair: &TradingPair,
772 old_reserves: Option<Reserves>,
773 new_reserves: Reserves,
774 removed: bool,
775 ) {
776 let (diff_1, diff_2) = match (removed, old_reserves, new_reserves) {
777 (true, None, new) => (-(new.r1.value() as f64), -(new.r2.value() as f64)),
778 (_, None, new) => ((new.r1.value() as f64), (new.r2.value() as f64)),
779 (_, Some(old), new) => (
780 (new.r1.value() as f64) - (old.r1.value() as f64),
781 (new.r2.value() as f64) - (old.r2.value() as f64),
782 ),
783 };
784 for (d_pair, diff) in [
785 (
786 DirectedTradingPair {
787 start: pair.asset_1(),
788 end: pair.asset_2(),
789 },
790 diff_2,
791 ),
792 (
793 DirectedTradingPair {
794 start: pair.asset_2(),
795 end: pair.asset_1(),
796 },
797 diff_1,
798 ),
799 ] {
800 self.metric(&d_pair).liquidity_change += diff;
801 }
802 }
803
804 pub fn extract(block: &BlockEvents) -> anyhow::Result<Self> {
805 let mut out = Self::new();
806 out.height = block.height() as i32;
807
808 for event in block.events() {
809 if let Ok(e) = EventCandlestickData::try_from_event(&event.event) {
810 let candle = Candle::from_candlestick_data(&e.stick);
811 out.with_candle(e.pair, candle);
812 } else if let Ok(e) = EventBlockRoot::try_from_event(&event.event) {
813 let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!(
814 "creating timestamp should succeed; timestamp: {}",
815 e.timestamp_seconds
816 ))?;
817 out.with_time(time);
818 } else if let Ok(e) = EventPositionOpen::try_from_event(&event.event) {
819 out.with_reserve_change(
820 &e.trading_pair,
821 None,
822 Reserves {
823 r1: e.reserves_1,
824 r2: e.reserves_2,
825 },
826 false,
827 );
828 if let Some(tx_hash) = event.tx_hash() {
829 out.position_open_txs.insert(e.position_id, tx_hash);
830 }
831 out.positions.insert(e.position_id, e.position.clone());
835 out.position_opens.push(e);
836 } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) {
837 out.with_reserve_change(
840 &e.trading_pair,
841 None,
842 Reserves {
843 r1: e.reserves_1,
844 r2: e.reserves_2,
845 },
846 true,
847 );
848 if let Some(tx_hash) = event.tx_hash() {
849 out.position_withdrawal_txs.insert(e.position_id, tx_hash);
850 }
851 out.position_withdrawals.push(e);
852 } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) {
853 out.with_reserve_change(
854 &e.trading_pair,
855 Some(Reserves {
856 r1: e.prev_reserves_1,
857 r2: e.prev_reserves_2,
858 }),
859 Reserves {
860 r1: e.reserves_1,
861 r2: e.reserves_2,
862 },
863 false,
864 );
865 if e.reserves_1 > e.prev_reserves_1 {
866 out.with_trade(&DirectedTradingPair {
868 start: e.trading_pair.asset_1(),
869 end: e.trading_pair.asset_2(),
870 });
871 } else if e.reserves_2 > e.prev_reserves_2 {
872 out.with_trade(&DirectedTradingPair {
873 start: e.trading_pair.asset_2(),
874 end: e.trading_pair.asset_1(),
875 });
876 }
877 out.position_executions.push(e);
878 } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) {
879 out.position_closes.push(e);
880 } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) {
881 if let Some(tx_hash) = event.tx_hash() {
884 out.position_close_txs.insert(e.position_id, tx_hash);
885 }
886 } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
887 out.batch_swaps.push(e);
888 } else if let Ok(e) = EventSwap::try_from_event(&event.event) {
889 out.swaps
890 .entry(e.trading_pair)
891 .or_insert_with(Vec::new)
892 .push(e);
893 } else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) {
894 out.swap_claims
895 .entry(e.trading_pair)
896 .or_insert_with(Vec::new)
897 .push(e);
898 } else if let Ok(e) = EventLqtPositionReward::try_from_event(&event.event) {
899 let pair = DirectedTradingPair {
900 start: e.incentivized_asset_id,
901 end: *STAKING_TOKEN_ASSET_ID,
902 };
903 out.metric(&pair).liquidity_change += e.reward_amount.value() as f64;
904 }
905 }
906 Ok(out)
907 }
908
909 async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
910 let missing_positions: Vec<_> = self
912 .position_executions
913 .iter()
914 .map(|e| e.position_id)
915 .filter(|id| !self.positions.contains_key(id))
916 .collect();
917
918 if missing_positions.is_empty() {
919 return Ok(());
920 }
921
922 let rows = sqlx::query(
924 "SELECT position_raw
925 FROM dex_ex_position_state
926 WHERE position_id = ANY($1)",
927 )
928 .bind(
929 &missing_positions
930 .iter()
931 .map(|id| id.0.as_ref())
932 .collect::<Vec<_>>(),
933 )
934 .fetch_all(dbtx.as_mut())
935 .await?;
936
937 for row in rows {
939 let position_raw: Vec<u8> = row.get("position_raw");
940 let position = Position::decode(position_raw.as_slice())?;
941 self.positions.insert(position.id(), position);
942 }
943
944 Ok(())
945 }
946
947 pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
949 self.candles
950 .get(&DirectedTradingPair::new(asset, indexing_denom))
951 .map(|x| x.close)
952 }
953}
954
955#[derive(Debug)]
956pub struct Component {
957 denom: asset::Id,
958 min_liquidity: f64,
959}
960
961impl Component {
962 pub fn new(denom: asset::Id, min_liquidity: f64) -> Self {
963 Self {
964 denom,
965 min_liquidity,
966 }
967 }
968
969 async fn record_position_open(
970 &self,
971 dbtx: &mut PgTransaction<'_>,
972 time: DateTime,
973 height: i32,
974 tx_hash: Option<[u8; 32]>,
975 event: &EventPositionOpen,
976 ) -> anyhow::Result<()> {
977 let effective_price_1_to_2: f64 = event
979 .position
980 .phi
981 .orient_start(event.trading_pair.asset_1())
982 .expect("position trading pair matches")
983 .effective_price()
984 .into();
985
986 let effective_price_2_to_1: f64 = event
987 .position
988 .phi
989 .orient_start(event.trading_pair.asset_2())
990 .expect("position trading pair matches")
991 .effective_price()
992 .into();
993
994 let opening_reserves_rowid = sqlx::query_scalar::<_, i32>(
996 "INSERT INTO dex_ex_position_reserves (
997 position_id,
998 height,
999 time,
1000 reserves_1,
1001 reserves_2
1002 ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1003 )
1004 .bind(event.position_id.0)
1005 .bind(height)
1006 .bind(time)
1007 .bind(BigDecimal::from(event.reserves_1.value()))
1008 .bind(BigDecimal::from(event.reserves_2.value()))
1009 .fetch_one(dbtx.as_mut())
1010 .await?;
1011
1012 sqlx::query(
1014 "INSERT INTO dex_ex_position_state (
1015 position_id,
1016 asset_1,
1017 asset_2,
1018 p,
1019 q,
1020 close_on_fill,
1021 fee_bps,
1022 effective_price_1_to_2,
1023 effective_price_2_to_1,
1024 position_raw,
1025 opening_time,
1026 opening_height,
1027 opening_tx,
1028 opening_reserves_rowid
1029 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
1030 )
1031 .bind(event.position_id.0)
1032 .bind(event.trading_pair.asset_1().to_bytes())
1033 .bind(event.trading_pair.asset_2().to_bytes())
1034 .bind(BigDecimal::from(event.position.phi.component.p.value()))
1035 .bind(BigDecimal::from(event.position.phi.component.q.value()))
1036 .bind(event.position.close_on_fill)
1037 .bind(event.trading_fee as i32)
1038 .bind(effective_price_1_to_2)
1039 .bind(effective_price_2_to_1)
1040 .bind(event.position.encode_to_vec())
1041 .bind(time)
1042 .bind(height)
1043 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1044 .bind(opening_reserves_rowid)
1045 .execute(dbtx.as_mut())
1046 .await?;
1047
1048 Ok(())
1049 }
1050
1051 async fn record_swap_execution_traces(
1052 &self,
1053 dbtx: &mut PgTransaction<'_>,
1054 time: DateTime,
1055 height: i32,
1056 swap_execution: &SwapExecution,
1057 ) -> anyhow::Result<()> {
1058 let SwapExecution {
1059 traces,
1060 input: se_input,
1061 output: se_output,
1062 } = swap_execution;
1063
1064 let asset_start = se_input.asset_id;
1065 let asset_end = se_output.asset_id;
1066 let batch_input = se_input.amount;
1067 let batch_output = se_output.amount;
1068
1069 for trace in traces.iter() {
1070 let Some(input_value) = trace.first() else {
1071 continue;
1072 };
1073 let Some(output_value) = trace.last() else {
1074 continue;
1075 };
1076
1077 let input = input_value.amount;
1078 let output = output_value.amount;
1079
1080 let price_float = (output.value() as f64) / (input.value() as f64);
1081 let amount_hops = trace
1082 .iter()
1083 .map(|x| BigDecimal::from(x.amount.value()))
1084 .collect::<Vec<_>>();
1085 let position_id_hops: Vec<[u8; 32]> = vec![];
1086 let asset_hops = trace
1087 .iter()
1088 .map(|x| x.asset_id.to_bytes())
1089 .collect::<Vec<_>>();
1090
1091 sqlx::query(
1092 "INSERT INTO dex_ex_batch_swap_traces (
1093 height,
1094 time,
1095 input,
1096 output,
1097 batch_input,
1098 batch_output,
1099 price_float,
1100 asset_start,
1101 asset_end,
1102 asset_hops,
1103 amount_hops,
1104 position_id_hops
1105 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1106 )
1107 .bind(height)
1108 .bind(time)
1109 .bind(BigDecimal::from(input.value()))
1110 .bind(BigDecimal::from(output.value()))
1111 .bind(BigDecimal::from(batch_input.value()))
1112 .bind(BigDecimal::from(batch_output.value()))
1113 .bind(price_float)
1114 .bind(asset_start.to_bytes())
1115 .bind(asset_end.to_bytes())
1116 .bind(asset_hops)
1117 .bind(amount_hops)
1118 .bind(position_id_hops)
1119 .execute(dbtx.as_mut())
1120 .await?;
1121 }
1122
1123 Ok(())
1124 }
1125
1126 async fn record_block_summary(
1127 &self,
1128 dbtx: &mut PgTransaction<'_>,
1129 time: DateTime,
1130 height: i32,
1131 events: &Events,
1132 ) -> anyhow::Result<()> {
1133 let num_opened_lps = events.position_opens.len() as i32;
1134 let num_closed_lps = events.position_closes.len() as i32;
1135 let num_withdrawn_lps = events.position_withdrawals.len() as i32;
1136 let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::<usize>() as i32;
1137 let num_swap_claims = events
1138 .swap_claims
1139 .iter()
1140 .map(|(_, v)| v.len())
1141 .sum::<usize>() as i32;
1142 let num_txs = events.batch_swaps.len() as i32;
1143
1144 let mut batch_swap_summaries = Vec::<BatchSwapSummary>::new();
1145
1146 for event in &events.batch_swaps {
1147 let trading_pair = event.batch_swap_output_data.trading_pair;
1148
1149 if let Some(swap_1_2) = &event.swap_execution_1_for_2 {
1150 let asset_start = swap_1_2.input.asset_id;
1151 let asset_end = swap_1_2.output.asset_id;
1152 let input = swap_1_2.input.amount;
1153 let output = swap_1_2.output.amount;
1154 let price_float = (output.value() as f64) / (input.value() as f64);
1155
1156 let empty_vec = vec![];
1157 let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1158 let filtered_swaps: Vec<_> = swaps_for_pair
1159 .iter()
1160 .filter(|swap| swap.delta_1_i != Amount::zero())
1161 .collect::<Vec<_>>();
1162 let num_swaps = filtered_swaps.len() as i32;
1163
1164 batch_swap_summaries.push(BatchSwapSummary {
1165 asset_start,
1166 asset_end,
1167 input,
1168 output,
1169 num_swaps,
1170 price_float,
1171 });
1172 }
1173
1174 if let Some(swap_2_1) = &event.swap_execution_2_for_1 {
1175 let asset_start = swap_2_1.input.asset_id;
1176 let asset_end = swap_2_1.output.asset_id;
1177 let input = swap_2_1.input.amount;
1178 let output = swap_2_1.output.amount;
1179 let price_float = (output.value() as f64) / (input.value() as f64);
1180
1181 let empty_vec = vec![];
1182 let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1183 let filtered_swaps: Vec<_> = swaps_for_pair
1184 .iter()
1185 .filter(|swap| swap.delta_2_i != Amount::zero())
1186 .collect::<Vec<_>>();
1187 let num_swaps = filtered_swaps.len() as i32;
1188
1189 batch_swap_summaries.push(BatchSwapSummary {
1190 asset_start,
1191 asset_end,
1192 input,
1193 output,
1194 num_swaps,
1195 price_float,
1196 });
1197 }
1198 }
1199
1200 sqlx::query(
1201 "INSERT INTO dex_ex_block_summary (
1202 height,
1203 time,
1204 batch_swaps,
1205 num_open_lps,
1206 num_closed_lps,
1207 num_withdrawn_lps,
1208 num_swaps,
1209 num_swap_claims,
1210 num_txs
1211 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
1212 )
1213 .bind(height)
1214 .bind(time)
1215 .bind(serde_json::to_value(&batch_swap_summaries)?)
1216 .bind(num_opened_lps)
1217 .bind(num_closed_lps)
1218 .bind(num_withdrawn_lps)
1219 .bind(num_swaps)
1220 .bind(num_swap_claims)
1221 .bind(num_txs)
1222 .execute(dbtx.as_mut())
1223 .await?;
1224
1225 Ok(())
1226 }
1227
1228 async fn record_batch_swap_traces(
1229 &self,
1230 dbtx: &mut PgTransaction<'_>,
1231 time: DateTime,
1232 height: i32,
1233 event: &EventBatchSwap,
1234 ) -> anyhow::Result<()> {
1235 let EventBatchSwap {
1236 batch_swap_output_data: _,
1237 swap_execution_1_for_2,
1238 swap_execution_2_for_1,
1239 } = event;
1240
1241 if let Some(batch_swap_1_2) = swap_execution_1_for_2 {
1242 self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2)
1243 .await?;
1244 }
1245
1246 if let Some(batch_swap_2_1) = swap_execution_2_for_1 {
1247 self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1)
1248 .await?;
1249 }
1250
1251 Ok(())
1252 }
1253
1254 async fn record_position_execution(
1255 &self,
1256 dbtx: &mut PgTransaction<'_>,
1257 time: DateTime,
1258 height: i32,
1259 event: &EventPositionExecution,
1260 positions: &BTreeMap<PositionId, Position>,
1261 ) -> anyhow::Result<()> {
1262 let position = positions
1264 .get(&event.position_id)
1265 .expect("position must exist for execution");
1266 let current = Reserves {
1267 r1: event.reserves_1,
1268 r2: event.reserves_2,
1269 };
1270 let prev = Reserves {
1271 r1: event.prev_reserves_1,
1272 r2: event.prev_reserves_2,
1273 };
1274 let flows = Flows::from_phi_and_reserves(&position.phi, ¤t, &prev);
1275
1276 let reserves_rowid = sqlx::query_scalar::<_, i32>(
1278 "INSERT INTO dex_ex_position_reserves (
1279 position_id,
1280 height,
1281 time,
1282 reserves_1,
1283 reserves_2
1284 ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1285 )
1286 .bind(event.position_id.0)
1287 .bind(height)
1288 .bind(time)
1289 .bind(BigDecimal::from(event.reserves_1.value()))
1290 .bind(BigDecimal::from(event.reserves_2.value()))
1291 .fetch_one(dbtx.as_mut())
1292 .await?;
1293
1294 sqlx::query(
1296 "INSERT INTO dex_ex_position_executions (
1297 position_id,
1298 height,
1299 time,
1300 reserves_rowid,
1301 delta_1,
1302 delta_2,
1303 lambda_1,
1304 lambda_2,
1305 fee_1,
1306 fee_2,
1307 context_asset_start,
1308 context_asset_end
1309 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1310 )
1311 .bind(event.position_id.0)
1312 .bind(height)
1313 .bind(time)
1314 .bind(reserves_rowid)
1315 .bind(BigDecimal::from(flows.delta_1().value()))
1316 .bind(BigDecimal::from(flows.delta_2().value()))
1317 .bind(BigDecimal::from(flows.lambda_1().value()))
1318 .bind(BigDecimal::from(flows.lambda_2().value()))
1319 .bind(BigDecimal::from(flows.fee_1().value()))
1320 .bind(BigDecimal::from(flows.fee_2().value()))
1321 .bind(event.context.start.to_bytes())
1322 .bind(event.context.end.to_bytes())
1323 .execute(dbtx.as_mut())
1324 .await?;
1325
1326 Ok(())
1327 }
1328
1329 async fn record_position_close(
1330 &self,
1331 dbtx: &mut PgTransaction<'_>,
1332 time: DateTime,
1333 height: i32,
1334 tx_hash: Option<[u8; 32]>,
1335 event: &EventPositionClose,
1336 ) -> anyhow::Result<()> {
1337 sqlx::query(
1338 "UPDATE dex_ex_position_state
1339 SET closing_time = $1,
1340 closing_height = $2,
1341 closing_tx = $3
1342 WHERE position_id = $4",
1343 )
1344 .bind(time)
1345 .bind(height)
1346 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1347 .bind(event.position_id.0)
1348 .execute(dbtx.as_mut())
1349 .await?;
1350
1351 Ok(())
1352 }
1353
1354 async fn record_position_withdraw(
1355 &self,
1356 dbtx: &mut PgTransaction<'_>,
1357 time: DateTime,
1358 height: i32,
1359 tx_hash: Option<[u8; 32]>,
1360 event: &EventPositionWithdraw,
1361 ) -> anyhow::Result<()> {
1362 let reserves_rowid = sqlx::query_scalar::<_, i32>(
1364 "INSERT INTO dex_ex_position_reserves (
1365 position_id,
1366 height,
1367 time,
1368 reserves_1,
1369 reserves_2
1370 ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", )
1372 .bind(event.position_id.0)
1373 .bind(height)
1374 .bind(time)
1375 .bind(BigDecimal::from(0)) .fetch_one(dbtx.as_mut())
1377 .await?;
1378
1379 sqlx::query(
1380 "INSERT INTO dex_ex_position_withdrawals (
1381 position_id,
1382 height,
1383 time,
1384 withdrawal_tx,
1385 sequence,
1386 reserves_1,
1387 reserves_2,
1388 reserves_rowid
1389 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1390 )
1391 .bind(event.position_id.0)
1392 .bind(height)
1393 .bind(time)
1394 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1395 .bind(event.sequence as i32)
1396 .bind(BigDecimal::from(event.reserves_1.value()))
1397 .bind(BigDecimal::from(event.reserves_2.value()))
1398 .bind(reserves_rowid)
1399 .execute(dbtx.as_mut())
1400 .await?;
1401
1402 Ok(())
1403 }
1404
1405 async fn record_transaction(
1406 &self,
1407 dbtx: &mut PgTransaction<'_>,
1408 time: DateTime,
1409 height: u64,
1410 transaction_id: [u8; 32],
1411 transaction: Transaction,
1412 ) -> anyhow::Result<()> {
1413 if transaction.transaction_body.actions.is_empty() {
1414 return Ok(());
1415 }
1416 sqlx::query(
1417 "INSERT INTO dex_ex_transactions (
1418 transaction_id,
1419 transaction,
1420 height,
1421 time
1422 ) VALUES ($1, $2, $3, $4)
1423 ",
1424 )
1425 .bind(transaction_id)
1426 .bind(transaction.encode_to_vec())
1427 .bind(i32::try_from(height)?)
1428 .bind(time)
1429 .execute(dbtx.as_mut())
1430 .await?;
1431
1432 Ok(())
1433 }
1434
1435 async fn record_all_transactions(
1436 &self,
1437 dbtx: &mut PgTransaction<'_>,
1438 time: DateTime,
1439 block: &BlockEvents,
1440 ) -> anyhow::Result<()> {
1441 for (tx_id, tx_bytes) in block.transactions() {
1442 let tx = Transaction::try_from(tx_bytes)?;
1443 let height = block.height();
1444 self.record_transaction(dbtx, time, height, tx_id, tx)
1445 .await?;
1446 }
1447 Ok(())
1448 }
1449}
1450
1451#[async_trait]
1452impl AppView for Component {
1453 async fn init_chain(
1454 &self,
1455 dbtx: &mut PgTransaction,
1456 _: &serde_json::Value,
1457 ) -> Result<(), anyhow::Error> {
1458 for statement in include_str!("schema.sql").split(";") {
1459 sqlx::query(statement).execute(dbtx.as_mut()).await?;
1460 }
1461 Ok(())
1462 }
1463
1464 fn name(&self) -> String {
1465 "dex_ex".to_string()
1466 }
1467
1468 async fn index_batch(
1469 &self,
1470 dbtx: &mut PgTransaction,
1471 batch: EventBatch,
1472 ctx: EventBatchContext,
1473 ) -> Result<(), anyhow::Error> {
1474 metadata::set(dbtx, self.denom).await?;
1475 let mut charts = HashMap::new();
1476 let mut snapshots = HashMap::new();
1477 let mut last_time = None;
1478 for block in batch.events_by_block() {
1479 let mut events = Events::extract(&block)?;
1480 let time = events
1481 .time
1482 .expect(&format!("no block root event at height {}", block.height()));
1483 last_time = Some(time);
1484
1485 self.record_all_transactions(dbtx, time, block).await?;
1486
1487 events.load_positions(dbtx).await?;
1489
1490 self.record_block_summary(dbtx, time, block.height() as i32, &events)
1492 .await?;
1493
1494 for event in &events.batch_swaps {
1496 self.record_batch_swap_traces(dbtx, time, block.height() as i32, event)
1497 .await?;
1498 }
1499
1500 for event in &events.position_opens {
1502 let tx_hash = events.position_open_txs.get(&event.position_id).copied();
1503 self.record_position_open(dbtx, time, events.height, tx_hash, event)
1504 .await?;
1505 }
1506
1507 for event in &events.position_executions {
1509 self.record_position_execution(dbtx, time, events.height, event, &events.positions)
1510 .await?;
1511 }
1512
1513 for event in &events.position_closes {
1515 let tx_hash = events.position_close_txs.get(&event.position_id).copied();
1516 self.record_position_close(dbtx, time, events.height, tx_hash, event)
1517 .await?;
1518 }
1519
1520 for event in &events.position_withdrawals {
1522 let tx_hash = events
1523 .position_withdrawal_txs
1524 .get(&event.position_id)
1525 .copied();
1526 self.record_position_withdraw(dbtx, time, events.height, tx_hash, event)
1527 .await?;
1528 }
1529
1530 for (pair, candle) in &events.candles {
1531 for window in Window::all() {
1532 let key = (pair.start, pair.end, window);
1533 if !charts.contains_key(&key) {
1534 let ctx = PriceChartContext::load(dbtx, key.0, key.1, key.2).await?;
1535 charts.insert(key, ctx);
1536 }
1537 charts
1538 .get_mut(&key)
1539 .unwrap() .update(dbtx, time, *candle)
1541 .await?;
1542 }
1543 }
1544
1545 let block_pairs = events
1546 .candles
1547 .keys()
1548 .chain(events.metrics.keys())
1549 .copied()
1550 .collect::<HashSet<_>>();
1551 for pair in block_pairs {
1552 if !snapshots.contains_key(&pair) {
1553 let ctx = summary::Context::load(dbtx, pair.start, pair.end).await?;
1554 snapshots.insert(pair, ctx);
1555 }
1556 snapshots
1558 .get_mut(&pair)
1559 .unwrap()
1560 .update(
1561 dbtx,
1562 time,
1563 events.candles.get(&pair).copied(),
1564 events.metrics.get(&pair).copied().unwrap_or_default(),
1565 events.price_for(self.denom, pair.start),
1566 )
1567 .await?;
1568 }
1569 }
1570
1571 if ctx.is_last() {
1572 if let Some(now) = last_time {
1573 for window in Window::all() {
1574 summary::update_summaries(dbtx, now, window).await?;
1575 summary::update_aggregate_summary(dbtx, window, self.denom, self.min_liquidity)
1576 .await?;
1577 }
1578 }
1579 }
1580 for chart in charts.into_values() {
1581 chart.unload(dbtx).await?;
1582 }
1583 Ok(())
1584 }
1585}