cnidarium/
rpc.rs

1pub struct Server {
2    storage: Storage,
3}
4
5impl Server {
6    pub fn new(storage: Storage) -> Self {
7        Self { storage }
8    }
9}
10use std::pin::Pin;
11
12use crate::proto::v1::{
13    key_value_response::Value as JMTValue, non_verifiable_key_value_response::Value as NVValue,
14    query_service_server::QueryService, watch_response as wr, KeyValueRequest, KeyValueResponse,
15    NonVerifiableKeyValueRequest, NonVerifiableKeyValueResponse, PrefixValueRequest,
16    PrefixValueResponse, WatchRequest, WatchResponse,
17};
18use crate::read::StateRead;
19use futures::{StreamExt, TryStreamExt};
20use regex::Regex;
21use tokio_stream::wrappers::ReceiverStream;
22use tonic::Status;
23use tracing::instrument;
24
25use crate::Storage;
26
27#[tonic::async_trait]
28impl QueryService for Server {
29    #[instrument(skip(self, request))]
30    async fn non_verifiable_key_value(
31        &self,
32        request: tonic::Request<NonVerifiableKeyValueRequest>,
33    ) -> Result<tonic::Response<NonVerifiableKeyValueResponse>, Status> {
34        let state = self.storage.latest_snapshot();
35        let request = request.into_inner();
36
37        if request.key.is_none() || request.key.as_ref().expect("key is Some").inner.is_empty() {
38            return Err(Status::invalid_argument("key is empty"));
39        }
40
41        let key = request.key.expect("key is Some").inner;
42        let some_value = state
43            .nonverifiable_get_raw(&key)
44            .await
45            .map_err(|e| tonic::Status::internal(e.to_string()))?;
46
47        Ok(tonic::Response::new(NonVerifiableKeyValueResponse {
48            value: some_value.map(|value| NVValue { value }),
49        }))
50    }
51
52    #[instrument(skip(self, request))]
53    async fn key_value(
54        &self,
55        request: tonic::Request<KeyValueRequest>,
56    ) -> Result<tonic::Response<KeyValueResponse>, Status> {
57        let state = self.storage.latest_snapshot();
58        // We map the error here to avoid including `tonic` as a dependency
59        // in the `chain` crate, to support its compilation to wasm.
60        let request = request.into_inner();
61        tracing::debug!(?request, "processing key_value request");
62
63        if request.key.is_empty() {
64            return Err(Status::invalid_argument("key is empty"));
65        }
66
67        let (some_value, proof) = {
68            // Don't generate the proof if the request doesn't ask for it.
69            let (v, p) = if request.proof {
70                let (v, p) = state
71                    .get_with_proof(request.key.into_bytes())
72                    .await
73                    .map_err(|e| tonic::Status::internal(e.to_string()))?;
74                (v, Some(p))
75            } else {
76                (
77                    state
78                        .get_raw(&request.key)
79                        .await
80                        .map_err(|e| tonic::Status::internal(e.to_string()))?,
81                    None,
82                )
83            };
84            (v, p)
85        };
86
87        Ok(tonic::Response::new(KeyValueResponse {
88            value: some_value.map(|value| JMTValue { value }),
89            proof: if request.proof {
90                Some(ibc_proto::ibc::core::commitment::v1::MerkleProof {
91                    proofs: proof
92                        .expect("proof should be present")
93                        .proofs
94                        .into_iter()
95                        .map(|p| {
96                            let mut encoded = Vec::new();
97                            prost::Message::encode(&p, &mut encoded).expect("able to encode proof");
98                            prost::Message::decode(&*encoded).expect("able to decode proof")
99                        })
100                        .collect(),
101                })
102            } else {
103                None
104            },
105        }))
106    }
107
108    type PrefixValueStream =
109        Pin<Box<dyn futures::Stream<Item = Result<PrefixValueResponse, tonic::Status>> + Send>>;
110
111    #[instrument(skip(self, request))]
112    async fn prefix_value(
113        &self,
114        request: tonic::Request<PrefixValueRequest>,
115    ) -> Result<tonic::Response<Self::PrefixValueStream>, Status> {
116        let state = self.storage.latest_snapshot();
117        let request = request.into_inner();
118        tracing::debug!(?request);
119
120        if request.prefix.is_empty() {
121            return Err(Status::invalid_argument("prefix is empty"));
122        }
123
124        Ok(tonic::Response::new(
125            state
126                .prefix_raw(&request.prefix)
127                .map_ok(|i: (String, Vec<u8>)| {
128                    let (key, value) = i;
129                    PrefixValueResponse { key, value }
130                })
131                .map_err(|e: anyhow::Error| {
132                    tonic::Status::unavailable(format!(
133                        "error getting prefix value from storage: {e}"
134                    ))
135                })
136                .boxed(),
137        ))
138    }
139
140    type WatchStream = ReceiverStream<Result<WatchResponse, tonic::Status>>;
141
142    #[instrument(skip(self, request))]
143    async fn watch(
144        &self,
145        request: tonic::Request<WatchRequest>,
146    ) -> Result<tonic::Response<Self::WatchStream>, Status> {
147        let request = request.into_inner();
148        tracing::debug!(?request);
149
150        const MAX_REGEX_LEN: usize = 1024;
151
152        let key_regex = match request.key_regex.as_str() {
153            "" => None,
154            _ => Some(
155                regex::RegexBuilder::new(&request.key_regex)
156                    .size_limit(MAX_REGEX_LEN)
157                    .build()
158                    .map_err(|e| Status::invalid_argument(format!("invalid key_regex: {}", e)))?,
159            ),
160        };
161
162        // Use the `bytes` regex to allow matching byte strings.
163        let nv_key_regex = match request.nv_key_regex.as_str() {
164            "" => None,
165            _ => Some(
166                regex::bytes::RegexBuilder::new(&request.nv_key_regex)
167                    .size_limit(MAX_REGEX_LEN)
168                    .unicode(false)
169                    .build()
170                    .map_err(|e| {
171                        Status::invalid_argument(format!("invalid nv_key_regex: {}", e))
172                    })?,
173            ),
174        };
175
176        let (tx, rx) = tokio::sync::mpsc::channel::<Result<WatchResponse, tonic::Status>>(10);
177
178        tokio::spawn(watch_changes(
179            self.storage.clone(),
180            key_regex,
181            nv_key_regex,
182            tx,
183        ));
184
185        Ok(tonic::Response::new(ReceiverStream::new(rx)))
186    }
187}
188
189async fn watch_changes(
190    storage: Storage,
191    key_regex: Option<regex::Regex>,
192    nv_key_regex: Option<regex::bytes::Regex>,
193    tx: tokio::sync::mpsc::Sender<Result<WatchResponse, tonic::Status>>,
194) -> anyhow::Result<()> {
195    let mut changes_rx = storage.subscribe_changes();
196    while !tx.is_closed() {
197        // Wait for a new set of changes, reporting an error if we don't get one.
198        if let Err(e) = changes_rx.changed().await {
199            tx.send(Err(tonic::Status::internal(e.to_string()))).await?;
200        }
201        let (version, changes) = changes_rx.borrow_and_update().clone();
202
203        if key_regex.is_some() || nv_key_regex.is_none() {
204            for (key, value) in changes.unwritten_changes().iter() {
205                if key_regex
206                    .as_ref()
207                    .unwrap_or(&Regex::new(r"").expect("empty regex ok"))
208                    .is_match(key)
209                {
210                    tx.send(Ok(WatchResponse {
211                        version,
212                        entry: Some(wr::Entry::Kv(wr::KeyValue {
213                            key: key.clone(),
214                            value: value.as_ref().cloned().unwrap_or_default(),
215                            deleted: value.is_none(),
216                        })),
217                    }))
218                    .await?;
219                }
220            }
221        }
222
223        if nv_key_regex.is_some() || key_regex.is_none() {
224            for (key, value) in changes.nonverifiable_changes().iter() {
225                if nv_key_regex
226                    .as_ref()
227                    .unwrap_or(&regex::bytes::Regex::new(r"").expect("empty regex ok"))
228                    .is_match(key)
229                {
230                    tx.send(Ok(WatchResponse {
231                        version,
232                        entry: Some(wr::Entry::NvKv(wr::NvKeyValue {
233                            key: key.clone(),
234                            value: value.as_ref().cloned().unwrap_or_default(),
235                            deleted: value.is_none(),
236                        })),
237                    }))
238                    .await?;
239                }
240            }
241        }
242    }
243    return Ok(());
244}