neon_arch commited on
Commit
351f39a
2 Parent(s): d1d2d4e e69126c

Merge pull request #223 from gzsombor/cache-refactor

Browse files

✨ A four caching options for the search engine based on feature based compilation

Cargo.lock CHANGED
@@ -446,6 +446,12 @@ version = "3.14.0"
446
  source = "registry+https://github.com/rust-lang/crates.io-index"
447
  checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
448
 
 
 
 
 
 
 
449
  [[package]]
450
  name = "byteorder"
451
  version = "1.4.3"
@@ -478,6 +484,37 @@ dependencies = [
478
  "bytes 1.5.0",
479
  ]
480
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
481
  [[package]]
482
  name = "cast"
483
  version = "0.3.0"
@@ -973,6 +1010,15 @@ dependencies = [
973
  "libc",
974
  ]
975
 
 
 
 
 
 
 
 
 
 
976
  [[package]]
977
  name = "error-stack"
978
  version = "0.4.1"
@@ -1256,6 +1302,12 @@ version = "0.28.0"
1256
  source = "registry+https://github.com/rust-lang/crates.io-index"
1257
  checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
1258
 
 
 
 
 
 
 
1259
  [[package]]
1260
  name = "governor"
1261
  version = "0.5.1"
@@ -1834,6 +1886,21 @@ dependencies = [
1834
  "unicase",
1835
  ]
1836
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1837
  [[package]]
1838
  name = "miniz_oxide"
1839
  version = "0.7.1"
@@ -2383,6 +2450,17 @@ dependencies = [
2383
  "url 2.4.1",
2384
  ]
2385
 
 
 
 
 
 
 
 
 
 
 
 
2386
  [[package]]
2387
  name = "quanta"
2388
  version = "0.9.3"
@@ -2900,6 +2978,9 @@ name = "semver"
2900
  version = "1.0.18"
2901
  source = "registry+https://github.com/rust-lang/crates.io-index"
2902
  checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918"
 
 
 
2903
 
2904
  [[package]]
2905
  name = "semver-parser"
@@ -3020,6 +3101,21 @@ version = "0.3.11"
3020
  source = "registry+https://github.com/rust-lang/crates.io-index"
3021
  checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d"
3022
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3023
  [[package]]
3024
  name = "slab"
3025
  version = "0.4.9"
@@ -3197,6 +3293,12 @@ dependencies = [
3197
  "libc",
3198
  ]
3199
 
 
 
 
 
 
 
3200
  [[package]]
3201
  name = "tempfile"
3202
  version = "3.8.0"
@@ -3553,6 +3655,12 @@ dependencies = [
3553
  "once_cell",
3554
  ]
3555
 
 
 
 
 
 
 
3556
  [[package]]
3557
  name = "try-lock"
3558
  version = "0.2.4"
@@ -3815,6 +3923,7 @@ dependencies = [
3815
  "log",
3816
  "md5",
3817
  "mimalloc",
 
3818
  "mlua",
3819
  "once_cell",
3820
  "rand 0.8.5",
 
446
  source = "registry+https://github.com/rust-lang/crates.io-index"
447
  checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
448
 
449
+ [[package]]
450
+ name = "bytecount"
451
+ version = "0.6.3"
452
+ source = "registry+https://github.com/rust-lang/crates.io-index"
453
+ checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c"
454
+
455
  [[package]]
456
  name = "byteorder"
457
  version = "1.4.3"
 
484
  "bytes 1.5.0",
485
  ]
486
 
487
+ [[package]]
488
+ name = "camino"
489
+ version = "1.1.6"
490
+ source = "registry+https://github.com/rust-lang/crates.io-index"
491
+ checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c"
492
+ dependencies = [
493
+ "serde",
494
+ ]
495
+
496
+ [[package]]
497
+ name = "cargo-platform"
498
+ version = "0.1.3"
499
+ source = "registry+https://github.com/rust-lang/crates.io-index"
500
+ checksum = "2cfa25e60aea747ec7e1124f238816749faa93759c6ff5b31f1ccdda137f4479"
501
+ dependencies = [
502
+ "serde",
503
+ ]
504
+
505
+ [[package]]
506
+ name = "cargo_metadata"
507
+ version = "0.14.2"
508
+ source = "registry+https://github.com/rust-lang/crates.io-index"
509
+ checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
510
+ dependencies = [
511
+ "camino",
512
+ "cargo-platform",
513
+ "semver 1.0.18",
514
+ "serde",
515
+ "serde_json",
516
+ ]
517
+
518
  [[package]]
519
  name = "cast"
520
  version = "0.3.0"
 
1010
  "libc",
1011
  ]
1012
 
1013
+ [[package]]
1014
+ name = "error-chain"
1015
+ version = "0.12.4"
1016
+ source = "registry+https://github.com/rust-lang/crates.io-index"
1017
+ checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
1018
+ dependencies = [
1019
+ "version_check",
1020
+ ]
1021
+
1022
  [[package]]
1023
  name = "error-stack"
1024
  version = "0.4.1"
 
1302
  source = "registry+https://github.com/rust-lang/crates.io-index"
1303
  checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
1304
 
1305
+ [[package]]
1306
+ name = "glob"
1307
+ version = "0.3.1"
1308
+ source = "registry+https://github.com/rust-lang/crates.io-index"
1309
+ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
1310
+
1311
  [[package]]
1312
  name = "governor"
1313
  version = "0.5.1"
 
1886
  "unicase",
1887
  ]
1888
 
