File size: 22,994 Bytes
0ea8b9e
 
 
ed5f8c7
0ea8b9e
 
ed5f8c7
 
 
0ea8b9e
ed5f8c7
0ea8b9e
ed5f8c7
f6e6d80
ed5f8c7
 
0ea8b9e
 
 
ed5f8c7
 
 
 
 
 
 
0ea8b9e
 
 
 
 
 
 
 
 
 
ed5f8c7
 
 
 
 
0ea8b9e
 
 
 
 
 
 
 
 
 
 
ed5f8c7
 
 
 
 
 
 
0ea8b9e
ed5f8c7
0ea8b9e
 
 
ed5f8c7
 
 
0ea8b9e
 
 
 
 
 
 
 
 
 
ed5f8c7
 
 
0ea8b9e
 
ed5f8c7
 
 
0ea8b9e
ed5f8c7
 
 
0ea8b9e
 
ed5f8c7
 
 
 
 
 
 
 
0ea8b9e
ed5f8c7
 
 
 
0ea8b9e
ed5f8c7
 
 
 
 
 
 
 
 
 
 
 
0ea8b9e
ed5f8c7
 
 
 
 
 
 
0ea8b9e
ed5f8c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ea8b9e
ed5f8c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ea8b9e
ed5f8c7
 
0ea8b9e
ed5f8c7
 
 
 
0ea8b9e
ed5f8c7
 
 
 
 
0ea8b9e
ed5f8c7
 
 
 
 
 
 
f6e6d80
ed5f8c7
 
f6e6d80
 
 
 
ed5f8c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f6e6d80
 
 
 
 
ed5f8c7
 
 
 
 
 
 
 
 
 
0ea8b9e
 
 
 
 
 
ed5f8c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ea8b9e
 
 
ed5f8c7
 
 
 
0ea8b9e
ed5f8c7
 
 
 
 
 
 
 
 
 
 
f6e6d80
ed5f8c7
 
 
f6e6d80
 
 
0ea8b9e
ed5f8c7
 
 
 
 
0ea8b9e
ed5f8c7
 
 
0ea8b9e
ed5f8c7
 
 
0ea8b9e
ed5f8c7
 
f6e6d80
ed5f8c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f6e6d80
 
 
 
ed5f8c7
0ea8b9e
ed5f8c7
 
 
0ea8b9e
ed5f8c7
0ea8b9e
ed5f8c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ea8b9e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed5f8c7
 
0ea8b9e
 
 
 
 
 
 
 
 
 
f6e6d80
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
import boto3
import time
import os
import pandas as pd
import json
import logging
import datetime
from typing import List
from io import StringIO
from urllib.parse import urlparse
from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError, TokenRetrievalError

from tools.config import TEXTRACT_BULK_ANALYSIS_BUCKET, OUTPUT_FOLDER, AWS_REGION, DOCUMENT_REDACTION_BUCKET, LOAD_PREVIOUS_TEXTRACT_JOBS_S3, TEXTRACT_JOBS_S3_LOC, TEXTRACT_JOBS_LOCAL_LOC
#from tools.aws_textract import json_to_ocrresult

