File size: 4,545 Bytes
8a41f4d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import os
import shutil
import string
import zipfile
from urllib.parse import urljoin

import nltk
import requests

from application.core.settings import settings
from application.parser.file.bulk import SimpleDirectoryReader
from application.parser.open_ai_func import call_openai_api
from application.parser.schema.base import Document
from application.parser.token_func import group_split

try:
    nltk.download('punkt', quiet=True)
    nltk.download('averaged_perceptron_tagger', quiet=True)
except FileExistsError:
    pass


# Define a function to extract metadata from a given filename.
def metadata_from_filename(title):
    store = '/'.join(title.split('/')[1:3])
    return {'title': title, 'store': store}


# Define a function to generate a random string of a given length.
def generate_random_string(length):
    return ''.join([string.ascii_letters[i % 52] for i in range(length)])

current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

# Define the main function for ingesting and processing documents.
def ingest_worker(self, directory, formats, name_job, filename, user):
    """
    Ingest and process documents.

    Args:
        self: Reference to the instance of the task.
        directory (str): Specifies the directory for ingesting ('inputs' or 'temp').
        formats (list of str): List of file extensions to consider for ingestion (e.g., [".rst", ".md"]).
        name_job (str): Name of the job for this ingestion task.
        filename (str): Name of the file to be ingested.
        user (str): Identifier for the user initiating the ingestion.

    Returns:
        dict: Information about the completed ingestion task, including input parameters and a "limited" flag.
    """
    # directory = 'inputs' or 'temp'
    # formats = [".rst", ".md"]
    input_files = None
    recursive = True
    limit = None
    exclude = True
    # name_job = 'job1'
    # filename = 'install.rst'
    # user = 'local'
    sample = False
    token_check = True
    min_tokens = 150
    max_tokens = 1250
    full_path = directory + '/' + user + '/' + name_job
    import sys
    print(full_path, file=sys.stderr)
    # check if API_URL env variable is set
    file_data = {'name': name_job, 'file': filename, 'user': user}
    response = requests.get(urljoin(settings.API_URL, "/api/download"), params=file_data)
    # check if file is in the response
    print(response, file=sys.stderr)
    file = response.content

    if not os.path.exists(full_path):
        os.makedirs(full_path)
    with open(full_path + '/' + filename, 'wb') as f:
        f.write(file)

    # check if file is .zip and extract it
    if filename.endswith('.zip'):
        with zipfile.ZipFile(full_path + '/' + filename, 'r') as zip_ref:
            zip_ref.extractall(full_path)
        os.remove(full_path + '/' + filename)

    self.update_state(state='PROGRESS', meta={'current': 1})

    raw_docs = SimpleDirectoryReader(input_dir=full_path, input_files=input_files, recursive=recursive,
                                     required_exts=formats, num_files_limit=limit,
                                     exclude_hidden=exclude, file_metadata=metadata_from_filename).load_data()
    raw_docs = group_split(documents=raw_docs, min_tokens=min_tokens, max_tokens=max_tokens, token_check=token_check)

    docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]

    call_openai_api(docs, full_path, self)
    self.update_state(state='PROGRESS', meta={'current': 100})

    if sample:
        for i in range(min(5, len(raw_docs))):
            print(raw_docs[i].text)

    # get files from outputs/inputs/index.faiss and outputs/inputs/index.pkl
    # and send them to the server (provide user and name in form)
    file_data = {'name': name_job, 'user': user}
    if settings.VECTOR_STORE == "faiss":
        files = {'file_faiss': open(full_path + '/index.faiss', 'rb'),
                'file_pkl': open(full_path + '/index.pkl', 'rb')}
        response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data)
        response = requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path))
    else:
        response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data)

    
    # delete local
    shutil.rmtree(full_path)

    return {
        'directory': directory,
        'formats': formats,
        'name_job': name_job,
        'filename': filename,
        'user': user,
        'limited': False
    }