Create preprocessing.py
Browse files- utils/preprocessing.py +283 -0
utils/preprocessing.py
ADDED
@@ -0,0 +1,283 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from haystack.nodes.base import BaseComponent
|
2 |
+
from haystack.schema import Document
|
3 |
+
from haystack.nodes import PDFToTextOCRConverter, PDFToTextConverter
|
4 |
+
from haystack.nodes import TextConverter, DocxToTextConverter, PreProcessor
|
5 |
+
from typing import Callable, Dict, List, Optional, Text, Tuple, Union
|
6 |
+
from typing_extensions import Literal
|
7 |
+
import pandas as pd
|
8 |
+
import logging
|
9 |
+
import re
|
10 |
+
import string
|
11 |
+
from haystack.pipelines import Pipeline
|
12 |
+
|
13 |
+
def useOCR(file_path: str)-> Text:
|
14 |
+
"""
|
15 |
+
Converts image pdfs into text, Using the Farm-haystack[OCR]
|
16 |
+
|
17 |
+
Params
|
18 |
+
----------
|
19 |
+
file_path: file_path of uploade file, returned by add_upload function in
|
20 |
+
uploadAndExample.py
|
21 |
+
|
22 |
+
Returns the text file as string.
|
23 |
+
"""
|
24 |
+
|
25 |
+
|
26 |
+
converter = PDFToTextOCRConverter(remove_numeric_tables=True,
|
27 |
+
valid_languages=["eng"])
|
28 |
+
docs = converter.convert(file_path=file_path, meta=None)
|
29 |
+
return docs[0].content
|
30 |
+
|
31 |
+
|
32 |
+
|
33 |
+
|
34 |
+
class FileConverter(BaseComponent):
|
35 |
+
"""
|
36 |
+
Wrapper class to convert uploaded document into text by calling appropriate
|
37 |
+
Converter class, will use internally haystack PDFToTextOCR in case of image
|
38 |
+
pdf. Cannot use the FileClassifier from haystack as its doesnt has any
|
39 |
+
label/output class for image.
|
40 |
+
1. https://haystack.deepset.ai/pipeline_nodes/custom-nodes
|
41 |
+
2. https://docs.haystack.deepset.ai/docs/file_converters
|
42 |
+
3. https://github.com/deepset-ai/haystack/tree/main/haystack/nodes/file_converter
|
43 |
+
4. https://docs.haystack.deepset.ai/reference/file-converters-api
|
44 |
+
"""
|
45 |
+
|
46 |
+
outgoing_edges = 1
|
47 |
+
|
48 |
+
def run(self, file_name: str , file_path: str, encoding: Optional[str]=None,
|
49 |
+
id_hash_keys: Optional[List[str]] = None,
|
50 |
+
) -> Tuple[dict,str]:
|
51 |
+
""" this is required method to invoke the component in
|
52 |
+
the pipeline implementation.
|
53 |
+
|
54 |
+
Params
|
55 |
+
----------
|
56 |
+
file_name: name of file
|
57 |
+
file_path: file_path of uploade file, returned by add_upload function in
|
58 |
+
uploadAndExample.py
|
59 |
+
|
60 |
+
See the links provided in Class docstring/description to see other params
|
61 |
+
|
62 |
+
Return
|
63 |
+
---------
|
64 |
+
output: dictionary, with key as identifier and value could be anything
|
65 |
+
we need to return. In this case its the List of Hasyatck Document
|
66 |
+
|
67 |
+
output_1: As there is only one outgoing edge, we pass 'output_1' string
|
68 |
+
"""
|
69 |
+
try:
|
70 |
+
if file_name.endswith('.pdf'):
|
71 |
+
converter = PDFToTextConverter(remove_numeric_tables=True)
|
72 |
+
if file_name.endswith('.txt'):
|
73 |
+
converter = TextConverter(remove_numeric_tables=True)
|
74 |
+
if file_name.endswith('.docx'):
|
75 |
+
converter = DocxToTextConverter()
|
76 |
+
except Exception as e:
|
77 |
+
logging.error(e)
|
78 |
+
return
|
79 |
+
|
80 |
+
|
81 |
+
|
82 |
+
documents = []
|
83 |
+
|
84 |
+
|
85 |
+
# encoding is empty, probably should be utf-8
|
86 |
+
document = converter.convert(
|
87 |
+
file_path=file_path, meta=None,
|
88 |
+
encoding=encoding, id_hash_keys=id_hash_keys
|
89 |
+
)[0]
|
90 |
+
|
91 |
+
text = document.content
|
92 |
+
|
93 |
+
# in case of scanned/images only PDF the content might contain only
|
94 |
+
# the page separator (\f or \x0c). We check if is so and use
|
95 |
+
# use the OCR to get the text.
|
96 |
+
filtered = re.sub(r'\x0c', '', text)
|
97 |
+
|
98 |
+
if filtered == "":
|
99 |
+
logging.info("Using OCR")
|
100 |
+
text = useOCR(file_path)
|
101 |
+
|
102 |
+
documents.append(Document(content=text,
|
103 |
+
meta={"name": file_name},
|
104 |
+
id_hash_keys=id_hash_keys))
|
105 |
+
|
106 |
+
logging.info('file conversion succesful')
|
107 |
+
output = {'documents': documents}
|
108 |
+
return output, 'output_1'
|
109 |
+
|
110 |
+
def run_batch():
|
111 |
+
"""
|
112 |
+
we dont have requirement to process the multiple files in one go
|
113 |
+
therefore nothing here, however to use the custom node we need to have
|
114 |
+
this method for the class.
|
115 |
+
"""
|
116 |
+
|
117 |
+
return
|
118 |
+
|
119 |
+
|
120 |
+
def basic(s:str, remove_punc:bool = False):
|
121 |
+
|
122 |
+
"""
|
123 |
+
Performs basic cleaning of text.
|
124 |
+
Params
|
125 |
+
----------
|
126 |
+
s: string to be processed
|
127 |
+
removePunc: to remove all Punctuation including ',' and '.' or not
|
128 |
+
|
129 |
+
Returns: processed string: see comments in the source code for more info
|
130 |
+
"""
|
131 |
+
|
132 |
+
# Remove URLs
|
133 |
+
s = re.sub(r'^https?:\/\/.*[\r\n]*', ' ', s, flags=re.MULTILINE)
|
134 |
+
s = re.sub(r"http\S+", " ", s)
|
135 |
+
|
136 |
+
# Remove new line characters
|
137 |
+
s = re.sub('\n', ' ', s)
|
138 |
+
|
139 |
+
# Remove punctuations
|
140 |
+
if remove_punc == True:
|
141 |
+
translator = str.maketrans(' ', ' ', string.punctuation)
|
142 |
+
s = s.translate(translator)
|
143 |
+
# Remove distracting single quotes and dotted pattern
|
144 |
+
s = re.sub("\'", " ", s)
|
145 |
+
s = s.replace("..","")
|
146 |
+
|
147 |
+
return s.strip()
|
148 |
+
|
149 |
+
def paraLengthCheck(paraList, max_len = 100):
|
150 |
+
"""
|
151 |
+
There are cases where preprocessor cannot respect word limit, when using
|
152 |
+
respect sentence boundary flag due to missing sentence boundaries.
|
153 |
+
Therefore we run one more round of split here for those paragraphs
|
154 |
+
|
155 |
+
Params
|
156 |
+
---------------
|
157 |
+
paraList : list of paragraphs/text
|
158 |
+
max_len : max length to be respected by sentences which bypassed
|
159 |
+
preprocessor strategy
|
160 |
+
|
161 |
+
"""
|
162 |
+
new_para_list = []
|
163 |
+
for passage in paraList:
|
164 |
+
# check if para exceeds words limit
|
165 |
+
if len(passage.content.split()) > max_len:
|
166 |
+
# we might need few iterations example if para = 512 tokens
|
167 |
+
# we need to iterate 5 times to reduce para to size limit of '100'
|
168 |
+
iterations = int(len(passage.content.split())/max_len)
|
169 |
+
for i in range(iterations):
|
170 |
+
temp = " ".join(passage.content.split()[max_len*i:max_len*(i+1)])
|
171 |
+
new_para_list.append((temp,passage.meta['page']))
|
172 |
+
temp = " ".join(passage.content.split()[max_len*(i+1):])
|
173 |
+
new_para_list.append((temp,passage.meta['page']))
|
174 |
+
else:
|
175 |
+
# paragraphs which dont need any splitting
|
176 |
+
new_para_list.append((passage.content, passage.meta['page']))
|
177 |
+
|
178 |
+
logging.info("New paragraphs length {}".format(len(new_para_list)))
|
179 |
+
return new_para_list
|
180 |
+
|
181 |
+
class UdfPreProcessor(BaseComponent):
|
182 |
+
"""
|
183 |
+
class to preprocess the document returned by FileConverter. It will check
|
184 |
+
for splitting strategy and splits the document by word or sentences and then
|
185 |
+
synthetically create the paragraphs.
|
186 |
+
1. https://docs.haystack.deepset.ai/docs/preprocessor
|
187 |
+
2. https://docs.haystack.deepset.ai/reference/preprocessor-api
|
188 |
+
3. https://github.com/deepset-ai/haystack/tree/main/haystack/nodes/preprocessor
|
189 |
+
"""
|
190 |
+
outgoing_edges = 1
|
191 |
+
|
192 |
+
def run(self, documents:List[Document], remove_punc:bool=False,
|
193 |
+
split_by: Literal["sentence", "word"] = 'sentence',
|
194 |
+
split_length:int = 2, split_respect_sentence_boundary:bool = False,
|
195 |
+
split_overlap:int = 0):
|
196 |
+
|
197 |
+
""" this is required method to invoke the component in
|
198 |
+
the pipeline implementation.
|
199 |
+
|
200 |
+
Params
|
201 |
+
----------
|
202 |
+
documents: documents from the output dictionary returned by Fileconverter
|
203 |
+
remove_punc: to remove all Punctuation including ',' and '.' or not
|
204 |
+
split_by: document splitting strategy either as word or sentence
|
205 |
+
split_length: when synthetically creating the paragrpahs from document,
|
206 |
+
it defines the length of paragraph.
|
207 |
+
split_respect_sentence_boundary: Used when using 'word' strategy for
|
208 |
+
splititng of text.
|
209 |
+
split_overlap: Number of words or sentences that overlap when creating
|
210 |
+
the paragraphs. This is done as one sentence or 'some words' make sense
|
211 |
+
when read in together with others. Therefore the overlap is used.
|
212 |
+
|
213 |
+
Return
|
214 |
+
---------
|
215 |
+
output: dictionary, with key as identifier and value could be anything
|
216 |
+
we need to return. In this case the output will contain 4 objects
|
217 |
+
the paragraphs text list as List, Haystack document, Dataframe and
|
218 |
+
one raw text file.
|
219 |
+
|
220 |
+
output_1: As there is only one outgoing edge, we pass 'output_1' string
|
221 |
+
|
222 |
+
"""
|
223 |
+
|
224 |
+
if split_by == 'sentence':
|
225 |
+
split_respect_sentence_boundary = False
|
226 |
+
|
227 |
+
else:
|
228 |
+
split_respect_sentence_boundary = split_respect_sentence_boundary
|
229 |
+
|
230 |
+
preprocessor = PreProcessor(
|
231 |
+
clean_empty_lines=True,
|
232 |
+
clean_whitespace=True,
|
233 |
+
clean_header_footer=True,
|
234 |
+
split_by=split_by,
|
235 |
+
split_length=split_length,
|
236 |
+
split_respect_sentence_boundary= split_respect_sentence_boundary,
|
237 |
+
split_overlap=split_overlap,
|
238 |
+
|
239 |
+
# will add page number only in case of PDF not for text/docx file.
|
240 |
+
add_page_number=True
|
241 |
+
)
|
242 |
+
|
243 |
+
for i in documents:
|
244 |
+
# # basic cleaning before passing it to preprocessor.
|
245 |
+
# i = basic(i)
|
246 |
+
docs_processed = preprocessor.process([i])
|
247 |
+
for item in docs_processed:
|
248 |
+
item.content = basic(item.content, remove_punc= remove_punc)
|
249 |
+
|
250 |
+
df = pd.DataFrame(docs_processed)
|
251 |
+
all_text = " ".join(df.content.to_list())
|
252 |
+
para_list = df.content.to_list()
|
253 |
+
logging.info('document split into {} paragraphs'.format(len(para_list)))
|
254 |
+
output = {'documents': docs_processed,
|
255 |
+
'dataframe': df,
|
256 |
+
'text': all_text,
|
257 |
+
'paraList': para_list
|
258 |
+
}
|
259 |
+
return output, "output_1"
|
260 |
+
def run_batch():
|
261 |
+
"""
|
262 |
+
we dont have requirement to process the multiple files in one go
|
263 |
+
therefore nothing here, however to use the custom node we need to have
|
264 |
+
this method for the class.
|
265 |
+
"""
|
266 |
+
return
|
267 |
+
|
268 |
+
def processingpipeline():
|
269 |
+
"""
|
270 |
+
Returns the preprocessing pipeline. Will use FileConverter and UdfPreProcesor
|
271 |
+
from utils.preprocessing
|
272 |
+
"""
|
273 |
+
|
274 |
+
preprocessing_pipeline = Pipeline()
|
275 |
+
file_converter = FileConverter()
|
276 |
+
custom_preprocessor = UdfPreProcessor()
|
277 |
+
|
278 |
+
preprocessing_pipeline.add_node(component=file_converter,
|
279 |
+
name="FileConverter", inputs=["File"])
|
280 |
+
preprocessing_pipeline.add_node(component = custom_preprocessor,
|
281 |
+
name ='UdfPreProcessor', inputs=["FileConverter"])
|
282 |
+
|
283 |
+
return preprocessing_pipeline
|