def analyse_document_with_textract_api(
    local_pdf_path: str,
    s3_input_prefix: str,
    s3_output_prefix: str,
    job_df:pd.DataFrame,
    s3_bucket_name: str = TEXTRACT_BULK_ANALYSIS_BUCKET,
    local_output_dir: str = OUTPUT_FOLDER,    
    analyse_signatures:List[str] = [],
    successful_job_number:int=0,
    general_s3_bucket_name: str = DOCUMENT_REDACTION_BUCKET,
    aws_region: str = AWS_REGION # Optional: specify region if not default
    ):
    """
    Uploads a local PDF to S3, starts a Textract analysis job (detecting text & signatures),
    waits for completion, and downloads the output JSON from S3 to a local directory.

    Args:
        local_pdf_path (str): Path to the local PDF file.
        s3_bucket_name (str): Name of the S3 bucket to use.
        s3_input_prefix (str): S3 prefix (folder) to upload the input PDF.
        s3_output_prefix (str): S3 prefix (folder) where Textract should write output.
        job_df (pd.DataFrame): Dataframe containing information from previous Textract API calls.
        s3_bucket_name (str, optional): S3 bucket in which to save API call outputs.
        local_output_dir (str, optional): Local directory to save the downloaded JSON results.        
        analyse_signatures (List[str], optional): Analyse signatures? Default is no.
        successful_job_number (int): The number of successful jobs that have been submitted in this session.
        aws_region (str, optional): AWS region name. Defaults to boto3 default region.

    Returns:
        str: Path to the downloaded local JSON output file, or None if failed.

    Raises:
        FileNotFoundError: If the local_pdf_path does not exist.
        boto3.exceptions.NoCredentialsError: If AWS credentials are not found.
        Exception: For other AWS errors or job failures.
    """

    # This is a variable that is written to logs to indicate that a Textract API call was made
    is_a_textract_api_call = True

    # Keep only latest pdf path if it's a list
    if isinstance(local_pdf_path, list):
        local_pdf_path = local_pdf_path[-1]

    if not os.path.exists(local_pdf_path):
        raise FileNotFoundError(f"Input document not found {local_pdf_path}")

    if not os.path.exists(local_output_dir):
        os.makedirs(local_output_dir)
        log_message = f"Created local output directory: {local_output_dir}"
        print(log_message)
        #logging.info(log_message)

    # Initialize boto3 clients
    session = boto3.Session(region_name=aws_region)
    s3_client = session.client('s3')
    textract_client = session.client('textract')

    # --- 1. Upload PDF to S3 ---
    pdf_filename = os.path.basename(local_pdf_path)
    s3_input_key = os.path.join(s3_input_prefix, pdf_filename).replace("\\", "/") # Ensure forward slashes for S3

    log_message = f"Uploading '{local_pdf_path}' to 's3://{s3_bucket_name}/{s3_input_key}'..."
    print(log_message)
    #logging.info(log_message)
    try:
        s3_client.upload_file(local_pdf_path, s3_bucket_name, s3_input_key)
        log_message = "Upload successful."
        print(log_message)
        #logging.info(log_message)
    except Exception as e:
        log_message = f"Failed to upload PDF to S3: {e}"
        print(log_message)
        #logging.error(log_message)
        raise

    # If job_df is not empty
    if not job_df.empty:
        if "file_name" in job_df.columns:
            matching_job_id_file_names = job_df.loc[(job_df["file_name"] == pdf_filename) & (job_df["signature_extraction"].astype(str) == str(analyse_signatures)), "file_name"]

            if len(matching_job_id_file_names) > 0:
                    raise Exception("Existing Textract outputs found. No need to re-analyse. Please download existing results from the list")

    # --- 2. Start Textract Document Analysis ---
    message = "Starting Textract document analysis job..."
    print(message)
    #logging.info("Starting Textract document analysis job...")

    try:
        if "Extract signatures" in analyse_signatures:
            response = textract_client.start_document_analysis(
                DocumentLocation={
                    'S3Object': {
                        'Bucket': s3_bucket_name,
                        'Name': s3_input_key
                    }
                },
                FeatureTypes=['SIGNATURES'], # Analyze for signatures, forms, and tables
                OutputConfig={
                    'S3Bucket': s3_bucket_name,
                    'S3Prefix': s3_output_prefix
                }
                # Optional: Add NotificationChannel for SNS topic notifications
                # NotificationChannel={
                #     'SNSTopicArn': 'YOUR_SNS_TOPIC_ARN',
                #     'RoleArn': 'YOUR_IAM_ROLE_ARN_FOR_TEXTRACT_TO_ACCESS_SNS'
                # }
            )
            job_type="document_analysis"

        else:
            response = textract_client.start_document_text_detection(
                DocumentLocation={
                    'S3Object': {
                        'Bucket': s3_bucket_name,
                        'Name': s3_input_key
                    }
                },
                OutputConfig={
                    'S3Bucket': s3_bucket_name,
                    'S3Prefix': s3_output_prefix
                }
                # Optional: Add NotificationChannel for SNS topic notifications
                # NotificationChannel={
                #     'SNSTopicArn': 'YOUR_SNS_TOPIC_ARN',
                #     'RoleArn': 'YOUR_IAM_ROLE_ARN_FOR_TEXTRACT_TO_ACCESS_SNS'
                # }
            )
            job_type="document_text_detection"

        job_id = response['JobId']
        print(f"Textract job started with JobId: {job_id}")
        #logging.info(f"Textract job started with JobId: {job_id}")

        # Write job_id to memory
        # Prepare CSV in memory
        log_csv_key_location = f"{s3_output_prefix}/textract_document_jobs.csv"
        job_location_full = f"s3://{s3_bucket_name}/{s3_output_prefix}/{job_id}/"

        csv_buffer = StringIO()
        log_df = pd.DataFrame([{
            'job_id': job_id,
            'file_name': pdf_filename,
            'job_type': job_type,
            'signature_extraction':analyse_signatures,
            's3_location': job_location_full,
            'job_date_time': datetime.datetime.now()
        }])

        # File path
        log_file_path = os.path.join(local_output_dir, "textract_job_log_files.csv")

        # Check if file exists
        file_exists = os.path.exists(log_file_path)

        # Append to CSV if it exists, otherwise write with header
        log_df.to_csv(log_file_path, mode='a', index=False, header=not file_exists)
        
        #log_df.to_csv(csv_buffer)

        # Upload the file
        s3_client.upload_file(log_file_path, general_s3_bucket_name, log_csv_key_location)

        # Upload to S3 (overwrite existing file)
        #s3_client.put_object(Bucket=general_s3_bucket_name, Key=log_csv_key_location, Body=csv_buffer.getvalue())
        print(f"Job ID written to {log_csv_key_location}")
        #logging.info(f"Job ID written to s3://{s3_bucket_name}/{s3_output_prefix}/textract_document_jobs.csv")

    except Exception as e:
        error = f"Failed to start Textract job: {e}"
        print(error)
        #logging.error(error)
        raise

    successful_job_number += 1

    return f"Textract analysis job submitted, job ID:{job_id}", job_id, job_type, successful_job_number, is_a_textract_api_call

