Zsombor Gegesy commited on
Commit
996ff84
1 Parent(s): d1d2d4e

Cache refactor - add an in-memory cache, so redis is not needed

Browse files
Cargo.toml CHANGED
@@ -33,6 +33,7 @@ dhat = {version="0.3.2", optional = true}
33
  mimalloc = { version = "0.1.38", default-features = false }
34
  async-once-cell = {version="0.5.3"}
35
  actix-governor = {version="0.4.1"}
 
36
 
37
  [dev-dependencies]
38
  rusty-hook = "^0.11.2"
 
33
  mimalloc = { version = "0.1.38", default-features = false }
34
  async-once-cell = {version="0.5.3"}
35
  actix-governor = {version="0.4.1"}
36
+ mini-moka = "0.10"
37
 
38
  [dev-dependencies]
39
  rusty-hook = "^0.11.2"
src/bin/websurfx.rs CHANGED
@@ -5,7 +5,9 @@
5
 
6
  use mimalloc::MiMalloc;
7
  use std::net::TcpListener;
8
- use websurfx::{config::parser::Config, run};
 
 
9
 
10
  /// A dhat heap memory profiler
11
  #[cfg(feature = "dhat-heap")]
@@ -30,6 +32,14 @@ async fn main() -> std::io::Result<()> {
30
 
31
  // Initialize the parsed config file.
32
  let config = Config::parse(false).unwrap();
 
 
 
 
 
 
 
 
33
 
34
  log::info!(
35
  "started server on port {} and IP {}",
@@ -44,5 +54,5 @@ async fn main() -> std::io::Result<()> {
44
 
45
  let listener = TcpListener::bind((config.binding_ip.clone(), config.port))?;
46
 
47
- run(listener, config)?.await
48
  }
 
5
 
6
  use mimalloc::MiMalloc;
7
  use std::net::TcpListener;
8
+ use websurfx::{
9
+ cache::cacher::Cache, cache::redis_cacher::RedisCache, config::parser::Config, run,
10
+ };
11
 
12
  /// A dhat heap memory profiler
13
  #[cfg(feature = "dhat-heap")]
 
32
 
33
  // Initialize the parsed config file.
34
  let config = Config::parse(false).unwrap();
35
+ let cache = match &config.redis_url {
36
+ Some(url) => Cache::new(
37
+ RedisCache::new(url, 5)
38
+ .await
39
+ .expect("Redis cache configured"),
40
+ ),
41
+ None => Cache::new_in_memory(),
42
+ };
43
 
44
  log::info!(
45
  "started server on port {} and IP {}",
 
54
 
55
  let listener = TcpListener::bind((config.binding_ip.clone(), config.port))?;
56
 
57
+ run(listener, config, cache)?.await
58
  }
src/cache/cacher.rs CHANGED
@@ -2,107 +2,53 @@
2
  //! from the upstream search engines in a json format.
3
 
4
  use error_stack::Report;
5
- use futures::future::try_join_all;
6
- use md5::compute;
7
- use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError};
8
 
9
- use super::error::PoolError;
10
 
11
- /// A named struct which stores the redis Connection url address to which the client will
12
- /// connect to.
13
  #[derive(Clone)]
14
- pub struct RedisCache {
15
- /// It stores a pool of connections ready to be used.
16
- connection_pool: Vec<ConnectionManager>,
17
- /// It stores the size of the connection pool (in other words the number of
18
- /// connections that should be stored in the pool).
19
- pool_size: u8,
20
- /// It stores the index of which connection is being used at the moment.
21
- current_connection: u8,
22
  }
23
 
24
- impl RedisCache {
25
- /// Constructs a new `SearchResult` with the given arguments needed for the struct.
26
- ///
27
- /// # Arguments
28
- ///
29
- /// * `redis_connection_url` - It takes the redis Connection url address.
30
- /// * `pool_size` - It takes the size of the connection pool (in other words the number of
31
- /// connections that should be stored in the pool).
32
- pub async fn new(
33
- redis_connection_url: &str,
34
- pool_size: u8,
35
- ) -> Result<Self, Box<dyn std::error::Error>> {
36
- let client = Client::open(redis_connection_url)?;
37
- let mut tasks: Vec<_> = Vec::new();
38
-
39
- for _ in 0..pool_size {
40
- tasks.push(client.get_tokio_connection_manager());
41
- }
42
-
43
- let redis_cache = RedisCache {
44
- connection_pool: try_join_all(tasks).await?,
45
- pool_size,
46
- current_connection: Default::default(),
47
- };
48
- Ok(redis_cache)
49
  }
50
 
51
- /// A helper function which computes the hash of the url and formats and returns it as string.
52
- ///
53
- /// # Arguments
54
- ///
55
- /// * `url` - It takes an url as string.
56
- fn hash_url(&self, url: &str) -> String {
57
- format!("{:?}", compute(url))
58
  }
59
 
60
- /// A function which fetches the cached json results as json string from the redis server.
61
  ///
62
  /// # Arguments
63
  ///
64
  /// * `url` - It takes an url as a string.
65
  pub async fn cached_json(&mut self, url: &str) -> Result<String, Report<PoolError>> {
66
- self.current_connection = Default::default();
67
- let hashed_url_string: &str = &self.hash_url(url);
68
-
69
- let mut result: Result<String, RedisError> = self.connection_pool
70
- [self.current_connection as usize]
71
- .get(hashed_url_string)
72
- .await;
73
-
74
- // Code to check whether the current connection being used is dropped with connection error
75
- // or not. if it drops with the connection error then the current connection is replaced
76
- // with a new connection from the pool which is then used to run the redis command then
77
- // that connection is also checked whether it is dropped or not if it is not then the
78
- // result is passed as a `Result` or else the same process repeats again and if all of the
79
- // connections in the pool result in connection drop error then a custom pool error is
80
- // returned.
81
- loop {
82
- match result {
83
- Err(error) => match error.is_connection_dropped() {
84
- true => {
85
- self.current_connection += 1;
86
- if self.current_connection == self.pool_size {
87
- return Err(Report::new(
88
- PoolError::PoolExhaustionWithConnectionDropError,
89
- ));
90
- }
91
- result = self.connection_pool[self.current_connection as usize]
92
- .get(hashed_url_string)
93
- .await;
94
- continue;
95
- }
96
- false => return Err(Report::new(PoolError::RedisError(error))),
97
- },
98
- Ok(res) => return Ok(res),
99
- }
100
  }
101
  }
102
 
103
- /// A function which caches the results by using the hashed `url` as the key and
104
- /// `json results` as the value and stores it in redis server with ttl(time to live)
105
- /// set to 60 seconds.
106
  ///
107
  /// # Arguments
108
  ///
@@ -110,43 +56,46 @@ impl RedisCache {
110
  /// * `url` - It takes the url as a String.
111
  pub async fn cache_results(
112
  &mut self,
113
- json_results: &str,
114
  url: &str,
115
  ) -> Result<(), Report<PoolError>> {
116
- self.current_connection = Default::default();
117
- let hashed_url_string: &str = &self.hash_url(url);
 
 
 
 
 
 
 
118
 
119
- let mut result: Result<(), RedisError> = self.connection_pool
120
- [self.current_connection as usize]
121
- .set_ex(hashed_url_string, json_results, 60)
122
- .await;
123
 
124
- // Code to check whether the current connection being used is dropped with connection error
125
- // or not. if it drops with the connection error then the current connection is replaced
126
- // with a new connection from the pool which is then used to run the redis command then
127
- // that connection is also checked whether it is dropped or not if it is not then the
128
- // result is passed as a `Result` or else the same process repeats again and if all of the
129
- // connections in the pool result in connection drop error then a custom pool error is
130
- // returned.
131
- loop {
132
- match result {
133
- Err(error) => match error.is_connection_dropped() {
134
- true => {
135
- self.current_connection += 1;
136
- if self.current_connection == self.pool_size {
137
- return Err(Report::new(
138
- PoolError::PoolExhaustionWithConnectionDropError,
139
- ));
140
- }
141
- result = self.connection_pool[self.current_connection as usize]
142
- .set_ex(hashed_url_string, json_results, 60)
143
- .await;
144
- continue;
145
- }
146
- false => return Err(Report::new(PoolError::RedisError(error))),
147
- },
148
- Ok(_) => return Ok(()),
149
- }
150
  }
151
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
152
  }
 
2
  //! from the upstream search engines in a json format.
3
 
4
  use error_stack::Report;
5
+ use mini_moka::sync::Cache as MokaCache;
6
+ use std::time::Duration;
7
+ use tokio::sync::Mutex;
8
 
9
+ use super::{error::PoolError, redis_cacher::RedisCache};
10
 
11
+ /// Different implementations for caching, currently it is possible to cache in-memory or in Redis.
 
12
  #[derive(Clone)]
13
+ pub enum Cache {
14
+ /// Encapsulates the Redis based cache
15
+ Redis(RedisCache),
16
+ /// Contains the in-memory cache.
17
+ InMemory(MokaCache<String, String>),
 
 
 
18
  }
19
 
20
+ impl Cache {
21
+ /// Creates a new cache, which wraps the given RedisCache.
22
+ pub fn new(redis_cache: RedisCache) -> Self {
23
+ Cache::Redis(redis_cache)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
  }
25
 
26
+ /// Creates an in-memory cache
27
+ pub fn new_in_memory() -> Self {
28
+ let cache = MokaCache::builder()
29
+ .max_capacity(1000)
30
+ .time_to_live(Duration::from_secs(60))
31
+ .build();
32
+ Cache::InMemory(cache)
33
  }
34
 
35
+ /// A function which fetches the cached json results as json string.
36
  ///
37
  /// # Arguments
38
  ///
39
  /// * `url` - It takes an url as a string.
40
  pub async fn cached_json(&mut self, url: &str) -> Result<String, Report<PoolError>> {
41
+ match self {
42
+ Cache::Redis(redis_cache) => redis_cache.cached_json(url).await,
43
+ Cache::InMemory(in_memory) => match in_memory.get(&url.to_string()) {
44
+ Some(res) => Ok(res),
45
+ None => Err(Report::new(PoolError::MissingValue)),
46
+ },
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  }
48
  }
49
 
50
+ /// A function which caches the results by using the `url` as the key and
51
+ /// `json results` as the value and stores it in the cache
 
52
  ///
53
  /// # Arguments
54
  ///
 
56
  /// * `url` - It takes the url as a String.
57
  pub async fn cache_results(
58
  &mut self,
59
+ json_results: String,
60
  url: &str,
61
  ) -> Result<(), Report<PoolError>> {
62
+ match self {
63
+ Cache::Redis(redis_cache) => redis_cache.cache_results(&json_results, url).await,
64
+ Cache::InMemory(cache) => {
65
+ cache.insert(url.to_string(), json_results);
66
+ Ok(())
67
+ }
68
+ }
69
+ }
70
+ }
71
 
72
+ /// A structure to efficiently share the cache between threads - as it is protected by a Mutex.
73
+ pub struct SharedCache {
74
+ cache: Mutex<Cache>,
75
+ }
76
 
77
+ impl SharedCache {
78
+ /// Creates a new SharedCache from a Cache implementation
79
+ pub fn new(cache: Cache) -> Self {
80
+ Self {
81
+ cache: Mutex::new(cache),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
  }
83
  }
84
+
85
+ /// A function which fetches the cached json results as json string.
86
+ pub async fn cached_json(&self, url: &str) -> Result<String, Report<PoolError>> {
87
+ let mut mut_cache = self.cache.lock().await;
88
+ mut_cache.cached_json(url).await
89
+ }
90
+
91
+ /// A function which caches the results by using the `url` as the key and
92
+ /// `json results` as the value and stores it in the cache
93
+ pub async fn cache_results(
94
+ &self,
95
+ json_results: String,
96
+ url: &str,
97
+ ) -> Result<(), Report<PoolError>> {
98
+ let mut mut_cache = self.cache.lock().await;
99
+ mut_cache.cache_results(json_results, url).await
100
+ }
101
  }
src/cache/error.rs CHANGED
@@ -12,6 +12,7 @@ pub enum PoolError {
12
  /// This variant handles the errors which occurs when all the connections
13
  /// in the connection pool return a connection dropped redis error.
14
  PoolExhaustionWithConnectionDropError,
 
15
  }
16
 
17
  impl fmt::Display for PoolError {
@@ -30,6 +31,9 @@ impl fmt::Display for PoolError {
30
  "Error all connections from the pool dropped with connection error"
31
  )
32
  }
 
 
 
33
  }
34
  }
35
  }
 
