File size: 10,324 Bytes
c170de8
 
 
db93c31
1e7805c
996ff84
1e7805c
996ff84
 
db93c31
e69126c
519ebe0
d33129c
1e7805c
76795c4
c170de8
996ff84
1de52de
996ff84
76795c4
 
03384d4
996ff84
 
03384d4
996ff84
519ebe0
03384d4
 
 
c170de8
 
996ff84
03384d4
 
 
 
 
 
 
 
 
e69126c
03384d4
d33129c
 
 
 
 
 
 
 
03384d4
d33129c
 
 
 
 
 
 
 
03384d4
e69126c
76795c4
d33129c
76795c4
03384d4
e69126c
 
 
 
76795c4
 
03384d4
 
 
 
 
 
 
 
 
 
996ff84
 
c170de8
 
03384d4
 
 
 
 
 
 
 
996ff84
 
 
 
 
 
c170de8
 
03384d4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
996ff84
c170de8
 
 
 
03384d4
 
 
 
 
d33129c
996ff84
d33129c
03384d4
519ebe0
03384d4
519ebe0
d33129c
519ebe0
03384d4
 
996ff84
d33129c
996ff84
03384d4
 
 
d33129c
03384d4
 
d33129c
03384d4
 
db93c31
c170de8
 
996ff84
 
c170de8
 
 
 
 
03384d4
 
 
 
 
 
db93c31
3c7edb8
03384d4
 
d33129c
996ff84
76795c4
03384d4
519ebe0
03384d4
d33129c
03384d4
519ebe0
03384d4
996ff84
03384d4
996ff84
 
03384d4
 
 
d33129c
03384d4
 
 
 
 
 
 
 
996ff84
 
 
c170de8
996ff84
 
e69126c
996ff84
 
c170de8
996ff84
03384d4
 
 
 
 
 
 
996ff84
 
 
db93c31
c170de8
996ff84
03384d4
 
 
 
 
 
 
 
 
 
 
d33129c
996ff84
 
 
 
03384d4
519ebe0
03384d4
 
 
 
 
 
 
 
 
 
 
 
996ff84
 
320f5f4
996ff84
d33129c
996ff84
519ebe0
996ff84
c170de8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
//! This module provides the functionality to cache the aggregated results fetched and aggregated
//! from the upstream search engines in a json format.

use error_stack::Report;
#[cfg(feature = "memory-cache")]
use mini_moka::sync::Cache as MokaCache;
#[cfg(feature = "memory-cache")]
use std::time::Duration;
use tokio::sync::Mutex;

use crate::{config::parser::Config, models::aggregation_models::SearchResults};

use super::error::CacheError;
#[cfg(feature = "redis-cache")]
use super::redis_cacher::RedisCache;

/// Different implementations for caching, currently it is possible to cache in-memory or in Redis.
#[derive(Clone)]
pub enum Cache {
    /// Caching is disabled
    Disabled,
    #[cfg(all(feature = "redis-cache", not(feature = "memory-cache")))]
    /// Encapsulates the Redis based cache
    Redis(RedisCache),
    #[cfg(all(feature = "memory-cache", not(feature = "redis-cache")))]
    /// Contains the in-memory cache.
    InMemory(MokaCache<String, SearchResults>),
    #[cfg(all(feature = "redis-cache", feature = "memory-cache"))]
    /// Contains both the in-memory cache and Redis based cache
    Hybrid(RedisCache, MokaCache<String, SearchResults>),
}

