import re from functools import lru_cache import gradio as gr import requests import os TOKEN_VALUE=os.getenv("TOKEN_VALUE") LOGIN = os.getenv("LOGIN") TOKEN = os.getenv("TOKEN") BASE_URL = 'https://nethunt.com/api/v1/zapier' FIELDS = { 'company_code': 'ЄДРПОУ', 'company_codes': 'Всі ЄДРПОУ', 'domain': 'Корпоративний домен', 'linkedin': 'LinkedIn', 'email': 'Email', 'phone': 'Телефон', 'company_name': 'Name', 'first_name': 'Имя', 'last_name': 'Фамилия', 'manager': 'Відповідальний'} FOLDERS = { 'companies': 'Companies', 'contacts': 'Contacts'} OUTPUT_LIMIT = 5 def _get_query( endpoint: str, params: dict[str, str] = None, json: dict[str, str] = None, login: str = LOGIN, token: str = TOKEN, base_url: str = BASE_URL ) -> list: url = f'{base_url}{endpoint}' responce = requests.get( url=url, params=params, json=json, auth=(login, token)) responce.raise_for_status() return responce.json() @lru_cache(maxsize=None) def _get_readable_folders(**kwargs) -> list[dict]: endpoint = '/triggers/writable-folder' return _get_query(endpoint, **kwargs) @lru_cache(maxsize=None) def _get_folder_id(folder_name: str, **kwargs) -> str: readable_folders = _get_readable_folders(**kwargs) for folder in readable_folders: if folder['name'] == folder_name: return folder['id'] raise ValueError('Wrong folder name.') def _get_nethunt_records( folder_name: str, query: str, limit: int = OUTPUT_LIMIT, **kwargs ) -> list[dict]: folder_id = _get_folder_id(folder_name) endpoint = f'/searches/find-record/{folder_id}' params = { 'query': query, 'limit': limit} return _get_query(endpoint, params, **kwargs) def _clean_found_records( folder_name: str, found_records: str, check_type: str ) -> list: result_list = [] for v in found_records: record = { 'folder_name': folder_name, 'duplicate_type': check_type, 'record_id': v.get('id'), 'link': v.get('link'), 'manager': v.get('fields', {}).get(FIELDS['manager'], '')} if folder_name == FOLDERS['companies']: company_name = v.get('fields', {}) \ .get(FIELDS['company_name'], '') record['name'] = company_name if folder_name == FOLDERS['contacts']: first_name = v.get('fields', {}) \ .get(FIELDS['first_name'], '') last_name = v.get('fields', {}) \ .get(FIELDS['last_name'], '') record['name'] = (first_name + ' ' + last_name).strip() result_list.append(record) return result_list def _check_field( folder_name: str, query: str, check_type: str ) -> list: found_records = _get_nethunt_records(folder_name, query=query) if len(found_records) == 0: return [] return _clean_found_records(folder_name, found_records, check_type) def _check_company_code(code: str) -> list: code = code.strip() folder = FOLDERS['companies'] query = f''' ("{FIELDS['company_code']}":"{code}") OR ("{FIELDS['company_codes']}":"{code}")''' return _check_field(folder, query, 'company_code') def _check_company_domain(domain: str) -> list: domain = domain.replace('@', '').strip() folder = FOLDERS['companies'] query = f'''"{FIELDS['domain']}":"{domain}"''' return _check_field(folder, query, 'domain') def _check_company_linkedin(linkedin: str) -> list: linkedin = linkedin.strip() folder = FOLDERS['companies'] query = f'''"{FIELDS['linkedin']}":"{linkedin}"''' return _check_field(folder, query, 'company_linkedin') def _check_contact_email(email: str) -> list: email = email.lower().strip() folder = FOLDERS['contacts'] query = f'''"{FIELDS['email']}":"{email}"''' return _check_field(folder, query, 'email') def _check_contact_phone(phone: str) -> list: phone = re.sub(r'[^\d+]', '', phone) folder = FOLDERS['contacts'] query = f'''"{FIELDS['phone']}":"{phone}"''' return _check_field(folder, query, 'phone') def _check_contact_linkedin(linkedin: str) -> list: linkedin = linkedin.strip() folder = FOLDERS['contacts'] query = f'''"{FIELDS['linkedin']}":"{linkedin}"''' return _check_field(folder, query, 'contact_linkedin') def check_record( company_code: str = None, domain: str = None, company_linkedin: str = None, email: str = None, phone: str = None, contact_linkedin: str = None ) -> list: found_records = [] checks = { _check_company_code: company_code, _check_company_domain: domain, _check_company_linkedin: company_linkedin, _check_contact_email: email, _check_contact_phone: phone, _check_contact_linkedin: contact_linkedin} for check, value in checks.items(): if value: found_records.extend(check(value)) # Апгрейд для вывода result = '' count = 0 for item in found_records: if item['folder_name'] == 'Companies': icon = '🏢' elif item['folder_name'] == 'Contacts': icon = '👨‍💼' count = count + 1 result += ( f"{icon} " f"{item['name']} | " f"Manager: {item['manager']} | " f"Found by: {item['duplicate_type']} | " f"Link: " f"{item['record_id']}

") if count == OUTPUT_LIMIT: break if count == 0: result += "

Nothing found

" result += "
" return result def generate( company_code, domain, company_linkedin, email, phone, contact_linkedin, token): if (TOKEN_VALUE == token): return check_record( company_code=company_code, domain=domain, company_linkedin=company_linkedin, email=email, phone=phone, contact_linkedin=contact_linkedin) else: return "Token error" title = "NetHunt search" css = """ footer {visibility: hidden} """ with gr.Blocks(css=css, title=title) as demo: gr.Markdown( """ # NetHuntCrm Search """ ) with gr.Row(): with gr.Column(): input_7 = gr.Text(label="Token") input_1 = gr.Text(label="Сompany Code") input_2 = gr.Textbox(label="Corp Domain") gr.Markdown(""" """) greet_btn = gr.Button("Search") with gr.Column(): input_4 = gr.Textbox(label="Email") input_5 = gr.Textbox(label="Phone") input_6 = gr.Textbox(label="Contact LinkedIn") input_3 = gr.Textbox(label="Company LinkedIn") output = gr.outputs.HTML() input = [input_1, input_2, input_3, input_4, input_5, input_6, input_7] greet_btn.click(fn=generate, inputs=input, outputs=output) demo.launch(debug=True, share=False)