12
  /// This variant handles the errors which occurs when all the connections
13
  /// in the connection pool return a connection dropped redis error.
14
  PoolExhaustionWithConnectionDropError,
15
+ MissingValue,
16
  }
17
 
18
  impl fmt::Display for PoolError {
 
31
  "Error all connections from the pool dropped with connection error"
32
  )
33
  }
34
+ PoolError::MissingValue => {
35
+ write!(f, "The value is missing from the cache")
36
+ }
37
  }
38
  }
39
  }
src/cache/mod.rs CHANGED
@@ -3,3 +3,4 @@
3
 
4
  pub mod cacher;
5
  pub mod error;
 
 
3
 
4
  pub mod cacher;
5
  pub mod error;
6
+ pub mod redis_cacher;
src/cache/redis_cacher.rs ADDED
@@ -0,0 +1,152 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ //! This module provides the functionality to cache the aggregated results fetched and aggregated
2
+ //! from the upstream search engines in a json format.
3
+
4
+ use error_stack::Report;
5
+ use futures::future::try_join_all;
6
+ use md5::compute;
7
+ use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError};
8
+
9
+ use super::error::PoolError;
10
+
11
+ /// A named struct which stores the redis Connection url address to which the client will
12
+ /// connect to.
13
+ #[derive(Clone)]
14
+ pub struct RedisCache {
15
+ /// It stores a pool of connections ready to be used.
16
+ connection_pool: Vec<ConnectionManager>,
17
+ /// It stores the size of the connection pool (in other words the number of
18
+ /// connections that should be stored in the pool).
19
+ pool_size: u8,
20
+ /// It stores the index of which connection is being used at the moment.
21
+ current_connection: u8,
22
+ }
23
+
24
+ impl RedisCache {
25
+ /// A function which fetches the cached json results as json string.
26
+ ///
27
+ /// # Arguments
28
+ ///
29
+ /// * `redis_connection_url` - It takes the redis Connection url address.
30
+ /// * `pool_size` - It takes the size of the connection pool (in other words the number of
31
+ /// connections that should be stored in the pool).
32
+ pub async fn new(
33
+ redis_connection_url: &str,
34
+ pool_size: u8,
35
+ ) -> Result<Self, Box<dyn std::error::Error>> {
36
+ let client = Client::open(redis_connection_url)?;
37
+ let mut tasks: Vec<_> = Vec::new();
38
+
39
+ for _ in 0..pool_size {
40
+ tasks.push(client.get_tokio_connection_manager());
41
+ }
42
+
43
+ let redis_cache = RedisCache {
44
+ connection_pool: try_join_all(tasks).await?,
45
+ pool_size,
46
+ current_connection: Default::default(),
47
+ };
48
+ Ok(redis_cache)
49
+ }
50
+
51
+ /// A helper function which computes the hash of the url and formats and returns it as string.
52
+ ///
53
+ /// # Arguments
54
+ ///
55
+ /// * `url` - It takes an url as string.
56
+ fn hash_url(&self, url: &str) -> String {
57
+ format!("{:?}", compute(url))
58
+ }
59
+
60
+ /// A function which fetches the cached json results as json string from the redis server.
61
+ ///
62
+ /// # Arguments
63
+ ///
64
+ /// * `url` - It takes an url as a string.
65
+ pub async fn cached_json(&mut self, url: &str) -> Result<String, Report<PoolError>> {
66
+ self.current_connection = Default::default();
67
+ let hashed_url_string: &str = &self.hash_url(url);
68
+
69
+ let mut result: Result<String, RedisError> = self.connection_pool
70
+ [self.current_connection as usize]
71
+ .get(hashed_url_string)
72
+ .await;
73
+
74
+ // Code to check whether the current connection being used is dropped with connection error
75
+ // or not. if it drops with the connection error then the current connection is replaced
76
+ // with a new connection from the pool which is then used to run the redis command then
77
+ // that connection is also checked whether it is dropped or not if it is not then the
78
+ // result is passed as a `Result` or else the same process repeats again and if all of the
79
+ // connections in the pool result in connection drop error then a custom pool error is
80
+ // returned.
81
+ loop {
82
+ match result {
83
+ Err(error) => match error.is_connection_dropped() {
84
+ true => {
85
+ self.current_connection += 1;
86
+ if self.current_connection == self.pool_size {
87
+ return Err(Report::new(
88
+ PoolError::PoolExhaustionWithConnectionDropError,
89
+ ));
90
+ }
91
+ result = self.connection_pool[self.current_connection as usize]
92
+ .get(hashed_url_string)
93
+ .await;
94
+ continue;
95
+ }
96
+ false => return Err(Report::new(PoolError::RedisError(error))),
97
+ },
98
+ Ok(res) => return Ok(res),
99
+ }
100
+ }
101
+ }
102
+
103
+ /// A function which caches the results by using the hashed `url` as the key and
104
+ /// `json results` as the value and stores it in redis server with ttl(time to live)
105
+ /// set to 60 seconds.
106
+ ///
107
+ /// # Arguments
108
+ ///
109
+ /// * `json_results` - It takes the json results string as an argument.
110
+ /// * `url` - It takes the url as a String.
111
+ pub async fn cache_results(
112
+ &mut self,
113
+ json_results: &str,
114
+ url: &str,
115
+ ) -> Result<(), Report<PoolError>> {
116
+ self.current_connection = Default::default();
117
+ let hashed_url_string: &str = &self.hash_url(url);
118
+
119
+ let mut result: Result<(), RedisError> = self.connection_pool
120
+ [self.current_connection as usize]
121
+ .set_ex(hashed_url_string, json_results, 60)
122
+ .await;
123
+
124
+ // Code to check whether the current connection being used is dropped with connection error
125
+ // or not. if it drops with the connection error then the current connection is replaced
126
+ // with a new connection from the pool which is then used to run the redis command then
127
+ // that connection is also checked whether it is dropped or not if it is not then the
128
+ // result is passed as a `Result` or else the same process repeats again and if all of the
129
+ // connections in the pool result in connection drop error then a custom pool error is
130
+ // returned.
131
+ loop {
132
+ match result {
133
+ Err(error) => match error.is_connection_dropped() {
134
+ true => {
135
+ self.current_connection += 1;
136
+ if self.current_connection == self.pool_size {
137
+ return Err(Report::new(
138
+ PoolError::PoolExhaustionWithConnectionDropError,
139
+ ));
140
+ }
141
+ result = self.connection_pool[self.current_connection as usize]
142
+ .set_ex(hashed_url_string, json_results, 60)
143
+ .await;
144
+ continue;
145
+ }
146
+ false => return Err(Report::new(PoolError::RedisError(error))),
147
+ },
148
+ Ok(_) => return Ok(()),
149
+ }
150
+ }
151
+ }
152
+ }
src/config/parser.rs CHANGED
@@ -19,7 +19,7 @@ pub struct Config {
19
  pub style: Style,
20
  /// It stores the redis connection url address on which the redis
21
  /// client should connect.
22
- pub redis_url: String,
23
  /// It stores the option to whether enable or disable production use.
24
  pub aggregator: AggregatorConfig,
25
  /// It stores the option to whether enable or disable logs.
@@ -99,7 +99,7 @@ impl Config {
99
  globals.get::<_, String>("theme")?,
100
  globals.get::<_, String>("colorscheme")?,
101
  ),
102
- redis_url: globals.get::<_, String>("redis_url")?,
103
  aggregator: AggregatorConfig {
104
  random_delay: globals.get::<_, bool>("production_use")?,
105
  },
 
19
  pub style: Style,
20
  /// It stores the redis connection url address on which the redis
21
  /// client should connect.
22
+ pub redis_url: Option<String>,
23
  /// It stores the option to whether enable or disable production use.
24
  pub aggregator: AggregatorConfig,
25
  /// It stores the option to whether enable or disable logs.
 
99
  globals.get::<_, String>("theme")?,
100
  globals.get::<_, String>("colorscheme")?,
101
  ),
