import os import nltk import yaml import pandas as pd import streamlit as st from txtai.embeddings import Documents, Embeddings from txtai.pipeline import Segmentation, Summary, Tabular, Textractor, Translation from txtai.workflow import ServiceTask, Task, UrlTask, Workflow class Process: @staticmethod @st.cache(ttl=60 * 60, max_entries=3, allow_output_mutation=True, show_spinner=False) def get(components, data): """ Lookup or creates a new workflow process instance """ process = Process(data) with st.spinner("Building workflow...."): process.build(components) return process def __init__(self, data): """ Create new Process """ self.components = {} self.pipelines = {} self. workflow = [] self.embeddings = None self.documents = None self.data = data def build(self, components): """ Builds a workflow using components """ tasks = [] for component in components: component = dict(component) wtype = component.pop(type) self.components[wtype] = component if wtype == "embeddings": self.embeddings = Embeddings({**component}) self.documents = Documents() tasks.append(Task(self.documents.add, unpack=False)) elif wtype == "segmentation": self.pipelines[wtype] = Segmentation(**self.components[wtype]) tasks.append(Task(self.pipelines[wtype])) elif wtype == "service": tasks.append(ServiceTask(**self.components[wtype])) elif wtype == "summary": self.pipelines[wtype] = Summary(component.pop("path")) tasks.append(Task(lambda x: self.pipelines["summary"](x, **self.components["summary"]))) elif wtype == "tabular": self.pipelines[wtype] = Tabular(**self.components[wtype]) tasks.append(Task(self.pipelines[wtype])) elif wtype == "textractor": self.pipelines[wtype] = Textractor(**self.components[wtype]) tasks.append(UrlTask(self.pipelines[wtype])) elif wtype == "translation": self.pipelines[wtype] = Translation() tasks.append(Task(lambda x: self.pipelines["translation"](x, **self.components["translation"]))) self.workflow = Workflow(tasks) def run(self, data): """ Runs a workflow using data as input """ if data and self.workflow: # Builds tuples for embedding index if self.documents: data = [(x, element, None) for x, element in enumerate(data)] # Process workflow for result in self.workflow(data): if not self.documents: st.write(result) # Build embedding index if self.documents: # Cache data self.data = list(self.documents) with st.spinner("Building embedding index...."): self.embeddings.index(self.documents) self.documents.close() # Clear workflow self.documents, self.pipelines, self.workflow = None, None, None def search(self, query): """ Runs a search for query """ if self.embeddings and query: st.markdown( """ """, unsafe_allow_html=True, ) limit = min(5, len(self.data)) results = [] for result in self.embeddings.search(query, limit): # Tuples are returned when an index doesn't have stored content if isinstance(result, tuple): uid, score = result results.append({"text": self.find(uid), "score": f"{score:.2}"}) else: if "id" in result and "text" in result: result["text"] = self.content(result.pop("id"), result["text"]) if "score" in result and result["score"]: result["score"] = f'{result["score"]:.2}' results.append(result) df = pd.DataFrame(results) st.write(df.to_html(escape=False), unsafe_allow_html=True) def find(self, key): """ Lookup record from cached data by uid key """ # Lookup text by id text = [text for uid, text, _ in self.data if uid == key][0] return self.content(key, text) def content(self, uid, text): """ Builds a content reference for uid and text """ if uid and uid.lower().startswith("http"): return f"{text}" return text class Application: """ Main application """ def __init__(self, directory): """ Creates a new application """ # Workflow configuration directory self.directory = directory def default(self, names): """ Gets default workflow index """ # Gets names as lowercase to match case sensitive lnames = [name.lower() for name in names] # Get default workflow param params = st.experimental_get_query_params() index = params.get("default") index = index[0].lower() if index else 0 # Lookup index of workflow name, add 1 to account for "--" if index and index in lnames: return lnames.index(index) + 1 # Workflow not found, default to index 0 return 0 def load(self, components): """ Load an existing workflow file """ with open(os.path.join(self.directory, "config.yml"), encoding="utf-8") as f: config = yaml.safe_load(f) names = [row["name"] for row in config] files = [row["file"] for row in config] selected = st.selectbox("Load workflow", ["--"] + names, self.default(names)) if selected != "--": index = [x for x, name in enumerate(names) if name == selected][0] with open(os.path.join(self.directory, files[index]), encoding="utf-8") as f: workflow = yaml.safe_load(f) st.markdown("---") # Get tasks for first workflow tasks = list(workflow["workflow"].values())[0]["tasks"] selected = [] for task in tasks: name = task.get("action", task.get("task")) if name in components: selected.append(name) elif name in ["index", "upsert"]: selected.append("embeddings") return (selected, workflow) return (None, None) def state(self, key): """ Lookup a session state variable """ if key in st.session_state: return st.session_state[key] return None def appsetting(self, workflow, name): """ Looks up an application configuration setting """ if workflow: config = workflow.get("app") if config: return config.get(name) return None def setting(self, config, name, default=None): """ Looks up a component configuration settings """ return config.get(name, default) if config else default def text(self, label, component, config, name, default=None): """ Create a new text input field """ default = self.setting(config, name, default) if not default: default = "" elif isinstance(default, list): default = ",".join(default) elif isinstance(default, dict): default = ",".join(default.keys()) st.caption(label) st.code(default, language="yaml") return default def number(self, label, component, config, name, default=None): """ Creates a new numeric input field """ value = self.text(label, component, config, name, default) return int(value) if value else None def boolean(self, label, component, config, name, default=None): """ Creates a new checkbox field """ default = self.setting(config, name, default) st.caption(label) st.markdown(":white_check_mark:" if default else ":white_large_square:") return default def select(self, label, component, config, name, options, default=0): """ Creates a new select box field """ index = self.setting(config, name) index = [x for x, option in enumerate(options) if option == default] # Derive default index default = index[0] if index else default st.caption(label) st.code(options[default], language="yaml") return options[default] def split(self, text): """ Splits text on commas and returns a list """ return [x.strip() for x in text.split(",")] def options(self, component, workflow, index): """ Extracts component settings into a component configuration dict """ options = {"type": component} config = None if workflow: if component in ["service", "translation"]: tasks = list(workflow["workflow"].values())[0]["tasks"] tasks = [task for task in tasks if task.get("task") == component or task.get("action") == component] if tasks: config = tasks[0] else: config = workflow.get(component) if component == "embeddings": st.markdown(f"** {index + 1}.) Embeddings Index** \n*Index workflow output*") options["path"] = self.text("Embeddings model path", component, config, "path", "sentence-transformers/nli-mpnet-base-v2") options["upsert"] = self.boolean("Upsert", component, config, "upsert") options["content"] = self.boolean("Content", component, config, "content") elif component in ("segmentation", "textractor"): if component == "segmentation": st.markdown(f"** {index + 1}.) Segment** \n*Split text into semantic units*") else: st.markdown(f"** {index + 1}.) Textract** \n*Extract text from documents") options["sentences"] = self.boolean("Split sentences", component, config, "sentences") options["lines"] = self.boolean("Split lines", component, config, "lines") options["paragraphs"] = self.boolean("Split paragraphs", component, config, "paragraphs") options["joint"] = self.boolean("Join tokenized", component, config, "join") options["minlength"] = self.number("Min section length", component, config, "minlength") elif component == "service": st.markdown(f"** {index + 1}.) Service** \n*Extract data from an API*") options["url"] = self.text("URL", component, config, "url") options["method"] = self.select("Method", component, config, "method", ["get", "post"], 0) options["params"] = self.text("URL parameters", component, config, "params") options["batch"] = self.boolean("Run as batch", component, config, "batch", True) options["extract"] = self.text("Subsection(s) to extract", component, config, "extract") if options["params"]: options["params"] = {key: None for key in self.split(options["params"])} if options["extract"]: options["extract"] = self.split(options["extract"]) elif component == "summary": st.markdown(f"** {index + 1}.) Summary** \n*Abstractive text summarization*") options["path"] = self.text("Model", component, config, "path", "sshleifer/distilbart-cnn-12-6") options["minlength"] = self.number("Min length", component, config, "minlength") options["maxlength"] = self.number("Max length", component, config, "maxlength") elif component == "tabular": st.markdown(f"** {index + 1}.) Tabular** \n*Split tabular data into rows and columns*") options["idcolumn"] = self.text("Id columns", component, config, "idcolumn") options["textcolumns"] = self.text("Text columns", component, config, "textcolumns") options["content"] = self.text("Content", component, config, "content") if options["textcolumns"]: options["textcolumns"] = self.split(options["textcolumns"]) if options["content"]: options["content"] = self.split(options["content"]) if len(options["content"]) == 1 and options["content"][0] == "1": options["content"] = options["content"][0] elif component == "translation": st.markdown(f"** {index + 1}.) Translate** \n*Machine translation*") options["target"] = self.text("Target language code", component, config, "args", "en") st.markdown("---") return options def yaml(self, components): """ Builds yaml string for components """ data = {"app": {"data": self.state("data"), "query": self.state("query")}} tasks = [] name = None for component in components: component = dict(component) name = wtype = component.pop("type") if wtype == "embeddings": upsert = component.pop("upsert") data[wtype] = component data["writable"] = True name = "index" tasks.append({"action": "upsert" if upsert else "index"}) elif wtype == "segmentation": data[wtype] = component tasks.append({"action": wtype}) elif wtype == "service": config = dict(**component) config["task"] = wtype tasks.append(config) elif wtype == "summary": data[wtype] = {"path": component.pop("path")} tasks.append({"action": wtype}) elif wtype == "tabular": data[wtype] = component tasks.append({"action": wtype}) elif wtype == "textractor": data[wtype] = component tasks.append({"action": wtype, "tasks": "url"}) elif wtype == "translation": data[wtype] = component tasks.append({"action": wtype, "args": list(component.values())}) # Add in workflow data["workflow"] = {name: {"tasks": tasks}} return (name, yaml.dump(data)) def data(self, workflow): """ Gets input data """ # Get default data setting data = self.appsetting(workflow, "data") if not self.appsetting(workflow, "query"): data = st.text_input("Input", value=data) # Save data state st.session_state["data"] = data # Wrap data as list for workflow processing return [data] def query(self, workflow, index): """ Gets input query """ default = self.appsetting(workflow, "query") default = default if default else "" # Get query if this is an indexing workflow query = st.text_input("Query", value=default) if index else None # Save query state st.session_state["query"] = query return query def process(self, workflow, components, index): """ Processes the current application action """ # Get input data and initialize query data = self.data(workflow) query = self.query(workflow, index) # Get workflow process process = Process.get(components, data if index else None) # Run workflow process process.run(data) # Run search if index: process.search(query) def run(self): """ Runs Streamlit application """ with st.sidebar: st.markdown("# Workflow builder for Station \n*Build and apply workflows to data about articles* ") st.markdown("This is a demo for Station and the data used is from [Hugging Face](https://huggingface.co/datasets/ag_news/viewer/default/train).") st.markdown("---") # Component configuration components = ["embeddings", "segmentation", "service", "summary", "tabular", "textractor", "translation"] selected, workflow = self.load(components) if selected: # Get selected options components = [self.options(component, workflow, x) for x, component in enumerate(selected)] if selected: # Process current action self.process(workflow, components, "embeddings" in selected) with st.sidebar: # Generate export button after workflow is complete _, config = self.yaml(components) st.download_button("Export", config, file_name="workflow.yaml", help="Export the API workflow as YAML") else: st.info("Selected a workflow from the sidebar") if __name__ == "__main__": os.environ["TOKENIZERS_PARALLELISM"] = "false" try: nltk.sent_tokenize("This is a test. Split") except: nltk.download("punkt") # Create and run application app = Application("workflows") app.run()