LaoCzi commited on
Commit
408009a
·
1 Parent(s): f55cc54

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +274 -0
app.py ADDED
@@ -0,0 +1,274 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+ import os
3
+ from functools import lru_cache
4
+
5
+ import gradio as gr
6
+ import requests
7
+
8
+
9
+
10
+ LOGIN = os.getenv("LOGIN")
11
+ TOKEN = os.getenv("TOKEN")
12
+
13
+ BASE_URL = 'https://nethunt.com/api/v1/zapier'
14
+
15
+ FIELDS = {
16
+ 'company_code': 'ЄДРПОУ',
17
+ 'company_codes': 'Всі ЄДРПОУ',
18
+ 'domain': 'Корпоративний домен',
19
+ 'linkedin': 'LinkedIn',
20
+ 'email': 'Email',
21
+ 'phone': 'Телефон',
22
+ 'company_name': 'Name',
23
+ 'first_name': 'Имя',
24
+ 'last_name': 'Фамилия',
25
+ 'manager': 'Відповідальний'}
26
+
27
+ FOLDERS = {
28
+ 'companies': 'Companies',
29
+ 'contacts': 'Contacts'}
30
+
31
+ OUTPUT_LIMIT = 5
32
+
33
+
34
+ def _get_query(
35
+ endpoint: str,
36
+ params: dict[str, str] = None,
37
+ json: dict[str, str] = None,
38
+ login: str = LOGIN,
39
+ token: str = TOKEN,
40
+ base_url: str = BASE_URL
41
+ ) -> list:
42
+
43
+ url = f'{base_url}{endpoint}'
44
+ responce = requests.get(
45
+ url=url, params=params, json=json, auth=(login, token))
46
+ responce.raise_for_status()
47
+
48
+ return responce.json()
49
+
50
+
51
+ @lru_cache(maxsize=None)
52
+ def _get_readable_folders(**kwargs) -> list[dict]:
53
+ endpoint = '/triggers/writable-folder'
54
+ return _get_query(endpoint, **kwargs)
55
+
56
+
57
+ @lru_cache(maxsize=None)
58
+ def _get_folder_id(folder_name: str, **kwargs) -> str:
59
+ readable_folders = _get_readable_folders(**kwargs)
60
+ for folder in readable_folders:
61
+ if folder['name'] == folder_name:
62
+ return folder['id']
63
+ raise ValueError('Wrong folder name.')
64
+
65
+
66
+ def _get_nethunt_records(
67
+ folder_name: str,
68
+ query: str,
69
+ limit: int = OUTPUT_LIMIT,
70
+ **kwargs
71
+ ) -> list[dict]:
72
+
73
+ folder_id = _get_folder_id(folder_name)
74
+ endpoint = f'/searches/find-record/{folder_id}'
75
+ params = {
76
+ 'query': query,
77
+ 'limit': limit}
78
+
79
+ return _get_query(endpoint, params, **kwargs)
80
+
81
+
82
+ def _clean_found_records(
83
+ folder_name: str,
84
+ found_records: str,
85
+ check_type: str
86
+ ) -> list:
87
+
88
+ result_list = []
89
+ for v in found_records:
90
+ record = {
91
+ 'folder_name': folder_name,
92
+ 'duplicate_type': check_type,
93
+ 'record_id': v.get('id'),
94
+ 'link': v.get('link'),
95
+ 'manager': v.get('fields', {}).get(FIELDS['manager'], '')}
96
+
97
+ if folder_name == FOLDERS['companies']:
98
+ company_name = v.get('fields', {}) \
99
+ .get(FIELDS['company_name'], '')
100
+
101
+ record['name'] = company_name
102
+
103
+ if folder_name == FOLDERS['contacts']:
104
+ first_name = v.get('fields', {}) \
105
+ .get(FIELDS['first_name'], '')
106
+ last_name = v.get('fields', {}) \
107
+ .get(FIELDS['last_name'], '')
108
+
109
+ record['name'] = (first_name + ' ' + last_name).strip()
110
+
111
+ result_list.append(record)
112
+
113
+ return result_list
114
+
115
+
116
+ def _check_field(
117
+ folder_name: str,
118
+ query: str,
119
+ check_type: str
120
+ ) -> list:
121
+
122
+ found_records = _get_nethunt_records(folder_name, query=query)
123
+ if len(found_records) == 0:
124
+ return []
125
+
126
+ return _clean_found_records(folder_name, found_records, check_type)
127
+
128
+
129
+ def _check_company_code(code: str) -> list:
130
+ code = code.strip()
131
+
132
+ folder = FOLDERS['companies']
133
+ query = f'''
134
+ ("{FIELDS['company_code']}":"{code}")
135
+ OR ("{FIELDS['company_codes']}":"{code}")'''
136
+
137
+ return _check_field(folder, query, 'company_code')
138
+
139
+
140
+ def _check_company_domain(domain: str) -> list:
141
+ domain = domain.replace('@', '').strip()
142
+
143
+ folder = FOLDERS['companies']
144
+ query = f'''"{FIELDS['domain']}":"{domain}"'''
145
+
146
+ return _check_field(folder, query, 'domain')
147
+
148
+
149
+ def _check_company_linkedin(linkedin: str) -> list:
150
+ linkedin = linkedin.strip()
151
+
152
+ folder = FOLDERS['companies']
153
+ query = f'''"{FIELDS['linkedin']}":"{linkedin}"'''
154
+
155
+ return _check_field(folder, query, 'company_linkedin')
156
+
157
+
158
+ def _check_contact_email(email: str) -> list:
159
+ email = email.lower().strip()
160
+
161
+ folder = FOLDERS['contacts']
162
+ query = f'''"{FIELDS['email']}":"{email}"'''
163
+
164
+ return _check_field(folder, query, 'email')
165
+
166
+
167
+ def _check_contact_phone(phone: str) -> list:
168
+ phone = re.sub(r'[^\d+]', '', phone)
169
+
170
+ folder = FOLDERS['contacts']
171
+ query = f'''"{FIELDS['phone']}":"{phone}"'''
172
+
173
+ return _check_field(folder, query, 'phone')
174
+
175
+
176
+ def _check_contact_linkedin(linkedin: str) -> list:
177
+ linkedin = linkedin.strip()
178
+
179
+ folder = FOLDERS['contacts']
180
+ query = f'''"{FIELDS['linkedin']}":"{linkedin}"'''
181
+
182
+ return _check_field(folder, query, 'contact_linkedin')
183
+
184
+
185
+ def check_record(
186
+ company_code: str = None,
187
+ domain: str = None,
188
+ company_linkedin: str = None,
189
+ email: str = None,
190
+ phone: str = None,
191
+ contact_linkedin: str = None
192
+ ) -> list:
193
+
194
+ found_records = []
195
+ checks = {
196
+ _check_company_code: company_code,
197
+ _check_company_domain: domain,
198
+ _check_company_linkedin: company_linkedin,
199
+ _check_contact_email: email,
200
+ _check_contact_phone: phone,
201
+ _check_contact_linkedin: contact_linkedin}
202
+
203
+ for check, value in checks.items():
204
+ if value:
205
+ found_records.extend(check(value))
206
+
207
+ # Апгрейд для вывода
208
+
209
+ result = '<span>'
210
+ count = 0
211
+ for item in found_records:
212
+ if item['folder_name'] == 'Companies':
213
+ icon = '🏢'
214
+ elif item['folder_name'] == 'Contacts':
215
+ icon = '👨‍💼'
216
+
217
+ count = count + 1
218
+ result += (
219
+ f"{icon} "
220
+ f"<b>{item['name']}</b> | "
221
+ f"Manager: {item['manager']} | "
222
+ f"Found by: {item['duplicate_type']} | "
223
+ f"Link: <a href=\"{str(item['link'])}\">"
224
+ f"{item['record_id']}</a><br><br>")
225
+
226
+ if count == OUTPUT_LIMIT:
227
+ break
228
+
229
+ if count == 0:
230
+ result += "<h2>Nothing found</h2>"
231
+ result += "</span>"
232
+
233
+ return result
234
+
235
+
236
+ def generate(
237
+ company_code, domain, company_linkedin,
238
+ email, phone, contact_linkedin):
239
+ return check_record(
240
+ company_code=company_code,
241
+ domain=domain,
242
+ company_linkedin=company_linkedin,
243
+ email=email,
244
+ phone=phone,
245
+ contact_linkedin=contact_linkedin)
246
+
247
+
248
+ title = "NetHunt search"
249
+ css = """
250
+ footer {visibility: hidden}
251
+
252
+ """
253
+
254
+ with gr.Blocks(css=css, title=title) as demo:
255
+ gr.Markdown(
256
+ """
257
+ # NetHuntCrm Search
258
+ """
259
+ )
260
+ with gr.Row():
261
+ with gr.Column():
262
+ input_1 = gr.Text(label="Сompany Code")
263
+ input_2 = gr.Textbox(label="Corp Domain")
264
+ input_3 = gr.Textbox(label="Company Linkedin")
265
+ with gr.Column():
266
+ input_4 = gr.Textbox(label="Email")
267
+ input_5 = gr.Textbox(label="Phone")
268
+ input_6 = gr.Textbox(label="Contact LinkedIn")
269
+ greet_btn = gr.Button("Search")
270
+ output = gr.outputs.HTML()
271
+ input = [input_1, input_2, input_3, input_4, input_5, input_6]
272
+ greet_btn.click(fn=generate, inputs=input, outputs=output)
273
+
274
+ demo.launch(debug=True, share=False)