def return_job_status(job_id:str,
                     response:dict,
                     attempts:int,
                     poll_interval_seconds: int = 0,
                     max_polling_attempts: int = 1 # ~10 minutes total wait time
                     ):
    '''
    Poll Textract for the current status of a previously-submitted job.
    '''

    job_status = response['JobStatus']
    logging.info(f"Polling attempt {attempts}/{max_polling_attempts}. Job status: {job_status}")

    if job_status == 'IN_PROGRESS':
        time.sleep(poll_interval_seconds)
    elif job_status == 'SUCCEEDED':
        logging.info("Textract job succeeded.")
    elif job_status in ['FAILED', 'PARTIAL_SUCCESS']:
        status_message = response.get('StatusMessage', 'No status message provided.')
        warnings = response.get('Warnings', [])
        logging.error(f"Textract job ended with status: {job_status}. Message: {status_message}")
        if warnings:
            logging.warning(f"Warnings: {warnings}")
        # Decide if PARTIAL_SUCCESS should proceed or raise error
        # For simplicity here, we raise for both FAILED and PARTIAL_SUCCESS
        raise Exception(f"Textract job {job_id} failed or partially failed. Status: {job_status}. Message: {status_message}")
    else:
        # Should not happen based on documentation, but handle defensively
        raise Exception(f"Unexpected Textract job status: {job_status}")
    
    return job_status