impl Cache {
    /// A function that builds the cache from the given configuration.
    ///
    /// # Arguments
    ///
    /// * `config` - It takes the config struct as an argument.
    ///
    /// # Returns
    ///
    /// It returns a newly initialized variant based on the feature enabled by the user.
    pub async fn build(_config: &Config) -> Self {
        #[cfg(all(feature = "redis-cache", feature = "memory-cache"))]
        {
            log::info!("Using a hybrid cache");
            Cache::new_hybrid(
                RedisCache::new(&_config.redis_url, 5)
                    .await
                    .expect("Redis cache configured"),
            )
        }
        #[cfg(all(feature = "redis-cache", not(feature = "memory-cache")))]
        {
            log::info!("Listening redis server on {}", &_config.redis_url);
            Cache::new(
                RedisCache::new(&_config.redis_url, 5)
                    .await
                    .expect("Redis cache configured"),
            )
        }
        #[cfg(all(feature = "memory-cache", not(feature = "redis-cache")))]
        {
            log::info!("Using an in-memory cache");
            Cache::new_in_memory()
        }
        #[cfg(not(any(feature = "memory-cache", feature = "redis-cache")))]
        {
            log::info!("Caching is disabled");
            Cache::Disabled
        }
    }

    /// A function that initializes a new connection pool struct.
    ///
    /// # Arguments
    ///
    /// * `redis_cache` - It takes the newly initialized connection pool struct as an argument.
    ///
    /// # Returns
    ///
    /// It returns a `Redis` variant with the newly initialized connection pool struct.
    #[cfg(all(feature = "redis-cache", not(feature = "memory-cache")))]
    pub fn new(redis_cache: RedisCache) -> Self {
        Cache::Redis(redis_cache)
    }

    /// A function that initializes the `in memory` cache which is used to cache the results in
    /// memory with the search engine thus improving performance by making retrieval and caching of
    /// results faster.
    ///
    /// # Returns
    ///
    /// It returns a `InMemory` variant with the newly initialized in memory cache type.
    #[cfg(all(feature = "memory-cache", not(feature = "redis-cache")))]
    pub fn new_in_memory() -> Self {
        let cache = MokaCache::builder()
            .max_capacity(1000)
            .time_to_live(Duration::from_secs(60))
            .build();
        Cache::InMemory(cache)
    }

    /// A function that initializes both in memory cache and redis client connection for being used
    /// for managing hybrid cache which increases resiliancy of the search engine by allowing the
    /// cache to switch to `in memory` caching if the `redis` cache server is temporarily
    /// unavailable.
    ///
    /// # Arguments
    ///
    /// * `redis_cache` - It takes `redis` client connection struct as an argument.
    ///
    /// # Returns
    ///
    /// It returns a tuple variant `Hybrid` storing both the in-memory cache type and the `redis`
    /// client connection struct.
    #[cfg(all(feature = "redis-cache", feature = "memory-cache"))]
    pub fn new_hybrid(redis_cache: RedisCache) -> Self {
        let cache = MokaCache::builder()
            .max_capacity(1000)
            .time_to_live(Duration::from_secs(60))
            .build();
        Cache::Hybrid(redis_cache, cache)
    }

    /// A function which fetches the cached json results as json string.
    ///
    /// # Arguments
    ///
    /// * `url` - It takes an url as a string.
    ///
    /// # Error
    ///
    /// Returns the `SearchResults` from the cache if the program executes normally otherwise
    /// returns a `CacheError` if the results cannot be retrieved from the cache.
    pub async fn cached_json(&mut self, _url: &str) -> Result<SearchResults, Report<CacheError>> {
        match self {
            Cache::Disabled => Err(Report::new(CacheError::MissingValue)),
            #[cfg(all(feature = "redis-cache", not(feature = "memory-cache")))]
            Cache::Redis(redis_cache) => {
                let json = redis_cache.cached_json(_url).await?;
                Ok(serde_json::from_str::<SearchResults>(&json)
                    .map_err(|_| CacheError::SerializationError)?)
            }
            #[cfg(all(feature = "memory-cache", not(feature = "redis-cache")))]
            Cache::InMemory(in_memory) => match in_memory.get(&_url.to_string()) {
                Some(res) => Ok(res),
                None => Err(Report::new(CacheError::MissingValue)),
            },
            #[cfg(all(feature = "redis-cache", feature = "memory-cache"))]
            Cache::Hybrid(redis_cache, in_memory) => match redis_cache.cached_json(_url).await {
                Ok(res) => Ok(serde_json::from_str::<SearchResults>(&res)
                    .map_err(|_| CacheError::SerializationError)?),
                Err(_) => match in_memory.get(&_url.to_string()) {
                    Some(res) => Ok(res),
                    None => Err(Report::new(CacheError::MissingValue)),
                },
            },
        }
    }

