Spaces:
Runtime error
Runtime error
#!/usr/bin/env python | |
# coding: utf-8 | |
# In[ ]: | |
import os | |
import sys | |
from random import randint | |
import random | |
import time | |
from datetime import datetime | |
import re, string, unicodedata | |
import nltk | |
import contractions | |
import inflect | |
from bs4 import BeautifulSoup | |
from nltk import word_tokenize, sent_tokenize | |
from nltk.corpus import stopwords | |
from nltk.stem.isri import ISRIStemmer | |
from nltk.stem.porter import PorterStemmer | |
from nltk.stem.snowball import SnowballStemmer | |
from nltk.stem import LancasterStemmer, WordNetLemmatizer | |
from nltk.tag import StanfordNERTagger | |
from nltk.tokenize import word_tokenize, sent_tokenize | |
import spacy | |
import torch | |
from collections import defaultdict | |
import pickle | |
import numpy as np | |
import re | |
sys.path.append(os.path.abspath("../lib")) | |
from util import * | |
from mlutil import * | |
lcc = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k","l","m","n","o", | |
"p","q","r","s","t","u","v","w","x","y","z"] | |
ucc = ["A","B","C","D","E","F","G","H","I","J","K","L","M", "N","O","P","Q","R","S","T","U","V","W","X","Y","Z"] | |
dig = ["0","1","2","3","4","5","6","7","8","9"] | |
spc = ["@","#","$","%","^","&","*","(",")","_","+","{","}","[","]","|",":","<",">","?",";",",","."] | |
class TextPreProcessor: | |
""" | |
text preprocessor | |
""" | |
def __init__(self, stemmer = "lancaster", verbose=False): | |
self.verbose = verbose | |
self.lemmatizer = WordNetLemmatizer() | |
def stripHtml(self, text): | |
soup = BeautifulSoup(text, "html.parser") | |
return soup.get_text() | |
def removeBetweenSquareBrackets(self, text): | |
return re.sub('\[[^]]*\]', '', text) | |
def denoiseText(self, text): | |
text = stripHtml(text) | |
text = removeBetweenSquareBrackets(text) | |
return text | |
def replaceContractions(self, text): | |
"""Replace contractions in string of text""" | |
return contractions.fix(text) | |
def tokenize(self, text): | |
words = nltk.word_tokenize(text) | |
return words | |
def removeNonAscii(self, words): | |
"""Remove non-ASCII characters from list of tokenized words""" | |
newWords = [] | |
for word in words: | |
if isinstance(word, unicode): | |
newWord = unicodedata.normalize('NFKD', word).encode('ascii', 'ignore') | |
else: | |
newWord = word | |
newWords.append(newWord) | |
return newWords | |
def replaceNonAsciiFromText(self, text): | |
""" replaces non ascii with blank """ | |
return ''.join([i if ord(i) < 128 else ' ' for i in text]) | |
def removeNonAsciiFromText(self, text): | |
""" replaces non ascii with blank """ | |
return ''.join([i if ord(i) < 128 else '' for i in text]) | |
def allow(self, words): | |
""" allow only specific charaters """ | |
allowed = [word for word in words if re.match('^[A-Za-z0-9\.\,\:\;\!\?\(\)\'\-\$\@\%\"]+$', word) is not None] | |
return allowed | |
def toLowercase(self, words): | |
"""Convert all characters to lowercase from list of tokenized words""" | |
newWords = [word.lower() for word in words] | |
return newWords | |
def removePunctuation(self, words): | |
"""Remove punctuation from list of tokenized words""" | |
newWords = [] | |
for word in words: | |
newWord = re.sub(r'[^\w\s]', '', word) | |
if newWord != '': | |
newWords.append(newWord) | |
return newWords | |
def replaceNumbers(self, words): | |
"""Replace all interger occurrences in list of tokenized words with textual representation""" | |
p = inflect.engine() | |
newWords = [] | |
for word in words: | |
if word.isdigit(): | |
newWord = p.number_to_words(word) | |
newWords.append(newWord) | |
else: | |
newWords.append(word) | |
return newWords | |
def removeStopwords(self, words): | |
"""Remove stop words from list of tokenized words""" | |
newWords = [] | |
for word in words: | |
if word not in stopwords.words('english'): | |
newWords.append(word) | |
return newWords | |
def removeCustomStopwords(self, words, stopWords): | |
"""Remove stop words from list of tokenized words""" | |
removed = [word for word in words if word not in stopWords] | |
return removed | |
def removeLowFreqWords(self, words, minFreq): | |
"""Remove low frewquncy words from list of tokenized words""" | |
frequency = defaultdict(int) | |
for word in words: | |
frequency[word] += 1 | |
removed = [word for word in words if frequency[word] > minFreq] | |
return removed | |
def removeNumbers(self, words): | |
"""Remove numbers""" | |
removed = [word for word in words if not isNumber(word)] | |
return removed | |
def removeShortWords(self, words, minLengh): | |
"""Remove short words """ | |
removed = [word for word in words if len(word) >= minLengh] | |
return removed | |
def keepAllowedWords(self, words, keepWords): | |
"""Keep words from the list only""" | |
kept = [word for word in words if word in keepWords] | |
return kept | |
def stemWords(self, words): | |
"""Stem words in list of tokenized words""" | |
if stemmer == "lancaster": | |
stemmer = LancasterStemmer() | |
elif stemmer == "snowbal": | |
stemmer = SnowballStemmer() | |
elif stemmer == "porter": | |
stemmer = PorterStemmer() | |
stems = [stemmer.stem(word) for word in words] | |
return stems | |
def lemmatizeWords(self, words): | |
"""Lemmatize tokens in list of tokenized words""" | |
lemmas = [self.lemmatizer.lemmatize(word) for word in words] | |
return lemmas | |
def lemmatizeVerbs(self, words): | |
"""Lemmatize verbs in list of tokenized words""" | |
lemmas = [self.lemmatizer.lemmatize(word, pos='v') for word in words] | |
return lemmas | |
def normalize(self, words): | |
words = self.removeNonAscii(words) | |
words = self.toLowercase(words) | |
words = self.removePunctuation(words) | |
words = self.replaceNumbers(words) | |
words = self.removeStopwords(words) | |
return words | |
def posTag(self, textTokens): | |
tags = nltk.pos_tag(textTokens) | |
return tags | |
def extractEntity(self, textTokens, classifierPath, jarPath): | |
st = StanfordNERTagger(classifierPath, jarPath) | |
entities = st.tag(textTokens) | |
return entities | |
def documentFeatures(self, document, wordFeatures): | |
documentWords = set(document) | |
features = {} | |
for word in wordFeatures: | |
features[word] = (word in documentWords) | |
return features | |
class NGram: | |
""" | |
word ngram | |
""" | |
def __init__(self, vocFilt, verbose=False): | |
""" | |
initialize | |
""" | |
self.vocFilt = vocFilt | |
self.nGramCounter = dict() | |
self.nGramFreq = dict() | |
self.corpSize = 0 | |
self.vocabulary = set() | |
self.freqDone = False | |
self.verbose = verbose | |
self.vecWords = None | |
self.nonZeroCount = 0 | |
def countDocNGrams(self, words): | |
""" | |
count words in a doc | |
""" | |
if self.verbose: | |
print ("doc size " + str(len(words))) | |
nGrams = self.toNGram(words) | |
for nGram in nGrams: | |
count = self.nGramCounter.get(nGram, 0) | |
self.nGramCounter[nGram] = count + 1 | |
self.corpSize += 1 | |
self.vocabulary.update(words) | |
def remLowCount(self, minCount): | |
""" | |
removes items with count below threshold | |
""" | |
self.nGramCounter = dict(filter(lambda item: item[1] >= minCount, self.nGramCounter.items())) | |
def getVocabSize(self): | |
""" | |
get vocabulary size | |
""" | |
return len(self.nGramCounter) | |
def getNGramFreq(self): | |
""" | |
get normalized count | |
""" | |
if self.verbose: | |
print ("counter size " + str(len(self.nGramCounter))) | |
if not self.freqDone: | |
for item in self.nGramCounter.items(): | |
self.nGramFreq[item[0]] = float(item[1]) / self.corpSize | |
self.freqDone = True | |
return self.nGramFreq | |
def getNGramIndex(self, show): | |
""" | |
convert to list | |
""" | |
if self.vecWords is None: | |
self.vecWords = list(self.nGramCounter) | |
if show: | |
for vw in enumerate(self.vecWords): | |
print(vw) | |
def getVector(self, words, byCount, normalized): | |
""" | |
convert to vector | |
""" | |
if self.vecWords is None: | |
self.vecWords = list(self.nGramCounter) | |
nGrams = self.toNGram(words) | |
if self.verbose: | |
print("vocabulary size {}".format(len(self.vecWords))) | |
print("ngrams") | |
print(nGrams) | |
self.nonZeroCount = 0 | |
vec = list(map(lambda vw: self.getVecElem(vw, nGrams, byCount, normalized), self.vecWords)) | |
return vec | |
def getVecElem(self, vw, nGrams, byCount, normalized): | |
""" | |
get vector element | |
""" | |
if vw in nGrams: | |
if byCount: | |
if normalized: | |
el = self.nGramFreq[vw] | |
else: | |
el = self.nGramCounter[vw] | |
else: | |
el = 1 | |
self.nonZeroCount += 1 | |
else: | |
if (byCount and normalized): | |
el = 0.0 | |
else: | |
el = 0 | |
return el | |
def getNonZeroCount(self): | |
""" | |
get non zero vector element count | |
""" | |
return self.nonZeroCount | |
def toBiGram(self, words): | |
""" | |
convert to bigram | |
""" | |
if self.verbose: | |
print ("doc size " + str(len(words))) | |
biGrams = list() | |
for i in range(len(words)-1): | |
w1 = words[i] | |
w2 = words[i+1] | |
if self.vocFilt is None or (w1 in self.vocFilt and w2 in self.vocFilt): | |
nGram = (w1, w2) | |
biGrams.append(nGram) | |
return biGrams | |
def toTriGram(self, words): | |
""" | |
convert to trigram | |
""" | |
if self.verbose: | |
print ("doc size " + str(len(words))) | |
triGrams = list() | |
for i in range(len(words)-2): | |
w1 = words[i] | |
w2 = words[i+1] | |
w3 = words[i+2] | |
if self.vocFilt is None or (w1 in self.vocFilt and w2 in self.vocFilt and w3 in self.vocFilt): | |
nGram = (w1, w2, w3) | |
triGrams.append(nGram) | |
return triGrams | |
def save(self, saveFile): | |
""" | |
save | |
""" | |
sf = open(saveFile, "wb") | |
pickle.dump(self, sf) | |
sf.close() | |
def load(saveFile): | |
""" | |
load | |
""" | |
sf = open(saveFile, "rb") | |
nGrams = pickle.load(sf) | |
sf.close() | |
return nGrams | |
class CharNGram: | |
""" | |
character n gram | |
""" | |
def __init__(self, domains, ngsize, verbose=False): | |
""" | |
initialize | |
""" | |
self.chDomain = list() | |
self.ws = "#" | |
self.chDomain.append(self.ws) | |
for d in domains: | |
if d == "lcc": | |
self.chDomain.extend(lcc) | |
elif d == "ucc": | |
self.chDomain.extend(ucc) | |
elif d == "dig": | |
self.chDomain.extend(dig) | |
elif d == "spc": | |
self.chDomain.extend(spc) | |
else: | |
raise ValueError("invalid character type " + d) | |
self.ngsize = ngsize | |
self.radixPow = None | |
self.cntVecSize = None | |
def addSpChar(self, spChar): | |
""" | |
add special characters | |
""" | |
self.chDomain.extend(spChar) | |
def setWsRepl(self, ws): | |
""" | |
set white space replacement charater | |
""" | |
self.ws = ws | |
self.chDomain[0] = self.ws | |
def finalize(self): | |
""" | |
final setup | |
""" | |
domSize = len(self.chDomain) | |
self.cntVecSize = int(math.pow(domSize, self.ngsize)) | |
if self.radixPow is None: | |
self.radixPow = list() | |
for i in range(self.ngsize-1, 0, -1): | |
self.radixPow.append(int(math.pow(domSize, i))) | |
self.radixPow.append(1) | |
def toMgramCount(self, text): | |
""" | |
get ngram count list | |
""" | |
#print(text) | |
ngCounts = [0] * self.cntVecSize | |
ngram = list() | |
totNgCount = 0 | |
for ch in text: | |
if ch.isspace(): | |
l = len(ngram) | |
if l == 0 or ngram[l-1] != self.ws: | |
ngram.append(self.ws) | |
else: | |
ngram.append(ch) | |
if len(ngram) == self.ngsize: | |
i = self.__getNgramIndex(ngram) | |
assert i < self.cntVecSize, "ngram index out of range index " + str(i) + " size " + str(self.cntVecSize) | |
ngCounts[i] += 1 | |
ngram.clear() | |
totNgCount += 1 | |
return ngCounts | |
def __getNgramIndex(self, ngram): | |
""" | |
get index of an ngram into a list of size equal total number of possible ngrams | |
""" | |
assert len(ngram) == len(self.radixPow), "ngram size mismatch" | |
ngi = 0 | |
for ch, rp in zip(ngram, self.radixPow): | |
i = self.chDomain.index(ch) | |
ngi += i * rp | |
return ngi | |
class TfIdf: | |
""" | |
TF IDF | |
""" | |
def __init__(self, vocFilt, doIdf, verbose=False): | |
""" | |
initialize | |
""" | |
self.vocFilt = vocFilt | |
self.doIdf = doIdf | |
self.wordCounter = {} | |
self.wordFreq = {} | |
self.wordInDocCount = {} | |
self.docCount = 0 | |
self.corpSize = 0 | |
self.freqDone = False | |
self.vocabulary = set() | |
self.wordIndex = None | |
self.verbose = verbose | |
self.vecWords = None | |
def countDocWords(self, words): | |
""" | |
count words in a doc | |
""" | |
if self.verbose: | |
print ("doc size " + str(len(words))) | |
for word in words: | |
if self.vocFilt is None or word in self.vocFilt: | |
count = self.wordCounter.get(word, 0) | |
self.wordCounter[word] = count + 1 | |
self.corpSize += len(words) | |
self.vocabulary.update(words) | |
if (self.doIdf): | |
self.docCount += 1 | |
for word in set(words): | |
self.wordInDocCount.get(word, 0) | |
self.wordInDocCount[word] = count + 1 | |
self.freqDone = False | |
def getWordFreq(self): | |
""" | |
get tfidf for corpus | |
""" | |
if self.verbose: | |
print ("counter size " + str(len(self.wordCounter))) | |
if not self.freqDone: | |
for item in self.wordCounter.items(): | |
self.wordFreq[item[0]] = float(item[1]) / self.corpSize | |
if self.doIdf: | |
for k in self.wordFreq.keys(): | |
self.wordFreq.items[k] *= math.log(self.docCount / self.wordInDocCount.items[k]) | |
self.freqDone = True | |
return self.wordFreq | |
def getCount(self, word): | |
""" | |
get counter | |
""" | |
if word in self.wordCounter: | |
count = self.wordCounter[word] | |
else: | |
raise ValueError("word not found in count table " + word) | |
return count | |
def getFreq(self, word): | |
""" | |
get normalized frequency | |
""" | |
if word in self.wordFreq: | |
freq = self.wordFreq[word] | |
else: | |
raise ValueError("word not found in count table " + word) | |
return freq | |
def resetCounter(self): | |
""" | |
reset counter | |
""" | |
self.wordCounter = {} | |
def buildVocabulary(self, words): | |
""" | |
build vocbulary | |
""" | |
self.vocabulary.update(words) | |
def getVocabulary(self): | |
""" | |
return vocabulary | |
""" | |
return self.vocabulary | |
def creatWordIndex(self): | |
""" | |
index for all words in vcabulary | |
""" | |
self.wordIndex = {word : idx for idx, word in enumerate(list(self.vocabulary))} | |
def getVector(self, words, byCount, normalized): | |
""" | |
get vector | |
""" | |
if self.vecWords is None: | |
self.vecWords = list(self.wordCounter) | |
vec = list(map(lambda vw: self.getVecElem(vw, words, byCount, normalized), self.vecWords)) | |
return vec | |
def getVecElem(self, vw, words, byCount, normalized): | |
""" | |
vector element | |
""" | |
el = 0 | |
if vw in words: | |
if byCount: | |
if normalized: | |
el = self.wordFreq[vw] | |
else: | |
el = self.wordCounter[vw] | |
else: | |
el = 1 | |
return el | |
def save(self, saveFile): | |
""" | |
save | |
""" | |
sf = open(saveFile, "wb") | |
pickle.dump(self, sf) | |
sf.close() | |
# load | |
def load(saveFile): | |
""" | |
load | |
""" | |
sf = open(saveFile, "rb") | |
tfidf = pickle.load(sf) | |
sf.close() | |
return tfidf | |
# bigram | |
class BiGram(NGram): | |
def __init__(self, vocFilt, verbose=False): | |
""" | |
initialize | |
""" | |
super(BiGram, self).__init__(vocFilt, verbose) | |
def toNGram(self, words): | |
""" | |
convert to Ngrams | |
""" | |
return self.toBiGram(words) | |
# trigram | |
class TriGram(NGram): | |
def __init__(self, vocFilt, verbose=False): | |
""" | |
initialize | |
""" | |
super(TriGram, self).__init__(vocFilt, verbose) | |
def toNGram(self, words): | |
""" | |
convert to Ngrams | |
""" | |
return self.toTriGram(words) | |
class DocSentences: | |
""" | |
sentence processor | |
""" | |
def __init__(self, filePath, minLength, verbose, text=None): | |
""" | |
initialize | |
""" | |
if filePath: | |
self.filePath = filePath | |
with open(filePath, 'r') as contentFile: | |
content = contentFile.read() | |
elif text: | |
content = text | |
else: | |
raise valueError("either file path or text must be provided") | |
#self.sentences = content.split('.') | |
self.verbose = verbose | |
tp = TextPreProcessor() | |
content = tp.removeNonAsciiFromText(content) | |
sentences = sent_tokenize(content) | |
self.sentences = list(filter(lambda s: len(nltk.word_tokenize(s)) >= minLength, sentences)) | |
if self.verbose: | |
print ("num of senteces after length filter " + str(len(self.sentences))) | |
self.sentencesAsTokens = [clean(s, tp, verbose) for s in self.sentences] | |
# get sentence tokens | |
def getSentencesAsTokens(self): | |
return self.sentencesAsTokens | |
# get sentences | |
def getSentences(self): | |
return self.sentences | |
# build term freq table | |
def getTermFreqTable(self): | |
# term count table for all words | |
termTable = TfIdf(None, False) | |
sentWords = self.getSentencesAsTokens() | |
for seWords in sentWords: | |
termTable.countDocWords(seWords) | |
return termTable | |
# sentence processor | |
class WordVectorContainer: | |
def __init__(self, dirPath, verbose): | |
""" | |
initialize | |
""" | |
self.docs = list() | |
self.wordVectors = list() | |
self.tp = TextPreProcessor() | |
self.similarityAlgo = "cosine" | |
self.simAlgoNormalizer = None | |
self.termTable = None | |
def addDir(self, dirPath): | |
""" | |
add content of all files ina directory | |
""" | |
docs, filePaths = getFileContent(dirPath, verbose) | |
self.docs.extend(docs) | |
self.wordVectors.extend([clean(doc, self.tp, verbose) for doc in docs]) | |
def addFile(self, filePath): | |
""" | |
add file content | |
""" | |
with open(filePath, 'r') as contentFile: | |
content = contentFile.read() | |
self.wordVectors.append(clean(content, self.tp, verbose)) | |
def addText(self, text): | |
""" | |
add text | |
""" | |
self.wordVectors.append(clean(text, self.tp, verbose)) | |
def addWords(self, words): | |
""" | |
add words | |
""" | |
self.wordVectors.append(words) | |
def withSimilarityAlgo(self, algo, normalizer=None): | |
""" | |
set similarity algo | |
""" | |
self.similarityAlgo = algo | |
self.simAlgoNormalizer = normalizer | |
def getDocsWords(self): | |
""" | |
get word vectors | |
""" | |
return self.wordVectors | |
def getDocs(self): | |
""" | |
get docs | |
""" | |
return self.docs | |
def getTermFreqTable(self): | |
""" | |
term count table for all words | |
""" | |
self.termTable = TfIdf(None, False) | |
for words in self.wordVectors: | |
self.termTable.countDocWords(words) | |
self.termTable.getWordFreq() | |
return self.termTable | |
def getPairWiseSimilarity(self, byCount, normalized): | |
""" | |
pair wise similarity | |
""" | |
self.getNumWordVectors() | |
size = len(self.wordVectors) | |
simArray = np.empty(shape=(size,size)) | |
for i in range(size): | |
simArray[i][i] = 1.0 | |
for i in range(size): | |
for j in range(i+1, size): | |
if self.similarityAlgo == "cosine": | |
sim = cosineSimilarity(self.numWordVectors[i], self.numWordVectors[j]) | |
elif self.similarityAlgo == "jaccard": | |
sim = jaccardSimilarity(self.wordVectors[i], self.wordVectors[j], self.simAlgoNormalizer[0], self.simAlgoNormalizer[1]) | |
else: | |
raise ValueError("invalid similarity algorithms") | |
simArray[i][j] = sim | |
simArray[j][i] = sim | |
return simArray | |
def getInterSetSimilarity(self, byCount, normalized, split): | |
""" | |
inter set pair wise similarity | |
""" | |
self.getNumWordVectors() | |
size = len(self.wordVectors) | |
if not self.similarityAlgo == "jaccard": | |
firstNumVec = self.numWordVectors[:split] | |
secNumVec = self.numWordVectors[split:] | |
fiSize = len(firstNumVec) | |
seSize = len(secNumVec) | |
else: | |
firstVec = self.wordVectors[:split] | |
secVec = self.wordVectors[split:] | |
fiSize = len(firstVec) | |
seSize = len(secVec) | |
simArray = np.empty(shape=(fiSize,seSize)) | |
for i in range(fiSize): | |
for j in range(seSize): | |
if self.similarityAlgo == "cosine": | |
sim = cosineSimilarity(firstNumVec[i], secNumVec[j]) | |
elif self.similarityAlgo == "jaccard": | |
sim = jaccardSimilarity(firstVec[i], secVec[j], self.simAlgoNormalizer[0], self.simAlgoNormalizer[1]) | |
else: | |
raise ValueError("invalid similarity algorithms") | |
simArray[i][j] = sim | |
return simArray | |
def getNumWordVectors(self): | |
""" | |
get vectors | |
""" | |
if not self.similarityAlgo == "jaccard": | |
if self.numWordVectors is None: | |
self.numWordVectors = list(map(lambda wv: self.termTable.getVector(wv, byCount, normalized), self.wordVectors)) | |
# fragments documents into whole doc, paragraph or passages | |
class TextFragmentGenerator: | |
def __init__(self, level, minParNl, passSize, verbose=False): | |
""" | |
initialize | |
""" | |
self.level = level | |
self.minParNl = minParNl | |
self.passSize = passSize | |
self.fragments = None | |
self.verbose = verbose | |
def loadDocs(self, fpaths): | |
""" | |
loads documents from one file, multiple files or all files under directory | |
""" | |
fPaths = fpaths.split(",") | |
if len(fPaths) == 1: | |
if os.path.isfile(fPaths[0]): | |
#one file | |
if self.verbose: | |
print("got one file from path") | |
dnames = fPaths | |
docStr = getOneFileContent(fPaths[0]) | |
dtexts = [docStr] | |
else: | |
#all files under directory | |
if self.verbose: | |
print("got all files under directory from path") | |
dtexts, dnames = getFileContent(fPaths[0]) | |
if self.verbose: | |
print("found {} files".format(len(dtexts))) | |
else: | |
#list of files | |
if self.verbose: | |
print("got list of files from path") | |
dnames = fPaths | |
dtexts = list(map(getOneFileContent, fpaths)) | |
if self.verbose: | |
print("found {} files".format(len(dtexts))) | |
ndocs = (dtexts, dnames) | |
if self.verbose: | |
print("docs") | |
for dn, dt in zip(dnames, dtexts): | |
print(dn + "\t" + dt[:40]) | |
return ndocs | |
def generateFragmentsFromFiles(self, fpaths): | |
""" | |
fragments documents into whole doc, paragraph or passages | |
""" | |
dtexts, dnames = self.loadDocs(fpaths) | |
return self.generateFragments(dtexts, dnames) | |
def generateFragmentsFromNamedDocs(self, ndocs): | |
""" | |
fragments documents into whole doc, paragraph or passages | |
""" | |
dtexts = list(map(lambda nd : nd[1], ndocs)) | |
dnames = list(map(lambda nd : nd[0], ndocs)) | |
#for i in range(len(dtexts)): | |
# print(dnames[i]) | |
# print(dtexts[i][:40]) | |
return self.generateFragments(dtexts, dnames) | |
def generateFragments(self, dtexts, dnames): | |
""" | |
fragments documents into whole doc, paragraph or passages | |
""" | |
if self.level == "para" or self.level == "passage": | |
#split paras | |
dptexts = list() | |
dpnames = list() | |
for dt, dn in zip(dtexts, dnames): | |
paras = getParas(dt, self.minParNl) | |
if self.verbose: | |
print(dn) | |
print("no of paras {}".format(len(paras))) | |
dptexts.extend(paras) | |
pnames = list(map(lambda i : dn + ":" + str(i), range(len(paras)))) | |
dpnames.extend(pnames) | |
dtexts = dptexts | |
dnames = dpnames | |
if self.level == "passage": | |
#split each para into passages | |
dptexts = list() | |
dpnames = list() | |
for dt, dn in zip(dtexts, dnames): | |
sents = sent_tokenize(dt.strip()) | |
if self.verbose: | |
print(dn) | |
print("no of sentences {}".format(len(sents))) | |
span = self.passSize | |
if len(sents) <= span: | |
pass | |
else: | |
for i in range(0, len(sents) - span, 1): | |
dptext = None | |
for j in range(span): | |
if dptext is None: | |
dptext = sents[i + j] + ". " | |
else: | |
dptext = dptext + sents[i + j] + ". " | |
dpname = dn + ":" + str(i) | |
dptexts.append(dptext) | |
dpnames.append(dpname) | |
dtexts = dptexts | |
dnames = dpnames | |
self.fragments = list(zip(dnames, dtexts)) | |
#if self.verbose: | |
# print("num fragments {}".format(len(self.fragments))) | |
return self.fragments | |
def showFragments(self): | |
""" | |
show fragments | |
""" | |
print("showing all " + self.level + " for the first 40 characters") | |
for dn, dt in self.fragments: | |
print(dn + "\t" + dt[:40]) | |
def isDocLevel(self): | |
""" | |
true if fragment is at doc level | |
""" | |
return self.level != "para" and self.level != "passage" | |
# clean doc to create term array | |
def clean(doc, preprocessor, verbose): | |
""" | |
text pre process | |
""" | |
if verbose: | |
print ("--raw doc") | |
print (doc) | |
#print "next clean" | |
doc = preprocessor.removeNonAsciiFromText(doc) | |
words = preprocessor.tokenize(doc) | |
words = preprocessor.allow(words) | |
words = preprocessor.toLowercase(words) | |
words = preprocessor.removeStopwords(words) | |
words = preprocessor.removeShortWords(words, 3) | |
words = preprocessor.removePunctuation(words) | |
words = preprocessor.lemmatizeWords(words) | |
#words = preprocessor.removeNonAscii(words) | |
if verbose: | |
print ("--after pre processing") | |
print (words) | |
return words | |
# get sentences | |
def getSentences(filePath): | |
""" | |
text pre process | |
""" | |
with open(filePath, 'r') as contentFile: | |
content = contentFile.read() | |
sentences = content.split('.') | |
return sentences | |
def getParas(text, minParNl=2): | |
""" | |
split into paras | |
""" | |
regx = "\n+" if minParNl == 1 else "\n{2,}" | |
paras = re.split(regx, text.replace("\r\n", "\n")) | |
return paras | |