def download_textract_job_files(s3_client:str,
                                s3_bucket_name:str,
                                s3_output_key_prefix:str,
                                pdf_filename:str,
                                job_id:str,
                                local_output_dir:str):    
    '''
    Download and combine selected job files from the AWS Textract service.
    '''

    list_response = s3_client.list_objects_v2(
        Bucket=s3_bucket_name,
        Prefix=s3_output_key_prefix
    )

    output_files = list_response.get('Contents', [])
    if not output_files:
        # Sometimes Textract might take a moment longer to write the output after SUCCEEDED status
        #logging.warning("No output files found immediately after job success. Waiting briefly and retrying list...")
        #time.sleep(5)
        list_response = s3_client.list_objects_v2(
            Bucket=s3_bucket_name,
            Prefix=s3_output_key_prefix
        )
        output_files = list_response.get('Contents', [])

    if not output_files:
        logging.error(f"No output files found in s3://{s3_bucket_name}/{s3_output_key_prefix}")
        # You could alternatively try getting results via get_document_analysis pagination here
        # but sticking to the request to download from S3 output path.
        raise FileNotFoundError(f"Textract output files not found in S3 path: s3://{s3_bucket_name}/{s3_output_key_prefix}")

    # Usually, we only need the first/main JSON output file(s)
    # For simplicity, download the first one found. A more complex scenario might merge multiple files.
    # Filter out potential directory markers if any key ends with '/'
    json_files_to_download = [
    f for f in output_files 
    if f['Key'] != s3_output_key_prefix and not f['Key'].endswith('/') and 'access_check' not in f['Key']
]

    #print("json_files_to_download:", json_files_to_download)

    if not json_files_to_download:
        error = f"No JSON files found (only prefix marker?) in s3://{s3_bucket_name}/{s3_output_key_prefix}"
        print(error)
        #logging.error(error)
        raise FileNotFoundError(error)

    combined_blocks = []

    for f in sorted(json_files_to_download, key=lambda x: x['Key']):  # Optional: sort to ensure consistent order
        obj = s3_client.get_object(Bucket=s3_bucket_name, Key=f['Key'])
        data = json.loads(obj['Body'].read())
        
        # Assuming Textract-style output with a "Blocks" key
        if "Blocks" in data:
            combined_blocks.extend(data["Blocks"])
        else:
            logging.warning(f"No 'Blocks' key in file: {f['Key']}")

    # Build final combined JSON structure
    combined_output = {
        "DocumentMetadata": {
            "Pages": len(set(block.get('Page', 1) for block in combined_blocks))
        },
        "Blocks": combined_blocks,
        "JobStatus": "SUCCEEDED"
    }

    output_filename_base = os.path.basename(pdf_filename)
    output_filename_base_no_ext = os.path.splitext(output_filename_base)[0]
    local_output_filename = f"{output_filename_base_no_ext}_textract.json"
    local_output_path = os.path.join(local_output_dir, local_output_filename)

    with open(local_output_path, 'w') as f:
        json.dump(combined_output, f)

    print(f"Combined Textract output written to {local_output_path}")

    # logging.info(f"Downloading Textract output from 's3://{s3_bucket_name}/{s3_output_key}' to '{local_output_path}'...")
    # s3_client.download_file(s3_bucket_name, s3_output_key, local_output_path)
    # logging.info("Download successful.")
    downloaded_file_path = local_output_path

    # Log if multiple files were found, as user might need to handle them
    #if len(json_files_to_download) > 1:
    #    logging.warning(f"Multiple output files found in S3 output location. Downloaded the first: '{s3_output_key}'. Other files exist.")

    return downloaded_file_path

def check_for_provided_job_id(job_id:str):
    if not job_id:
        raise Exception("Please provide a job ID.")    
    return

