neon_arch commited on
Commit
0781385
1 Parent(s): 897ab08

✨ feat: implement async multithreading and engine selection code

Browse files
src/search_results_handler/aggregator.rs CHANGED
@@ -3,15 +3,20 @@
3
 
4
  use std::{collections::HashMap, time::Duration};
5
 
 
6
  use rand::Rng;
7
- use tokio::join;
8
 
9
  use super::{
10
  aggregation_models::{RawSearchResult, SearchResult, SearchResults},
11
  user_agent::random_user_agent,
12
  };
13
 
14
- use crate::engines::{duckduckgo, searx};
 
 
 
 
15
 
16
  /// A function that aggregates all the scraped results from the above upstream engines and
17
  /// then removes duplicate results and if two results are found to be from two or more engines
@@ -37,10 +42,11 @@ use crate::engines::{duckduckgo, searx};
37
  /// function in either `searx` or `duckduckgo` or both otherwise returns a `SearchResults struct`
38
  /// containing appropriate values.
39
  pub async fn aggregate(
40
- query: &str,
41
  page: u32,
42
  random_delay: bool,
43
  debug: bool,
 
44
  ) -> Result<SearchResults, Box<dyn std::error::Error>> {
45
  let user_agent: String = random_user_agent();
46
  let mut result_map: HashMap<String, RawSearchResult> = HashMap::new();
@@ -53,41 +59,86 @@ pub async fn aggregate(
53
  }
54
 
55
  // fetch results from upstream search engines simultaneously/concurrently.
56
- let (ddg_map_results, searx_map_results) = join!(
57
- duckduckgo::results(query, page, &user_agent),
58
- searx::results(query, page, &user_agent)
59
- );
 
 
 
60
 
61
- let ddg_map_results = ddg_map_results.unwrap_or_else(|e| {
62
- if debug {
63
- log::error!("Error fetching results from DuckDuckGo: {:?}", e);
64
- }
65
- HashMap::new()
66
- });
 
67
 
68
- let searx_map_results = searx_map_results.unwrap_or_else(|e| {
69
- if debug {
70
- log::error!("Error fetching results from Searx: {:?}", e);
71
- }
72
- HashMap::new()
73
- });
74
 
75
- result_map.extend(ddg_map_results);
 
 
76
 
77
- searx_map_results.into_iter().for_each(|(key, value)| {
78
- result_map
79
- .entry(key)
80
- .and_modify(|result| {
81
- result.add_engines(value.clone().engine());
82
- })
83
- .or_insert_with(|| -> RawSearchResult {
84
- RawSearchResult::new(
85
- value.title.clone(),
86
- value.visiting_url.clone(),
87
- value.description.clone(),
88
- value.engine.clone(),
89
- )
90
- });
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  });
92
 
93
  Ok(SearchResults::new(
 
3
 
4
  use std::{collections::HashMap, time::Duration};
5
 
6
+ use error_stack::Report;
7
  use rand::Rng;
8
+ use tokio::task::JoinHandle;
9
 
10
  use super::{
11
  aggregation_models::{RawSearchResult, SearchResult, SearchResults},
12
  user_agent::random_user_agent,
13
  };
14
 
15
+ use crate::engines::{
16
+ duckduckgo,
17
+ engine_models::{EngineError, SearchEngine},
18
+ searx,
19
+ };
20
 
21
  /// A function that aggregates all the scraped results from the above upstream engines and
22
  /// then removes duplicate results and if two results are found to be from two or more engines
 
42
  /// function in either `searx` or `duckduckgo` or both otherwise returns a `SearchResults struct`
43
  /// containing appropriate values.
44
  pub async fn aggregate(
45
+ query: String,
46
  page: u32,
47
  random_delay: bool,
48
  debug: bool,
49
+ upstream_search_engines: Vec<String>,
50
  ) -> Result<SearchResults, Box<dyn std::error::Error>> {
51
  let user_agent: String = random_user_agent();
52
  let mut result_map: HashMap<String, RawSearchResult> = HashMap::new();
 
59
  }
60
 
61
  // fetch results from upstream search engines simultaneously/concurrently.
62
+ let search_engines: Vec<Box<dyn SearchEngine>> = upstream_search_engines
63
+ .iter()
64
+ .map(|engine| match engine.to_lowercase().as_str() {
65
+ "duckduckgo" => Box::new(duckduckgo::DuckDuckGo) as Box<dyn SearchEngine>,
66
+ "searx " => Box::new(searx::Searx) as Box<dyn SearchEngine>,
67
+ })
68
+ .collect();
69
 
70
+ let tasks: Vec<JoinHandle<Result<HashMap<String, RawSearchResult>, Report<EngineError>>>> =
71
+ search_engines
72
+ .iter()
73
+ .map(|search_engine| {
74
+ tokio::spawn(search_engine.results(query.clone(), page, user_agent.clone()))
75
+ })
76
+ .collect();
77
 
78
+ let mut outputs = Vec::with_capacity(search_engines.len());
 
 
 
 
 
79
 
80
+ for task in tasks {
81
+ outputs.push(task.await.ok())
82
+ }
83
 
84
+ let mut initial: bool = true;
85
+ let mut counter: usize = 0;
86
+ outputs.iter().for_each(|results| {
87
+ if initial {
88
+ match results {
89
+ Some(result) => {
90
+ let new_result = result.clone();
91
+ result_map.extend(new_result.as_ref().unwrap().clone());
92
+ counter += 1;
93
+ initial = false
94
+ }
95
+ None => {
96
+ if debug {
97
+ log::error!(
98
+ "Error fetching results from {}",
99
+ upstream_search_engines[counter]
100
+ );
101
+ };
102
+ counter += 1
103
+ }
104
+ }
105
+ } else {
106
+ match results {
107
+ Some(result) => {
108
+ let new_result = result.clone();
109
+ new_result
110
+ .as_ref()
111
+ .unwrap()
112
+ .clone()
113
+ .into_iter()
114
+ .for_each(|(key, value)| {
115
+ result_map
116
+ .entry(key)
117
+ .and_modify(|result| {
118
+ result.add_engines(value.clone().engine());
119
+ })
120
+ .or_insert_with(|| -> RawSearchResult {
121
+ RawSearchResult::new(
122
+ value.title.clone(),
123
+ value.visiting_url.clone(),
124
+ value.description.clone(),
125
+ value.engine.clone(),
126
+ )
127
+ });
128
+ });
129
+ counter += 1
130
+ }
131
+ None => {
132
+ if debug {
133
+ log::error!(
134
+ "Error fetching results from {}",
135
+ upstream_search_engines[counter]
136
+ );
137
+ };
138
+ counter += 1
139
+ }
140
+ }
141
+ }
142
  });
143
 
144
  Ok(SearchResults::new(