Spaces:
Runtime error
Runtime error
#VAR | |
MAX_RESULTS = 10 | |
import asyncio | |
import logging | |
import warnings | |
from typing import Dict, Generator, Optional | |
import nest_asyncio | |
import json | |
import logging | |
import sys | |
from collections import deque | |
from datetime import datetime, timezone | |
from decimal import Decimal | |
from itertools import cycle | |
from typing import AsyncGenerator, Deque, Dict, Optional, Set, Tuple | |
from curl_cffi import requests | |
from docstring_inheritance import GoogleDocstringInheritanceMeta | |
from lxml import html | |
import json | |
import re | |
from html import unescape | |
from typing import Optional | |
from urllib.parse import unquote | |
from dataclasses import dataclass | |
from typing import Dict, Optional | |
from random import randint | |
class DuckDuckGoSearchException(Exception): | |
"""""" | |
class MapsResult: | |
"""Represents a result from the maps search.""" | |
title: Optional[str] = None | |
address: Optional[str] = None | |
country_code: Optional[str] = None | |
latitude: Optional[str] = None | |
longitude: Optional[str] = None | |
url: Optional[str] = None | |
desc: Optional[str] = None | |
phone: Optional[str] = None | |
image: Optional[str] = None | |
source: Optional[str] = None | |
hours: Optional[Dict[str, str]] = None | |
category: Optional[str] = None | |
facebook: Optional[str] = None | |
instagram: Optional[str] = None | |
twitter: Optional[str] = None | |
REGEX_500_IN_URL = re.compile(r"(?:\d{3}-\d{2}\.js)") | |
REGEX_STRIP_TAGS = re.compile("<.*?>") | |
REGEX_VQD = re.compile(rb"""vqd=['"]?([^&"']+)""") | |
def _extract_vqd(html_bytes: bytes, keywords: str) -> Optional[str]: | |
"""Extract vqd from html using a regular expression.""" | |
try: | |
match = REGEX_VQD.search(html_bytes) | |
if match: | |
return match.group(1).decode() | |
except Exception: | |
pass | |
raise DuckDuckGoSearchException( | |
f"_extract_vqd() {keywords=} Could not extract vqd.") | |
def _text_extract_json(html_bytes: bytes, keywords: str) -> Optional[str]: | |
"""text(backend="api") -> extract json from html.""" | |
try: | |
start = html_bytes.index(b"DDG.pageLayout.load('d',") + 24 | |
end = html_bytes.index(b");DDG.duckbar.load(", start) | |
data = html_bytes[start:end] | |
return json.loads(data) | |
except Exception as ex: | |
raise DuckDuckGoSearchException( | |
f"_text_extract_json() {keywords=} {type(ex).__name__}: {ex}") from ex | |
def _is_500_in_url(url: str) -> bool: | |
"""Something like '506-00.js' inside the url.""" | |
return bool(REGEX_500_IN_URL.search(url)) | |
def _normalize(raw_html: str) -> str: | |
"""Strip HTML tags from the raw_html string.""" | |
return unescape(REGEX_STRIP_TAGS.sub("", raw_html)) if raw_html else "" | |
def _normalize_url(url: str) -> str: | |
"""Unquote URL and replace spaces with '+'.""" | |
return unquote(url.replace(" ", "+")) if url else "" | |
logger = logging.getLogger("duckduckgo_search.AsyncDDGS") | |
# Not working on Windows, NotImplementedError (https://curl-cffi.readthedocs.io/en/latest/faq/) | |
if sys.platform.lower().startswith("win"): | |
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | |
class AsyncDDGS(metaclass=GoogleDocstringInheritanceMeta): | |
"""webscout_search async class to get search results from duckduckgo.com.""" | |
def __init__(self, headers=None, proxies=None, timeout=10) -> None: | |
"""Initialize the AsyncDDGS object. | |
Args: | |
headers (dict, optional): Dictionary of headers for the HTTP client. Defaults to None. | |
proxies (Union[dict, str], optional): Proxies for the HTTP client (can be dict or str). Defaults to None. | |
timeout (int, optional): Timeout value for the HTTP client. Defaults to 10. | |
""" | |
useragent = f'{randint(0, 1000000)}' | |
headers = {'User-Agent': useragent} | |
self.proxies = proxies if proxies and isinstance(proxies, dict) else { | |
"http": proxies, | |
"https": proxies | |
} | |
self._asession = requests.AsyncSession(headers=headers, | |
proxies=self.proxies, | |
timeout=timeout, | |
impersonate="chrome") | |
self._asession.headers["Referer"] = "https://duckduckgo.com/" | |
async def __aenter__(self) -> "AsyncDDGS": | |
"""A context manager method that is called when entering the 'with' statement.""" | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: | |
"""Closes the session.""" | |
return self._asession.close() | |
async def _aget_url(self, method: str, url: str, | |
**kwargs) -> Optional[requests.Response]: | |
try: | |
useragent = f'{randint(0, 1000000)}' | |
headers = {'User-Agent': useragent} | |
resp = await self._asession.request(method, | |
url, | |
stream=True, | |
**kwargs, | |
headers=headers) | |
resp.raise_for_status() | |
resp_content = await resp.acontent() | |
logger.debug( | |
f"_aget_url() {url} {resp.status_code} {resp.http_version} {resp.elapsed} {len(resp_content)}" | |
) | |
if _is_500_in_url(str(resp.url)) or resp.status_code == 202: | |
raise DuckDuckGoSearchException("Ratelimit") | |
if resp.status_code == 200: | |
return resp_content | |
except Exception as ex: | |
raise DuckDuckGoSearchException( | |
f"_aget_url() {url} {type(ex).__name__}: {ex}") from ex | |
async def _aget_vqd(self, keywords: str) -> Optional[str]: | |
"""Get vqd value for a search query.""" | |
resp_content = await self._aget_url("POST", | |
"https://duckduckgo.com", | |
data={"q": keywords}) | |
if resp_content: | |
return _extract_vqd(resp_content, keywords) | |
async def text( | |
self, | |
keywords: str, | |
region: str = "wt-wt", | |
safesearch: str = "moderate", | |
timelimit: Optional[str] = None, | |
backend: str = "api", | |
max_results: Optional[int] = None, | |
) -> AsyncGenerator[Dict[str, Optional[str]], None]: | |
"""DuckDuckGo text search generator. Query params: https://duckduckgo.com/params. | |
Args: | |
keywords: keywords for query. | |
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". | |
safesearch: on, moderate, off. Defaults to "moderate". | |
timelimit: d, w, m, y. Defaults to None. | |
backend: api, html, lite. Defaults to api. | |
api - collect data from https://duckduckgo.com, | |
html - collect data from https://html.duckduckgo.com, | |
lite - collect data from https://lite.duckduckgo.com. | |
max_results: max number of results. If None, returns results only from the first response. Defaults to None. | |
Yields: | |
dict with search results. | |
""" | |
if backend == "api": | |
results = self._text_api(keywords, region, safesearch, timelimit, | |
max_results) | |
elif backend == "html": | |
results = self._text_html(keywords, region, safesearch, timelimit, | |
max_results) | |
elif backend == "lite": | |
results = self._text_lite(keywords, region, timelimit, max_results) | |
async for result in results: | |
yield result | |
async def _text_api( | |
self, | |
keywords: str, | |
region: str = "wt-wt", | |
safesearch: str = "moderate", | |
timelimit: Optional[str] = None, | |
max_results: Optional[int] = None, | |
) -> AsyncGenerator[Dict[str, Optional[str]], None]: | |
"""webscout text search generator. Query params: https://duckduckgo.com/params. | |
Args: | |
keywords: keywords for query. | |
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". | |
safesearch: on, moderate, off. Defaults to "moderate". | |
timelimit: d, w, m, y. Defaults to None. | |
max_results: max number of results. If None, returns results only from the first response. Defaults to None. | |
Yields: | |
dict with search results. | |
""" | |
assert keywords, "keywords is mandatory" | |
vqd = await self._aget_vqd(keywords) | |
payload = { | |
"q": keywords, | |
"kl": region, | |
"l": region, | |
"bing_market": region, | |
"s": "0", | |
"df": timelimit, | |
"vqd": vqd, | |
# "o": "json", | |
"sp": "0", | |
} | |
safesearch = safesearch.lower() | |
if safesearch == "moderate": | |
payload["ex"] = "-1" | |
elif safesearch == "off": | |
payload["ex"] = "-2" | |
elif safesearch == "on": # strict | |
payload["p"] = "1" | |
cache = set() | |
for _ in range(11): | |
resp_content = await self._aget_url("GET", | |
"https://links.duckduckgo.com/d.js", | |
params=payload) | |
if resp_content is None: | |
return | |
page_data = _text_extract_json(resp_content, keywords) | |
if page_data is None: | |
return | |
result_exists, next_page_url = False, None | |
for row in page_data: | |
href = row.get("u", None) | |
if href and href not in cache and href != f"http://www.google.com/search?q={keywords}": | |
cache.add(href) | |
body = _normalize(row["a"]) | |
if body: | |
result_exists = True | |
yield { | |
"title": _normalize(row["t"]), | |
"href": _normalize_url(href), | |
"body": body, | |
} | |
if max_results and len(cache) >= max_results: | |
return | |
else: | |
next_page_url = row.get("n", None) | |
if max_results is None or result_exists is False or next_page_url is None: | |
return | |
payload["s"] = next_page_url.split("s=")[1].split("&")[0] | |
async def _text_html( | |
self, | |
keywords: str, | |
region: str = "wt-wt", | |
safesearch: str = "moderate", | |
timelimit: Optional[str] = None, | |
max_results: Optional[int] = None, | |
) -> AsyncGenerator[Dict[str, Optional[str]], None]: | |
"""webscout text search generator. Query params: https://duckduckgo.com/params. | |
Args: | |
keywords: keywords for query. | |
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". | |
safesearch: on, moderate, off. Defaults to "moderate". | |
timelimit: d, w, m, y. Defaults to None. | |
max_results: max number of results. If None, returns results only from the first response. Defaults to None. | |
Yields: | |
dict with search results. | |
""" | |
assert keywords, "keywords is mandatory" | |
self._asession.headers["Referer"] = "https://html.duckduckgo.com/" | |
safesearch_base = {"on": 1, "moderate": -1, "off": -2} | |
payload = { | |
"q": keywords, | |
"s": "0", | |
"kl": region, | |
"p": safesearch_base[safesearch.lower()], | |
"df": timelimit, | |
} | |
cache: Set[str] = set() | |
for _ in range(11): | |
resp_content = await self._aget_url("POST", | |
"https://html.duckduckgo.com/html", | |
data=payload) | |
if resp_content is None: | |
return | |
tree = html.fromstring(resp_content) | |
if tree.xpath('//div[@class="no-results"]/text()'): | |
return | |
result_exists = False | |
for e in tree.xpath('//div[contains(@class, "results_links")]'): | |
href = e.xpath('.//a[contains(@class, "result__a")]/@href') | |
href = href[0] if href else None | |
if (href and href not in cache | |
and href != f"http://www.google.com/search?q={keywords}" | |
and not href.startswith("https://duckduckgo.com/y.js?ad_domain")): | |
cache.add(href) | |
title = e.xpath('.//a[contains(@class, "result__a")]/text()') | |
body = e.xpath('.//a[contains(@class, "result__snippet")]//text()') | |
result_exists = True | |
yield { | |
"title": _normalize(title[0]) if title else None, | |
"href": _normalize_url(href), | |
"body": _normalize("".join(body)) if body else None, | |
} | |
if max_results and len(cache) >= max_results: | |
return | |
if max_results is None or result_exists is False: | |
return | |
next_page = tree.xpath('.//div[@class="nav-link"]') | |
next_page = next_page[-1] if next_page else None | |
if next_page is None: | |
return | |
names = next_page.xpath('.//input[@type="hidden"]/@name') | |
values = next_page.xpath('.//input[@type="hidden"]/@value') | |
payload = {n: v for n, v in zip(names, values)} | |
async def _text_lite( | |
self, | |
keywords: str, | |
region: str = "wt-wt", | |
timelimit: Optional[str] = None, | |
max_results: Optional[int] = None, | |
) -> AsyncGenerator[Dict[str, Optional[str]], None]: | |
"""webscout text search generator. Query params: https://duckduckgo.com/params. | |
Args: | |
keywords: keywords for query. | |
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". | |
timelimit: d, w, m, y. Defaults to None. | |
max_results: max number of results. If None, returns results only from the first response. Defaults to None. | |
Yields: | |
dict with search results. | |
""" | |
assert keywords, "keywords is mandatory" | |
self._asession.headers["Referer"] = "https://lite.duckduckgo.com/" | |
payload = { | |
"q": keywords, | |
"s": "0", | |
"o": "json", | |
"api": "d.js", | |
"kl": region, | |
"df": timelimit, | |
} | |
cache: Set[str] = set() | |
for _ in range(11): | |
resp_content = await self._aget_url("POST", | |
"https://lite.duckduckgo.com/lite/", | |
data=payload) | |
if resp_content is None: | |
return | |
if b"No more results." in resp_content: | |
return | |
tree = html.fromstring(resp_content) | |
result_exists = False | |
data = zip(cycle(range(1, 5)), tree.xpath("//table[last()]//tr")) | |
for i, e in data: | |
if i == 1: | |
href = e.xpath(".//a//@href") | |
href = href[0] if href else None | |
if (href is None or href in cache | |
or href == f"http://www.google.com/search?q={keywords}" | |
or href.startswith("https://duckduckgo.com/y.js?ad_domain")): | |
[next(data, None) for _ in range(3)] # skip block(i=1,2,3,4) | |
else: | |
cache.add(href) | |
title = e.xpath(".//a//text()")[0] | |
elif i == 2: | |
body = e.xpath(".//td[@class='result-snippet']//text()") | |
body = "".join(body).strip() | |
elif i == 3: | |
result_exists = True | |
yield { | |
"title": _normalize(title), | |
"href": _normalize_url(href), | |
"body": _normalize(body), | |
} | |
if max_results and len(cache) >= max_results: | |
return | |
if max_results is None or result_exists is False: | |
return | |
next_page_s = tree.xpath( | |
"//form[./input[contains(@value, 'ext')]]/input[@name='s']/@value") | |
if not next_page_s: | |
return | |
payload["s"] = next_page_s[0] | |
payload["vqd"] = _extract_vqd(resp_content, keywords) | |
async def images( | |
self, | |
keywords: str, | |
region: str = "wt-wt", | |
safesearch: str = "moderate", | |
timelimit: Optional[str] = None, | |
size: Optional[str] = None, | |
color: Optional[str] = None, | |
type_image: Optional[str] = None, | |
layout: Optional[str] = None, | |
license_image: Optional[str] = None, | |
max_results: Optional[int] = None, | |
) -> AsyncGenerator[Dict[str, Optional[str]], None]: | |
"""webscout images search. Query params: https://duckduckgo.com/params. | |
Args: | |
keywords: keywords for query. | |
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". | |
safesearch: on, moderate, off. Defaults to "moderate". | |
timelimit: Day, Week, Month, Year. Defaults to None. | |
size: Small, Medium, Large, Wallpaper. Defaults to None. | |
color: color, Monochrome, Red, Orange, Yellow, Green, Blue, | |
Purple, Pink, Brown, Black, Gray, Teal, White. Defaults to None. | |
type_image: photo, clipart, gif, transparent, line. | |
Defaults to None. | |
layout: Square, Tall, Wide. Defaults to None. | |
license_image: any (All Creative Commons), Public (PublicDomain), | |
Share (Free to Share and Use), ShareCommercially (Free to Share and Use Commercially), | |
Modify (Free to Modify, Share, and Use), ModifyCommercially (Free to Modify, Share, and | |
Use Commercially). Defaults to None. | |
max_results: max number of results. If None, returns results only from the first response. Defaults to None. | |
Yields: | |
dict with image search results. | |
""" | |
assert keywords, "keywords is mandatory" | |
vqd = await self._aget_vqd(keywords) | |
safesearch_base = {"on": 1, "moderate": 1, "off": -1} | |
timelimit = f"time:{timelimit}" if timelimit else "" | |
size = f"size:{size}" if size else "" | |
color = f"color:{color}" if color else "" | |
type_image = f"type:{type_image}" if type_image else "" | |
layout = f"layout:{layout}" if layout else "" | |
license_image = f"license:{license_image}" if license_image else "" | |
payload = { | |
"l": region, | |
"o": "json", | |
"q": keywords, | |
"vqd": vqd, | |
"f": | |
f"{timelimit},{size},{color},{type_image},{layout},{license_image}", | |
"p": safesearch_base[safesearch.lower()], | |
} | |
cache = set() | |
for _ in range(10): | |
resp_content = await self._aget_url("GET", | |
"https://duckduckgo.com/i.js", | |
params=payload) | |
if resp_content is None: | |
return | |
try: | |
resp_json = json.loads(resp_content) | |
except Exception: | |
return | |
page_data = resp_json.get("results", None) | |
if page_data is None: | |
return | |
result_exists = False | |
for row in page_data: | |
image_url = row.get("image", None) | |
if image_url and image_url not in cache: | |
cache.add(image_url) | |
result_exists = True | |
yield { | |
"title": row["title"], | |
"image": _normalize_url(image_url), | |
"thumbnail": _normalize_url(row["thumbnail"]), | |
"url": _normalize_url(row["url"]), | |
"height": row["height"], | |
"width": row["width"], | |
"source": row["source"], | |
} | |
if max_results and len(cache) >= max_results: | |
return | |
if max_results is None or result_exists is False: | |
return | |
next = resp_json.get("next", None) | |
if next is None: | |
return | |
payload["s"] = next.split("s=")[-1].split("&")[0] | |
async def videos( | |
self, | |
keywords: str, | |
region: str = "wt-wt", | |
safesearch: str = "moderate", | |
timelimit: Optional[str] = None, | |
resolution: Optional[str] = None, | |
duration: Optional[str] = None, | |
license_videos: Optional[str] = None, | |
max_results: Optional[int] = None, | |
) -> AsyncGenerator[Dict[str, Optional[str]], None]: | |
"""webscout videos search. Query params: https://duckduckgo.com/params. | |
Args: | |
keywords: keywords for query. | |
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". | |
safesearch: on, moderate, off. Defaults to "moderate". | |
timelimit: d, w, m. Defaults to None. | |
resolution: high, standart. Defaults to None. | |
duration: short, medium, long. Defaults to None. | |
license_videos: creativeCommon, youtube. Defaults to None. | |
max_results: max number of results. If None, returns results only from the first response. Defaults to None. | |
Yields: | |
dict with videos search results | |
""" | |
assert keywords, "keywords is mandatory" | |
vqd = await self._aget_vqd(keywords) | |
safesearch_base = {"on": 1, "moderate": -1, "off": -2} | |
timelimit = f"publishedAfter:{timelimit}" if timelimit else "" | |
resolution = f"videoDefinition:{resolution}" if resolution else "" | |
duration = f"videoDuration:{duration}" if duration else "" | |
license_videos = f"videoLicense:{license_videos}" if license_videos else "" | |
payload = { | |
"l": region, | |
"o": "json", | |
"s": 0, | |
"q": keywords, | |
"vqd": vqd, | |
"f": f"{timelimit},{resolution},{duration},{license_videos}", | |
"p": safesearch_base[safesearch.lower()], | |
} | |
cache = set() | |
for _ in range(10): | |
resp_content = await self._aget_url("GET", | |
"https://duckduckgo.com/v.js", | |
params=payload) | |
if resp_content is None: | |
return | |
try: | |
resp_json = json.loads(resp_content) | |
except Exception: | |
return | |
page_data = resp_json.get("results", None) | |
if page_data is None: | |
return | |
result_exists = False | |
for row in page_data: | |
if row["content"] not in cache: | |
cache.add(row["content"]) | |
result_exists = True | |
yield row | |
if max_results and len(cache) >= max_results: | |
return | |
if max_results is None or result_exists is False: | |
return | |
next = resp_json.get("next", None) | |
if next is None: | |
return | |
payload["s"] = next.split("s=")[-1].split("&")[0] | |
async def news( | |
self, | |
keywords: str, | |
region: str = "wt-wt", | |
safesearch: str = "moderate", | |
timelimit: Optional[str] = None, | |
max_results: Optional[int] = None, | |
) -> AsyncGenerator[Dict[str, Optional[str]], None]: | |
"""webscout news search. Query params: https://duckduckgo.com/params. | |
Args: | |
keywords: keywords for query. | |
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". | |
safesearch: on, moderate, off. Defaults to "moderate". | |
timelimit: d, w, m. Defaults to None. | |
max_results: max number of results. If None, returns results only from the first response. Defaults to None. | |
Yields: | |
dict with news search results. | |
""" | |
assert keywords, "keywords is mandatory" | |
vqd = await self._aget_vqd(keywords) | |
safesearch_base = {"on": 1, "moderate": -1, "off": -2} | |
payload = { | |
"l": region, | |
"o": "json", | |
"noamp": "1", | |
"q": keywords, | |
"vqd": vqd, | |
"p": safesearch_base[safesearch.lower()], | |
"df": timelimit, | |
"s": 0, | |
} | |
cache = set() | |
for _ in range(10): | |
resp_content = await self._aget_url("GET", | |
"https://duckduckgo.com/news.js", | |
params=payload) | |
if resp_content is None: | |
return | |
try: | |
resp_json = json.loads(resp_content) | |
except Exception: | |
return | |
page_data = resp_json.get("results", None) | |
if page_data is None: | |
return | |
result_exists = False | |
for row in page_data: | |
if row["url"] not in cache: | |
cache.add(row["url"]) | |
image_url = row.get("image", None) | |
result_exists = True | |
yield { | |
"date": datetime.fromtimestamp(row["date"], | |
timezone.utc).isoformat(), | |
"title": row["title"], | |
"body": _normalize(row["excerpt"]), | |
"url": _normalize_url(row["url"]), | |
"image": _normalize_url(image_url) if image_url else None, | |
"source": row["source"], | |
} | |
if max_results and len(cache) >= max_results: | |
return | |
if max_results is None or result_exists is False: | |
return | |
next = resp_json.get("next", None) | |
if next is None: | |
return | |
payload["s"] = next.split("s=")[-1].split("&")[0] | |
async def answers( | |
self, keywords: str) -> AsyncGenerator[Dict[str, Optional[str]], None]: | |
"""webscout instant answers. Query params: https://duckduckgo.com/params. | |
Args: | |
keywords: keywords for query. | |
Yields: | |
dict with instant answers results. | |
""" | |
assert keywords, "keywords is mandatory" | |
payload = { | |
"q": f"what is {keywords}", | |
"format": "json", | |
} | |
resp_content = await self._aget_url("GET", | |
"https://api.duckduckgo.com/", | |
params=payload) | |
if resp_content is None: | |
yield None | |
try: | |
page_data = json.loads(resp_content) | |
except Exception: | |
page_data = None | |
if page_data: | |
answer = page_data.get("AbstractText", None) | |
url = page_data.get("AbstractURL", None) | |
if answer: | |
yield { | |
"icon": None, | |
"text": answer, | |
"topic": None, | |
"url": url, | |
} | |
# related: | |
payload = { | |
"q": f"{keywords}", | |
"format": "json", | |
} | |
resp_content = await self._aget_url("GET", | |
"https://api.duckduckgo.com/", | |
params=payload) | |
if resp_content is None: | |
yield None | |
try: | |
page_data = json.loads(resp_content).get("RelatedTopics", None) | |
except Exception: | |
page_data = None | |
if page_data: | |
for row in page_data: | |
topic = row.get("Name", None) | |
if not topic: | |
icon = row["Icon"].get("URL", None) | |
yield { | |
"icon": f"https://duckduckgo.com{icon}" if icon else None, | |
"text": row["Text"], | |
"topic": None, | |
"url": row["FirstURL"], | |
} | |
else: | |
for subrow in row["Topics"]: | |
icon = subrow["Icon"].get("URL", None) | |
yield { | |
"icon": f"https://duckduckgo.com{icon}" if icon else None, | |
"text": subrow["Text"], | |
"topic": topic, | |
"url": subrow["FirstURL"], | |
} | |
async def suggestions( | |
self, | |
keywords: str, | |
region: str = "wt-wt") -> AsyncGenerator[Dict[str, Optional[str]], None]: | |
"""webscout suggestions. Query params: https://duckduckgo.com/params. | |
Args: | |
keywords: keywords for query. | |
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". | |
Yields: | |
dict with suggestions results. | |
""" | |
assert keywords, "keywords is mandatory" | |
payload = { | |
"q": keywords, | |
"kl": region, | |
} | |
resp_content = await self._aget_url("GET", | |
"https://duckduckgo.com/ac", | |
params=payload) | |
if resp_content is None: | |
yield None | |
try: | |
page_data = json.loads(resp_content) | |
for r in page_data: | |
yield r | |
except Exception: | |
pass | |
async def maps( | |
self, | |
keywords: str, | |
place: Optional[str] = None, | |
street: Optional[str] = None, | |
city: Optional[str] = None, | |
county: Optional[str] = None, | |
state: Optional[str] = None, | |
country: Optional[str] = None, | |
postalcode: Optional[str] = None, | |
latitude: Optional[str] = None, | |
longitude: Optional[str] = None, | |
radius: int = 0, | |
max_results: Optional[int] = None, | |
) -> AsyncGenerator[Dict[str, Optional[str]], None]: | |
"""webscout maps search. Query params: https://duckduckgo.com/params. | |
Args: | |
keywords: keywords for query | |
place: if set, the other parameters are not used. Defaults to None. | |
street: house number/street. Defaults to None. | |
city: city of search. Defaults to None. | |
county: county of search. Defaults to None. | |
state: state of search. Defaults to None. | |
country: country of search. Defaults to None. | |
postalcode: postalcode of search. Defaults to None. | |
latitude: geographic coordinate (north-south position). Defaults to None. | |
longitude: geographic coordinate (east-west position); if latitude and | |
longitude are set, the other parameters are not used. Defaults to None. | |
radius: expand the search square by the distance in kilometers. Defaults to 0. | |
max_results: max number of results. If None, returns results only from the first response. Defaults to None. | |
Yields: | |
dict with maps search results | |
""" | |
assert keywords, "keywords is mandatory" | |
vqd = await self._aget_vqd(keywords) | |
# if longitude and latitude are specified, skip the request about bbox to the nominatim api | |
if latitude and longitude: | |
lat_t = Decimal(latitude.replace(",", ".")) | |
lat_b = Decimal(latitude.replace(",", ".")) | |
lon_l = Decimal(longitude.replace(",", ".")) | |
lon_r = Decimal(longitude.replace(",", ".")) | |
if radius == 0: | |
radius = 1 | |
# otherwise request about bbox to nominatim api | |
else: | |
if place: | |
params: Dict[str, Optional[str]] = { | |
"q": place, | |
"polygon_geojson": "0", | |
"format": "jsonv2", | |
} | |
else: | |
params = { | |
"street": street, | |
"city": city, | |
"county": county, | |
"state": state, | |
"country": country, | |
"postalcode": postalcode, | |
"polygon_geojson": "0", | |
"format": "jsonv2", | |
} | |
try: | |
resp_content = await self._aget_url( | |
"GET", | |
"https://nominatim.openstreetmap.org/search.php", | |
params=params, | |
) | |
if resp_content is None: | |
yield None | |
coordinates = json.loads(resp_content)[0]["boundingbox"] | |
lat_t, lon_l = Decimal(coordinates[1]), Decimal(coordinates[2]) | |
lat_b, lon_r = Decimal(coordinates[0]), Decimal(coordinates[3]) | |
except Exception as ex: | |
logger.debug( | |
f"ddg_maps() keywords={keywords} {type(ex).__name__} {ex}") | |
return | |
# if a radius is specified, expand the search square | |
lat_t += Decimal(radius) * Decimal(0.008983) | |
lat_b -= Decimal(radius) * Decimal(0.008983) | |
lon_l -= Decimal(radius) * Decimal(0.008983) | |
lon_r += Decimal(radius) * Decimal(0.008983) | |
logger.debug(f"bbox coordinates\n{lat_t} {lon_l}\n{lat_b} {lon_r}") | |
# сreate a queue of search squares (bboxes) | |
work_bboxes: Deque[Tuple[Decimal, Decimal, Decimal, Decimal]] = deque() | |
work_bboxes.append((lat_t, lon_l, lat_b, lon_r)) | |
# bbox iterate | |
cache = set() | |
while work_bboxes: | |
lat_t, lon_l, lat_b, lon_r = work_bboxes.pop() | |
params = { | |
"q": keywords, | |
"vqd": vqd, | |
"tg": "maps_places", | |
"rt": "D", | |
"mkexp": "b", | |
"wiki_info": "1", | |
"is_requery": "1", | |
"bbox_tl": f"{lat_t},{lon_l}", | |
"bbox_br": f"{lat_b},{lon_r}", | |
"strict_bbox": "1", | |
} | |
resp_content = await self._aget_url("GET", | |
"https://duckduckgo.com/local.js", | |
params=params) | |
if resp_content is None: | |
return | |
try: | |
page_data = json.loads(resp_content).get("results", []) | |
except Exception: | |
return | |
if page_data is None: | |
return | |
for res in page_data: | |
result = MapsResult() | |
result.title = res["name"] | |
result.address = res["address"] | |
if f"{result.title} {result.address}" in cache: | |
continue | |
else: | |
cache.add(f"{result.title} {result.address}") | |
result.country_code = res["country_code"] | |
result.url = _normalize_url(res["website"]) | |
result.phone = res["phone"] | |
result.latitude = res["coordinates"]["latitude"] | |
result.longitude = res["coordinates"]["longitude"] | |
result.source = _normalize_url(res["url"]) | |
if res["embed"]: | |
result.image = res["embed"].get("image", "") | |
result.desc = res["embed"].get("description", "") | |
result.hours = res["hours"] | |
result.category = res["ddg_category"] | |
result.facebook = f"www.facebook.com/profile.php?id={x}" if ( | |
x := res["facebook_id"]) else None | |
result.instagram = f"https://www.instagram.com/{x}" if ( | |
x := res["instagram_id"]) else None | |
result.twitter = f"https://twitter.com/{x}" if ( | |
x := res["twitter_id"]) else None | |
yield result.__dict__ | |
if max_results and len(cache) >= max_results: | |
return | |
if max_results is None: | |
return | |
# divide the square into 4 parts and add to the queue | |
if len(page_data) >= 15: | |
lat_middle = (lat_t + lat_b) / 2 | |
lon_middle = (lon_l + lon_r) / 2 | |
bbox1 = (lat_t, lon_l, lat_middle, lon_middle) | |
bbox2 = (lat_t, lon_middle, lat_middle, lon_r) | |
bbox3 = (lat_middle, lon_l, lat_b, lon_middle) | |
bbox4 = (lat_middle, lon_middle, lat_b, lon_r) | |
work_bboxes.extendleft([bbox1, bbox2, bbox3, bbox4]) | |
async def translate(self, | |
keywords: str, | |
from_: Optional[str] = None, | |
to: str = "en") -> Optional[Dict[str, Optional[str]]]: | |
"""webscout translate. | |
Args: | |
keywords: string or a list of strings to translate | |
from_: translate from (defaults automatically). Defaults to None. | |
to: what language to translate. Defaults to "en". | |
Returns: | |
dict with translated keywords. | |
""" | |
assert keywords, "keywords is mandatory" | |
vqd = await self._aget_vqd("translate") | |
payload = { | |
"vqd": vqd, | |
"query": "translate", | |
"to": to, | |
} | |
if from_: | |
payload["from"] = from_ | |
resp_content = await self._aget_url( | |
"POST", | |
"https://duckduckgo.com/translation.js", | |
params=payload, | |
data=keywords.encode(), | |
) | |
if resp_content is None: | |
return None | |
try: | |
page_data = json.loads(resp_content) | |
page_data["original"] = keywords | |
except Exception: | |
page_data = None | |
return page_data | |
logger = logging.getLogger("duckduckgo_search.DDGS") | |
nest_asyncio.apply() | |
class DDGS(AsyncDDGS): | |
def __init__(self, headers=None, proxies=None, timeout=10): | |
if asyncio.get_event_loop().is_running(): | |
warnings.warn( | |
"DDGS running in an async loop. This may cause errors. Use AsyncDDGS instead.", | |
stacklevel=2) | |
super().__init__(headers, proxies, timeout) | |
self._loop = asyncio.get_event_loop() | |
def __enter__(self) -> "DDGS": | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb) -> None: | |
self._loop.create_task(self.__aexit__(exc_type, exc_val, exc_tb)) | |
def _iter_over_async(self, async_gen): | |
"""Iterate over an async generator.""" | |
while True: | |
try: | |
yield self._loop.run_until_complete(async_gen.__anext__()) | |
except StopAsyncIteration: | |
break | |
def text(self, *args, | |
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: | |
async_gen = super().text(*args, **kwargs) | |
return self._iter_over_async(async_gen) | |
def images(self, *args, | |
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: | |
async_gen = super().images(*args, **kwargs) | |
return self._iter_over_async(async_gen) | |
def videos(self, *args, | |
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: | |
async_gen = super().videos(*args, **kwargs) | |
return self._iter_over_async(async_gen) | |
def news(self, *args, | |
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: | |
async_gen = super().news(*args, **kwargs) | |
return self._iter_over_async(async_gen) | |
def answers(self, *args, | |
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: | |
async_gen = super().answers(*args, **kwargs) | |
return self._iter_over_async(async_gen) | |
def suggestions(self, *args, | |
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: | |
async_gen = super().suggestions(*args, **kwargs) | |
return self._iter_over_async(async_gen) | |
def maps(self, *args, | |
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: | |
async_gen = super().maps(*args, **kwargs) | |
return self._iter_over_async(async_gen) | |
def translate(self, *args, **kwargs) -> Optional[Dict[str, Optional[str]]]: | |
async_coro = super().translate(*args, **kwargs) | |
return self._loop.run_until_complete(async_coro) | |
# Function to generate response based on user input | |
def Gemini(messages, model): | |
response = model.generate_content(messages) | |
messages.append({ | |
"parts": [ | |
{ | |
"text": response.text | |
} | |
], | |
"role": "model"}) | |
messages | |
return response.text | |
from rich import print | |
from time import time as t | |
#pip install requests | |
#pip install bs4 | |
import requests as rq | |
from bs4 import BeautifulSoup | |
classes=["Ab33Nc","zCubwf","hgKElc","LTKOO sY7ric","Z0LcW","vk_bk","gsrt vk_bk FzvWSb YwPhnf","pclqee","tw-Data-text tw-text-small tw-ta", | |
"IZ6rdc","O5uR6d LTKOO","vlzY6d","webanswers-webanswers_table__webanswers-table", | |
"dDoNo ikb4Bb gsrt","sXLaOe","LWkfKe","VQF4g","qv3Wpe","kno-rdesc"] | |
useragent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36' | |
sess = rq.session() | |
#scrape data from google search results | |
def Online_Scraper(query,PRINT=True): | |
query=query.replace(" + "," plus ") | |
query=query.replace(" - "," minus ") | |
URL = "https://www.google.co.in/search?q=" + query | |
headers = {'User-Agent': useragent} | |
page = sess.get(URL, headers=headers) | |
soup = BeautifulSoup(page.content, 'html.parser') | |
for i in classes: | |
try: | |
result=soup.find(class_=i).get_text() | |
if PRINT: | |
print(f"by class {i}") | |
return result | |
except Exception: | |
pass | |
return None | |
def DDG(query): | |
with DDGS() as ddgs: | |
results = ddgs.text(query, max_results=MAX_RESULTS) | |
results=[i for i in results if i["body"] != None] | |
return results | |
def RealTimeGemini(query:str,messages:list=[],model=None): | |
assert query, "Query is required" | |
assert isinstance(query, str), "Query must be a string" | |
print(messages) | |
realquery = query | |
ReturnObj = {} | |
C=t() | |
results = Online_Scraper(realquery) | |
if results == None: | |
try: | |
results = DDG(realquery) | |
except: | |
results = "No results found" | |
#ADD TO RETURN OBJECT | |
ReturnObj["DDGSResults"] = results | |
ReturnObj["DDGSExecutionTime"] = t() - C | |
ReturnObj["Query"] = realquery | |
ReturnObj["SearchQuery"] = query | |
C = t() | |
messages=[{ | |
"parts": [ | |
{ | |
"text": f"Search on Google -> {realquery}\nAnswer -> ```{results.__str__()}```\n ***real time information you must use to reply***" | |
} | |
], | |
"role": "user" | |
}, | |
{ | |
"parts": [ | |
{ | |
"text": "ok i know its websearch results i will tell you whenevery you ask me about it" | |
} | |
], | |
"role": "model" | |
}] + messages | |
messages.append({ | |
"parts": [ | |
{ | |
"text": query | |
} | |
], | |
"role": "user" | |
}) | |
responce = Gemini(messages,model) | |
#ADD TO RETURN OBJECT | |
ReturnObj["GeminiResponce"] = responce | |
ReturnObj["GeminiExecutionTime"] = t() - C | |
return ReturnObj | |
# if __name__ == "__main__": | |
# while 1: | |
# a = input("Enter your query: ") | |
# print(RealTimeGemini(a)) | |
# while 1: | |
# X=input("Enter your query: ") | |
# C=t() | |
# print(Online_Scraper(X)) | |
# print(C-t()) |