def poll_bulk_textract_analysis_progress_and_download(
    job_id:str,
    job_type_dropdown:str,
    s3_output_prefix: str,
    pdf_filename:str,
    job_df:pd.DataFrame,
    s3_bucket_name: str = TEXTRACT_BULK_ANALYSIS_BUCKET,
    local_output_dir: str = OUTPUT_FOLDER,
    load_s3_jobs_loc:str=TEXTRACT_JOBS_S3_LOC,
    load_local_jobs_loc:str=TEXTRACT_JOBS_LOCAL_LOC,    
    aws_region: str = AWS_REGION, # Optional: specify region if not default
    load_jobs_from_s3:str = LOAD_PREVIOUS_TEXTRACT_JOBS_S3,
    poll_interval_seconds: int = 1,
    max_polling_attempts: int = 1 # ~10 minutes total wait time):
    ):
    '''
    Poll AWS for the status of a Textract API job. Return status, and if finished, combine and download results into a locally-stored json file for further processing by the app.
    '''

    if job_id:
        # Initialize boto3 clients
        session = boto3.Session(region_name=aws_region)
        s3_client = session.client('s3')
        textract_client = session.client('textract')

        # --- 3. Poll for Job Completion ---
        job_status = 'IN_PROGRESS'
        attempts = 0

        message = "Polling Textract for job completion status..."
        print(message)
        #logging.info("Polling Textract for job completion status...")

        # Update Textract document history df
        try:
            job_df = load_in_textract_job_details(load_s3_jobs=load_jobs_from_s3,
                                        load_s3_jobs_loc=load_s3_jobs_loc,
                                        load_local_jobs_loc=load_local_jobs_loc)
        except Exception as e:
            #logging.error(f"Failed to update job details dataframe: {e}")
            print(f"Failed to update job details dataframe: {e}")
            #raise

        while job_status == 'IN_PROGRESS' and attempts < max_polling_attempts:
            attempts += 1
            try:
                if job_type_dropdown=="document_analysis":
                    response = textract_client.get_document_analysis(JobId=job_id)
                    job_status = return_job_status(job_id, response, attempts, poll_interval_seconds, max_polling_attempts)
                elif job_type_dropdown=="document_text_detection":
                    response = textract_client.get_document_text_detection(JobId=job_id)
                    job_status = return_job_status(job_id, response, attempts, poll_interval_seconds, max_polling_attempts)
                else:
                    error = f"Unknown job type, cannot poll job"
                    print(error)
                    #logging.error(f"Invalid JobId: {job_id}. This might happen if the job expired (older than 7 days) or never existed.")
                    raise

            except textract_client.exceptions.InvalidJobIdException:
                error_message = f"Invalid JobId: {job_id}. This might happen if the job expired (older than 7 days) or never existed."
                print(error_message)
                logging.error(error_message)
                raise
            except Exception as e:
                error_message = f"Error while polling Textract status for job {job_id}: {e}"
                print(error_message)
                logging.error(error_message)
                raise

        downloaded_file_path = None
        if job_status == 'SUCCEEDED':
            #raise TimeoutError(f"Textract job {job_id} did not complete successfully within the polling limit.")
            # 3b - Replace PDF file name if it exists in the job dataframe        

            # If job_df is not empty
            if not job_df.empty:
                if "file_name" in job_df.columns:
                    matching_job_id_file_names = job_df.loc[job_df["job_id"] == job_id, "file_name"]

                    if pdf_filename and not matching_job_id_file_names.empty:
                        if pdf_filename == matching_job_id_file_names.iloc[0]:
                            raise Exception("Existing Textract outputs found. No need to re-download.")

                    if not matching_job_id_file_names.empty:
                        pdf_filename = matching_job_id_file_names.iloc[0]
                    else:
                        pdf_filename = "unknown_file"


            # --- 4. Download Output JSON from S3 ---
            # Textract typically creates output under s3_output_prefix/job_id/
            # There might be multiple JSON files if pagination occurred during writing.
            # Usually, for smaller docs, there's one file, often named '1'.
            # For robust handling, list objects and find the JSON(s).


            s3_output_key_prefix = os.path.join(s3_output_prefix, job_id).replace("\\", "/") + "/"
            logging.info(f"Searching for output files in s3://{s3_bucket_name}/{s3_output_key_prefix}")

            try:
                downloaded_file_path = download_textract_job_files(s3_client,
                                                s3_bucket_name,
                                                s3_output_key_prefix,
                                                pdf_filename,
                                                job_id,
                                                local_output_dir)

            except Exception as e:
                #logging.error(f"Failed to download or process Textract output from S3: {e}")
                print(f"Failed to download or process Textract output from S3: {e}")
                raise

    else:
        raise Exception("No Job ID provided.")        

    return downloaded_file_path, job_status, job_df

