alamin655 commited on
Commit
78858b0
2 Parent(s): 705ba81 660f856

Merge pull request #492 from spencerjibz/optimize-caching-code

Browse files
Cargo.lock CHANGED
@@ -4146,7 +4146,7 @@ checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10"
4146
 
4147
  [[package]]
4148
  name = "websurfx"
4149
- version = "1.9.3"
4150
  dependencies = [
4151
  "actix-cors",
4152
  "actix-files",
 
4146
 
4147
  [[package]]
4148
  name = "websurfx"
4149
+ version = "1.9.4"
4150
  dependencies = [
4151
  "actix-cors",
4152
  "actix-files",
Cargo.toml CHANGED
@@ -1,6 +1,6 @@
1
  [package]
2
  name = "websurfx"
3
- version = "1.9.3"
4
  edition = "2021"
5
  description = "An open-source alternative to Searx that provides clean, ad-free, and organic results with incredible speed while keeping privacy and security in mind."
6
  repository = "https://github.com/neon-mmd/websurfx"
 
1
  [package]
2
  name = "websurfx"
3
+ version = "1.9.4"
4
  edition = "2021"
5
  description = "An open-source alternative to Searx that provides clean, ad-free, and organic results with incredible speed while keeping privacy and security in mind."
6
  repository = "https://github.com/neon-mmd/websurfx"
src/cache/cacher.rs CHANGED
@@ -4,6 +4,7 @@
4
  use error_stack::Report;
5
  #[cfg(feature = "memory-cache")]
6
  use mini_moka::sync::Cache as MokaCache;
 
7
 
8
  #[cfg(feature = "memory-cache")]
9
  use std::time::Duration;
@@ -61,8 +62,8 @@ pub trait Cacher: Send + Sync {
61
  /// failure.
62
  async fn cache_results(
63
  &mut self,
64
- search_results: &SearchResults,
65
- url: &str,
66
  ) -> Result<(), Report<CacheError>>;
67
 
68
  /// A helper function which computes the hash of the url and formats and returns it as string.
@@ -332,14 +333,33 @@ impl Cacher for RedisCache {
332
 
333
  async fn cache_results(
334
  &mut self,
335
- search_results: &SearchResults,
336
- url: &str,
337
  ) -> Result<(), Report<CacheError>> {
338
  use base64::Engine;
339
- let bytes = self.pre_process_search_results(search_results)?;
340
- let base64_string = base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes);
341
- let hashed_url_string = self.hash_url(url);
342
- self.cache_json(&base64_string, &hashed_url_string).await
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
343
  }
344
  }
345
  /// TryInto implementation for SearchResults from Vec<u8>
@@ -391,12 +411,16 @@ impl Cacher for InMemoryCache {
391
 
392
  async fn cache_results(
393
  &mut self,
394
- search_results: &SearchResults,
395
- url: &str,
396
  ) -> Result<(), Report<CacheError>> {
397
- let hashed_url_string = self.hash_url(url);
398
- let bytes = self.pre_process_search_results(search_results)?;
399
- self.cache.insert(hashed_url_string, bytes);
 
 
 
 
400
  Ok(())
401
  }
402
  }
@@ -434,11 +458,13 @@ impl Cacher for HybridCache {
434
 
435
  async fn cache_results(
436
  &mut self,
437
- search_results: &SearchResults,
438
- url: &str,
439
  ) -> Result<(), Report<CacheError>> {
440
- self.redis_cache.cache_results(search_results, url).await?;
441
- self.memory_cache.cache_results(search_results, url).await?;
 
 
442
 
443
  Ok(())
444
  }
@@ -460,8 +486,8 @@ impl Cacher for DisabledCache {
460
 
461
  async fn cache_results(
462
  &mut self,
463
- _search_results: &SearchResults,
464
- _url: &str,
465
  ) -> Result<(), Report<CacheError>> {
466
  Ok(())
467
  }
@@ -519,11 +545,11 @@ impl SharedCache {
519
  /// on a failure.
520
  pub async fn cache_results(
521
  &self,
522
- search_results: &SearchResults,
523
- url: &str,
524
  ) -> Result<(), Report<CacheError>> {
525
  let mut mut_cache = self.cache.lock().await;
526
- mut_cache.cache_results(search_results, url).await
527
  }
528
  }
529
 
 
4
  use error_stack::Report;
5
  #[cfg(feature = "memory-cache")]
6
  use mini_moka::sync::Cache as MokaCache;
7
+ use mini_moka::sync::ConcurrentCacheExt;
8
 
9
  #[cfg(feature = "memory-cache")]
10
  use std::time::Duration;
 
62
  /// failure.
63
  async fn cache_results(
64
  &mut self,
65
+ search_results: &[SearchResults],
66
+ urls: &[String],
67
  ) -> Result<(), Report<CacheError>>;
68
 
69
  /// A helper function which computes the hash of the url and formats and returns it as string.
 
333
 
334
  async fn cache_results(
335
  &mut self,
336
+ search_results: &[SearchResults],
337
+ urls: &[String],
338
  ) -> Result<(), Report<CacheError>> {
339
  use base64::Engine;
340
+
341
+ // size of search_results is expected to be equal to size of urls -> key/value pairs for cache;
342
+ let search_results_len = search_results.len();
343
+
344
+ let mut bytes = Vec::with_capacity(search_results_len);
345
+
346
+ for result in search_results {
347
+ let processed = self.pre_process_search_results(result)?;
348
+ bytes.push(processed);
349
+ }
350
+
351
+ let base64_strings = bytes
352
+ .iter()
353
+ .map(|bytes_vec| base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes_vec));
354
+
355
+ let mut hashed_url_strings = Vec::with_capacity(search_results_len);
356
+
357
+ for url in urls {
358
+ let hash = self.hash_url(url);
359
+ hashed_url_strings.push(hash);
360
+ }
361
+ self.cache_json(base64_strings, hashed_url_strings.into_iter())
362
+ .await
363
  }
364
  }
365
  /// TryInto implementation for SearchResults from Vec<u8>
 
411
 
412
  async fn cache_results(
413
  &mut self,
414
+ search_results: &[SearchResults],
415
+ urls: &[String],
416
  ) -> Result<(), Report<CacheError>> {
417
+ for (url, search_result) in urls.iter().zip(search_results.iter()) {
418
+ let hashed_url_string = self.hash_url(url);
419
+ let bytes = self.pre_process_search_results(search_result)?;
420
+ self.cache.insert(hashed_url_string, bytes);
421
+ }
422
+
423
+ self.cache.sync();
424
  Ok(())
425
  }
426
  }
 
458
 
459
  async fn cache_results(
460
  &mut self,
461
+ search_results: &[SearchResults],
462
+ urls: &[String],
463
  ) -> Result<(), Report<CacheError>> {
464
+ self.redis_cache.cache_results(search_results, urls).await?;
465
+ self.memory_cache
466
+ .cache_results(search_results, urls)
467
+ .await?;
468
 
469
  Ok(())
470
  }
 
486
 
487
  async fn cache_results(
488
  &mut self,
489
+ _search_results: &[SearchResults],
490
+ _urls: &[String],
491
  ) -> Result<(), Report<CacheError>> {
492
  Ok(())
493
  }
 
545
  /// on a failure.
546
  pub async fn cache_results(
547
  &self,
548
+ search_results: &[SearchResults],
549
+ urls: &[String],
550
  ) -> Result<(), Report<CacheError>> {
551
  let mut mut_cache = self.cache.lock().await;
552
+ mut_cache.cache_results(search_results, urls).await
553
  }
554
  }