    /// A function which caches the results by using the `url` as the key and
    /// `json results` as the value and stores it in the cache
    ///
    /// # Arguments
    ///
    /// * `json_results` - It takes the json results string as an argument.
    /// * `url` - It takes the url as a String.
    ///
    /// # Error
    ///
    /// Returns a unit type if the program caches the given search results without a failure
    /// otherwise it returns a `CacheError` if the search results cannot be cached due to a
    /// failure.
    pub async fn cache_results(
        &mut self,
        _search_results: &SearchResults,
        _url: &str,
    ) -> Result<(), Report<CacheError>> {
        match self {
            Cache::Disabled => Ok(()),
            #[cfg(all(feature = "redis-cache", not(feature = "memory-cache")))]
            Cache::Redis(redis_cache) => {
                let json = serde_json::to_string(_search_results)
                    .map_err(|_| CacheError::SerializationError)?;
                redis_cache.cache_results(&json, _url).await
            }
            #[cfg(all(feature = "memory-cache", not(feature = "redis-cache")))]
            Cache::InMemory(cache) => {
                cache.insert(_url.to_string(), _search_results.clone());
                Ok(())
            }
            #[cfg(all(feature = "memory-cache", feature = "redis-cache"))]
            Cache::Hybrid(redis_cache, cache) => {
                let json = serde_json::to_string(_search_results)
                    .map_err(|_| CacheError::SerializationError)?;
                match redis_cache.cache_results(&json, _url).await {
                    Ok(_) => Ok(()),
                    Err(_) => {
                        cache.insert(_url.to_string(), _search_results.clone());
                        Ok(())
                    }
                }
            }
        }
    }
}

/// A structure to efficiently share the cache between threads - as it is protected by a Mutex.
pub struct SharedCache {
    /// The internal cache protected from concurrent access by a mutex
    cache: Mutex<Cache>,
}

impl SharedCache {
    /// A function that creates a new `SharedCache` from a Cache implementation.
    ///
    /// # Arguments
    ///
    /// * `cache` - It takes the `Cache` enum variant as an argument with the prefered cache type.
    ///
    /// Returns a newly constructed `SharedCache` struct.
    pub fn new(cache: Cache) -> Self {
        Self {
            cache: Mutex::new(cache),
        }
    }

    /// A getter function which retrieves the cached SearchResulsts from the internal cache.
    ///
    /// # Arguments
    ///
    /// * `url` - It takes the search url as an argument which will be used as the key to fetch the
    /// cached results from the cache.
    ///
    /// # Error
    ///
    /// Returns a `SearchResults` struct containing the search results from the cache if nothing
    /// goes wrong otherwise returns a `CacheError`.
    pub async fn cached_json(&self, url: &str) -> Result<SearchResults, Report<CacheError>> {
        let mut mut_cache = self.cache.lock().await;
        mut_cache.cached_json(url).await
    }

    /// A setter function which caches the results by using the `url` as the key and
    /// `SearchResults` as the value.
    ///
    /// # Arguments
    ///
    /// * `search_results` - It takes the `SearchResults` as an argument which are results that
    /// needs to be cached.
    /// * `url` - It takes the search url as an argument which will be used as the key for storing
    /// results in the cache.
    ///
    /// # Error
    ///
    /// Returns an unit type if the results are cached succesfully otherwise returns a `CacheError`
    /// on a failure.
    pub async fn cache_results(
        &self,
        search_results: &SearchResults,
        url: &str,
    ) -> Result<(), Report<CacheError>> {
        let mut mut_cache = self.cache.lock().await;
        mut_cache.cache_results(search_results, url).await
    }
}