def load_in_textract_job_details(load_s3_jobs:str=LOAD_PREVIOUS_TEXTRACT_JOBS_S3,
                                     load_s3_jobs_loc:str=TEXTRACT_JOBS_S3_LOC,
                                     load_local_jobs_loc:str=TEXTRACT_JOBS_LOCAL_LOC,
                                     document_redaction_bucket:str=DOCUMENT_REDACTION_BUCKET,
                                     aws_region:str=AWS_REGION):
    '''
    Load in a dataframe of jobs previous submitted to the Textract API service.
    '''

    job_df = pd.DataFrame(columns=['job_id','file_name','job_type','signature_extraction','s3_location','job_date_time'])

    # Initialize boto3 clients
    session = boto3.Session(region_name=aws_region)
    s3_client = session.client('s3')

    local_output_path = f'{load_local_jobs_loc}/textract_job_log_files.csv'

    if load_s3_jobs == 'True':

        s3_output_key = f'{load_s3_jobs_loc}/textract_job_log_files.csv'
                
        try:
            s3_client.head_object(Bucket=document_redaction_bucket, Key=s3_output_key)
            print(f"File exists. Downloading from '{s3_output_key}' to '{local_output_path}'...")
            s3_client.download_file(document_redaction_bucket, s3_output_key, local_output_path)
            print("Download successful.")
        except ClientError as e:
            if e.response['Error']['Code'] == '404':
                print("Log file does not exist in S3.")
            else:
                print(f"Unexpected error occurred: {e}")
        except (NoCredentialsError, PartialCredentialsError, TokenRetrievalError) as e:
            print(f"AWS credential issue encountered: {e}")
            print("Skipping S3 log file download.")

    # If the log path exists, load it in
    if os.path.exists(local_output_path):
        print("Found log file in local path")
        job_df = pd.read_csv(local_output_path)

        if "job_date_time" in job_df.columns:
            job_df["job_date_time"] = pd.to_datetime(job_df["job_date_time"], errors='coerce')
            # Keep only jobs that have been completed in the last 7 days
            cutoff_time = pd.Timestamp.now() - pd.Timedelta(days=7)
            job_df = job_df.loc[job_df["job_date_time"] >= cutoff_time,:]

    return job_df

def download_textract_output(job_id:str,
                             output_bucket:str,
                             output_prefix:str,
                             local_folder:str):
    """
    Checks the status of a Textract job and downloads the output ZIP file if the job is complete.

    :param job_id: The Textract job ID.
    :param output_bucket: The S3 bucket where the output is stored.
    :param output_prefix: The prefix (folder path) in S3 where the output file is stored.
    :param local_folder: The local directory where the ZIP file should be saved.
    """
    textract_client = boto3.client('textract')
    s3_client = boto3.client('s3')

    # Check job status
    while True:
        response = textract_client.get_document_analysis(JobId=job_id)
        status = response['JobStatus']
        
        if status == 'SUCCEEDED':
            print("Job completed successfully.")
            break
        elif status == 'FAILED':
            print("Job failed:", response.get("StatusMessage", "No error message provided."))
            return
        else:
            print(f"Job is still {status}.")
            #time.sleep(10)  # Wait before checking again

    # Find output ZIP file in S3
    output_file_key = f"{output_prefix}/{job_id}.zip"
    local_file_path = os.path.join(local_folder, f"{job_id}.zip")

    # Download file
    try:
        s3_client.download_file(output_bucket, output_file_key, local_file_path)
        print(f"Output file downloaded to: {local_file_path}")
    except Exception as e:
        print(f"Error downloading file: {e}")