import json import math import time import dateparser from datetime import datetime, timezone from importlib import import_module from typing import Any, Dict, Optional, Union from bs4 import BeautifulSoup from bs4.element import Comment from dateutil.relativedelta import relativedelta DATETIME_STRING_PATTERN = "%Y-%m-%dT%H:%M:%SZ" DEFAULT_LOOKUP_PERIOD = "1h" # Used from https://stackoverflow.com/a/52081812 and modified def flatten_dict( dictionary: Dict[str, Any], round_the_float: bool = True, float_round_format_str: str = ".2f", separator: str = "_", ) -> Dict[str, Any]: out: Dict[str, Any] = {} for key, val in dictionary.items(): if isinstance(val, dict): val = [val] if isinstance(val, list): for sub_dict in val: deeper = flatten_dict(sub_dict).items() out.update({key + separator + key2: val2 for key2, val2 in deeper}) elif isinstance(val, float) and round_the_float: out[key] = format(val, float_round_format_str) else: out[key] = val return out def obj_to_json(obj: Any, sort_keys: bool = False, indent: Optional[int] = None) -> Union[bytes, None]: if obj is None: return None return json.dumps( obj, default=datetime_handler, ensure_ascii=False, sort_keys=sort_keys, indent=indent, ).encode("utf8") def obj_to_markdown( obj: Any, level: int = 1, str_enclose_start: Optional[str] = None, str_enclose_end: Optional[str] = None, ) -> str: key_prefix = "*" * level markdowns = [] if is_collection(obj): add_key = True if hasattr(obj, "__dict__"): item_view = obj.__dict__.items() elif isinstance(obj, dict): item_view = obj.items() else: add_key = False item_view = enumerate(obj) for key, val in item_view: if add_key: header = f"{key_prefix} {key}" else: header = key_prefix if is_collection(val): child_markdown = obj_to_markdown( obj=val, level=level + 1, str_enclose_start=str_enclose_start, str_enclose_end=str_enclose_end, ) markdowns.append(f"{header}\n{child_markdown}") elif str_enclose_start is not None and isinstance(val, str): markdowns.append( f"{header}:\n{str_enclose_start}{val}{str_enclose_end}" ) else: markdowns.append(f"{header}: {val}") elif str_enclose_start is not None and isinstance(obj, str): markdowns.append(f"{key_prefix}:\n{str_enclose_start}{obj}{str_enclose_end}") else: markdowns.append(f"{key_prefix}: {obj}") return "\n".join(markdowns) def is_collection(obj: Any) -> bool: return isinstance(obj, (dict, list)) or hasattr(obj, "__dict__") # Copied from searchtweets-v2 and bit modified def convert_utc_time(datetime_str: str) -> datetime: """ Handles datetime argument conversion to the Labs API format, which is `YYYY-MM-DDTHH:mm:ssZ`. Flexible passing of date formats in the following types:: - YYYYmmDDHHMM - YYYY-mm-DD - YYYY-mm-DD HH:MM - YYYY-mm-DDTHH:MM - 2m (set start_time to two months ago) - 3d (set start_time to three days ago) - 12h (set start_time to twelve hours ago) - 15m (set start_time to fifteen minutes ago) Args: datetime_str (str): valid formats are listed above. Returns: string of ISO formatted date. """ try: if len(datetime_str) <= 5: _date = datetime.utcnow() # parse out numeric character. num = int(datetime_str[:-1]) if "d" in datetime_str: _date = _date + relativedelta(days=-num) elif "h" in datetime_str: _date = _date + relativedelta(hours=-num) elif "m" in datetime_str: _date = _date + relativedelta(minutes=-num) elif "M" in datetime_str: _date = _date + relativedelta(months=-num) elif "Y" in datetime_str: _date = _date + relativedelta(years=-num) elif not {"-", ":"} & set(datetime_str): _date = datetime.strptime(datetime_str, "%Y%m%d%H%M") elif "T" in datetime_str: _date = datetime.strptime(datetime_str, DATETIME_STRING_PATTERN) else: _date = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M") except ValueError: _date = datetime.strptime(datetime_str, "%Y-%m-%d") return _date.replace(tzinfo=timezone.utc) def convert_datetime_str_to_epoch(datetime_str: str) -> Optional[int]: parsed_datetime = dateparser.parse(datetime_str) if not parsed_datetime: return None unix_timestamp = time.mktime(parsed_datetime.timetuple()) return math.trunc(unix_timestamp) def tag_visible(element: Any) -> bool: if element.parent.name in [ "style", "script", "head", "title", "meta", "[document]", ]: return False if isinstance(element, Comment): return False return True def text_from_html(body: Union[str, bytes]) -> str: soup = BeautifulSoup(body, "html.parser") texts = soup.findAll(text=True) visible_texts = filter(tag_visible, texts) return " ".join(t.strip() for t in visible_texts) def dict_to_object( dictionary: Dict[str, Any], class_name_key: Optional[str] = "_target_", full_class_name: Optional[str] = None, ) -> Any: new_dict: Dict[str, Any] = dict() for k, v in dictionary.items(): if k == class_name_key: full_class_name = v elif isinstance(v, Dict): new_dict[k] = dict_to_object(dictionary=v, class_name_key=class_name_key) else: new_dict[k] = v if full_class_name is None: return new_dict module_name, class_name = tuple(full_class_name.rsplit(".", 1)) module = import_module(module_name) class_ref = getattr(module, class_name) return class_ref(**new_dict) def datetime_handler(x: Any) -> Optional[Any]: if x is None: return None elif isinstance(x, datetime): return x.isoformat() return vars(x) if hasattr(x, "__dict__") else x