Spaces:
Paused
Paused
# -*- coding: utf-8 -*- | |
# Copyright (c) 2024 OSU Natural Language Processing Group | |
# | |
# Licensed under the OpenRAIL-S License; | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# https://www.licenses.ai/ai-pubs-open-rails-vz1 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import re | |
import asyncio | |
from difflib import SequenceMatcher | |
from playwright.sync_api import Playwright, expect, sync_playwright | |
# from playwright.async_api import async_playwright | |
from pathlib import Path | |
import toml | |
import os | |
import traceback | |
async def normal_launch_async(playwright: Playwright,headless=False,args=None): | |
browser = await playwright.chromium.launch( | |
traces_dir=None, | |
headless=False, | |
args=args, | |
# ignore_default_args=ignore_args, | |
# chromium_sandbox=False, | |
) | |
return browser | |
async def normal_new_context_async( | |
browser, | |
storage_state=None, | |
har_path=None, | |
video_path=None, | |
tracing=False, | |
trace_screenshots=False, | |
trace_snapshots=False, | |
trace_sources=False, | |
locale=None, | |
geolocation=None, | |
user_agent: str = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", | |
viewport: dict = {"width": 1280, "height": 720}, | |
): | |
context = await browser.new_context( | |
storage_state=storage_state, | |
user_agent=user_agent, | |
viewport=viewport, | |
locale=locale, | |
record_har_path=har_path, | |
record_video_dir=video_path, | |
geolocation=geolocation, | |
) | |
if tracing: | |
await context.tracing.start(screenshots=trace_screenshots, snapshots=trace_snapshots, sources=trace_sources) | |
return context | |
# | |
# def persistent_launch(playwright: Playwright, user_data_dir: str = ""): | |
# context = playwright.chromium.launch_persistent_context( | |
# user_data_dir=user_data_dir, | |
# headless=False, | |
# args=["--no-default-browser-check", | |
# "--no_sandbox", | |
# "--disable-blink-features=AutomationControlled", | |
# ], | |
# ignore_default_args=ignore_args, | |
# user_agent="Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", | |
# viewport={"width": 1280, "height": 720}, | |
# bypass_csp=True, | |
# slow_mo=1000, | |
# chromium_sandbox=True, | |
# channel="chrome-dev" | |
# ) | |
# return context | |
# | |
# async def persistent_launch_async(playwright: Playwright, user_data_dir: str = "", record_video_dir="video"): | |
# context = await playwright.chromium.launch_persistent_context( | |
# user_data_dir=user_data_dir, | |
# headless=False, | |
# args=[ | |
# "--disable-blink-features=AutomationControlled", | |
# ], | |
# ignore_default_args=ignore_args, | |
# user_agent="Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", | |
# # viewport={"width": 1280, "height": 720}, | |
# record_video_dir=record_video_dir, | |
# channel="chrome-dev" | |
# # slow_mo=1000, | |
# ) | |
# return context | |
def remove_extra_eol(text): | |
# Replace EOL symbols | |
text = text.replace('\n', ' ') | |
return re.sub(r'\s{2,}', ' ', text) | |
def get_first_line(s): | |
first_line = s.split('\n')[0] | |
tokens = first_line.split() | |
if len(tokens) > 8: | |
return ' '.join(tokens[:8]) + '...' | |
else: | |
return first_line | |
async def get_element_description(element, tag_name, role_value, type_value): | |
''' | |
Asynchronously generates a descriptive text for a web element based on its tag type. | |
Handles various HTML elements like 'select', 'input', and 'textarea', extracting attributes and content relevant to accessibility and interaction. | |
''' | |
# text_content = await element.inner_text(timeout=0) | |
# text = (text_content or '').strip() | |
# | |
# print(text) | |
salient_attributes = [ | |
"alt", | |
"aria-describedby", | |
"aria-label", | |
"aria-role", | |
"input-checked", | |
# "input-value", | |
"label", | |
"name", | |
"option_selected", | |
"placeholder", | |
"readonly", | |
"text-value", | |
"title", | |
"value", | |
] | |
parent_value = "parent_node: " | |
parent_locator = element.locator('xpath=..') | |
num_parents = await parent_locator.count() | |
if num_parents > 0: | |
# only will be zero or one parent node | |
parent_text = (await parent_locator.inner_text(timeout=0) or "").strip() | |
if parent_text: | |
parent_value += parent_text | |
parent_value = remove_extra_eol(get_first_line(parent_value)).strip() | |
if parent_value == "parent_node:": | |
parent_value = "" | |
else: | |
parent_value += " " | |
if tag_name == "select": | |
text1 = "Selected Options: " | |
text3 = " - Options: " | |
text2 = await element.evaluate( | |
"select => select.options[select.selectedIndex].textContent", timeout=0 | |
) | |
if text2: | |
options = await element.evaluate("select => Array.from(select.options).map(option => option.text)", | |
timeout=0) | |
text4 = " | ".join(options) | |
if not text4: | |
text4 = await element.text_content(timeout=0) | |
if not text4: | |
text4 = await element.inner_text(timeout=0) | |
return parent_value+text1 + remove_extra_eol(text2.strip()) + text3 + text4 | |
input_value = "" | |
none_input_type = ["submit", "reset", "checkbox", "radio", "button", "file"] | |
if tag_name == "input" or tag_name == "textarea": | |
if role_value not in none_input_type and type_value not in none_input_type: | |
text1 = "input value=" | |
text2 = await element.input_value(timeout=0) | |
if text2: | |
input_value = text1 + "\"" + text2 + "\"" + " " | |
text_content = await element.text_content(timeout=0) | |
text = (text_content or '').strip() | |
# print(text) | |
if text: | |
text = remove_extra_eol(text) | |
if len(text) > 80: | |
text_content_in = await element.inner_text(timeout=0) | |
text_in = (text_content_in or '').strip() | |
if text_in: | |
return input_value + remove_extra_eol(text_in) | |
else: | |
return input_value + text | |
# get salient_attributes | |
text1 = "" | |
for attr in salient_attributes: | |
attribute_value = await element.get_attribute(attr, timeout=0) | |
if attribute_value: | |
text1 += f"{attr}=" + "\"" + attribute_value.strip() + "\"" + " " | |
text = (parent_value + text1).strip() | |
if text: | |
return input_value + remove_extra_eol(text.strip()) | |
# try to get from the first child node | |
first_child_locator = element.locator('xpath=./child::*[1]') | |
num_childs = await first_child_locator.count() | |
if num_childs>0: | |
for attr in salient_attributes: | |
attribute_value = await first_child_locator.get_attribute(attr, timeout=0) | |
if attribute_value: | |
text1 += f"{attr}=" + "\"" + attribute_value.strip() + "\"" + " " | |
text = (parent_value + text1).strip() | |
if text: | |
return input_value + remove_extra_eol(text.strip()) | |
return None | |
async def get_element_data(element, tag_name,viewport_size,seen_elements=[],coordinates=None): | |
try: | |
tag_name_list = ['a', 'button', | |
'input', | |
'select', 'textarea', 'adc-tab'] | |
rect = await element.bounding_box() or {'x': -1, 'y': -1, 'width': 0, 'height': 0} | |
if rect['x']<0 or rect['y']<0 or rect['width']<=4 or rect['height']<=4 or rect['y']+rect['height']>viewport_size["height"] or rect['x']+ rect['width']>viewport_size["width"]: | |
return None | |
if coordinates is not None: | |
if coordinates[0]>=rect['x'] and coordinates[0]<=rect['x']+rect['width'] and coordinates[1]>=rect['y'] and coordinates[1]<=rect['y']+rect['height']: | |
print(coordinates) | |
print(rect) | |
else: | |
return None | |
box_model = [rect['x'], rect['y'], rect['x'] + rect['width'], rect['y'] + rect['height']] | |
center_point = (round((box_model[0] + box_model[2]) / 2 , 3), | |
round((box_model[1] + box_model[3]) / 2 , 3)) | |
if await element.is_hidden(timeout=0) or await element.is_disabled(timeout=0): | |
return None | |
if center_point in seen_elements: | |
return None | |
# await aprint(element,tag_name) | |
if tag_name in tag_name_list: | |
tag_head = tag_name | |
real_tag_name = tag_name | |
else: | |
real_tag_name = await element.evaluate("element => element.tagName.toLowerCase()", timeout=0) | |
if real_tag_name in tag_name_list: | |
# already detected | |
return None | |
else: | |
tag_head = real_tag_name | |
text_element = ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'td', "div","em","center","strong","b","i","small","mark","abbr","cite","q","blockquote","span","nobr"] | |
if real_tag_name in text_element: | |
return None | |
role_value = await element.get_attribute('role', timeout=0) | |
type_value = await element.get_attribute('type', timeout=0) | |
# await aprint("start to get element description",element,tag_name ) | |
description = await get_element_description(element, real_tag_name, role_value, type_value) | |
# print(description) | |
if not description: | |
return None | |
if role_value: | |
tag_head += " role=" + "\"" + role_value + "\"" | |
if type_value: | |
tag_head += " type=" + "\"" + type_value + "\"" | |
''' | |
0: center_point =(x,y) | |
1: description | |
2: tag_with_role: tag_head with role and type # TODO: Consider adding more | |
3. box | |
4. selector | |
5. tag | |
''' | |
selector = element | |
if coordinates is not None: | |
if coordinates[0]>=rect['x'] and coordinates[0]<=rect['x']+rect['width'] and coordinates[1]>=rect['y'] and coordinates[1]<=rect['y']+rect['height']: | |
print(tag_head) | |
print(description) | |
print(box_model) | |
else: | |
return None | |
return {"center_point":center_point,"description":description,"tag_with_role":tag_head,"box":box_model,"selector":selector,"tag":real_tag_name} | |
# return [center_point, description, tag_head, box_model, selector, real_tag_name] | |
except Exception as e: | |
print(traceback.format_exc()) | |
print(e) | |
return None | |
async def get_interactive_elements_with_playwright(page,viewport_size,coordinates=None): | |
print("Get Interactive elements around: ", coordinates) | |
interactive_elements_selectors = [ | |
'a', 'button', | |
'input', | |
'select', 'textarea', | |
] | |
seen_elements = set() | |
tasks = [] | |
for selector in interactive_elements_selectors: | |
locator = page.locator(selector) | |
element_count = await locator.count() | |
for index in range(element_count): | |
element = locator.nth(index) | |
tag_name = selector | |
task = get_element_data(element, tag_name,viewport_size,seen_elements=[],coordinates=coordinates) | |
tasks.append(task) | |
results = await asyncio.gather(*tasks) | |
interactive_elements = [] | |
for i in results: | |
if i: | |
if i["center_point"] in seen_elements: | |
continue | |
else: | |
seen_elements.add(i["center_point"]) | |
interactive_elements.append(i) | |
# interactive_elements_selectors = [ | |
# '*' | |
# ] | |
# tasks = [] | |
# | |
# for selector in interactive_elements_selectors: | |
# locator = page.locator(selector) | |
# element_count = await locator.count() | |
# for index in range(element_count): | |
# element = locator.nth(index) | |
# tag_name = selector | |
# task = get_element_data(element, tag_name, viewport_size,seen_elements,coordinates) | |
# | |
# tasks.append(task) | |
# | |
# results = await asyncio.gather(*tasks) | |
# | |
# | |
# for i in results: | |
# if i: | |
# if i["center_point"] in seen_elements: | |
# continue | |
# else: | |
# seen_elements.add(i["center_point"]) | |
# interactive_elements.append(i) | |
return interactive_elements | |
async def get_select_elements_with_playwright(page,viewport_size): | |
interactive_elements_selectors = [ | |
'select' | |
] | |
seen_elements = set() | |
tasks = [] | |
for selector in interactive_elements_selectors: | |
locator = page.locator(selector) | |
element_count = await locator.count() | |
for index in range(element_count): | |
element = locator.nth(index) | |
tag_name = selector | |
task = get_element_data(element, tag_name,viewport_size,seen_elements=[],coordinates=None) | |
tasks.append(task) | |
results = await asyncio.gather(*tasks) | |
interactive_elements = [] | |
for i in results: | |
if i: | |
if i["center_point"] in seen_elements: | |
continue | |
else: | |
seen_elements.add(i["center_point"]) | |
interactive_elements.append(i) | |
return interactive_elements | |
async def select_option(selector, value): | |
best_option = [-1, "", -1] | |
for i in range(await selector.locator("option").count()): | |
option = await selector.locator("option").nth(i).inner_text() | |
similarity = SequenceMatcher(None, option, value).ratio() | |
if similarity > best_option[2]: | |
best_option = [i, option, similarity] | |
await selector.select_option(index=best_option[0], timeout=10000) | |
return remove_extra_eol(best_option[1]).strip() | |
def saveconfig(config, save_file): | |
""" | |
config is a dictionary. | |
save_path: saving path include file name. | |
""" | |
if isinstance(save_file, str): | |
save_file = Path(save_file) | |
if isinstance(config, dict): | |
with open(save_file, 'w') as f: | |
config_without_key = config | |
config_without_key["openai"]["api_key"] = "Your API key here" | |
toml.dump(config_without_key, f) | |
else: | |
os.system(" ".join(["cp", str(config), str(save_file)])) | |