102
+ redis_url: globals.get::<_, String>("redis_url").ok(),
103
  aggregator: AggregatorConfig {
104
  random_delay: globals.get::<_, bool>("production_use")?,
105
  },
src/lib.rs CHANGED
@@ -21,6 +21,7 @@ use actix_cors::Cors;
21
  use actix_files as fs;
22
  use actix_governor::{Governor, GovernorConfigBuilder};
23
  use actix_web::{dev::Server, http::header, middleware::Logger, web, App, HttpServer};
 
24
  use config::parser::Config;
25
  use handlebars::Handlebars;
26
  use handler::paths::{file_path, FileType};
@@ -45,7 +46,7 @@ use handler::paths::{file_path, FileType};
45
  /// let listener = TcpListener::bind("127.0.0.1:8080").expect("Failed to bind address");
46
  /// let server = run(listener,config).expect("Failed to start server");
47
  /// ```
48
- pub fn run(listener: TcpListener, config: Config) -> std::io::Result<Server> {
49
  let mut handlebars: Handlebars<'_> = Handlebars::new();
50
 
51
  let public_folder_path: &str = file_path(FileType::Theme)?;
@@ -58,6 +59,8 @@ pub fn run(listener: TcpListener, config: Config) -> std::io::Result<Server> {
58
 
59
  let cloned_config_threads_opt: u8 = config.threads;
60
 
 
 
61
  let server = HttpServer::new(move || {
62
  let cors: Cors = Cors::default()
63
  .allow_any_origin()
@@ -73,6 +76,7 @@ pub fn run(listener: TcpListener, config: Config) -> std::io::Result<Server> {
73
  .wrap(Logger::default()) // added logging middleware for logging.
74
  .app_data(handlebars_ref.clone())
75
  .app_data(web::Data::new(config.clone()))
 
76
  .wrap(cors)
77
  .wrap(Governor::new(
78
  &GovernorConfigBuilder::default()
 
21
  use actix_files as fs;
22
  use actix_governor::{Governor, GovernorConfigBuilder};
23
  use actix_web::{dev::Server, http::header, middleware::Logger, web, App, HttpServer};
24
+ use cache::cacher::{Cache, SharedCache};
25
  use config::parser::Config;
26
  use handlebars::Handlebars;
27
  use handler::paths::{file_path, FileType};
 
46
  /// let listener = TcpListener::bind("127.0.0.1:8080").expect("Failed to bind address");
47
  /// let server = run(listener,config).expect("Failed to start server");
48
  /// ```
49
+ pub fn run(listener: TcpListener, config: Config, cache: Cache) -> std::io::Result<Server> {
50
  let mut handlebars: Handlebars<'_> = Handlebars::new();
51
 
52
  let public_folder_path: &str = file_path(FileType::Theme)?;
 
59
 
60
  let cloned_config_threads_opt: u8 = config.threads;
61
 
62
+ let cache = web::Data::new(SharedCache::new(cache));
63
+
64
  let server = HttpServer::new(move || {
65
  let cors: Cors = Cors::default()
66
  .allow_any_origin()
 
76
  .wrap(Logger::default()) // added logging middleware for logging.
77
  .app_data(handlebars_ref.clone())
78
  .app_data(web::Data::new(config.clone()))
79
+ .app_data(cache.clone())
80
  .wrap(cors)
81
  .wrap(Governor::new(
82
  &GovernorConfigBuilder::default()
src/server/routes/search.rs CHANGED
@@ -1,7 +1,7 @@
1
  //! This module handles the search route of the search engine website.
2
 
3
  use crate::{
4
- cache::cacher::RedisCache,
5
  config::parser::Config,
6
  handler::paths::{file_path, FileType},
7
  models::{aggregation_models::SearchResults, engine_models::EngineHandler},
@@ -17,10 +17,6 @@ use std::{
17
  };
18
  use tokio::join;
19
 
20
- // ---- Constants ----
21
- /// Initialize redis cache connection once and store it on the heap.
22
- static REDIS_CACHE: async_once_cell::OnceCell<RedisCache> = async_once_cell::OnceCell::new();
23
-
24
  /// A named struct which deserializes all the user provided search parameters and stores them.
25
  #[derive(Deserialize)]
26
  pub struct SearchParams {
@@ -89,6 +85,7 @@ pub async fn search(
89
  hbs: web::Data<Handlebars<'_>>,
90
  req: HttpRequest,
91
  config: web::Data<Config>,
 
92
  ) -> Result<HttpResponse, Box<dyn std::error::Error>> {
93
  let params = web::Query::<SearchParams>::from_query(req.query_string())?;
94
  match &params.q {
@@ -125,6 +122,7 @@ pub async fn search(
125
  safe_search
126
  ),
127
  &config,
 
128
  query,
129
  page - 1,
130
  req.clone(),
@@ -136,6 +134,7 @@ pub async fn search(
136
  config.binding_ip, config.port, query, page, safe_search
137
  ),
138
  &config,
 
139
  query,
140
  page,
141
  req.clone(),
@@ -151,6 +150,7 @@ pub async fn search(
151
  safe_search
152
  ),
153
  &config,
 
154
  query,
155
  page + 1,
156
  req.clone(),
@@ -185,27 +185,19 @@ pub async fn search(
185
  async fn results(
186
  url: String,
187
  config: &Config,
 
188
  query: &str,
189
  page: u32,
190
  req: HttpRequest,
191
  safe_search: u8,
192
  ) -> Result<SearchResults, Box<dyn std::error::Error>> {
193
- // Initialize redis cache connection struct
194
- let mut redis_cache: RedisCache = REDIS_CACHE
195
- .get_or_init(async {
196
- // Initialize redis cache connection pool only one and store it in the heap.
197
- RedisCache::new(&config.redis_url, 5).await.unwrap()
198
- })
199
- .await
200
- .clone();
201
  // fetch the cached results json.
202
- let cached_results_json: Result<String, error_stack::Report<crate::cache::error::PoolError>> =
203
- redis_cache.clone().cached_json(&url).await;
204
  // check if fetched cache results was indeed fetched or it was an error and if so
205
  // handle the data accordingly.
206
  match cached_results_json {
207
- Ok(results) => Ok(serde_json::from_str::<SearchResults>(&results)?),
208
- Err(_) => {
209
  if safe_search == 4 {
210
  let mut results: SearchResults = SearchResults::default();
211
  let mut _flag: bool =
@@ -216,8 +208,8 @@ async fn results(
216
  results.set_disallowed();
217
  results.add_style(&config.style);
218
  results.set_page_query(query);
219
- redis_cache
220
- .cache_results(&serde_json::to_string(&results)?, &url)
221
  .await?;
222
  return Ok(results);
223
  }
@@ -266,9 +258,8 @@ async fn results(
266
  results.set_filtered();
267
  }
268
  results.add_style(&config.style);
269
- redis_cache
270
- .cache_results(&serde_json::to_string(&results)?, &url)
271
- .await?;
272
  Ok(results)
273
  }
274
  }
 
1
  //! This module handles the search route of the search engine website.
2
 
3
  use crate::{
4
+ cache::cacher::SharedCache,
5
  config::parser::Config,
6
  handler::paths::{file_path, FileType},
7
  models::{aggregation_models::SearchResults, engine_models::EngineHandler},
 
17
  };
18
  use tokio::join;
19
 
 
 
 
 
20
  /// A named struct which deserializes all the user provided search parameters and stores them.
21
  #[derive(Deserialize)]
22
  pub struct SearchParams {
 
85
  hbs: web::Data<Handlebars<'_>>,
86
  req: HttpRequest,
87
  config: web::Data<Config>,
88
+ cache: web::Data<SharedCache>,
89
  ) -> Result<HttpResponse, Box<dyn std::error::Error>> {
90
  let params = web::Query::<SearchParams>::from_query(req.query_string())?;
91
  match &params.q {
 
122
  safe_search
123
  ),
124
  &config,
125
+ &cache,
126
  query,
127
  page - 1,
128
  req.clone(),
 
134
  config.binding_ip, config.port, query, page, safe_search
135
  ),
136
  &config,
137
+ &cache,
138
  query,
139
  page,
140
  req.clone(),
 
150
  safe_search
151
  ),
152
  &config,
153
+ &cache,
154
  query,
155
  page + 1,
156
  req.clone(),
 
185
  async fn results(
186
  url: String,
187
  config: &Config,
188
+ cache: &web::Data<SharedCache>,
189
  query: &str,
190
  page: u32,
191
  req: HttpRequest,
192
  safe_search: u8,
193
  ) -> Result<SearchResults, Box<dyn std::error::Error>> {
 
 
 
 
 
 
 
 
194
  // fetch the cached results json.
195
+ let cached_results_json = cache.cached_json(&url).await.ok();
 
196
  // check if fetched cache results was indeed fetched or it was an error and if so
197
  // handle the data accordingly.
198
  match cached_results_json {
199
+ Some(results) => Ok(serde_json::from_str::<SearchResults>(&results)?),
200
+ None => {
201
  if safe_search == 4 {
202
  let mut results: SearchResults = SearchResults::default();
203
  let mut _flag: bool =
 
208
  results.set_disallowed();
209
  results.add_style(&config.style);
210
  results.set_page_query(query);
211
+ cache
212
+ .cache_results(serde_json::to_string(&results)?, &url)
213
  .await?;
214
  return Ok(results);
215
  }
 
258
  results.set_filtered();
259
  }
260
  results.add_style(&config.style);
261
+ let json_results = serde_json::to_string(&results)?;
262
+ cache.cache_results(json_results, &url).await?;
 
263
  Ok(results)
264
  }
265
  }
tests/index.rs CHANGED
@@ -9,7 +9,12 @@ fn spawn_app() -> String {
9
  let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port");
10
  let port = listener.local_addr().unwrap().port();
11
  let config = Config::parse(false).unwrap();
12
- let server = run(listener, config).expect("Failed to bind address");
 
 
 
 
 
13
 
14
  tokio::spawn(server);
15
  format!("http://127.0.0.1:{}/", port)
 
9
  let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port");
10
  let port = listener.local_addr().unwrap().port();
11
  let config = Config::parse(false).unwrap();
12
+ let server = run(
13
+ listener,
14
+ config,
15
+ websurfx::cache::cacher::Cache::new_in_memory(),
16
+ )
17
+ .expect("Failed to bind address");
18
 
19
  tokio::spawn(server);
20
  format!("http://127.0.0.1:{}/", port)