555
 
src/cache/redis_cacher.rs CHANGED
@@ -118,14 +118,18 @@ impl RedisCache {
118
  /// on a failure.
119
  pub async fn cache_json(
120
  &mut self,
121
- json_results: &str,
122
- key: &str,
123
  ) -> Result<(), Report<CacheError>> {
124
  self.current_connection = Default::default();
 
125
 
126
- let mut result: Result<(), RedisError> = self.connection_pool
127
- [self.current_connection as usize]
128
- .set_ex(key, json_results, self.cache_ttl.into())
 
 
 
129
  .await;
130
 
131
  // Code to check whether the current connection being used is dropped with connection error
@@ -145,8 +149,10 @@ impl RedisCache {
145
  CacheError::PoolExhaustionWithConnectionDropError,
146
  ));
147
  }
148
- result = self.connection_pool[self.current_connection as usize]
149
- .set_ex(key, json_results, 60)
 
 
150
  .await;
151
  continue;
152
  }
 
118
  /// on a failure.
119
  pub async fn cache_json(
120
  &mut self,
121
+ json_results: impl Iterator<Item = String>,
122
+ keys: impl Iterator<Item = String>,
123
  ) -> Result<(), Report<CacheError>> {
124
  self.current_connection = Default::default();
125
+ let mut pipeline = redis::Pipeline::with_capacity(3);
126
 
127
+ for (key, json_result) in keys.zip(json_results) {
128
+ pipeline.set_ex(key, json_result, self.cache_ttl.into());
129
+ }
130
+
131
+ let mut result: Result<(), RedisError> = pipeline
132
+ .query_async(&mut self.connection_pool[self.current_connection as usize])
133
  .await;
134
 
135
  // Code to check whether the current connection being used is dropped with connection error
 
149
  CacheError::PoolExhaustionWithConnectionDropError,
150
  ));
