Diego Carpintero commited on
Commit
dda22df
·
1 Parent(s): 7a6e4a3

abstract tools into a class

Browse files
Files changed (1) hide show
  1. tools.py +90 -0
tools.py ADDED
@@ -0,0 +1,90 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from typing import Dict, List, Tuple
3
+
4
+ from PIL import Image
5
+ import pytesseract
6
+ import requests
7
+ from dotenv import load_dotenv, find_dotenv
8
+
9
+
10
+ class Tools:
11
+ def __init__(self):
12
+ load_dotenv(find_dotenv())
13
+
14
+ self.safebrowsing_key = os.getenv("SAFEBROWSING_API_KEY")
15
+ self.api_base_url = "https://safebrowsing.googleapis.com/v4"
16
+ self.client_id = "minerva"
17
+ self.client_version = "0.1.0"
18
+ self.threat_types = [
19
+ "MALWARE",
20
+ "SOCIAL_ENGINEERING",
21
+ "UNWANTED_SOFTWARE",
22
+ "POTENTIALLY_HARMFUL_APPLICATION"
23
+ ]
24
+
25
+ def ocr(self, image_path: str) -> str:
26
+ """Extract text from image using OCR
27
+ """
28
+ try:
29
+ image = Image.open(image_path)
30
+ text = pytesseract.image_to_string(image)
31
+ return text
32
+ except Exception as e:
33
+ return f"Error in text extraction: {str(e)}"
34
+
35
+ def expand_url(self, url: str) -> str:
36
+ """Expand shortened URL
37
+ """
38
+ try:
39
+ response = requests.head(url, allow_redirects=True)
40
+ return response.url
41
+ except requests.exceptions.RequestException as e:
42
+ return url # Return original URL if expansion fails
43
+
44
+ def is_url_safe(self, url: str) -> Tuple[bool, List[Dict[str, str]]]:
45
+ """Check if URL is safe using Google Safe Browsing API
46
+ """
47
+ if not self.safebrowsing_key:
48
+ raise ValueError("SAFEBROWSING_API_KEY is missing.")
49
+
50
+ api_endpoint = f"{self.api_base_url}/threatMatches:find?key={self.safebrowsing_key}"
51
+ expanded_url = self.expand_url(url)
52
+
53
+ request_body = {
54
+ "client": {
55
+ "clientId": self.client_id,
56
+ "clientVersion": self.client_version
57
+ },
58
+ "threatInfo": {
59
+ "threatTypes": self.threat_types,
60
+ "platformTypes": ["ANY_PLATFORM"],
61
+ "threatEntryTypes": ["URL"],
62
+ "threatEntries": [
63
+ {"url": url},
64
+ {"url": expanded_url} if expanded_url != url else {}
65
+ ]
66
+ }
67
+ }
68
+
69
+ try:
70
+ response = requests.post(api_endpoint, json=request_body)
71
+ response.raise_for_status()
72
+
73
+ result = response.json()
74
+
75
+ if not result:
76
+ return True, []
77
+
78
+ threats = []
79
+ if "matches" in result:
80
+ for match in result["matches"]:
81
+ threats.append({
82
+ "threat_type": match.get("threatType"),
83
+ "platform_type": match.get("platformType"),
84
+ "threat_entry_type": match.get("threatEntryType")
85
+ })
86
+
87
+ return False, threats
88
+
89
+ except requests.exceptions.RequestException as e:
90
+ raise Exception(f"Error checking URL safety: {str(e)}")