1use anyhow::anyhow;
2use cometindex::{
3 async_trait,
4 index::{EventBatch, EventBatchContext, Version},
5 sqlx, AppView, ContextualizedEvent, PgTransaction,
6};
7use penumbra_sdk_app::{event::EventAppParametersChange, genesis::Content, params::AppParameters};
8use penumbra_sdk_asset::asset;
9use penumbra_sdk_dex::event::EventLqtPositionVolume;
10use penumbra_sdk_dex::lp::position;
11use penumbra_sdk_distributions::{event::EventLqtPoolSizeIncrease, DistributionsParameters};
12use penumbra_sdk_funding::{
13 event::{EventLqtDelegatorReward, EventLqtPositionReward, EventLqtVote},
14 FundingParameters,
15};
16use penumbra_sdk_keys::Address;
17use penumbra_sdk_num::Amount;
18use penumbra_sdk_proto::event::EventDomainType;
19use penumbra_sdk_sct::{event::EventEpochRoot, params::SctParameters};
20use sqlx::types::BigDecimal;
21
22use crate::parsing::parse_content;
23
24struct Parameters {
25 funding: FundingParameters,
26 sct: SctParameters,
27 distribution: DistributionsParameters,
28}
29
30impl From<Content> for Parameters {
31 fn from(value: Content) -> Self {
32 Self {
33 funding: value.funding_content.funding_params,
34 sct: value.sct_content.sct_params,
35 distribution: value.distributions_content.distributions_params,
36 }
37 }
38}
39
40impl From<AppParameters> for Parameters {
41 fn from(value: AppParameters) -> Self {
42 Self {
43 funding: value.funding_params,
44 sct: value.sct_params,
45 distribution: value.distributions_params,
46 }
47 }
48}
49
50mod _params {
51 use super::*;
52
53 pub async fn set_initial(
55 dbtx: &mut PgTransaction<'_>,
56 params: Parameters,
57 ) -> anyhow::Result<()> {
58 set_epoch(dbtx, 0, params).await
59 }
60
61 #[allow(dead_code)]
63 pub async fn set_epoch(
65 dbtx: &mut PgTransaction<'_>,
66 epoch: u64,
67 params: Parameters,
68 ) -> anyhow::Result<()> {
69 let gauge_threshold = params.funding.liquidity_tournament.gauge_threshold;
70 let delegator_share = params.funding.liquidity_tournament.delegator_share;
71 let epoch_duration = params.sct.epoch_duration;
72 let rewards_per_block = params.distribution.liquidity_tournament_incentive_per_block;
73 sqlx::query(
74 "
75 INSERT INTO lqt._params
76 VALUES ($1, $2::NUMERIC / 100, $3::NUMERIC / 100, $4, $5)
77 ON CONFLICT (epoch)
78 DO UPDATE SET
79 delegator_share = EXCLUDED.delegator_share,
80 gauge_threshold = EXCLUDED.gauge_threshold,
81 epoch_duration = EXCLUDED.epoch_duration,
82 rewards_per_block = EXCLUDED.rewards_per_block
83 ",
84 )
85 .bind(i64::try_from(epoch)?)
86 .bind(i64::try_from(delegator_share.to_percent())?)
87 .bind(i64::try_from(gauge_threshold.to_percent())?)
88 .bind(i64::try_from(epoch_duration)?)
89 .bind(i64::try_from(rewards_per_block)?)
90 .execute(dbtx.as_mut())
91 .await?;
92 Ok(())
93 }
94}
95
96mod _meta {
97 use super::*;
98
99 pub async fn update_with_batch(
100 dbtx: &mut PgTransaction<'_>,
101 batch: &EventBatch,
102 block_time_s: f64,
103 ) -> anyhow::Result<()> {
104 let current_height = batch
105 .events_by_block()
106 .last()
107 .map(|x| x.height())
108 .ok_or(anyhow!("expected there to be at least one block in events"))?;
109 sqlx::query(
110 "
111 INSERT INTO lqt._meta VALUES (0, $1, $2)
112 ON CONFLICT (rowid)
113 DO UPDATE SET
114 current_height = EXCLUDED.current_height,
115 block_time_s = EXCLUDED.block_time_s
116 ",
117 )
118 .bind(i64::try_from(current_height)?)
119 .bind(block_time_s)
120 .execute(dbtx.as_mut())
121 .await?;
122 Ok(())
123 }
124}
125
126mod _epoch_info {
127 use super::*;
128
129 pub async fn start_epoch(
130 dbtx: &mut PgTransaction<'_>,
131 epoch: u64,
132 height: u64,
133 ) -> anyhow::Result<()> {
134 sqlx::query(
135 "INSERT INTO lqt._epoch_info VALUES ($1, $2, $2, NULL, 0) ON CONFLICT DO NOTHING",
136 )
137 .bind(i64::try_from(epoch)?)
138 .bind(i64::try_from(height)?)
139 .execute(dbtx.as_mut())
140 .await?;
141 Ok(())
142 }
143
144 pub async fn end_epoch(
145 dbtx: &mut PgTransaction<'_>,
146 epoch: u64,
147 height: u64,
148 ) -> anyhow::Result<()> {
149 sqlx::query(
150 "UPDATE lqt._epoch_info SET end_block = $2, updated_block = $2 WHERE epoch = $1",
151 )
152 .bind(i64::try_from(epoch)?)
153 .bind(i64::try_from(height)?)
154 .execute(dbtx.as_mut())
155 .await?;
156 Ok(())
157 }
158
159 pub async fn set_rewards_for_epoch(
160 dbtx: &mut PgTransaction<'_>,
161 epoch: u64,
162 height: u64,
163 amount: Amount,
164 ) -> anyhow::Result<()> {
165 sqlx::query("UPDATE lqt._epoch_info SET available_rewards = $2::NUMERIC, updated_block = $3 WHERE epoch = $1")
166 .bind(i64::try_from(epoch)?)
167 .bind(BigDecimal::from(amount.value()))
168 .bind(i64::try_from(height)?)
169 .execute(dbtx.as_mut())
170 .await?;
171 Ok(())
172 }
173
174 pub async fn current(dbtx: &mut PgTransaction<'_>) -> anyhow::Result<u64> {
175 let out: i32 =
176 sqlx::query_scalar("SELECT epoch FROM lqt._epoch_info ORDER BY epoch DESC LIMIT 1")
177 .fetch_one(dbtx.as_mut())
178 .await?;
179 Ok(u64::try_from(out)?)
180 }
181}
182
183mod _finished_epochs {
184 use super::*;
185
186 pub async fn declare_finished(dbtx: &mut PgTransaction<'_>, epoch: u64) -> anyhow::Result<()> {
190 sqlx::query("INSERT INTO lqt._finished_epochs VALUES ($1) ON CONFLICT DO NOTHING")
191 .bind(i64::try_from(epoch)?)
192 .execute(dbtx.as_mut())
193 .await?;
194 Ok(())
195 }
196}
197
198mod _delegator_rewards {
199 use super::*;
200
201 pub async fn add(
203 dbtx: &mut PgTransaction<'_>,
204 epoch: u64,
205 addr: Address,
206 amount: Amount,
207 ) -> anyhow::Result<()> {
208 sqlx::query(
209 "
210 INSERT INTO lqt._delegator_rewards
211 VALUES ($1, $2, $3)
212 ON CONFLICT (epoch, address)
213 DO UPDATE SET
214 amount = lqt._delegator_rewards.amount + EXCLUDED.amount
215 ",
216 )
217 .bind(i64::try_from(epoch)?)
218 .bind(addr.to_vec())
219 .bind(BigDecimal::from(amount.value()))
220 .execute(dbtx.as_mut())
221 .await?;
222 Ok(())
223 }
224}
225
226mod _lp_rewards {
227 use super::*;
228
229 pub async fn add_reward(
233 dbtx: &mut PgTransaction<'_>,
234 epoch: u64,
235 position_id: position::Id,
236 amount: Amount,
237 ) -> anyhow::Result<()> {
238 sqlx::query(
239 "
240 UPDATE lqt._lp_rewards
241 SET amount = amount + $3
242 WHERE epoch = $1 AND position_id = $2
243 ",
244 )
245 .bind(i64::try_from(epoch)?)
246 .bind(position_id.0)
247 .bind(BigDecimal::from(amount.value()))
248 .execute(dbtx.as_mut())
249 .await?;
250 Ok(())
251 }
252
253 pub async fn add_flows(
255 dbtx: &mut PgTransaction<'_>,
256 epoch: u64,
257 position_id: position::Id,
258 other_asset: asset::Id,
259 um_volume: Amount,
260 asset_volume: Amount,
261 um_fees: Amount,
262 asset_fees: Amount,
263 points: Amount,
264 ) -> anyhow::Result<()> {
265 sqlx::query(
266 "
267 INSERT INTO lqt._lp_rewards
268 VALUES ($1, $2, $3, 0, 1, $4, $5, $6, $7, $8)
269 ON CONFLICT (epoch, position_id)
270 DO UPDATE SET
271 asset_id = EXCLUDED.asset_id,
272 executions = lqt._lp_rewards.executions + 1,
273 um_volume = lqt._lp_rewards.um_volume + EXCLUDED.um_volume,
274 asset_volume = lqt._lp_rewards.asset_volume + EXCLUDED.asset_volume,
275 um_fees = lqt._lp_rewards.um_fees + EXCLUDED.um_fees,
276 asset_fees = lqt._lp_rewards.asset_fees + EXCLUDED.asset_fees,
277 points = lqt._lp_rewards.points + EXCLUDED.points
278 ",
279 )
280 .bind(i64::try_from(epoch)?)
281 .bind(position_id.0)
282 .bind(other_asset.to_bytes())
283 .bind(BigDecimal::from(um_volume.value()))
284 .bind(BigDecimal::from(asset_volume.value()))
285 .bind(BigDecimal::from(um_fees.value()))
286 .bind(BigDecimal::from(asset_fees.value()))
287 .bind(BigDecimal::from(points.value()))
288 .execute(dbtx.as_mut())
289 .await?;
290 Ok(())
291 }
292}
293
294mod _votes {
295 use super::*;
296
297 pub async fn add(
299 dbtx: &mut PgTransaction<'_>,
300 epoch: u64,
301 power: Amount,
302 asset_id: asset::Id,
303 addr: Address,
304 ) -> anyhow::Result<()> {
305 sqlx::query(
306 "
307 INSERT INTO lqt._votes
308 VALUES (DEFAULT, $1, $2, $3, $4)
309 ",
310 )
311 .bind(i64::try_from(epoch)?)
312 .bind(BigDecimal::from(power.value()))
313 .bind(asset_id.to_bytes())
314 .bind(addr.to_vec())
315 .execute(dbtx.as_mut())
316 .await?;
317 Ok(())
318 }
319}
320
321#[derive(Debug)]
322pub struct Lqt {
323 block_time_s: f64,
324}
325
326impl Lqt {
327 pub fn new(block_time_s: f64) -> Self {
328 Self { block_time_s }
329 }
330
331 async fn index_event(
332 &self,
333 dbtx: &mut PgTransaction<'_>,
334 event: ContextualizedEvent<'_>,
335 ) -> anyhow::Result<()> {
336 if let Ok(e) = EventLqtVote::try_from_event(&event.event) {
337 _votes::add(
338 dbtx,
339 e.epoch_index,
340 e.voting_power,
341 e.incentivized_asset_id,
342 e.rewards_recipient,
343 )
344 .await?;
345 } else if let Ok(e) = EventLqtDelegatorReward::try_from_event(&event.event) {
346 _delegator_rewards::add(dbtx, e.epoch_index, e.address, e.reward_amount).await?;
347 } else if let Ok(e) = EventLqtPositionReward::try_from_event(&event.event) {
348 _lp_rewards::add_reward(dbtx, e.epoch_index, e.position_id, e.reward_amount).await?;
349 } else if let Ok(e) = EventLqtPositionVolume::try_from_event(&event.event) {
350 _lp_rewards::add_flows(
351 dbtx,
352 e.epoch_index,
353 e.position_id,
354 e.asset_id,
355 e.staking_token_in,
356 e.asset_in,
357 e.staking_fees,
358 e.asset_fees,
359 e.volume,
360 )
361 .await?;
362 } else if let Ok(e) = EventEpochRoot::try_from_event(&event.event) {
363 _finished_epochs::declare_finished(dbtx, e.index).await?;
364 _epoch_info::end_epoch(dbtx, e.index, event.block_height).await?;
365 _epoch_info::start_epoch(dbtx, e.index + 1, event.block_height + 1).await?;
366 } else if let Ok(e) = EventLqtPoolSizeIncrease::try_from_event(&event.event) {
367 _epoch_info::set_rewards_for_epoch(
368 dbtx,
369 e.epoch_index,
370 event.block_height,
371 e.new_total,
372 )
373 .await?;
374 } else if let Ok(e) = EventAppParametersChange::try_from_event(&event.event) {
375 let current = _epoch_info::current(dbtx).await?;
376 _params::set_epoch(dbtx, current, e.new_parameters.into()).await?;
377 }
378 Ok(())
379 }
380}
381
382#[async_trait]
383impl AppView for Lqt {
384 async fn init_chain(
385 &self,
386 dbtx: &mut PgTransaction,
387 app_state: &serde_json::Value,
388 ) -> Result<(), anyhow::Error> {
389 let content = parse_content(app_state.clone())?;
390 _params::set_initial(dbtx, content.into()).await?;
391 Ok(())
392 }
393
394 fn name(&self) -> String {
395 "lqt".to_string()
396 }
397
398 fn version(&self) -> Version {
399 let hash: [u8; 32] = blake2b_simd::Params::default()
400 .personal(b"option_hash")
401 .hash_length(32)
402 .to_state()
403 .update(&self.block_time_s.to_le_bytes())
404 .finalize()
405 .as_bytes()
406 .try_into()
407 .expect("Impossible 000-003: expected 32 byte hash");
408
409 Version::with_major(4).with_option_hash(hash)
410 }
411
412 async fn reset(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
413 for statement in include_str!("reset.sql").split(";") {
414 sqlx::query(statement).execute(dbtx.as_mut()).await?;
415 }
416 Ok(())
417 }
418
419 async fn on_startup(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
420 for statement in include_str!("schema.sql").split(";") {
421 sqlx::query(statement).execute(dbtx.as_mut()).await?;
422 }
423 _epoch_info::start_epoch(dbtx, 1, 1).await?;
424 Ok(())
425 }
426
427 async fn index_batch(
428 &self,
429 dbtx: &mut PgTransaction,
430 batch: EventBatch,
431 _ctx: EventBatchContext,
432 ) -> Result<(), anyhow::Error> {
433 _meta::update_with_batch(dbtx, &batch, self.block_time_s).await?;
434 for event in batch.events() {
435 self.index_event(dbtx, event).await?;
436 }
437 Ok(())
438 }
439}