1889
+ [[package]]
1890
+ name = "mini-moka"
1891
+ version = "0.10.2"
1892
+ source = "registry+https://github.com/rust-lang/crates.io-index"
1893
+ checksum = "23e0b72e7c9042467008b10279fc732326bd605459ae03bda88825909dd19b56"
1894
+ dependencies = [
1895
+ "crossbeam-channel",
1896
+ "crossbeam-utils 0.8.16",
1897
+ "dashmap",
1898
+ "skeptic",
1899
+ "smallvec 1.11.0",
1900
+ "tagptr",
1901
+ "triomphe",
1902
+ ]
1903
+
1904
  [[package]]
1905
  name = "miniz_oxide"
1906
  version = "0.7.1"
 
2450
  "url 2.4.1",
2451
  ]
2452
 
2453
+ [[package]]
2454
+ name = "pulldown-cmark"
2455
+ version = "0.9.3"
2456
+ source = "registry+https://github.com/rust-lang/crates.io-index"
2457
+ checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998"
2458
+ dependencies = [
2459
+ "bitflags 1.3.2",
2460
+ "memchr",
2461
+ "unicase",
2462
+ ]
2463
+
2464
  [[package]]
2465
  name = "quanta"
2466
  version = "0.9.3"
 
2978
  version = "1.0.18"
2979
  source = "registry+https://github.com/rust-lang/crates.io-index"
2980
  checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918"
2981
+ dependencies = [
2982
+ "serde",
2983
+ ]
2984
 
2985
  [[package]]
2986
  name = "semver-parser"
 
3101
  source = "registry+https://github.com/rust-lang/crates.io-index"
3102
  checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d"
3103
 
3104
+ [[package]]
3105
+ name = "skeptic"
3106
+ version = "0.13.7"
3107
+ source = "registry+https://github.com/rust-lang/crates.io-index"
3108
+ checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
3109
+ dependencies = [
3110
+ "bytecount",
3111
+ "cargo_metadata",
3112
+ "error-chain",
3113
+ "glob",
3114
+ "pulldown-cmark",
3115
+ "tempfile",
3116
+ "walkdir",
3117
+ ]
3118
+
3119
  [[package]]
3120
  name = "slab"
3121
  version = "0.4.9"
 
3293
  "libc",
3294
  ]
3295
 
3296
+ [[package]]
3297
+ name = "tagptr"
3298
+ version = "0.2.0"
3299
+ source = "registry+https://github.com/rust-lang/crates.io-index"
3300
+ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
3301
+
3302
  [[package]]
3303
  name = "tempfile"
3304
  version = "3.8.0"
 
3655
  "once_cell",
3656
  ]
3657
 
3658
+ [[package]]
3659
+ name = "triomphe"
3660
+ version = "0.1.9"
3661
+ source = "registry+https://github.com/rust-lang/crates.io-index"
3662
+ checksum = "0eee8098afad3fb0c54a9007aab6804558410503ad676d4633f9c2559a00ac0f"
3663
+
3664
  [[package]]
3665
  name = "try-lock"
3666
  version = "0.2.4"
 
3923
  "log",
3924
  "md5",
3925
  "mimalloc",
3926
+ "mini-moka",
3927
  "mlua",
3928
  "once_cell",
3929
  "rand 0.8.5",
Cargo.toml CHANGED
@@ -20,7 +20,7 @@ fake-useragent = {version="0.1.3"}
20
  env_logger = {version="0.10.0"}
21
  log = {version="0.4.20"}
22
  mlua = {version="0.8.10", features=["luajit"]}
23
- redis = {version="0.23.3", features=["tokio-comp","connection-manager"]}
24
  md5 = {version="0.7.0"}
25
  rand={version="0.8.5"}
26
  once_cell = {version="1.18.0"}
@@ -33,6 +33,7 @@ dhat = {version="0.3.2", optional = true}
33
  mimalloc = { version = "0.1.38", default-features = false }
34
  async-once-cell = {version="0.5.3"}
35
  actix-governor = {version="0.4.1"}
 
36
 
37
  [dev-dependencies]
38
  rusty-hook = "^0.11.2"
@@ -66,4 +67,8 @@ rpath = false
66
  strip = "debuginfo"
67
 
68
  [features]
 
69
  dhat-heap = ["dep:dhat"]
 
 
 
 
20
  env_logger = {version="0.10.0"}
21
  log = {version="0.4.20"}
22
  mlua = {version="0.8.10", features=["luajit"]}
23
+ redis = {version="0.23.3", features=["tokio-comp","connection-manager"], optional = true}
24
  md5 = {version="0.7.0"}
25
  rand={version="0.8.5"}
26
  once_cell = {version="1.18.0"}
 
33
  mimalloc = { version = "0.1.38", default-features = false }
34
  async-once-cell = {version="0.5.3"}
35
  actix-governor = {version="0.4.1"}
36
+ mini-moka = { version="0.10", optional = true}
37
 
38
  [dev-dependencies]
39
  rusty-hook = "^0.11.2"
 
67
  strip = "debuginfo"
68
 
69
  [features]
70
+ default = ["memory-cache"]
71
  dhat-heap = ["dep:dhat"]
72
+ memory-cache = ["dep:mini-moka"]
73
+ redis-cache = ["dep:redis"]
74
+ hybrid-cache = ["memory-cache", "redis-cache"]
src/bin/websurfx.rs CHANGED
@@ -5,7 +5,7 @@
5
 
6
  use mimalloc::MiMalloc;
7
  use std::net::TcpListener;
8
- use websurfx::{config::parser::Config, run};
9
 
10
  /// A dhat heap memory profiler
11
  #[cfg(feature = "dhat-heap")]
