flaskapp / main.py
5m4ck3r's picture
Update main.py
c38cffb verified
from flask import Flask, request, jsonify
import os
from hiou import hugging
import json
import io
import random
import string
from mimetypes import guess_type
import requests
import time
from PIL import Image
def compress_image(input_stream: io.BytesIO, max_size_mb=4) -> io.BytesIO:
with Image.open(input_stream) as img:
img = Image.new(img.mode, img.size)
img.paste(img)
quality = 95
step = 5
output_stream = io.BytesIO()
while True:
output_stream.seek(0)
img.save(output_stream, format="JPEG", quality=quality)
output_stream_size_mb = len(output_stream.getvalue()) / (1024 * 1024)
if output_stream_size_mb <= max_size_mb:
output_stream.seek(0)
print(f"Image compressed to {output_stream_size_mb:.2f} MB")
return output_stream
quality -= step
if quality < step:
print("Cannot compress the image to the desired size.")
output_stream.seek(0)
return output_stream
def read_image_from_stream(image_data, subscription_key, endpoint):
headers = {
'Ocp-Apim-Subscription-Key': subscription_key,
'Content-Type': 'application/octet-stream'
}
img = image_data.read()
response = requests.post(f"{endpoint}/vision/v3.2/read/analyze", headers=headers, data=img)
if response.status_code == 202:
read_response_headers = response.headers
operation_location = read_response_headers["Operation-Location"]
else:
raise Exception(f"Unexpected response status: {response.status_code}, {response.text}")
return operation_location
def get_read_result(operation_location, subscription_key):
headers = {
'Ocp-Apim-Subscription-Key': subscription_key
}
while True:
response = requests.get(operation_location, headers=headers)
if response.status_code == 200: # HTTP 200 indicates success
read_result = response.json()
status = read_result['status']
if status == 'succeeded':
break
elif status in ['failed', 'notStarted', 'running']:
time.sleep(1) # Wait before polling again
else:
raise Exception(f"Unexpected status: {status}")
else:
raise Exception(f"Unexpected response status: {response.status_code}")
return read_result
def process_image(image_path, subscription_key, endpoint):
operation_location = read_image_from_stream(image_path, subscription_key, endpoint)
operation_id = operation_location.split("/")[-1]
operation_location = f"{endpoint}/vision/v3.2/read/analyzeResults/{operation_id}"
read_result = get_read_result(operation_location, subscription_key)
if read_result['status'] == 'succeeded':
output = []
for text_result in read_result['analyzeResult']['readResults']:
for line in text_result['lines']:
output.append(line['text'])
return " ".join(output).replace("\n", " ") # Join lines and replace newlines with spaces
else:
return "Processing failed or did not succeed."
def get_answer(image: io.BytesIO, question: str):
filename = image.name
spid = os.getenv('SPID')
hug = hugging(spid)
filelink = hug.upload(image)
image.seek(0, 2)
file_size = image.tell()
image.seek(0)
image_mime_type, _ = guess_type(image.name)
data = [
{
"meta": {
"_type": "gradio.FileData"
},
"mime_type": image_mime_type,
"orig_name": filename,
"path": filelink.split("=", 1)[1],
"size": file_size,
"url": filelink
}
]
datas = os.getenv('DATA')
jsnd = json.loads(datas)
data.append(jsnd)
data.append(
question
)
data.append(
{
"tab_index": 0
}
)
hug.filnal_setup(data, 2, 15)
hug.start()
return hug.output.get("data")[0]
def valuate_qna(p: str, r: str) -> str:
spid2 = os.getenv('SPID2')
hug = hugging(spid2)
data = [
p,
r
]
hug.filnal_setup(data, 0, 12)
hug.start()
return hug.output.get("data", [None])[0]
def ocr(image: io.BytesIO) -> str:
return process_image(image, os.getenv('AZKEY'), os.getenv('AZURL'))
app = Flask(__name__)
@app.route('/getans', methods=['POST'])
def gettans_task():
if 'file' not in request.files:
if len(request.data) > 0:
pass
else:
return jsonify({"status" : False, "msg" : "No file found"}), 400
if 'file' in request.files:
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
file_content = io.BytesIO(file.read())
file_content.name = file.filename
else:
rawFile = request.data
file_content = io.BytesIO(rawFile)
filename = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)) + ".png"
file_content.name = request.args.get("name", filename)
headers = dict(request.headers)
if not headers.get("KEY") != os.getenv("KEY"):
return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404
print(f"QUESTION ASKED : {headers.get('QU', '')}")
answer = get_answer(file_content, headers.get('QU', ''))
return jsonify({"status" : True, "ANS" : answer})
@app.route('/ocr', methods=['POST'])
def task_ocr():
if 'file' not in request.files:
if len(request.data) > 0:
pass
else:
return jsonify({"status" : False, "msg" : "No file found"}), 400
if 'file' in request.files:
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
file_content = io.BytesIO(file.read())
file_content.name = file.filename
else:
rawFile = request.data
file_content = io.BytesIO(rawFile)
filename = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)) + ".png"
file_content.name = request.args.get("name", filename)
headers = dict(request.headers)
if not headers.get("KEY") != os.getenv("KEY"):
return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404
size = len(file_content.getvalue())
if size > 4194304:
return jsonify({"status" : False, "msg" : "File size is greater then 4MB"}), 404
answer = ocr(file_content)
return jsonify({"status" : True, "data" : answer})
@app.route('/check', methods=['POST', 'GET'])
def check_f():
jsndata = request.json
p = jsndata.get("p")
r = jsndata.get("r")
headers = dict(request.headers)
if not headers.get("KEY") != os.getenv("KEY"):
return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404
data = valuate_qna(p, r)
return jsonify(
{
"status" : True,
"data" : data
}
)
@app.route('/')
def home():
return jsonify({"message": "Welcome to my Flask API!"})
@app.route('/confirm')
def confirm_product():
return jsonify({'status' : True})