151
  }
152
+ result = pipeline
153
+ .query_async(
154
+ &mut self.connection_pool[self.current_connection as usize],
155
+ )
156
  .await;
157
  continue;
158
  }
src/server/routes/search.rs CHANGED
@@ -40,6 +40,7 @@ pub async fn search(
40
  config: web::Data<Config>,
41
  cache: web::Data<SharedCache>,
42
  ) -> Result<HttpResponse, Box<dyn std::error::Error>> {
 
43
  let params = web::Query::<SearchParams>::from_query(req.query_string())?;
44
  match &params.q {
45
  Some(query) => {
@@ -79,12 +80,50 @@ pub async fn search(
79
 
80
  // .max(1) makes sure that the page >= 0.
81
  let page = params.page.unwrap_or(1).max(1) - 1;
 
 
82
 
83
- let (_, results, _) = join!(
84
- get_results(page.saturating_sub(1)),
85
- get_results(page),
86
- get_results(page + 1)
87
- );
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
 
89
  Ok(HttpResponse::Ok().content_type(ContentType::html()).body(
90
  crate::templates::views::search::search(
@@ -92,7 +131,7 @@ pub async fn search(
92
  &config.style.theme,
93
  &config.style.animation,
94
  query,
95
- &results?,
96
  )
97
  .0,
98
  ))
@@ -124,7 +163,7 @@ async fn results(
124
  query: &str,
125
  page: u32,
126
  search_settings: &server_models::Cookie<'_>,
127
- ) -> Result<SearchResults, Box<dyn std::error::Error>> {
128
  // eagerly parse cookie value to evaluate safe search level
129
  let safe_search_level = search_settings.safe_search_level;
130
 
@@ -143,7 +182,7 @@ async fn results(
143
  // check if fetched cache results was indeed fetched or it was an error and if so
144
  // handle the data accordingly.
145
  match cached_results {
146
- Ok(results) => Ok(results),
147
  Err(_) => {
148
  if safe_search_level == 4 {
149
  let mut results: SearchResults = SearchResults::default();
@@ -153,9 +192,11 @@ async fn results(
153
  // Return early when query contains disallowed words,
154
  if flag {
155
  results.set_disallowed();
156
- cache.cache_results(&results, &cache_key).await?;
 
 
157
  results.set_safe_search_level(safe_search_level);
158
- return Ok(results);
159
  }
160
  }
161
 
@@ -173,7 +214,7 @@ async fn results(
173
  &search_settings
174
  .engines
175
  .iter()
176
- .filter_map(|engine| EngineHandler::new(&engine).ok())
177
  .collect::<Vec<EngineHandler>>(),
178
  config.request_timeout,
179
  safe_search_level,
@@ -192,9 +233,11 @@ async fn results(
192
  {
193
  results.set_filtered();
194
  }
195
- cache.cache_results(&results, &cache_key).await?;
 
 
196
  results.set_safe_search_level(safe_search_level);
197
- Ok(results)
198
  }
199
  }
200
  }
 
40
  config: web::Data<Config>,
41
  cache: web::Data<SharedCache>,
42
  ) -> Result<HttpResponse, Box<dyn std::error::Error>> {
43
+ use std::sync::Arc;
44
  let params = web::Query::<SearchParams>::from_query(req.query_string())?;
45
  match &params.q {
46
  Some(query) => {
 
80
 
81
  // .max(1) makes sure that the page >= 0.
82
  let page = params.page.unwrap_or(1).max(1) - 1;
83
+ let previous_page = page.saturating_sub(1);
84
+ let next_page = page + 1;
85
 
86
+ let mut results = Arc::new((SearchResults::default(), String::default()));
87
+ if page != previous_page {
88
+ let (previous_results, current_results, next_results) = join!(
89
+ get_results(previous_page),
90
+ get_results(page),
91
+ get_results(next_page)
92
+ );
93
+ let (parsed_previous_results, parsed_next_results) =
94
+ (previous_results?, next_results?);
95
+
96
+ let (cache_keys, results_list) = (
97
+ [
98
+ parsed_previous_results.1,
99
+ results.1.clone(),
100
+ parsed_next_results.1,
101
+ ],
102
+ [
103
+ parsed_previous_results.0,
104
+ results.0.clone(),
105
+ parsed_next_results.0,
106
+ ],
107
+ );
108
+
109
+ results = Arc::new(current_results?);
110
+
111
+ tokio::spawn(async move { cache.cache_results(&results_list, &cache_keys).await });
112
+ } else {
113
+ let (current_results, next_results) =
114
+ join!(get_results(page), get_results(page + 1));
115
+
116
+ let parsed_next_results = next_results?;
117
+
118
+ results = Arc::new(current_results?);
119
+
120
+ let (cache_keys, results_list) = (
121
+ [results.1.clone(), parsed_next_results.1.clone()],
122
+ [results.0.clone(), parsed_next_results.0],
123
+ );
124
+
125
+ tokio::spawn(async move { cache.cache_results(&results_list, &cache_keys).await });
126
+ }
127
 
128
  Ok(HttpResponse::Ok().content_type(ContentType::html()).body(
129
  crate::templates::views::search::search(
 
131
  &config.style.theme,
132
  &config.style.animation,
133
  query,
134
+ &results.0,
135
  )
136
  .0,
137
  ))
 
163
  query: &str,
164
  page: u32,
165
  search_settings: &server_models::Cookie<'_>,
166
+ ) -> Result<(SearchResults, String), Box<dyn std::error::Error>> {
167
  // eagerly parse cookie value to evaluate safe search level
168
  let safe_search_level = search_settings.safe_search_level;
169
 
 
182
  // check if fetched cache results was indeed fetched or it was an error and if so
183
  // handle the data accordingly.
184
  match cached_results {
185
+ Ok(results) => Ok((results, cache_key)),
186
  Err(_) => {
187
  if safe_search_level == 4 {
188
  let mut results: SearchResults = SearchResults::default();
 
192
  // Return early when query contains disallowed words,
193
  if flag {
194
  results.set_disallowed();
195
+ cache
196
+ .cache_results(&[results.clone()], &[cache_key.clone()])
197
+ .await?;
198
  results.set_safe_search_level(safe_search_level);
199
+ return Ok((results, cache_key));
200
  }
201
  }
202
 
 
214
  &search_settings
215
  .engines
216
  .iter()
217
+ .filter_map(|engine| EngineHandler::new(engine).ok())
218
  .collect::<Vec<EngineHandler>>(),
219
  config.request_timeout,
220
  safe_search_level,
 
233
  {
234
  results.set_filtered();
235
  }
236
+ cache
237
+ .cache_results(&[results.clone()], &[cache_key.clone()])
238
+ .await?;
239
  results.set_safe_search_level(safe_search_level);
240
+ Ok((results, cache_key))
241
  }
242
  }
243
  }