1pub struct Server {
2 storage: Storage,
3}
4
5impl Server {
6 pub fn new(storage: Storage) -> Self {
7 Self { storage }
8 }
9}
10use std::pin::Pin;
11
12use crate::proto::v1::{
13 key_value_response::Value as JMTValue, non_verifiable_key_value_response::Value as NVValue,
14 query_service_server::QueryService, watch_response as wr, KeyValueRequest, KeyValueResponse,
15 NonVerifiableKeyValueRequest, NonVerifiableKeyValueResponse, PrefixValueRequest,
16 PrefixValueResponse, WatchRequest, WatchResponse,
17};
18use crate::read::StateRead;
19use futures::{StreamExt, TryStreamExt};
20use regex::Regex;
21use tokio_stream::wrappers::ReceiverStream;
22use tonic::Status;
23use tracing::instrument;
24
25use crate::Storage;
26
27#[tonic::async_trait]
28impl QueryService for Server {
29 #[instrument(skip(self, request))]
30 async fn non_verifiable_key_value(
31 &self,
32 request: tonic::Request<NonVerifiableKeyValueRequest>,
33 ) -> Result<tonic::Response<NonVerifiableKeyValueResponse>, Status> {
34 let state = self.storage.latest_snapshot();
35 let request = request.into_inner();
36
37 if request.key.is_none() || request.key.as_ref().expect("key is Some").inner.is_empty() {
38 return Err(Status::invalid_argument("key is empty"));
39 }
40
41 let key = request.key.expect("key is Some").inner;
42 let some_value = state
43 .nonverifiable_get_raw(&key)
44 .await
45 .map_err(|e| tonic::Status::internal(e.to_string()))?;
46
47 Ok(tonic::Response::new(NonVerifiableKeyValueResponse {
48 value: some_value.map(|value| NVValue { value }),
49 }))
50 }
51
52 #[instrument(skip(self, request))]
53 async fn key_value(
54 &self,
55 request: tonic::Request<KeyValueRequest>,
56 ) -> Result<tonic::Response<KeyValueResponse>, Status> {
57 let state = self.storage.latest_snapshot();
58 let request = request.into_inner();
61 tracing::debug!(?request, "processing key_value request");
62
63 if request.key.is_empty() {
64 return Err(Status::invalid_argument("key is empty"));
65 }
66
67 let (some_value, proof) = {
68 let (v, p) = if request.proof {
70 let (v, p) = state
71 .get_with_proof(request.key.into_bytes())
72 .await
73 .map_err(|e| tonic::Status::internal(e.to_string()))?;
74 (v, Some(p))
75 } else {
76 (
77 state
78 .get_raw(&request.key)
79 .await
80 .map_err(|e| tonic::Status::internal(e.to_string()))?,
81 None,
82 )
83 };
84 (v, p)
85 };
86
87 Ok(tonic::Response::new(KeyValueResponse {
88 value: some_value.map(|value| JMTValue { value }),
89 proof: if request.proof {
90 Some(ibc_proto::ibc::core::commitment::v1::MerkleProof {
91 proofs: proof
92 .expect("proof should be present")
93 .proofs
94 .into_iter()
95 .map(|p| {
96 let mut encoded = Vec::new();
97 prost::Message::encode(&p, &mut encoded).expect("able to encode proof");
98 prost::Message::decode(&*encoded).expect("able to decode proof")
99 })
100 .collect(),
101 })
102 } else {
103 None
104 },
105 }))
106 }
107
108 type PrefixValueStream =
109 Pin<Box<dyn futures::Stream<Item = Result<PrefixValueResponse, tonic::Status>> + Send>>;
110
111 #[instrument(skip(self, request))]
112 async fn prefix_value(
113 &self,
114 request: tonic::Request<PrefixValueRequest>,
115 ) -> Result<tonic::Response<Self::PrefixValueStream>, Status> {
116 let state = self.storage.latest_snapshot();
117 let request = request.into_inner();
118 tracing::debug!(?request);
119
120 if request.prefix.is_empty() {
121 return Err(Status::invalid_argument("prefix is empty"));
122 }
123
124 Ok(tonic::Response::new(
125 state
126 .prefix_raw(&request.prefix)
127 .map_ok(|i: (String, Vec<u8>)| {
128 let (key, value) = i;
129 PrefixValueResponse { key, value }
130 })
131 .map_err(|e: anyhow::Error| {
132 tonic::Status::unavailable(format!(
133 "error getting prefix value from storage: {e}"
134 ))
135 })
136 .boxed(),
137 ))
138 }
139
140 type WatchStream = ReceiverStream<Result<WatchResponse, tonic::Status>>;
141
142 #[instrument(skip(self, request))]
143 async fn watch(
144 &self,
145 request: tonic::Request<WatchRequest>,
146 ) -> Result<tonic::Response<Self::WatchStream>, Status> {
147 let request = request.into_inner();
148 tracing::debug!(?request);
149
150 const MAX_REGEX_LEN: usize = 1024;
151
152 let key_regex = match request.key_regex.as_str() {
153 "" => None,
154 _ => Some(
155 regex::RegexBuilder::new(&request.key_regex)
156 .size_limit(MAX_REGEX_LEN)
157 .build()
158 .map_err(|e| Status::invalid_argument(format!("invalid key_regex: {}", e)))?,
159 ),
160 };
161
162 let nv_key_regex = match request.nv_key_regex.as_str() {
164 "" => None,
165 _ => Some(
166 regex::bytes::RegexBuilder::new(&request.nv_key_regex)
167 .size_limit(MAX_REGEX_LEN)
168 .unicode(false)
169 .build()
170 .map_err(|e| {
171 Status::invalid_argument(format!("invalid nv_key_regex: {}", e))
172 })?,
173 ),
174 };
175
176 let (tx, rx) = tokio::sync::mpsc::channel::<Result<WatchResponse, tonic::Status>>(10);
177
178 tokio::spawn(watch_changes(
179 self.storage.clone(),
180 key_regex,
181 nv_key_regex,
182 tx,
183 ));
184
185 Ok(tonic::Response::new(ReceiverStream::new(rx)))
186 }
187}
188
189async fn watch_changes(
190 storage: Storage,
191 key_regex: Option<regex::Regex>,
192 nv_key_regex: Option<regex::bytes::Regex>,
193 tx: tokio::sync::mpsc::Sender<Result<WatchResponse, tonic::Status>>,
194) -> anyhow::Result<()> {
195 let mut changes_rx = storage.subscribe_changes();
196 while !tx.is_closed() {
197 if let Err(e) = changes_rx.changed().await {
199 tx.send(Err(tonic::Status::internal(e.to_string()))).await?;
200 }
201 let (version, changes) = changes_rx.borrow_and_update().clone();
202
203 if key_regex.is_some() || nv_key_regex.is_none() {
204 for (key, value) in changes.unwritten_changes().iter() {
205 if key_regex
206 .as_ref()
207 .unwrap_or(&Regex::new(r"").expect("empty regex ok"))
208 .is_match(key)
209 {
210 tx.send(Ok(WatchResponse {
211 version,
212 entry: Some(wr::Entry::Kv(wr::KeyValue {
213 key: key.clone(),
214 value: value.as_ref().cloned().unwrap_or_default(),
215 deleted: value.is_none(),
216 })),
217 }))
218 .await?;
219 }
220 }
221 }
222
223 if nv_key_regex.is_some() || key_regex.is_none() {
224 for (key, value) in changes.nonverifiable_changes().iter() {
225 if nv_key_regex
226 .as_ref()
227 .unwrap_or(®ex::bytes::Regex::new(r"").expect("empty regex ok"))
228 .is_match(key)
229 {
230 tx.send(Ok(WatchResponse {
231 version,
232 entry: Some(wr::Entry::NvKv(wr::NvKeyValue {
233 key: key.clone(),
234 value: value.as_ref().cloned().unwrap_or_default(),
235 deleted: value.is_none(),
236 })),
237 }))
238 .await?;
239 }
240 }
241 }
242 }
243 return Ok(());
244}