martinjosifoski commited on
Commit
bfb6e70
·
1 Parent(s): 74a9992

First commit.

Browse files
ChromaDBFlow.py ADDED
@@ -0,0 +1,68 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from typing import Dict, List, Any
3
+
4
+ import uuid
5
+
6
+ from langchain.embeddings import OpenAIEmbeddings
7
+
8
+ from chromadb import Client as ChromaClient
9
+
10
+ from flows.base_flows import AtomicFlow
11
+
12
+
13
+ class ChromaDBFlow(AtomicFlow):
14
+
15
+ def __init__(self, **kwargs):
16
+ super().__init__(**kwargs)
17
+ self.client = ChromaClient()
18
+ self.collection = self.client.get_or_create_collection(name=self.flow_config["name"])
19
+
20
+ def get_input_keys(self) -> List[str]:
21
+ return self.flow_config["input_keys"]
22
+
23
+ def get_output_keys(self) -> List[str]:
24
+ return self.flow_config["output_keys"]
25
+
26
+ def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
27
+
28
+ api_information = self._get_from_state("api_information")
29
+
30
+ if api_information.backend_used == "openai":
31
+ embeddings = OpenAIEmbeddings(openai_api_key=api_information.api_key)
32
+ else:
33
+ # ToDo: Add support for Azure
34
+ embeddings = OpenAIEmbeddings(openai_api_key=os.getenv("OPENAI_API_KEY"))
35
+ response = {}
36
+
37
+ operation = input_data["operation"]
38
+ if operation not in ["write", "read"]:
39
+ raise ValueError(f"Operation '{operation}' not supported")
40
+
41
+ content = input_data["content"]
42
+ if operation == "read":
43
+ if not isinstance(content, str):
44
+ raise ValueError(f"content(query) must be a string during read, got {type(content)}: {content}")
45
+ if content == "":
46
+ response["retrieved"] = [[""]]
47
+ return response
48
+ query = content
49
+ query_result = self.collection.query(
50
+ query_embeddings=embeddings.embed_query(query),
51
+ n_results=self.flow_config["n_results"]
52
+ )
53
+
54
+ response["retrieved"] = [doc for doc in query_result["documents"]]
55
+
56
+ elif operation == "write":
57
+ if content != "":
58
+ if not isinstance(content, list):
59
+ content = [content]
60
+ documents = content
61
+ self.collection.add(
62
+ ids=[str(uuid.uuid4()) for _ in range(len(documents))],
63
+ embeddings=embeddings.embed_documents(documents),
64
+ documents=documents
65
+ )
66
+ response["retrieved"] = ""
67
+
68
+ return response
ChromaDBFlow.yaml ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ name: chroma_db
2
+ description: ChromaDB is a document store that uses vector embeddings to store and retrieve documents
3
+
4
+ input_keys:
5
+ - operation
6
+ - content
7
+ output_keys:
8
+ - retrieved
9
+
10
+ n_results: 5 # number of results to retrieve when query
README.md ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ---
2
+ license: mit
3
+ ---
4
+ ## Description
5
+ ToDo
6
+
7
+ < Flow description >
8
+
9
+ ## Configuration parameters
10
+
11
+ < Name 1 > (< Type 1 >): < Description 1 >. Required parameter.
12
+
13
+ < Name 2 > (< Type 2 >): < Description 2 >. Default value is: < value 2 >
14
+
15
+ ## Input interface
16
+
17
+ < Name 1 > (< Type 1 >): < Description 1 >.
18
+
19
+ (Note that the interface might depend on the state of the Flow.)
20
+
21
+ ## Output interface
22
+
23
+ < Name 1 > (< Type 1 >): < Description 1 >.
24
+
25
+ (Note that the interface might depend on the state of the Flow.)
VectorStoreFlow.py ADDED
@@ -0,0 +1,84 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from copy import deepcopy
2
+ from typing import Dict, List, Any, Optional
3
+
4
+ import faiss
5
+
6
+ from langchain.docstore import InMemoryDocstore
7
+ from langchain.embeddings import OpenAIEmbeddings
8
+ from langchain.schema import Document
9
+ from langchain.vectorstores import Chroma, FAISS
10
+ from langchain.vectorstores.base import VectorStoreRetriever
11
+
12
+ from flows.base_flows import AtomicFlow
13
+
14
+
15
+ class VectorStoreFlow(AtomicFlow):
16
+ REQUIRED_KEYS_CONFIG = ["type", "api_keys"]
17
+
18
+ vector_db: VectorStoreRetriever
19
+
20
+ def __init__(self, vector_db, **kwargs):
21
+ super().__init__(**kwargs)
22
+ self.vector_db = vector_db
23
+
24
+ @classmethod
25
+ def _set_up_retriever(cls, config: Dict[str, Any]) -> Dict[str, Any]:
26
+ embeddings = OpenAIEmbeddings(openai_api_key=config["api_keys"]["openai"])
27
+ kwargs = {}
28
+
29
+ vs_type = config["type"]
30
+
31
+ if vs_type == "chroma":
32
+ vectorstore = Chroma(config["name"], embedding_function=embeddings)
33
+ elif vs_type == "faiss":
34
+ index = faiss.IndexFlatL2(config.get("embedding_size", 1536))
35
+ vectorstore = FAISS(
36
+ embedding_function=embeddings.embed_query,
37
+ index=index,
38
+ docstore=InMemoryDocstore({}),
39
+ index_to_docstore_id={}
40
+ )
41
+ else:
42
+ raise NotImplementedError(f"Vector store '{vs_type}' not implemented")
43
+
44
+ kwargs["vector_db"] = vectorstore.as_retriever(**config.get("retriever_config", {}))
45
+
46
+ return kwargs
47
+
48
+ @classmethod
49
+ def instantiate_from_config(cls, config: Dict[str, Any]):
50
+ flow_config = deepcopy(config)
51
+
52
+ kwargs = {"flow_config": flow_config}
53
+
54
+ kwargs.update(cls._set_up_retriever(flow_config))
55
+
56
+ return cls(**kwargs)
57
+
58
+ @staticmethod
59
+ def package_documents(documents: List[str]) -> List[Document]:
60
+ # TODO(yeeef): support metadata
61
+ return [Document(page_content=doc, metadata={"": ""}) for doc in documents]
62
+
63
+ def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
64
+ response = {}
65
+
66
+ operation = input_data["operation"]
67
+ assert operation in ["write", "read"], f"Operation '{operation}' not supported"
68
+
69
+ content = input_data["content"]
70
+ if operation == "read":
71
+ assert isinstance(content, str), f"Content must be a string, got {type(content)}"
72
+ query = content
73
+ retrieved_documents = self.vector_db.get_relevant_documents(query)
74
+ response["retrieved"] = [doc.page_content for doc in retrieved_documents]
75
+ elif operation == "write":
76
+ if isinstance(content, str):
77
+ content = [content]
78
+ assert isinstance(content, list), f"Content must be a list of strings, got {type(content)}"
79
+ documents = content
80
+ documents = self.package_documents(documents)
81
+ self.vector_db.add_documents(documents)
82
+ response["retrieved"] = ""
83
+
84
+ return response
VectorStoreFlow.yaml ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ name: "VectorStoreFlow"
2
+ description: "VectorStoreFlow"
3
+
4
+ input_keys:
5
+ - "operation" # read or write
6
+ - "content"
7
+
8
+ output_keys:
9
+ - "retrieved"
10
+
11
+ type: "chroma"
12
+ api_keys:
13
+ openai: "YOUR_OPENAI_API_KEY"
14
+
15
+
__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ from .VectorStoreFlow import VectorStoreFlow
2
+ from .ChromaDBFlow import ChromaDBFlow
pip_requirements.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # ToDo