@@ -31,6 +31,8 @@ async fn main() -> std::io::Result<()> {
31
  // Initialize the parsed config file.
32
  let config = Config::parse(false).unwrap();
33
 
 
 
34
  log::info!(
35
  "started server on port {} and IP {}",
36
  config.port,
@@ -44,5 +46,5 @@ async fn main() -> std::io::Result<()> {
44
 
45
  let listener = TcpListener::bind((config.binding_ip.clone(), config.port))?;
46
 
47
- run(listener, config)?.await
48
  }
 
5
 
6
  use mimalloc::MiMalloc;
7
  use std::net::TcpListener;
8
+ use websurfx::{cache::cacher::Cache, config::parser::Config, run};
9
 
10
  /// A dhat heap memory profiler
11
  #[cfg(feature = "dhat-heap")]
 
31
  // Initialize the parsed config file.
32
  let config = Config::parse(false).unwrap();
33
 
34
+ let cache = Cache::build(&config).await;
35
+
36
  log::info!(
37
  "started server on port {} and IP {}",
38
  config.port,
 
46
 
47
  let listener = TcpListener::bind((config.binding_ip.clone(), config.port))?;
48
 
49
+ run(listener, config, cache)?.await
50
  }
src/cache/cacher.rs CHANGED
@@ -2,107 +2,95 @@
2
  //! from the upstream search engines in a json format.
3
 
4
  use error_stack::Report;
5
- use futures::future::try_join_all;
6
- use md5::compute;
7
- use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError};
 
 
 
 
8
 
9
  use super::error::PoolError;
 
 
10
 
11
- /// A named struct which stores the redis Connection url address to which the client will
12
- /// connect to.
13
  #[derive(Clone)]
14
- pub struct RedisCache {
15
- /// It stores a pool of connections ready to be used.
16
- connection_pool: Vec<ConnectionManager>,
17
- /// It stores the size of the connection pool (in other words the number of
18
- /// connections that should be stored in the pool).
19
- pool_size: u8,
20
- /// It stores the index of which connection is being used at the moment.
21
- current_connection: u8,
 
22
  }
23
 
24
- impl RedisCache {
25
- /// Constructs a new `SearchResult` with the given arguments needed for the struct.
26
- ///
27
- /// # Arguments
28
- ///
29
- /// * `redis_connection_url` - It takes the redis Connection url address.
30
- /// * `pool_size` - It takes the size of the connection pool (in other words the number of
31
- /// connections that should be stored in the pool).
32
- pub async fn new(
33
- redis_connection_url: &str,
34
- pool_size: u8,
35
- ) -> Result<Self, Box<dyn std::error::Error>> {
36
- let client = Client::open(redis_connection_url)?;
37
- let mut tasks: Vec<_> = Vec::new();
38
-
39
- for _ in 0..pool_size {
40
- tasks.push(client.get_tokio_connection_manager());
 
 
 
 
41
  }
 
42
 
43
- let redis_cache = RedisCache {
44
- connection_pool: try_join_all(tasks).await?,
45
- pool_size,
46
- current_connection: Default::default(),
47
- };
48
- Ok(redis_cache)
49
  }
50
 
51
- /// A helper function which computes the hash of the url and formats and returns it as string.
52
- ///
53
- /// # Arguments
54
- ///
55
- /// * `url` - It takes an url as string.
56
- fn hash_url(&self, url: &str) -> String {
57
- format!("{:?}", compute(url))
 
58
  }
59
 
60
- /// A function which fetches the cached json results as json string from the redis server.
61
  ///
62
  /// # Arguments
63
  ///
64
  /// * `url` - It takes an url as a string.
65
- pub async fn cached_json(&mut self, url: &str) -> Result<String, Report<PoolError>> {
66
- self.current_connection = Default::default();
67
- let hashed_url_string: &str = &self.hash_url(url);
68
-
69
- let mut result: Result<String, RedisError> = self.connection_pool
70
- [self.current_connection as usize]
71
- .get(hashed_url_string)
72
- .await;
73
-
74
- // Code to check whether the current connection being used is dropped with connection error
75
- // or not. if it drops with the connection error then the current connection is replaced
76
- // with a new connection from the pool which is then used to run the redis command then
77
- // that connection is also checked whether it is dropped or not if it is not then the
78
- // result is passed as a `Result` or else the same process repeats again and if all of the
79
- // connections in the pool result in connection drop error then a custom pool error is
80
- // returned.
81
- loop {
82
- match result {
83
- Err(error) => match error.is_connection_dropped() {
84
- true => {
85
- self.current_connection += 1;
86
- if self.current_connection == self.pool_size {
87
- return Err(Report::new(
88
- PoolError::PoolExhaustionWithConnectionDropError,
89
- ));
90
- }
91
- result = self.connection_pool[self.current_connection as usize]
92
- .get(hashed_url_string)
93
- .await;
94
- continue;
95
- }
96
- false => return Err(Report::new(PoolError::RedisError(error))),
97
- },
98
- Ok(res) => return Ok(res),
99
  }
 
 
 
 
 
100
  }
101
  }
102
 
103
- /// A function which caches the results by using the hashed `url` as the key and
104
- /// `json results` as the value and stores it in redis server with ttl(time to live)
105
- /// set to 60 seconds.
106
  ///
107
  /// # Arguments
108
  ///
@@ -110,43 +98,54 @@ impl RedisCache {
110
  /// * `url` - It takes the url as a String.
111
  pub async fn cache_results(
112
  &mut self,
113
- json_results: &str,
114
  url: &str,
115
  ) -> Result<(), Report<PoolError>> {
116
- self.current_connection = Default::default();
117
- let hashed_url_string: &str = &self.hash_url(url);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
 
119
- let mut result: Result<(), RedisError> = self.connection_pool
120
- [self.current_connection as usize]
121
- .set_ex(hashed_url_string, json_results, 60)
122
- .await;
 
123
 
124
- // Code to check whether the current connection being used is dropped with connection error
125
- // or not. if it drops with the connection error then the current connection is replaced
126
- // with a new connection from the pool which is then used to run the redis command then
127
- // that connection is also checked whether it is dropped or not if it is not then the
128
- // result is passed as a `Result` or else the same process repeats again and if all of the
129
- // connections in the pool result in connection drop error then a custom pool error is
130
- // returned.
131
- loop {
132
- match result {
133
- Err(error) => match error.is_connection_dropped() {
134
- true => {
135
- self.current_connection += 1;
136
- if self.current_connection == self.pool_size {
137
- return Err(Report::new(
138
- PoolError::PoolExhaustionWithConnectionDropError,
139
- ));
140
- }
141
- result = self.connection_pool[self.current_connection as usize]
142
- .set_ex(hashed_url_string, json_results, 60)
143
- .await;
144
- continue;
145
- }
146
- false => return Err(Report::new(PoolError::RedisError(error))),
147
- },
148
- Ok(_) => return Ok(()),
149
- }
150
  }
151
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
152
  }
 
2
  //! from the upstream search engines in a json format.
3
 
4
  use error_stack::Report;
5
+ #[cfg(feature = "memory-cache")]
6
+ use mini_moka::sync::Cache as MokaCache;
7
+ #[cfg(feature = "memory-cache")]
8
+ use std::time::Duration;
9
+ use tokio::sync::Mutex;
10
+
11
+ use crate::{config::parser::Config, models::aggregation_models::SearchResults};
12
 
13
  use super::error::PoolError;
14
+ #[cfg(feature = "redis-cache")]
15
+ use super::redis_cacher::RedisCache;
16
 
17
+ /// Different implementations for caching, currently it is possible to cache in-memory or in Redis.
 
18
  #[derive(Clone)]
19
+ pub enum Cache {
20
+ /// Caching is disabled
21
+ Disabled,
22
+ #[cfg(feature = "redis-cache")]
23
+ /// Encapsulates the Redis based cache
24
+ Redis(RedisCache),
25
+ #[cfg(feature = "memory-cache")]
26
+ /// Contains the in-memory cache.
27
+ InMemory(MokaCache<String, SearchResults>),
28
  }
29
 
30
+ impl Cache {
31
+ /// Builds the cache from the given configuration.
32
+ pub async fn build(_config: &Config) -> Self {
33
+ #[cfg(feature = "redis-cache")]
34
+ if let Some(url) = &_config.redis_url {
35
+ log::info!("Using Redis running at {} for caching", &url);
36
+ return Cache::new(
37
+ RedisCache::new(url, 5)
38
+ .await
39
+ .expect("Redis cache configured"),
40
+ );
41
+ }
42
+ #[cfg(feature = "memory-cache")]
43
+ {
44
+ log::info!("Using an in-memory cache");
45
+ return Cache::new_in_memory();
46
+ }
47
+ #[cfg(not(feature = "memory-cache"))]
48
+ {
49
+ log::info!("Caching is disabled");
50
+ Cache::Disabled
51
  }
52
+ }
53
 
54
+ /// Creates a new cache, which wraps the given RedisCache.
55
+ #[cfg(feature = "redis-cache")]
56
+ pub fn new(redis_cache: RedisCache) -> Self {
57
+ Cache::Redis(redis_cache)
 
 
58
  }
59
 
60
+ /// Creates an in-memory cache
61
+ #[cfg(feature = "memory-cache")]
62
+ pub fn new_in_memory() -> Self {
63
+ let cache = MokaCache::builder()
64
+ .max_capacity(1000)
65
+ .time_to_live(Duration::from_secs(60))
66
+ .build();
67
+ Cache::InMemory(cache)
68
  }
69
 
70
+ /// A function which fetches the cached json results as json string.
71
  ///
72
  /// # Arguments
73
  ///
74
  /// * `url` - It takes an url as a string.
75
+ pub async fn cached_json(&mut self, url: &str) -> Result<SearchResults, Report<PoolError>> {
76
+ match self {
77
+ Cache::Disabled => Err(Report::new(PoolError::MissingValue)),
78
+ #[cfg(feature = "redis-cache")]
79
+ Cache::Redis(redis_cache) => {
80
+ let json = redis_cache.cached_json(url).await?;
81
+ Ok(serde_json::from_str::<SearchResults>(&json)
82
+ .map_err(|_| PoolError::SerializationError)?)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
  }
84
+ #[cfg(feature = "memory-cache")]
85
+ Cache::InMemory(in_memory) => match in_memory.get(&url.to_string()) {
86
+ Some(res) => Ok(res),
87
+ None => Err(Report::new(PoolError::MissingValue)),
88
+ },
89
  }
90
  }
91
 
92
+ /// A function which caches the results by using the `url` as the key and
93
+ /// `json results` as the value and stores it in the cache
 
94
  ///
95
  /// # Arguments
96
  ///
 
98
  /// * `url` - It takes the url as a String.
99
  pub async fn cache_results(
100
  &mut self,
101
+ search_results: &SearchResults,
102
  url: &str,
103
  ) -> Result<(), Report<PoolError>> {
104
+ match self {
105
+ Cache::Disabled => Ok(()),
106
+ #[cfg(feature = "redis-cache")]
107
+ Cache::Redis(redis_cache) => {
108
+ let json = serde_json::to_string(search_results)
109
+ .map_err(|_| PoolError::SerializationError)?;
110
+ redis_cache.cache_results(&json, url).await
111
+ }
112
+ #[cfg(feature = "memory-cache")]
113
+ Cache::InMemory(cache) => {
114
+ cache.insert(url.to_string(), search_results.clone());
115
+ Ok(())
116
+ }
117
+ }
118
+ }
119
+ }
120
 
121
+ /// A structure to efficiently share the cache between threads - as it is protected by a Mutex.
122
+ pub struct SharedCache {
123
+ /// The internal cache protected from concurrent access by a mutex
124
+ cache: Mutex<Cache>,
125
+ }
126
 
127
+ impl SharedCache {
128
+ /// Creates a new SharedCache from a Cache implementation
129
+ pub fn new(cache: Cache) -> Self {
130
+ Self {
131
+ cache: Mutex::new(cache),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  }
133
  }
134
+
135
+ /// A function which retrieves the cached SearchResulsts from the internal cache.
136
+ pub async fn cached_json(&self, url: &str) -> Result<SearchResults, Report<PoolError>> {
137
+ let mut mut_cache = self.cache.lock().await;
138
+ mut_cache.cached_json(url).await
139
+ }
140
+
141
+ /// A function which caches the results by using the `url` as the key and
142
+ /// `SearchResults` as the value.
143
+ pub async fn cache_results(
144
+ &self,
145
+ search_results: &SearchResults,
146
+ url: &str,
147
+ ) -> Result<(), Report<PoolError>> {
148
+ let mut mut_cache = self.cache.lock().await;
149
+ mut_cache.cache_results(search_results, url).await
150
+ }
151
  }
src/cache/error.rs CHANGED
@@ -2,21 +2,28 @@
2
  //! the redis server using an async connection pool.
3
  use std::fmt;
4
 
 
5
  use redis::RedisError;
6
 
7
  /// A custom error type used for handling redis async pool associated errors.
8
  #[derive(Debug)]
9
  pub enum PoolError {
10
  /// This variant handles all errors related to `RedisError`,
 
11
  RedisError(RedisError),
12
  /// This variant handles the errors which occurs when all the connections
13
  /// in the connection pool return a connection dropped redis error.
14
  PoolExhaustionWithConnectionDropError,
 
 
 
 
15
  }
16
 
17
  impl fmt::Display for PoolError {
18
  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19
  match self {
 
20
  PoolError::RedisError(redis_error) => {
21
  if let Some(detail) = redis_error.detail() {
22
  write!(f, "{}", detail)
@@ -30,6 +37,12 @@ impl fmt::Display for PoolError {
30
  "Error all connections from the pool dropped with connection error"
31
  )
32
  }
 
 
 
 
 
 
33
  }
34
  }
35
  }
 
2
  //! the redis server using an async connection pool.
3
  use std::fmt;
4
 
5
+ #[cfg(feature = "redis-cache")]
6
  use redis::RedisError;
7
 
8
  /// A custom error type used for handling redis async pool associated errors.
9
  #[derive(Debug)]
10
  pub enum PoolError {
11
  /// This variant handles all errors related to `RedisError`,
12
+ #[cfg(feature = "redis-cache")]
13
  RedisError(RedisError),
14
  /// This variant handles the errors which occurs when all the connections
15
  /// in the connection pool return a connection dropped redis error.
16
  PoolExhaustionWithConnectionDropError,
17
+ /// Whenever serialization or deserialization fails during communication with the cache.
18
+ SerializationError,
19
+ /// Returned when the value is missing.
20
+ MissingValue,
21
  }
22
 
23
  impl fmt::Display for PoolError {
24
  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25
  match self {
26
+ #[cfg(feature = "redis-cache")]
27
  PoolError::RedisError(redis_error) => {
28
  if let Some(detail) = redis_error.detail() {
29
  write!(f, "{}", detail)
 
37
  "Error all connections from the pool dropped with connection error"
38
  )
39
  }
40
+ PoolError::MissingValue => {
41
+ write!(f, "The value is missing from the cache")
42
+ }
43
+ PoolError::SerializationError => {
44
+ write!(f, "Unable to serialize, deserialize from the cache")
45
+ }
46
  }
47
  }
48
  }
src/cache/mod.rs CHANGED
@@ -3,3 +3,5 @@
3
 
4
  pub mod cacher;
5
  pub mod error;
 
 
 
3
 
4
  pub mod cacher;
5
  pub mod error;
6
+ #[cfg(feature = "redis-cache")]
7
+ pub mod redis_cacher;
src/cache/redis_cacher.rs ADDED
@@ -0,0 +1,152 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ //! This module provides the functionality to cache the aggregated results fetched and aggregated
2
+ //! from the upstream search engines in a json format.
3
+
4
+ use error_stack::Report;
5
+ use futures::future::try_join_all;
6
+ use md5::compute;
7
+ use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisError};
8
+
9
+ use super::error::PoolError;
10
+
11
+ /// A named struct which stores the redis Connection url address to which the client will
12
+ /// connect to.
13
+ #[derive(Clone)]
14
+ pub struct RedisCache {
15
+ /// It stores a pool of connections ready to be used.
16
+ connection_pool: Vec<ConnectionManager>,
17
+ /// It stores the size of the connection pool (in other words the number of
18
+ /// connections that should be stored in the pool).
19
+ pool_size: u8,
20
+ /// It stores the index of which connection is being used at the moment.
21
+ current_connection: u8,
22
+ }
23
+
24
+ impl RedisCache {
25
+ /// A function which fetches the cached json results as json string.
26
+ ///
27
+ /// # Arguments
28
+ ///
29
+ /// * `redis_connection_url` - It takes the redis Connection url address.
30
+ /// * `pool_size` - It takes the size of the connection pool (in other words the number of
31
+ /// connections that should be stored in the pool).
32
+ pub async fn new(
33
+ redis_connection_url: &str,
34
+ pool_size: u8,
35
+ ) -> Result<Self, Box<dyn std::error::Error>> {
36
+ let client = Client::open(redis_connection_url)?;
37
+ let mut tasks: Vec<_> = Vec::new();
38
+
39
+ for _ in 0..pool_size {
40
+ tasks.push(client.get_tokio_connection_manager());
41
+ }
42
+
43
+ let redis_cache = RedisCache {
44
+ connection_pool: try_join_all(tasks).await?,
45
+ pool_size,
46
+ current_connection: Default::default(),
47
+ };
48
+ Ok(redis_cache)
49
+ }
50
+
51
+ /// A helper function which computes the hash of the url and formats and returns it as string.
52
+ ///
53
+ /// # Arguments
54
+ ///
55
+ /// * `url` - It takes an url as string.
56
+ fn hash_url(&self, url: &str) -> String {
57
+ format!("{:?}", compute(url))
58
+ }
59
+
60
+ /// A function which fetches the cached json results as json string from the redis server.
61
+ ///
62
+ /// # Arguments
63
+ ///
64
+ /// * `url` - It takes an url as a string.
65
+ pub async fn cached_json(&mut self, url: &str) -> Result<String, Report<PoolError>> {
66
+ self.current_connection = Default::default();
67
+ let hashed_url_string: &str = &self.hash_url(url);
68
+
69
+ let mut result: Result<String, RedisError> = self.connection_pool
70
+ [self.current_connection as usize]
71
+ .get(hashed_url_string)
72
+ .await;
73
+
74
+ // Code to check whether the current connection being used is dropped with connection error
75
+ // or not. if it drops with the connection error then the current connection is replaced
76
+ // with a new connection from the pool which is then used to run the redis command then
77
+ // that connection is also checked whether it is dropped or not if it is not then the
78
+ // result is passed as a `Result` or else the same process repeats again and if all of the
79
+ // connections in the pool result in connection drop error then a custom pool error is
80
+ // returned.
81
+ loop {
82
+ match result {
83
+ Err(error) => match error.is_connection_dropped() {
84
+ true => {
85
+ self.current_connection += 1;
86
+ if self.current_connection == self.pool_size {
87
+ return Err(Report::new(
88
+ PoolError::PoolExhaustionWithConnectionDropError,
89
+ ));
90
+ }
91
+ result = self.connection_pool[self.current_connection as usize]
92
+ .get(hashed_url_string)
93
+ .await;
94
+ continue;
95
+ }
96
+ false => return Err(Report::new(PoolError::RedisError(error))),
97
+ },
98
+ Ok(res) => return Ok(res),
99
+ }
100
+ }
101
+ }
102
+
103
+ /// A function which caches the results by using the hashed `url` as the key and
104
+ /// `json results` as the value and stores it in redis server with ttl(time to live)
105
+ /// set to 60 seconds.
106
+ ///
107
+ /// # Arguments
108
+ ///
109
+ /// * `json_results` - It takes the json results string as an argument.
110
+ /// * `url` - It takes the url as a String.
111
+ pub async fn cache_results(
112
+ &mut self,
113
+ json_results: &str,
114
+ url: &str,
115
+ ) -> Result<(), Report<PoolError>> {
116
+ self.current_connection = Default::default();
117
+ let hashed_url_string: &str = &self.hash_url(url);
118
+
119
+ let mut result: Result<(), RedisError> = self.connection_pool
120
+ [self.current_connection as usize]
121
+ .set_ex(hashed_url_string, json_results, 60)
122
+ .await;
123
+
124
+ // Code to check whether the current connection being used is dropped with connection error
125
+ // or not. if it drops with the connection error then the current connection is replaced
126
+ // with a new connection from the pool which is then used to run the redis command then
127
+ // that connection is also checked whether it is dropped or not if it is not then the
128
+ // result is passed as a `Result` or else the same process repeats again and if all of the
129
+ // connections in the pool result in connection drop error then a custom pool error is
130
+ // returned.
131
+ loop {
132
+ match result {
133
+ Err(error) => match error.is_connection_dropped() {
134
+ true => {
135
+ self.current_connection += 1;
136
+ if self.current_connection == self.pool_size {
137
+ return Err(Report::new(
138
+ PoolError::PoolExhaustionWithConnectionDropError,
139
+ ));
140
+ }
141
+ result = self.connection_pool[self.current_connection as usize]
142
+ .set_ex(hashed_url_string, json_results, 60)
143
+ .await;
144
+ continue;
145
+ }
146
+ false => return Err(Report::new(PoolError::RedisError(error))),
147
+ },
148
+ Ok(_) => return Ok(()),
149
+ }
150
+ }
151
+ }
152
+ }
src/config/parser.rs CHANGED
@@ -19,7 +19,7 @@ pub struct Config {
19
  pub style: Style,
20
  /// It stores the redis connection url address on which the redis
21
  /// client should connect.
22
- pub redis_url: String,
23
  /// It stores the option to whether enable or disable production use.
24
  pub aggregator: AggregatorConfig,
25
  /// It stores the option to whether enable or disable logs.
@@ -99,7 +99,7 @@ impl Config {
99
  globals.get::<_, String>("theme")?,
100
  globals.get::<_, String>("colorscheme")?,
101
  ),
102
- redis_url: globals.get::<_, String>("redis_url")?,
103
  aggregator: AggregatorConfig {
104
  random_delay: globals.get::<_, bool>("production_use")?,
105
  },
 
19
  pub style: Style,
20
  /// It stores the redis connection url address on which the redis
21
  /// client should connect.
22
+ pub redis_url: Option<String>,
23
  /// It stores the option to whether enable or disable production use.
24
  pub aggregator: AggregatorConfig,
25
  /// It stores the option to whether enable or disable logs.
 
99
  globals.get::<_, String>("theme")?,
100
  globals.get::<_, String>("colorscheme")?,
101
  ),
102
+ redis_url: globals.get::<_, String>("redis_url").ok(),
103
  aggregator: AggregatorConfig {
104
  random_delay: globals.get::<_, bool>("production_use")?,
105
  },
src/lib.rs CHANGED
@@ -21,6 +21,7 @@ use actix_cors::Cors;
21
  use actix_files as fs;
22
  use actix_governor::{Governor, GovernorConfigBuilder};
23
  use actix_web::{dev::Server, http::header, middleware::Logger, web, App, HttpServer};
 
24
  use config::parser::Config;
25
  use handlebars::Handlebars;
26
  use handler::paths::{file_path, FileType};
@@ -39,13 +40,14 @@ use handler::paths::{file_path, FileType};
39
  ///
40
  /// ```rust
41
  /// use std::net::TcpListener;
42
- /// use websurfx::{config::parser::Config, run};
43
  ///
44
  /// let config = Config::parse(true).unwrap();
45
  /// let listener = TcpListener::bind("127.0.0.1:8080").expect("Failed to bind address");
46
- /// let server = run(listener,config).expect("Failed to start server");
 
47
  /// ```
48
- pub fn run(listener: TcpListener, config: Config) -> std::io::Result<Server> {
49
  let mut handlebars: Handlebars<'_> = Handlebars::new();
50
 
51
  let public_folder_path: &str = file_path(FileType::Theme)?;
@@ -58,6 +60,8 @@ pub fn run(listener: TcpListener, config: Config) -> std::io::Result<Server> {
58
 
59
  let cloned_config_threads_opt: u8 = config.threads;
60
 
 
 
61
  let server = HttpServer::new(move || {
62
  let cors: Cors = Cors::default()
63
  .allow_any_origin()
@@ -73,6 +77,7 @@ pub fn run(listener: TcpListener, config: Config) -> std::io::Result<Server> {
73
  .wrap(Logger::default()) // added logging middleware for logging.
74
  .app_data(handlebars_ref.clone())
75
  .app_data(web::Data::new(config.clone()))
 
76
  .wrap(cors)
77
  .wrap(Governor::new(
78
  &GovernorConfigBuilder::default()
 
21
  use actix_files as fs;
22
  use actix_governor::{Governor, GovernorConfigBuilder};
23
  use actix_web::{dev::Server, http::header, middleware::Logger, web, App, HttpServer};
24
+ use cache::cacher::{Cache, SharedCache};
25
  use config::parser::Config;
26
  use handlebars::Handlebars;
27
  use handler::paths::{file_path, FileType};
 
40
  ///
41
  /// ```rust
42
  /// use std::net::TcpListener;
43
+ /// use websurfx::{config::parser::Config, run, cache::cacher::Cache};
44
  ///
45
  /// let config = Config::parse(true).unwrap();
46
  /// let listener = TcpListener::bind("127.0.0.1:8080").expect("Failed to bind address");
47
+ /// let cache = Cache::new_in_memory();
48
+ /// let server = run(listener,config,cache).expect("Failed to start server");
49
  /// ```
50
+ pub fn run(listener: TcpListener, config: Config, cache: Cache) -> std::io::Result<Server> {
51
  let mut handlebars: Handlebars<'_> = Handlebars::new();
52
 
53
  let public_folder_path: &str = file_path(FileType::Theme)?;
 
60
 
61
  let cloned_config_threads_opt: u8 = config.threads;
62
 
63
+ let cache = web::Data::new(SharedCache::new(cache));
64
+
65
  let server = HttpServer::new(move || {
66
  let cors: Cors = Cors::default()
67
  .allow_any_origin()
 
77
  .wrap(Logger::default()) // added logging middleware for logging.
78
  .app_data(handlebars_ref.clone())
79
  .app_data(web::Data::new(config.clone()))
80
+ .app_data(cache.clone())
81
  .wrap(cors)
82
  .wrap(Governor::new(
83
  &GovernorConfigBuilder::default()
src/models/aggregation_models.rs CHANGED
@@ -102,7 +102,7 @@ impl EngineErrorInfo {
102
  /// A named struct to store, serialize, deserialize the all the search results scraped and
103
  /// aggregated from the upstream search engines.
104
  /// `SearchResult` structs.
105
- #[derive(Serialize, Deserialize, Default)]
106
  #[serde(rename_all = "camelCase")]
107
  pub struct SearchResults {
108
  /// Stores the individual serializable `SearchResult` struct into a vector of
 
102
  /// A named struct to store, serialize, deserialize the all the search results scraped and
103
  /// aggregated from the upstream search engines.
104
  /// `SearchResult` structs.
105
+ #[derive(Serialize, Deserialize, Default, Clone)]
106
  #[serde(rename_all = "camelCase")]
107
  pub struct SearchResults {
108
  /// Stores the individual serializable `SearchResult` struct into a vector of
src/server/routes/search.rs CHANGED
@@ -1,7 +1,7 @@
1
  //! This module handles the search route of the search engine website.
2
 
3
  use crate::{
4
- cache::cacher::RedisCache,
5
  config::parser::Config,
6
  handler::paths::{file_path, FileType},
7
  models::{aggregation_models::SearchResults, engine_models::EngineHandler},
@@ -17,10 +17,6 @@ use std::{
17
  };
18
  use tokio::join;
19
 
20
- // ---- Constants ----
21
- /// Initialize redis cache connection once and store it on the heap.
22
- static REDIS_CACHE: async_once_cell::OnceCell<RedisCache> = async_once_cell::OnceCell::new();
23
-
24
  /// A named struct which deserializes all the user provided search parameters and stores them.
25
  #[derive(Deserialize)]
26
  pub struct SearchParams {
@@ -89,6 +85,7 @@ pub async fn search(
89
  hbs: web::Data<Handlebars<'_>>,
90
  req: HttpRequest,
91
  config: web::Data<Config>,
 
92
  ) -> Result<HttpResponse, Box<dyn std::error::Error>> {
93
  let params = web::Query::<SearchParams>::from_query(req.query_string())?;
94
  match &params.q {
@@ -125,6 +122,7 @@ pub async fn search(
125
  safe_search
126
  ),
127
  &config,
 
128
  query,
129
  page - 1,
130
  req.clone(),
@@ -136,6 +134,7 @@ pub async fn search(
136
  config.binding_ip, config.port, query, page, safe_search
137
  ),
138
  &config,
 
139
  query,
140
  page,
141
  req.clone(),
@@ -151,6 +150,7 @@ pub async fn search(
151
  safe_search
152
  ),
153
  &config,
 
154
  query,
155
  page + 1,
156
  req.clone(),
@@ -185,26 +185,18 @@ pub async fn search(
185
  async fn results(
186
  url: String,
187
  config: &Config,
 
188
  query: &str,
189
  page: u32,
190
  req: HttpRequest,
191
  safe_search: u8,
192
  ) -> Result<SearchResults, Box<dyn std::error::Error>> {
193
- // Initialize redis cache connection struct
194
- let mut redis_cache: RedisCache = REDIS_CACHE
195
- .get_or_init(async {
196
- // Initialize redis cache connection pool only one and store it in the heap.
197
- RedisCache::new(&config.redis_url, 5).await.unwrap()
198
- })
199
- .await
200
- .clone();
201
  // fetch the cached results json.
202
- let cached_results_json: Result<String, error_stack::Report<crate::cache::error::PoolError>> =
203
- redis_cache.clone().cached_json(&url).await;
204
  // check if fetched cache results was indeed fetched or it was an error and if so
205
  // handle the data accordingly.
206
- match cached_results_json {
207
- Ok(results) => Ok(serde_json::from_str::<SearchResults>(&results)?),
208
  Err(_) => {
209
  if safe_search == 4 {
210
  let mut results: SearchResults = SearchResults::default();
@@ -216,9 +208,7 @@ async fn results(
216
  results.set_disallowed();
217
  results.add_style(&config.style);
218
  results.set_page_query(query);
219
- redis_cache
220
- .cache_results(&serde_json::to_string(&results)?, &url)
221
- .await?;
222
  return Ok(results);
223
  }
224
  }
@@ -266,9 +256,7 @@ async fn results(
266
  results.set_filtered();
267
  }
268
  results.add_style(&config.style);
269
- redis_cache
270
- .cache_results(&serde_json::to_string(&results)?, &url)
271
- .await?;
272
  Ok(results)
273
  }
274
  }
 
1
  //! This module handles the search route of the search engine website.
2
 
3
  use crate::{
4
+ cache::cacher::SharedCache,
5
  config::parser::Config,
6
  handler::paths::{file_path, FileType},
7
  models::{aggregation_models::SearchResults, engine_models::EngineHandler},
 
17
  };
18
  use tokio::join;
19
 
 
 
 
 
20
  /// A named struct which deserializes all the user provided search parameters and stores them.
21
  #[derive(Deserialize)]
22
  pub struct SearchParams {
 
85
  hbs: web::Data<Handlebars<'_>>,
86
  req: HttpRequest,
87
  config: web::Data<Config>,
88
+ cache: web::Data<SharedCache>,
89
  ) -> Result<HttpResponse, Box<dyn std::error::Error>> {
90
  let params = web::Query::<SearchParams>::from_query(req.query_string())?;
91
  match &params.q {
 
122
  safe_search
123
  ),
124
  &config,
125
+ &cache,
126
  query,
127
  page - 1,
128
  req.clone(),
 
134
  config.binding_ip, config.port, query, page, safe_search
135
  ),
136
  &config,
137
+ &cache,
138
  query,
139
  page,
140
  req.clone(),
 
150
  safe_search
151
  ),
152
  &config,
153
+ &cache,
154
  query,
155
  page + 1,
156
  req.clone(),
 
185
  async fn results(
186
  url: String,
187
  config: &Config,
188
+ cache: &web::Data<SharedCache>,
189
  query: &str,
190
  page: u32,
191
  req: HttpRequest,
192
  safe_search: u8,
193
  ) -> Result<SearchResults, Box<dyn std::error::Error>> {
 
 
 
 
 
 
 
 
194
  // fetch the cached results json.
195
+ let cached_results = cache.cached_json(&url).await;
 
196
  // check if fetched cache results was indeed fetched or it was an error and if so
197
  // handle the data accordingly.
198
+ match cached_results {
199
+ Ok(results) => Ok(results),
200
  Err(_) => {
201
  if safe_search == 4 {
202
  let mut results: SearchResults = SearchResults::default();
 
208
  results.set_disallowed();
209
  results.add_style(&config.style);
210
  results.set_page_query(query);
211
+ cache.cache_results(&results, &url).await?;
 
 
212
  return Ok(results);
213
  }
214
  }
 
256
  results.set_filtered();
257
  }
258
  results.add_style(&config.style);
259
+ cache.cache_results(&results, &url).await?;
 
 
260
  Ok(results)
261
  }
262
  }
tests/index.rs CHANGED
@@ -9,7 +9,12 @@ fn spawn_app() -> String {
9
  let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port");
10
  let port = listener.local_addr().unwrap().port();
11
  let config = Config::parse(false).unwrap();
12
- let server = run(listener, config).expect("Failed to bind address");
 
 
 
 
 
13
 
14
  tokio::spawn(server);
15
  format!("http://127.0.0.1:{}/", port)
 
9
  let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port");
10
  let port = listener.local_addr().unwrap().port();
11
  let config = Config::parse(false).unwrap();
12
+ let server = run(
13
+ listener,
14
+ config,
15
+ websurfx::cache::cacher::Cache::new_in_memory(),
16
+ )
17
+ .expect("Failed to bind address");
18
 
19
  tokio::spawn(server);
20
  format!("http://127.0.0.1:{}/", port)