1use anyhow::anyhow;
2use cometindex::{
3 async_trait,
4 index::{BlockEvents, EventBatch, EventBatchContext},
5 AppView, PgTransaction,
6};
7use penumbra_sdk_asset::asset;
8use penumbra_sdk_dex::{
9 event::{
10 EventBatchSwap, EventCandlestickData, EventPositionClose, EventPositionExecution,
11 EventPositionOpen, EventPositionWithdraw, EventQueuePositionClose,
12 },
13 lp::Reserves,
14 DirectedTradingPair, SwapExecution, TradingPair,
15};
16use penumbra_sdk_dex::{
17 event::{EventSwap, EventSwapClaim},
18 lp::position::{Id as PositionId, Position},
19};
20use penumbra_sdk_num::Amount;
21use penumbra_sdk_proto::event::EventDomainType;
22use penumbra_sdk_proto::DomainType;
23use penumbra_sdk_sct::event::EventBlockRoot;
24use penumbra_sdk_transaction::Transaction;
25use sqlx::types::BigDecimal;
26use sqlx::Row;
27use std::collections::{BTreeMap, HashMap, HashSet};
28
29type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;
30
31mod candle {
32 use super::DateTime;
33 use chrono::{Datelike as _, Days, TimeDelta, TimeZone as _, Timelike as _, Utc};
34 use penumbra_sdk_dex::CandlestickData;
35 use std::fmt::Display;
36
37 fn geo_mean(a: f64, b: f64) -> f64 {
38 (a * b).sqrt()
39 }
40
41 #[derive(Debug, Clone, Copy)]
46 pub struct Candle {
47 pub open: f64,
48 pub close: f64,
49 pub low: f64,
50 pub high: f64,
51 pub direct_volume: f64,
52 pub swap_volume: f64,
53 }
54
55 impl Candle {
56 pub fn from_candlestick_data(data: &CandlestickData) -> Self {
57 Self {
61 open: data.open,
62 close: data.close,
63 low: data.low,
64 high: data.high,
65 direct_volume: data.direct_volume,
66 swap_volume: data.swap_volume,
67 }
68 }
69
70 pub fn merge(&mut self, that: &Self) {
71 self.close = that.close;
72 self.low = self.low.min(that.low);
73 self.high = self.high.max(that.high);
74 self.direct_volume += that.direct_volume;
75 self.swap_volume += that.swap_volume;
76 }
77
78 pub fn mix(&mut self, op: &Self) {
80 self.close /= geo_mean(self.close, op.close);
83 self.open /= geo_mean(self.open, op.open);
84 self.low = self.low.min(1.0 / op.low);
85 self.high = self.high.min(1.0 / op.high);
86 self.direct_volume += op.direct_volume / self.close;
88 self.swap_volume += op.swap_volume / self.close;
89 }
90
91 pub fn flip(&self) -> Self {
93 Self {
94 open: 1.0 / self.open,
95 close: 1.0 / self.close,
96 low: 1.0 / self.low,
97 high: 1.0 / self.high,
98 direct_volume: self.direct_volume * self.close,
99 swap_volume: self.swap_volume * self.close,
100 }
101 }
102 }
103
104 impl From<CandlestickData> for Candle {
105 fn from(value: CandlestickData) -> Self {
106 Self::from(&value)
107 }
108 }
109
110 impl From<&CandlestickData> for Candle {
111 fn from(value: &CandlestickData) -> Self {
112 Self::from_candlestick_data(value)
113 }
114 }
115
116 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
117 pub enum Window {
118 W1m,
119 W15m,
120 W1h,
121 W4h,
122 W1d,
123 W1w,
124 W1mo,
125 }
126
127 impl Window {
128 pub fn all() -> impl Iterator<Item = Self> {
129 [
130 Window::W1m,
131 Window::W15m,
132 Window::W1h,
133 Window::W4h,
134 Window::W1d,
135 Window::W1w,
136 Window::W1mo,
137 ]
138 .into_iter()
139 }
140
141 pub fn anchor(&self, time: DateTime) -> DateTime {
148 let (y, mo, d, h, m) = (
149 time.year(),
150 time.month(),
151 time.day(),
152 time.hour(),
153 time.minute(),
154 );
155 let out = match self {
156 Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(),
157 Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(),
158 Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(),
159 Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(),
160 Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(),
161 Window::W1w => Utc
162 .with_ymd_and_hms(y, mo, d, 0, 0, 0)
163 .single()
164 .and_then(|x| {
165 x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into()))
166 }),
167 Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(),
168 };
169 out.unwrap()
170 }
171
172 pub fn subtract_from(&self, time: DateTime) -> DateTime {
173 let delta = match self {
174 Window::W1m => TimeDelta::minutes(1),
175 Window::W15m => TimeDelta::minutes(15),
176 Window::W1h => TimeDelta::hours(1),
177 Window::W4h => TimeDelta::hours(4),
178 Window::W1d => TimeDelta::days(1),
179 Window::W1w => TimeDelta::weeks(1),
180 Window::W1mo => TimeDelta::days(30),
181 };
182 time - delta
183 }
184 }
185
186 impl Display for Window {
187 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188 use Window::*;
189 let str = match self {
190 W1m => "1m",
191 W15m => "15m",
192 W1h => "1h",
193 W4h => "4h",
194 W1d => "1d",
195 W1w => "1w",
196 W1mo => "1mo",
197 };
198 write!(f, "{}", str)
199 }
200 }
201
202 #[derive(Debug)]
203 pub struct WindowedCandle {
204 start: DateTime,
205 window: Window,
206 candle: Candle,
207 }
208
209 impl WindowedCandle {
210 pub fn new(now: DateTime, window: Window, candle: Candle) -> Self {
211 Self {
212 start: window.anchor(now),
213 window,
214 candle,
215 }
216 }
217
218 pub fn with_candle(&mut self, now: DateTime, candle: Candle) -> Option<(DateTime, Candle)> {
223 let start = self.window.anchor(now);
224 if start > self.start {
226 let old_start = std::mem::replace(&mut self.start, start);
227 let old_candle = std::mem::replace(&mut self.candle, candle);
228 Some((old_start, old_candle))
229 } else {
230 self.candle.merge(&candle);
231 None
232 }
233 }
234
235 pub fn window(&self) -> Window {
236 self.window
237 }
238
239 pub fn flush(self) -> (DateTime, Candle) {
240 (self.start, self.candle)
241 }
242 }
243}
244pub use candle::{Candle, Window, WindowedCandle};
245
246mod price_chart {
247 use super::*;
248
249 #[derive(Debug)]
251 pub struct Context {
252 asset_start: asset::Id,
253 asset_end: asset::Id,
254 window: Window,
255 state: Option<WindowedCandle>,
256 }
257
258 impl Context {
259 pub async fn load(
260 dbtx: &mut PgTransaction<'_>,
261 asset_start: asset::Id,
262 asset_end: asset::Id,
263 window: Window,
264 ) -> anyhow::Result<Self> {
265 let row: Option<(f64, f64, f64, f64, f64, f64, DateTime)> = sqlx::query_as(
266 "
267 SELECT open, close, high, low, direct_volume, swap_volume, start_time
268 FROM dex_ex_price_charts
269 WHERE asset_start = $1
270 AND asset_end = $2
271 AND the_window = $3
272 ORDER BY start_time DESC
273 LIMIT 1
274 ",
275 )
276 .bind(asset_start.to_bytes())
277 .bind(asset_end.to_bytes())
278 .bind(window.to_string())
279 .fetch_optional(dbtx.as_mut())
280 .await?;
281 let state = row.map(
282 |(open, close, low, high, direct_volume, swap_volume, start)| {
283 let candle = Candle {
284 open,
285 close,
286 low,
287 high,
288 direct_volume,
289 swap_volume,
290 };
291 WindowedCandle::new(start, window, candle)
292 },
293 );
294 Ok(Self {
295 asset_start,
296 asset_end,
297 window,
298 state,
299 })
300 }
301
302 async fn write_candle(
303 &self,
304 dbtx: &mut PgTransaction<'_>,
305 start: DateTime,
306 candle: Candle,
307 ) -> anyhow::Result<()> {
308 sqlx::query(
309 "
310 INSERT INTO dex_ex_price_charts(
311 id, asset_start, asset_end, the_window,
312 start_time, open, close, high, low, direct_volume, swap_volume
313 )
314 VALUES(
315 DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
316 )
317 ON CONFLICT (asset_start, asset_end, the_window, start_time) DO UPDATE SET
318 open = EXCLUDED.open,
319 close = EXCLUDED.close,
320 high = EXCLUDED.high,
321 low = EXCLUDED.low,
322 direct_volume = EXCLUDED.direct_volume,
323 swap_volume = EXCLUDED.swap_volume
324 ",
325 )
326 .bind(self.asset_start.to_bytes())
327 .bind(self.asset_end.to_bytes())
328 .bind(self.window.to_string())
329 .bind(start)
330 .bind(candle.open)
331 .bind(candle.close)
332 .bind(candle.high)
333 .bind(candle.low)
334 .bind(candle.direct_volume)
335 .bind(candle.swap_volume)
336 .execute(dbtx.as_mut())
337 .await?;
338 Ok(())
339 }
340
341 pub async fn update(
342 &mut self,
343 dbtx: &mut PgTransaction<'_>,
344 now: DateTime,
345 candle: Candle,
346 ) -> anyhow::Result<()> {
347 let state = match self.state.as_mut() {
348 Some(x) => x,
349 None => {
350 self.state = Some(WindowedCandle::new(now, self.window, candle));
351 self.state.as_mut().unwrap()
352 }
353 };
354 if let Some((start, old_candle)) = state.with_candle(now, candle) {
355 self.write_candle(dbtx, start, old_candle).await?;
356 };
357 Ok(())
358 }
359
360 pub async fn unload(mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
361 let state = std::mem::replace(&mut self.state, None);
362 if let Some(state) = state {
363 let (start, candle) = state.flush();
364 self.write_candle(dbtx, start, candle).await?;
365 }
366 Ok(())
367 }
368 }
369}
370
371use price_chart::Context as PriceChartContext;
372
373mod summary {
374 use cometindex::PgTransaction;
375 use penumbra_sdk_asset::asset;
376
377 use super::{Candle, DateTime, PairMetrics, Window};
378
379 pub struct Context {
380 start: asset::Id,
381 end: asset::Id,
382 price: f64,
383 liquidity: f64,
384 start_price_indexing_denom: f64,
385 }
386
387 impl Context {
388 pub async fn load(
389 dbtx: &mut PgTransaction<'_>,
390 start: asset::Id,
391 end: asset::Id,
392 ) -> anyhow::Result<Self> {
393 let row: Option<(f64, f64, f64)> = sqlx::query_as(
394 "
395 SELECT price, liquidity, start_price_indexing_denom
396 FROM dex_ex_pairs_block_snapshot
397 WHERE asset_start = $1
398 AND asset_end = $2
399 ORDER BY id DESC
400 LIMIT 1
401 ",
402 )
403 .bind(start.to_bytes())
404 .bind(end.to_bytes())
405 .fetch_optional(dbtx.as_mut())
406 .await?;
407 let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
408 Ok(Self {
409 start,
410 end,
411 price,
412 liquidity,
413 start_price_indexing_denom,
414 })
415 }
416
417 pub async fn update(
418 &mut self,
419 dbtx: &mut PgTransaction<'_>,
420 now: DateTime,
421 candle: Option<Candle>,
422 metrics: PairMetrics,
423 start_price_indexing_denom: Option<f64>,
424 ) -> anyhow::Result<()> {
425 if let Some(candle) = candle {
426 self.price = candle.close;
427 }
428 if let Some(price) = start_price_indexing_denom {
429 self.start_price_indexing_denom = price;
430 }
431 self.liquidity += metrics.liquidity_change;
432
433 sqlx::query(
434 "
435 INSERT INTO dex_ex_pairs_block_snapshot VALUES (
436 DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
437 )
438 ",
439 )
440 .bind(now)
441 .bind(self.start.to_bytes())
442 .bind(self.end.to_bytes())
443 .bind(self.price)
444 .bind(self.liquidity)
445 .bind(candle.map(|x| x.direct_volume).unwrap_or_default())
446 .bind(candle.map(|x| x.swap_volume).unwrap_or_default())
447 .bind(self.start_price_indexing_denom)
448 .bind(metrics.trades)
449 .execute(dbtx.as_mut())
450 .await?;
451 Ok(())
452 }
453 }
454
455 pub async fn update_summaries(
456 dbtx: &mut PgTransaction<'_>,
457 now: DateTime,
458 window: Window,
459 ) -> anyhow::Result<()> {
460 let then = window.subtract_from(now);
461 let pairs = sqlx::query_as::<_, (Vec<u8>, Vec<u8>)>(
462 "SELECT asset_start, asset_end FROM dex_ex_pairs_block_snapshot GROUP BY asset_start, asset_end",
463 )
464 .fetch_all(dbtx.as_mut())
465 .await?;
466 for (start, end) in pairs {
467 sqlx::query(
468 "
469 WITH
470 snapshots AS (
471 SELECT *
472 FROM dex_ex_pairs_block_snapshot
473 WHERE asset_start = $1
474 AND asset_end = $2
475 ),
476 previous AS (
477 SELECT price AS price_then, liquidity AS liquidity_then
478 FROM snapshots
479 WHERE time <= $4
480 ORDER BY time DESC
481 LIMIT 1
482 ),
483 previous_or_default AS (
484 SELECT
485 COALESCE((SELECT price_then FROM previous), 0.0) AS price_then,
486 COALESCE((SELECT liquidity_then FROM previous), 0.0) AS liquidity_then
487 ),
488 now AS (
489 SELECT price, liquidity
490 FROM snapshots
491 WHERE time <= $3
492 ORDER BY time DESC
493 LIMIT 1
494 ),
495 sums AS (
496 SELECT
497 COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
498 COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
499 COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
500 COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
501 COALESCE(SUM(trades), 0.0) AS trades_over_window,
502 COALESCE(MIN(price), 0.0) AS low,
503 COALESCE(MAX(price), 0.0) AS high
504 FROM snapshots
505 WHERE time <= $3
506 AND time >= $4
507 )
508 INSERT INTO dex_ex_pairs_summary
509 SELECT
510 $1, $2, $5,
511 price, price_then,
512 low, high,
513 liquidity, liquidity_then,
514 direct_volume_over_window,
515 swap_volume_over_window,
516 direct_volume_indexing_denom_over_window,
517 swap_volume_indexing_denom_over_window,
518 trades_over_window
519 FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
520 ON CONFLICT (asset_start, asset_end, the_window)
521 DO UPDATE SET
522 price = EXCLUDED.price,
523 price_then = EXCLUDED.price_then,
524 liquidity = EXCLUDED.liquidity,
525 liquidity_then = EXCLUDED.liquidity_then,
526 direct_volume_over_window = EXCLUDED.direct_volume_over_window,
527 swap_volume_over_window = EXCLUDED.swap_volume_over_window,
528 direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
529 swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
530 trades_over_window = EXCLUDED.trades_over_window
531 ",
532 )
533 .bind(start)
534 .bind(end)
535 .bind(now)
536 .bind(then)
537 .bind(window.to_string())
538 .execute(dbtx.as_mut())
539 .await?;
540 }
541 Ok(())
542 }
543
544 pub async fn update_aggregate_summary(
545 dbtx: &mut PgTransaction<'_>,
546 window: Window,
547 denom: asset::Id,
548 min_liquidity: f64,
549 ) -> anyhow::Result<()> {
550 sqlx::query(
552 "
553 WITH
554 eligible_denoms AS (
555 SELECT asset_start as asset, price
556 FROM dex_ex_pairs_summary
557 WHERE asset_end = $1
558 AND liquidity >= $2
559 UNION VALUES ($1, 1.0)
560 ),
561 converted_pairs_summary AS (
562 SELECT
563 asset_start, asset_end,
564 (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
565 liquidity * ed_end.price AS liquidity,
566 direct_volume_indexing_denom_over_window AS dv,
567 swap_volume_indexing_denom_over_window AS sv,
568 trades_over_window as trades
569 FROM dex_ex_pairs_summary
570 JOIN eligible_denoms AS ed_end
571 ON ed_end.asset = asset_end
572 JOIN eligible_denoms AS ed_start
573 ON ed_start.asset = asset_start
574 WHERE the_window = $3
575 ),
576 sums AS (
577 SELECT
578 SUM(dv) AS direct_volume,
579 SUM(sv) AS swap_volume,
580 SUM(liquidity) AS liquidity,
581 SUM(trades) AS trades,
582 (SELECT COUNT(*) FROM converted_pairs_summary WHERE dv > 0 OR sv > 0) AS active_pairs
583 FROM converted_pairs_summary
584 ),
585 largest_sv AS (
586 SELECT
587 asset_start AS largest_sv_trading_pair_start,
588 asset_end AS largest_sv_trading_pair_end,
589 sv AS largest_sv_trading_pair_volume
590 FROM converted_pairs_summary
591 ORDER BY sv DESC
592 LIMIT 1
593 ),
594 largest_dv AS (
595 SELECT
596 asset_start AS largest_dv_trading_pair_start,
597 asset_end AS largest_dv_trading_pair_end,
598 dv AS largest_dv_trading_pair_volume
599 FROM converted_pairs_summary
600 ORDER BY dv DESC
601 LIMIT 1
602 ),
603 top_price_mover AS (
604 SELECT
605 asset_start AS top_price_mover_start,
606 asset_end AS top_price_mover_end,
607 price_change AS top_price_mover_change_percent
608 FROM converted_pairs_summary
609 ORDER BY price_change DESC
610 LIMIT 1
611 )
612 INSERT INTO dex_ex_aggregate_summary
613 SELECT
614 $3,
615 direct_volume, swap_volume, liquidity, trades, active_pairs,
616 largest_sv_trading_pair_start,
617 largest_sv_trading_pair_end,
618 largest_sv_trading_pair_volume,
619 largest_dv_trading_pair_start,
620 largest_dv_trading_pair_end,
621 largest_dv_trading_pair_volume,
622 top_price_mover_start,
623 top_price_mover_end,
624 top_price_mover_change_percent
625 FROM
626 sums
627 JOIN largest_sv ON TRUE
628 JOIN largest_dv ON TRUE
629 JOIN top_price_mover ON TRUE
630 ON CONFLICT (the_window) DO UPDATE SET
631 direct_volume = EXCLUDED.direct_volume,
632 swap_volume = EXCLUDED.swap_volume,
633 liquidity = EXCLUDED.liquidity,
634 trades = EXCLUDED.trades,
635 active_pairs = EXCLUDED.active_pairs,
636 largest_sv_trading_pair_start = EXCLUDED.largest_sv_trading_pair_start,
637 largest_sv_trading_pair_end = EXCLUDED.largest_sv_trading_pair_end,
638 largest_sv_trading_pair_volume = EXCLUDED.largest_sv_trading_pair_volume,
639 largest_dv_trading_pair_start = EXCLUDED.largest_dv_trading_pair_start,
640 largest_dv_trading_pair_end = EXCLUDED.largest_dv_trading_pair_end,
641 largest_dv_trading_pair_volume = EXCLUDED.largest_dv_trading_pair_volume,
642 top_price_mover_start = EXCLUDED.top_price_mover_start,
643 top_price_mover_end = EXCLUDED.top_price_mover_end,
644 top_price_mover_change_percent = EXCLUDED.top_price_mover_change_percent
645 ")
646 .bind(denom.to_bytes())
647 .bind(min_liquidity)
648 .bind(window.to_string())
649 .execute(dbtx.as_mut())
650 .await?;
651 Ok(())
652 }
653}
654
655mod metadata {
656 use super::*;
657
658 pub async fn set(dbtx: &mut PgTransaction<'_>, quote_asset: asset::Id) -> anyhow::Result<()> {
659 sqlx::query(
660 "
661 INSERT INTO dex_ex_metadata
662 VALUES (1, $1)
663 ON CONFLICT (id) DO UPDATE
664 SET id = EXCLUDED.id,
665 quote_asset_id = EXCLUDED.quote_asset_id
666 ",
667 )
668 .bind(quote_asset.to_bytes())
669 .execute(dbtx.as_mut())
670 .await?;
671 Ok(())
672 }
673}
674
675#[derive(Debug, Default, Clone, Copy)]
676struct PairMetrics {
677 trades: f64,
678 liquidity_change: f64,
679}
680
681#[derive(Debug, Clone, serde::Serialize)]
682struct BatchSwapSummary {
683 asset_start: asset::Id,
684 asset_end: asset::Id,
685 input: Amount,
686 output: Amount,
687 num_swaps: i32,
688 price_float: f64,
689}
690
691#[derive(Debug)]
692struct Events {
693 time: Option<DateTime>,
694 height: i32,
695 candles: HashMap<DirectedTradingPair, Candle>,
696 metrics: HashMap<DirectedTradingPair, PairMetrics>,
697 positions: BTreeMap<PositionId, Position>,
699 position_opens: Vec<EventPositionOpen>,
701 position_executions: Vec<EventPositionExecution>,
702 position_closes: Vec<EventPositionClose>,
703 position_withdrawals: Vec<EventPositionWithdraw>,
704 batch_swaps: Vec<EventBatchSwap>,
705 swaps: BTreeMap<TradingPair, Vec<EventSwap>>,
706 swap_claims: BTreeMap<TradingPair, Vec<EventSwapClaim>>,
707 position_open_txs: BTreeMap<PositionId, [u8; 32]>,
709 position_close_txs: BTreeMap<PositionId, [u8; 32]>,
710 position_withdrawal_txs: BTreeMap<PositionId, [u8; 32]>,
711}
712
713impl Events {
714 fn new() -> Self {
715 Self {
716 time: None,
717 height: 0,
718 candles: HashMap::new(),
719 metrics: HashMap::new(),
720 positions: BTreeMap::new(),
721 position_opens: Vec::new(),
722 position_executions: Vec::new(),
723 position_closes: Vec::new(),
724 position_withdrawals: Vec::new(),
725 batch_swaps: Vec::new(),
726 swaps: BTreeMap::new(),
727 swap_claims: BTreeMap::new(),
728 position_open_txs: BTreeMap::new(),
729 position_close_txs: BTreeMap::new(),
730 position_withdrawal_txs: BTreeMap::new(),
731 }
732 }
733
734 fn with_time(&mut self, time: DateTime) {
735 self.time = Some(time)
736 }
737
738 fn with_candle(&mut self, pair: DirectedTradingPair, candle: Candle) {
739 let flip = pair.flip();
742 let new_candle = match self.candles.get(&flip).cloned() {
743 None => candle,
744 Some(flipped) => {
745 let mut out = candle;
746 out.mix(&flipped);
747 out
748 }
749 };
750
751 self.candles.insert(pair, new_candle);
752 self.candles.insert(flip, new_candle.flip());
753 }
754
755 fn metric(&mut self, pair: &DirectedTradingPair) -> &mut PairMetrics {
756 if !self.metrics.contains_key(pair) {
757 self.metrics.insert(*pair, PairMetrics::default());
758 }
759 self.metrics.get_mut(pair).unwrap()
761 }
762
763 fn with_trade(&mut self, pair: &DirectedTradingPair) {
764 self.metric(pair).trades += 1.0;
765 }
766
767 fn with_reserve_change(
768 &mut self,
769 pair: &TradingPair,
770 old_reserves: Option<Reserves>,
771 new_reserves: Reserves,
772 removed: bool,
773 ) {
774 let (diff_1, diff_2) = match (removed, old_reserves, new_reserves) {
775 (true, None, new) => (-(new.r1.value() as f64), -(new.r2.value() as f64)),
776 (_, None, new) => ((new.r1.value() as f64), (new.r2.value() as f64)),
777 (_, Some(old), new) => (
778 (new.r1.value() as f64) - (old.r1.value() as f64),
779 (new.r2.value() as f64) - (old.r2.value() as f64),
780 ),
781 };
782 for (d_pair, diff) in [
783 (
784 DirectedTradingPair {
785 start: pair.asset_1(),
786 end: pair.asset_2(),
787 },
788 diff_2,
789 ),
790 (
791 DirectedTradingPair {
792 start: pair.asset_2(),
793 end: pair.asset_1(),
794 },
795 diff_1,
796 ),
797 ] {
798 self.metric(&d_pair).liquidity_change += diff;
799 }
800 }
801
802 pub fn extract(block: &BlockEvents) -> anyhow::Result<Self> {
803 let mut out = Self::new();
804 out.height = block.height() as i32;
805
806 for event in block.events() {
807 if let Ok(e) = EventCandlestickData::try_from_event(&event.event) {
808 let candle = Candle::from_candlestick_data(&e.stick);
809 out.with_candle(e.pair, candle);
810 } else if let Ok(e) = EventBlockRoot::try_from_event(&event.event) {
811 let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!(
812 "creating timestamp should succeed; timestamp: {}",
813 e.timestamp_seconds
814 ))?;
815 out.with_time(time);
816 } else if let Ok(e) = EventPositionOpen::try_from_event(&event.event) {
817 out.with_reserve_change(
818 &e.trading_pair,
819 None,
820 Reserves {
821 r1: e.reserves_1,
822 r2: e.reserves_2,
823 },
824 false,
825 );
826 if let Some(tx_hash) = event.tx_hash() {
827 out.position_open_txs.insert(e.position_id, tx_hash);
828 }
829 out.positions.insert(e.position_id, e.position.clone());
833 out.position_opens.push(e);
834 } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) {
835 out.with_reserve_change(
838 &e.trading_pair,
839 None,
840 Reserves {
841 r1: e.reserves_1,
842 r2: e.reserves_2,
843 },
844 true,
845 );
846 if let Some(tx_hash) = event.tx_hash() {
847 out.position_withdrawal_txs.insert(e.position_id, tx_hash);
848 }
849 out.position_withdrawals.push(e);
850 } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) {
851 out.with_reserve_change(
852 &e.trading_pair,
853 Some(Reserves {
854 r1: e.prev_reserves_1,
855 r2: e.prev_reserves_2,
856 }),
857 Reserves {
858 r1: e.reserves_1,
859 r2: e.reserves_2,
860 },
861 false,
862 );
863 if e.reserves_1 > e.prev_reserves_1 {
864 out.with_trade(&DirectedTradingPair {
866 start: e.trading_pair.asset_1(),
867 end: e.trading_pair.asset_2(),
868 });
869 } else if e.reserves_2 > e.prev_reserves_2 {
870 out.with_trade(&DirectedTradingPair {
871 start: e.trading_pair.asset_2(),
872 end: e.trading_pair.asset_1(),
873 });
874 }
875 out.position_executions.push(e);
876 } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) {
877 out.position_closes.push(e);
878 } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) {
879 if let Some(tx_hash) = event.tx_hash() {
882 out.position_close_txs.insert(e.position_id, tx_hash);
883 }
884 } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
885 out.batch_swaps.push(e);
886 } else if let Ok(e) = EventSwap::try_from_event(&event.event) {
887 out.swaps
888 .entry(e.trading_pair)
889 .or_insert_with(Vec::new)
890 .push(e);
891 } else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) {
892 out.swap_claims
893 .entry(e.trading_pair)
894 .or_insert_with(Vec::new)
895 .push(e);
896 }
897 }
898 Ok(out)
899 }
900
901 async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
902 let missing_positions: Vec<_> = self
904 .position_executions
905 .iter()
906 .map(|e| e.position_id)
907 .filter(|id| !self.positions.contains_key(id))
908 .collect();
909
910 if missing_positions.is_empty() {
911 return Ok(());
912 }
913
914 let rows = sqlx::query(
916 "SELECT position_raw
917 FROM dex_ex_position_state
918 WHERE position_id = ANY($1)",
919 )
920 .bind(
921 &missing_positions
922 .iter()
923 .map(|id| id.0.as_ref())
924 .collect::<Vec<_>>(),
925 )
926 .fetch_all(dbtx.as_mut())
927 .await?;
928
929 for row in rows {
931 let position_raw: Vec<u8> = row.get("position_raw");
932 let position = Position::decode(position_raw.as_slice())?;
933 self.positions.insert(position.id(), position);
934 }
935
936 Ok(())
937 }
938
939 pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
941 self.candles
942 .get(&DirectedTradingPair::new(asset, indexing_denom))
943 .map(|x| x.close)
944 }
945}
946
947#[derive(Debug)]
948pub struct Component {
949 denom: asset::Id,
950 min_liquidity: f64,
951}
952
953impl Component {
954 pub fn new(denom: asset::Id, min_liquidity: f64) -> Self {
955 Self {
956 denom,
957 min_liquidity,
958 }
959 }
960
961 async fn record_position_open(
962 &self,
963 dbtx: &mut PgTransaction<'_>,
964 time: DateTime,
965 height: i32,
966 tx_hash: Option<[u8; 32]>,
967 event: &EventPositionOpen,
968 ) -> anyhow::Result<()> {
969 let effective_price_1_to_2: f64 = event
971 .position
972 .phi
973 .orient_start(event.trading_pair.asset_1())
974 .expect("position trading pair matches")
975 .effective_price()
976 .into();
977
978 let effective_price_2_to_1: f64 = event
979 .position
980 .phi
981 .orient_start(event.trading_pair.asset_2())
982 .expect("position trading pair matches")
983 .effective_price()
984 .into();
985
986 let opening_reserves_rowid = sqlx::query_scalar::<_, i32>(
988 "INSERT INTO dex_ex_position_reserves (
989 position_id,
990 height,
991 time,
992 reserves_1,
993 reserves_2
994 ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
995 )
996 .bind(event.position_id.0)
997 .bind(height)
998 .bind(time)
999 .bind(BigDecimal::from(event.reserves_1.value()))
1000 .bind(BigDecimal::from(event.reserves_2.value()))
1001 .fetch_one(dbtx.as_mut())
1002 .await?;
1003
1004 sqlx::query(
1006 "INSERT INTO dex_ex_position_state (
1007 position_id,
1008 asset_1,
1009 asset_2,
1010 p,
1011 q,
1012 close_on_fill,
1013 fee_bps,
1014 effective_price_1_to_2,
1015 effective_price_2_to_1,
1016 position_raw,
1017 opening_time,
1018 opening_height,
1019 opening_tx,
1020 opening_reserves_rowid
1021 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
1022 )
1023 .bind(event.position_id.0)
1024 .bind(event.trading_pair.asset_1().to_bytes())
1025 .bind(event.trading_pair.asset_2().to_bytes())
1026 .bind(BigDecimal::from(event.position.phi.component.p.value()))
1027 .bind(BigDecimal::from(event.position.phi.component.q.value()))
1028 .bind(event.position.close_on_fill)
1029 .bind(event.trading_fee as i32)
1030 .bind(effective_price_1_to_2)
1031 .bind(effective_price_2_to_1)
1032 .bind(event.position.encode_to_vec())
1033 .bind(time)
1034 .bind(height)
1035 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1036 .bind(opening_reserves_rowid)
1037 .execute(dbtx.as_mut())
1038 .await?;
1039
1040 Ok(())
1041 }
1042
1043 async fn record_swap_execution_traces(
1044 &self,
1045 dbtx: &mut PgTransaction<'_>,
1046 time: DateTime,
1047 height: i32,
1048 swap_execution: &SwapExecution,
1049 ) -> anyhow::Result<()> {
1050 let SwapExecution {
1051 traces,
1052 input: se_input,
1053 output: se_output,
1054 } = swap_execution;
1055
1056 let asset_start = se_input.asset_id;
1057 let asset_end = se_output.asset_id;
1058 let batch_input = se_input.amount;
1059 let batch_output = se_output.amount;
1060
1061 for trace in traces.iter() {
1062 let Some(input_value) = trace.first() else {
1063 continue;
1064 };
1065 let Some(output_value) = trace.last() else {
1066 continue;
1067 };
1068
1069 let input = input_value.amount;
1070 let output = output_value.amount;
1071
1072 let price_float = (output.value() as f64) / (input.value() as f64);
1073 let amount_hops = trace
1074 .iter()
1075 .map(|x| BigDecimal::from(x.amount.value()))
1076 .collect::<Vec<_>>();
1077 let position_id_hops: Vec<[u8; 32]> = vec![];
1078 let asset_hops = trace
1079 .iter()
1080 .map(|x| x.asset_id.to_bytes())
1081 .collect::<Vec<_>>();
1082
1083 sqlx::query(
1084 "INSERT INTO dex_ex_batch_swap_traces (
1085 height,
1086 time,
1087 input,
1088 output,
1089 batch_input,
1090 batch_output,
1091 price_float,
1092 asset_start,
1093 asset_end,
1094 asset_hops,
1095 amount_hops,
1096 position_id_hops
1097 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1098 )
1099 .bind(height)
1100 .bind(time)
1101 .bind(BigDecimal::from(input.value()))
1102 .bind(BigDecimal::from(output.value()))
1103 .bind(BigDecimal::from(batch_input.value()))
1104 .bind(BigDecimal::from(batch_output.value()))
1105 .bind(price_float)
1106 .bind(asset_start.to_bytes())
1107 .bind(asset_end.to_bytes())
1108 .bind(asset_hops)
1109 .bind(amount_hops)
1110 .bind(position_id_hops)
1111 .execute(dbtx.as_mut())
1112 .await?;
1113 }
1114
1115 Ok(())
1116 }
1117
1118 async fn record_block_summary(
1119 &self,
1120 dbtx: &mut PgTransaction<'_>,
1121 time: DateTime,
1122 height: i32,
1123 events: &Events,
1124 ) -> anyhow::Result<()> {
1125 let num_opened_lps = events.position_opens.len() as i32;
1126 let num_closed_lps = events.position_closes.len() as i32;
1127 let num_withdrawn_lps = events.position_withdrawals.len() as i32;
1128 let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::<usize>() as i32;
1129 let num_swap_claims = events
1130 .swap_claims
1131 .iter()
1132 .map(|(_, v)| v.len())
1133 .sum::<usize>() as i32;
1134 let num_txs = events.batch_swaps.len() as i32;
1135
1136 let mut batch_swap_summaries = Vec::<BatchSwapSummary>::new();
1137
1138 for event in &events.batch_swaps {
1139 let trading_pair = event.batch_swap_output_data.trading_pair;
1140
1141 if let Some(swap_1_2) = &event.swap_execution_1_for_2 {
1142 let asset_start = swap_1_2.input.asset_id;
1143 let asset_end = swap_1_2.output.asset_id;
1144 let input = swap_1_2.input.amount;
1145 let output = swap_1_2.output.amount;
1146 let price_float = (output.value() as f64) / (input.value() as f64);
1147
1148 let empty_vec = vec![];
1149 let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1150 let filtered_swaps: Vec<_> = swaps_for_pair
1151 .iter()
1152 .filter(|swap| swap.delta_1_i != Amount::zero())
1153 .collect::<Vec<_>>();
1154 let num_swaps = filtered_swaps.len() as i32;
1155
1156 batch_swap_summaries.push(BatchSwapSummary {
1157 asset_start,
1158 asset_end,
1159 input,
1160 output,
1161 num_swaps,
1162 price_float,
1163 });
1164 }
1165
1166 if let Some(swap_2_1) = &event.swap_execution_2_for_1 {
1167 let asset_start = swap_2_1.input.asset_id;
1168 let asset_end = swap_2_1.output.asset_id;
1169 let input = swap_2_1.input.amount;
1170 let output = swap_2_1.output.amount;
1171 let price_float = (output.value() as f64) / (input.value() as f64);
1172
1173 let empty_vec = vec![];
1174 let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1175 let filtered_swaps: Vec<_> = swaps_for_pair
1176 .iter()
1177 .filter(|swap| swap.delta_2_i != Amount::zero())
1178 .collect::<Vec<_>>();
1179 let num_swaps = filtered_swaps.len() as i32;
1180
1181 batch_swap_summaries.push(BatchSwapSummary {
1182 asset_start,
1183 asset_end,
1184 input,
1185 output,
1186 num_swaps,
1187 price_float,
1188 });
1189 }
1190 }
1191
1192 sqlx::query(
1193 "INSERT INTO dex_ex_block_summary (
1194 height,
1195 time,
1196 batch_swaps,
1197 num_open_lps,
1198 num_closed_lps,
1199 num_withdrawn_lps,
1200 num_swaps,
1201 num_swap_claims,
1202 num_txs
1203 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
1204 )
1205 .bind(height)
1206 .bind(time)
1207 .bind(serde_json::to_value(&batch_swap_summaries)?)
1208 .bind(num_opened_lps)
1209 .bind(num_closed_lps)
1210 .bind(num_withdrawn_lps)
1211 .bind(num_swaps)
1212 .bind(num_swap_claims)
1213 .bind(num_txs)
1214 .execute(dbtx.as_mut())
1215 .await?;
1216
1217 Ok(())
1218 }
1219
1220 async fn record_batch_swap_traces(
1221 &self,
1222 dbtx: &mut PgTransaction<'_>,
1223 time: DateTime,
1224 height: i32,
1225 event: &EventBatchSwap,
1226 ) -> anyhow::Result<()> {
1227 let EventBatchSwap {
1228 batch_swap_output_data: _,
1229 swap_execution_1_for_2,
1230 swap_execution_2_for_1,
1231 } = event;
1232
1233 if let Some(batch_swap_1_2) = swap_execution_1_for_2 {
1234 self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2)
1235 .await?;
1236 }
1237
1238 if let Some(batch_swap_2_1) = swap_execution_2_for_1 {
1239 self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1)
1240 .await?;
1241 }
1242
1243 Ok(())
1244 }
1245
1246 async fn record_position_execution(
1247 &self,
1248 dbtx: &mut PgTransaction<'_>,
1249 time: DateTime,
1250 height: i32,
1251 event: &EventPositionExecution,
1252 positions: &BTreeMap<PositionId, Position>,
1253 ) -> anyhow::Result<()> {
1254 let position = positions
1256 .get(&event.position_id)
1257 .expect("position must exist for execution");
1258
1259 let (delta_1, delta_2, lambda_1, lambda_2) = if event.reserves_1 > event.prev_reserves_1 {
1261 let delta_1 = event.reserves_1 - event.prev_reserves_1;
1263 let lambda_2 = event.prev_reserves_2 - event.reserves_2;
1264 (delta_1, Amount::zero(), Amount::zero(), lambda_2)
1265 } else {
1266 let delta_2 = event.reserves_2 - event.prev_reserves_2;
1268 let lambda_1 = event.prev_reserves_1 - event.reserves_1;
1269 (Amount::zero(), delta_2, lambda_1, Amount::zero())
1270 };
1271
1272 let fee_bps = position.phi.component.fee as u128;
1274 let fee_1 = (delta_1.value() * fee_bps) / 10_000u128;
1275 let fee_2 = (delta_2.value() * fee_bps) / 10_000u128;
1276
1277 let reserves_rowid = sqlx::query_scalar::<_, i32>(
1279 "INSERT INTO dex_ex_position_reserves (
1280 position_id,
1281 height,
1282 time,
1283 reserves_1,
1284 reserves_2
1285 ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1286 )
1287 .bind(event.position_id.0)
1288 .bind(height)
1289 .bind(time)
1290 .bind(BigDecimal::from(event.reserves_1.value()))
1291 .bind(BigDecimal::from(event.reserves_2.value()))
1292 .fetch_one(dbtx.as_mut())
1293 .await?;
1294
1295 sqlx::query(
1297 "INSERT INTO dex_ex_position_executions (
1298 position_id,
1299 height,
1300 time,
1301 reserves_rowid,
1302 delta_1,
1303 delta_2,
1304 lambda_1,
1305 lambda_2,
1306 fee_1,
1307 fee_2,
1308 context_asset_start,
1309 context_asset_end
1310 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1311 )
1312 .bind(event.position_id.0)
1313 .bind(height)
1314 .bind(time)
1315 .bind(reserves_rowid)
1316 .bind(BigDecimal::from(delta_1.value()))
1317 .bind(BigDecimal::from(delta_2.value()))
1318 .bind(BigDecimal::from(lambda_1.value()))
1319 .bind(BigDecimal::from(lambda_2.value()))
1320 .bind(BigDecimal::from(fee_1))
1321 .bind(BigDecimal::from(fee_2))
1322 .bind(event.context.start.to_bytes())
1323 .bind(event.context.end.to_bytes())
1324 .execute(dbtx.as_mut())
1325 .await?;
1326
1327 Ok(())
1328 }
1329
1330 async fn record_position_close(
1331 &self,
1332 dbtx: &mut PgTransaction<'_>,
1333 time: DateTime,
1334 height: i32,
1335 tx_hash: Option<[u8; 32]>,
1336 event: &EventPositionClose,
1337 ) -> anyhow::Result<()> {
1338 sqlx::query(
1339 "UPDATE dex_ex_position_state
1340 SET closing_time = $1,
1341 closing_height = $2,
1342 closing_tx = $3
1343 WHERE position_id = $4",
1344 )
1345 .bind(time)
1346 .bind(height)
1347 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1348 .bind(event.position_id.0)
1349 .execute(dbtx.as_mut())
1350 .await?;
1351
1352 Ok(())
1353 }
1354
1355 async fn record_position_withdraw(
1356 &self,
1357 dbtx: &mut PgTransaction<'_>,
1358 time: DateTime,
1359 height: i32,
1360 tx_hash: Option<[u8; 32]>,
1361 event: &EventPositionWithdraw,
1362 ) -> anyhow::Result<()> {
1363 let reserves_rowid = sqlx::query_scalar::<_, i32>(
1365 "INSERT INTO dex_ex_position_reserves (
1366 position_id,
1367 height,
1368 time,
1369 reserves_1,
1370 reserves_2
1371 ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", )
1373 .bind(event.position_id.0)
1374 .bind(height)
1375 .bind(time)
1376 .bind(BigDecimal::from(0)) .fetch_one(dbtx.as_mut())
1378 .await?;
1379
1380 sqlx::query(
1381 "INSERT INTO dex_ex_position_withdrawals (
1382 position_id,
1383 height,
1384 time,
1385 withdrawal_tx,
1386 sequence,
1387 reserves_1,
1388 reserves_2,
1389 reserves_rowid
1390 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1391 )
1392 .bind(event.position_id.0)
1393 .bind(height)
1394 .bind(time)
1395 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1396 .bind(event.sequence as i32)
1397 .bind(BigDecimal::from(event.reserves_1.value()))
1398 .bind(BigDecimal::from(event.reserves_2.value()))
1399 .bind(reserves_rowid)
1400 .execute(dbtx.as_mut())
1401 .await?;
1402
1403 Ok(())
1404 }
1405
1406 async fn record_transaction(
1407 &self,
1408 dbtx: &mut PgTransaction<'_>,
1409 time: DateTime,
1410 height: u64,
1411 transaction_id: [u8; 32],
1412 transaction: Transaction,
1413 ) -> anyhow::Result<()> {
1414 if transaction.transaction_body.actions.is_empty() {
1415 return Ok(());
1416 }
1417 sqlx::query(
1418 "INSERT INTO dex_ex_transactions (
1419 transaction_id,
1420 transaction,
1421 height,
1422 time
1423 ) VALUES ($1, $2, $3, $4)
1424 ",
1425 )
1426 .bind(transaction_id)
1427 .bind(transaction.encode_to_vec())
1428 .bind(i32::try_from(height)?)
1429 .bind(time)
1430 .execute(dbtx.as_mut())
1431 .await?;
1432
1433 Ok(())
1434 }
1435
1436 async fn record_all_transactions(
1437 &self,
1438 dbtx: &mut PgTransaction<'_>,
1439 time: DateTime,
1440 block: &BlockEvents,
1441 ) -> anyhow::Result<()> {
1442 for (tx_id, tx_bytes) in block.transactions() {
1443 let tx = Transaction::try_from(tx_bytes)?;
1444 let height = block.height();
1445 self.record_transaction(dbtx, time, height, tx_id, tx)
1446 .await?;
1447 }
1448 Ok(())
1449 }
1450}
1451
1452#[async_trait]
1453impl AppView for Component {
1454 async fn init_chain(
1455 &self,
1456 dbtx: &mut PgTransaction,
1457 _: &serde_json::Value,
1458 ) -> Result<(), anyhow::Error> {
1459 for statement in include_str!("schema.sql").split(";") {
1460 sqlx::query(statement).execute(dbtx.as_mut()).await?;
1461 }
1462 Ok(())
1463 }
1464
1465 fn name(&self) -> String {
1466 "dex_ex".to_string()
1467 }
1468
1469 async fn index_batch(
1470 &self,
1471 dbtx: &mut PgTransaction,
1472 batch: EventBatch,
1473 ctx: EventBatchContext,
1474 ) -> Result<(), anyhow::Error> {
1475 metadata::set(dbtx, self.denom).await?;
1476 let mut charts = HashMap::new();
1477 let mut snapshots = HashMap::new();
1478 let mut last_time = None;
1479 for block in batch.events_by_block() {
1480 let mut events = Events::extract(&block)?;
1481 let time = events
1482 .time
1483 .expect(&format!("no block root event at height {}", block.height()));
1484 last_time = Some(time);
1485
1486 self.record_all_transactions(dbtx, time, block).await?;
1487
1488 events.load_positions(dbtx).await?;
1490
1491 self.record_block_summary(dbtx, time, block.height() as i32, &events)
1493 .await?;
1494
1495 for event in &events.batch_swaps {
1497 self.record_batch_swap_traces(dbtx, time, block.height() as i32, event)
1498 .await?;
1499 }
1500
1501 for event in &events.position_opens {
1503 let tx_hash = events.position_open_txs.get(&event.position_id).copied();
1504 self.record_position_open(dbtx, time, events.height, tx_hash, event)
1505 .await?;
1506 }
1507
1508 for event in &events.position_executions {
1510 self.record_position_execution(dbtx, time, events.height, event, &events.positions)
1511 .await?;
1512 }
1513
1514 for event in &events.position_closes {
1516 let tx_hash = events.position_close_txs.get(&event.position_id).copied();
1517 self.record_position_close(dbtx, time, events.height, tx_hash, event)
1518 .await?;
1519 }
1520
1521 for event in &events.position_withdrawals {
1523 let tx_hash = events
1524 .position_withdrawal_txs
1525 .get(&event.position_id)
1526 .copied();
1527 self.record_position_withdraw(dbtx, time, events.height, tx_hash, event)
1528 .await?;
1529 }
1530
1531 for (pair, candle) in &events.candles {
1532 for window in Window::all() {
1533 let key = (pair.start, pair.end, window);
1534 if !charts.contains_key(&key) {
1535 let ctx = PriceChartContext::load(dbtx, key.0, key.1, key.2).await?;
1536 charts.insert(key, ctx);
1537 }
1538 charts
1539 .get_mut(&key)
1540 .unwrap() .update(dbtx, time, *candle)
1542 .await?;
1543 }
1544 }
1545
1546 let block_pairs = events
1547 .candles
1548 .keys()
1549 .chain(events.metrics.keys())
1550 .copied()
1551 .collect::<HashSet<_>>();
1552 for pair in block_pairs {
1553 if !snapshots.contains_key(&pair) {
1554 let ctx = summary::Context::load(dbtx, pair.start, pair.end).await?;
1555 snapshots.insert(pair, ctx);
1556 }
1557 snapshots
1559 .get_mut(&pair)
1560 .unwrap()
1561 .update(
1562 dbtx,
1563 time,
1564 events.candles.get(&pair).copied(),
1565 events.metrics.get(&pair).copied().unwrap_or_default(),
1566 events.price_for(self.denom, pair.start),
1567 )
1568 .await?;
1569 }
1570 }
1571
1572 if ctx.is_last() {
1573 if let Some(now) = last_time {
1574 for window in Window::all() {
1575 summary::update_summaries(dbtx, now, window).await?;
1576 summary::update_aggregate_summary(dbtx, window, self.denom, self.min_liquidity)
1577 .await?;
1578 }
1579 }
1580 }
1581 for chart in charts.into_values() {
1582 chart.unload(dbtx).await?;
1583 }
1584 Ok(())
1585 }
1586}