ishworrsubedii commited on
Commit
a2c5029
·
1 Parent(s): 869e26b

add: new endpoint ntoctoconbined

Browse files
src/api/nto_api.py CHANGED
@@ -76,7 +76,7 @@ async def clothing_try_on_v2(image: UploadFile = File(...), clothing_type: str =
76
  return JSONResponse(status_code=500, content={"error": f"Error reading image", "code": 500})
77
 
78
  try:
79
- mask = await pipeline.shoulderPointMaskGeneration_(image=image)
80
  logger.info(">>> MASK GENERATION COMPLETED <<<")
81
  except Exception as e:
82
  logger.error(f">>> MASK GENERATION ERROR: {str(e)} <<<")
@@ -129,60 +129,55 @@ async def clothing_try_on_v2(image: UploadFile = File(...), clothing_type: str =
129
 
130
  @nto_cto_router.post("/clothingTryOn")
131
  async def clothing_try_on(image: UploadFile = File(...),
132
- mask: UploadFile = File(...),
133
- clothing_type: str = Form(...)):
134
  logger.info("-" * 50)
135
  logger.info(">>> CLOTHING TRY ON STARTED <<<")
136
  start_time = time.time()
137
 
138
- # Helper function to convert image to base64
139
- def image_to_base64(img: Image.Image, format="WEBP", quality=85) -> str:
140
- with BytesIO() as buffer:
141
- img.save(buffer, format=format, quality=quality)
142
- return f"data:image/{format.lower()};base64,{base64.b64encode(buffer.getvalue()).decode('utf-8')}"
143
-
144
  try:
145
- # Load images concurrently using asyncio
146
- image_data, mask_data = await asyncio.gather(
147
- image.read(),
148
- mask.read()
149
- )
150
-
151
- image = Image.open(BytesIO(image_data)).convert("RGB")
152
- mask = Image.open(BytesIO(mask_data)).convert("RGB")
153
  logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<")
154
  except Exception as e:
155
  logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
156
- return JSONResponse(status_code=500, content={"error": "Error reading image or mask", "code": 500})
157
 
158
  try:
159
  actual_image = image.copy()
160
  jewellery_mask = Image.fromarray(np.bitwise_and(np.array(mask), np.array(image)))
161
  arr_orig = np.array(grayscale(mask))
162
 
163
- image = Image.fromarray(
164
- cv2.inpaint(np.array(image), arr_orig, 15, cv2.INPAINT_TELEA)
165
- ).resize((512, 512))
166
 
167
  arr = arr_orig.copy()
168
  mask_y = np.where(arr == arr[arr != 0][0])[0][0]
169
  arr[mask_y:, :] = 255
 
170
  mask = Image.fromarray(arr).resize((512, 512))
171
-
172
  logger.info(">>> IMAGE PROCESSING COMPLETED <<<")
173
  except Exception as e:
174
  logger.error(f">>> IMAGE PROCESSING ERROR: {str(e)} <<<")
175
- return JSONResponse(status_code=500, content={"error": "Error processing image or mask", "code": 500})
 
176
 
177
  try:
178
- mask_data_uri = image_to_base64(mask)
179
- image_data_uri = image_to_base64(image)
 
 
 
 
 
 
180
  logger.info(">>> IMAGE ENCODING COMPLETED <<<")
181
  except Exception as e:
182
  logger.error(f">>> IMAGE ENCODING ERROR: {str(e)} <<<")
183
- return JSONResponse(status_code=500, content={"error": "Error encoding images", "code": 500})
 
184
 
185
- input_data = {
186
  "mask": mask_data_uri,
187
  "image": image_data_uri,
188
  "prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse",
@@ -191,65 +186,33 @@ async def clothing_try_on(image: UploadFile = File(...),
191
  }
192
 
193
  try:
194
- output = replicate_run_cto(input_data)
195
  logger.info(">>> REPLICATE PROCESSING COMPLETED <<<")
196
  except Exception as e:
197
  logger.error(f">>> REPLICATE PROCESSING ERROR: {str(e)} <<<")
198
- return JSONResponse(content={"error": "Error running clothing try on", "code": 500}, status_code=500)
199
 
200
  try:
201
- # Fetch the output image URL
202
- output_url = output[0]
203
- if not isinstance(output_url, str):
204
- raise ValueError("Invalid output URL format from replicate")
205
-
206
- # Download the output image
207
- async with aiohttp.ClientSession() as session:
208
- async with session.get(output_url) as response:
209
- if response.status != 200:
210
- raise HTTPException(status_code=response.status,
211
- detail="Failed to fetch output image")
212
- output_bytes = await response.read()
213
-
214
- # Process the output image
215
- output_image = Image.open(BytesIO(output_bytes)).resize(actual_image.size)
216
-
217
- # Convert arrays and process final image
218
- mask_array = np.array(Image.fromarray(arr_orig).convert("RGB"))
219
- output_array = np.array(output_image)
220
-
221
- # Perform bitwise operations
222
- mask_inverse = np.bitwise_not(mask_array)
223
- intermediate = np.bitwise_and(output_array, mask_inverse)
224
- result = Image.fromarray(np.bitwise_or(intermediate, np.array(jewellery_mask)))
225
-
226
- # Convert result to base64
227
- result_base64 = image_to_base64(result)
228
-
229
  total_inference_time = round((time.time() - start_time), 2)
230
  logger.info(">>> OUTPUT IMAGE PROCESSING COMPLETED <<<")
231
 
232
  response = {
233
- "output": result_base64,
234
  "code": 200,
235
  "inference_time": total_inference_time
236
  }
237
- except ValueError as ve:
238
- logger.error(f">>> OUTPUT IMAGE PROCESSING ERROR: {str(ve)} <<<")
239
- return JSONResponse(status_code=500,
240
- content={"error": "Invalid response from image generation service",
241
- "code": 500})
242
  except Exception as e:
243
  logger.error(f">>> OUTPUT IMAGE PROCESSING ERROR: {str(e)} <<<")
244
- return JSONResponse(status_code=500,
245
- content={"error": "Error processing output image",
246
- "code": 500})
247
- finally:
248
- # Clean up resources
249
- for var in ['output_image', 'output_array', 'intermediate', 'mask_array', 'mask_inverse']:
250
- if var in locals():
251
- del locals()[var]
252
- gc.collect()
253
 
254
  logger.info(f">>> TOTAL INFERENCE TIME: {total_inference_time}s <<<")
255
  logger.info(">>> REQUEST COMPLETED SUCCESSFULLY <<<")
@@ -257,7 +220,6 @@ async def clothing_try_on(image: UploadFile = File(...),
257
 
258
  return JSONResponse(content=response, status_code=200)
259
 
260
-
261
  @nto_cto_router.post("/productData/{storeId}")
262
  async def product_data(
263
  storeId: str,
@@ -501,11 +463,10 @@ async def necklace_try_on_id(necklace_try_on_id: NecklaceTryOnIDEntity = Depends
501
  # Upload both images concurrently
502
  upload_tasks = [
503
  supabase_upload_and_return_url(prefix="necklace_try_on", image=result),
504
- supabase_upload_and_return_url(prefix="necklace_try_on_mask", image=mask)
505
  ]
506
- result_url, mask_url = await asyncio.gather(*upload_tasks)
507
 
508
- if not result_url or not mask_url:
509
  raise Exception("Failed to upload one or both images")
510
 
511
  logger.info(f">>> RESULT IMAGES SAVED IN {round((time.time() - start_time_saving), 2)}s <<<")
@@ -519,11 +480,10 @@ async def necklace_try_on_id(necklace_try_on_id: NecklaceTryOnIDEntity = Depends
519
  response = {
520
  "code": 200,
521
  "output": f"{result_url}",
522
- "mask": f"{mask_url}",
523
  "inference_time": total_backend_time
524
  }
525
 
526
-
527
 
528
  except Exception as e:
529
  logger.error(f">>> RESPONSE GENERATION ERROR: {str(e)} <<<")
@@ -534,7 +494,7 @@ async def necklace_try_on_id(necklace_try_on_id: NecklaceTryOnIDEntity = Depends
534
  logger.info("-" * 50)
535
 
536
  return JSONResponse(content=response, status_code=200)
537
-
538
  finally:
539
  if 'result' in locals(): del result
540
  if 'mask' in locals(): del mask
@@ -654,3 +614,102 @@ async def necklace_try_on_with_points(necklace_try_on_id: NecklaceTryOnIDEntity
654
  logger.info("-" * 50)
655
 
656
  return JSONResponse(content=response, status_code=200)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
  return JSONResponse(status_code=500, content={"error": f"Error reading image", "code": 500})
77
 
78
  try:
79
+ mask, _, _ = await pipeline.shoulderPointMaskGeneration_(image=image)
80
  logger.info(">>> MASK GENERATION COMPLETED <<<")
81
  except Exception as e:
82
  logger.error(f">>> MASK GENERATION ERROR: {str(e)} <<<")
 
129
 
130
  @nto_cto_router.post("/clothingTryOn")
131
  async def clothing_try_on(image: UploadFile = File(...),
132
+ mask: UploadFile = File(...), clothing_type: str = Form(...)):
 
133
  logger.info("-" * 50)
134
  logger.info(">>> CLOTHING TRY ON STARTED <<<")
135
  start_time = time.time()
136
 
 
 
 
 
 
 
137
  try:
138
+ image_bytes = await image.read()
139
+ mask_bytes = await mask.read()
140
+ image, mask = Image.open(BytesIO(image_bytes)).convert("RGB"), Image.open(BytesIO(mask_bytes)).convert("RGB")
 
 
 
 
 
141
  logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<")
142
  except Exception as e:
143
  logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
144
+ return JSONResponse(status_code=500, content={"error": f"Error reading image or mask", "code": 500})
145
 
146
  try:
147
  actual_image = image.copy()
148
  jewellery_mask = Image.fromarray(np.bitwise_and(np.array(mask), np.array(image)))
149
  arr_orig = np.array(grayscale(mask))
150
 
151
+ image = cv2.inpaint(np.array(image), arr_orig, 15, cv2.INPAINT_TELEA)
152
+ image = Image.fromarray(image).resize((512, 512))
 
153
 
154
  arr = arr_orig.copy()
155
  mask_y = np.where(arr == arr[arr != 0][0])[0][0]
156
  arr[mask_y:, :] = 255
157
+
158
  mask = Image.fromarray(arr).resize((512, 512))
 
159
  logger.info(">>> IMAGE PROCESSING COMPLETED <<<")
160
  except Exception as e:
161
  logger.error(f">>> IMAGE PROCESSING ERROR: {str(e)} <<<")
162
+ return JSONResponse(status_code=500,
163
+ content={"error": f"Error processing image or mask", "code": 500})
164
 
165
  try:
166
+ mask_img_base_64, act_img_base_64 = BytesIO(), BytesIO()
167
+ mask.save(mask_img_base_64, format="WEBP")
168
+ image.save(act_img_base_64, format="WEBP")
169
+ mask_bytes_ = base64.b64encode(mask_img_base_64.getvalue()).decode("utf-8")
170
+ image_bytes_ = base64.b64encode(act_img_base_64.getvalue()).decode("utf-8")
171
+
172
+ mask_data_uri = f"data:image/webp;base64,{mask_bytes_}"
173
+ image_data_uri = f"data:image/webp;base64,{image_bytes_}"
174
  logger.info(">>> IMAGE ENCODING COMPLETED <<<")
175
  except Exception as e:
176
  logger.error(f">>> IMAGE ENCODING ERROR: {str(e)} <<<")
177
+ return JSONResponse(status_code=500,
178
+ content={"error": f"Error encoding images", "code": 500})
179
 
180
+ input = {
181
  "mask": mask_data_uri,
182
  "image": image_data_uri,
183
  "prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse",
 
186
  }
187
 
188
  try:
189
+ output = replicate_run_cto(input)
190
  logger.info(">>> REPLICATE PROCESSING COMPLETED <<<")
191
  except Exception as e:
192
  logger.error(f">>> REPLICATE PROCESSING ERROR: {str(e)} <<<")
193
+ return JSONResponse(content={"error": f"Error running clothing try on", "code": 500}, status_code=500)
194
 
195
  try:
196
+ response = requests.get(output[0])
197
+ output_image = Image.open(BytesIO(response.content)).resize(actual_image.size)
198
+ output_image = np.bitwise_and(np.array(output_image),
199
+ np.bitwise_not(np.array(Image.fromarray(arr_orig).convert("RGB"))))
200
+ result = Image.fromarray(np.bitwise_or(np.array(output_image), np.array(jewellery_mask)))
201
+
202
+ in_mem_file = BytesIO()
203
+ result.save(in_mem_file, format="WEBP", quality=85)
204
+ base_64_output = base64.b64encode(in_mem_file.getvalue()).decode('utf-8')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
205
  total_inference_time = round((time.time() - start_time), 2)
206
  logger.info(">>> OUTPUT IMAGE PROCESSING COMPLETED <<<")
207
 
208
  response = {
209
+ "output": f"data:image/WEBP;base64,{base_64_output}",
210
  "code": 200,
211
  "inference_time": total_inference_time
212
  }
 
 
 
 
 
213
  except Exception as e:
214
  logger.error(f">>> OUTPUT IMAGE PROCESSING ERROR: {str(e)} <<<")
215
+ return JSONResponse(status_code=500, content={"error": f"Error processing output image", "code": 500})
 
 
 
 
 
 
 
 
216
 
217
  logger.info(f">>> TOTAL INFERENCE TIME: {total_inference_time}s <<<")
218
  logger.info(">>> REQUEST COMPLETED SUCCESSFULLY <<<")
 
220
 
221
  return JSONResponse(content=response, status_code=200)
222
 
 
223
  @nto_cto_router.post("/productData/{storeId}")
224
  async def product_data(
225
  storeId: str,
 
463
  # Upload both images concurrently
464
  upload_tasks = [
465
  supabase_upload_and_return_url(prefix="necklace_try_on", image=result),
 
466
  ]
467
+ result_url = await asyncio.gather(*upload_tasks)
468
 
469
+ if not result_url:
470
  raise Exception("Failed to upload one or both images")
471
 
472
  logger.info(f">>> RESULT IMAGES SAVED IN {round((time.time() - start_time_saving), 2)}s <<<")
 
480
  response = {
481
  "code": 200,
482
  "output": f"{result_url}",
 
483
  "inference_time": total_backend_time
484
  }
485
 
486
+
487
 
488
  except Exception as e:
489
  logger.error(f">>> RESPONSE GENERATION ERROR: {str(e)} <<<")
 
494
  logger.info("-" * 50)
495
 
496
  return JSONResponse(content=response, status_code=200)
497
+
498
  finally:
499
  if 'result' in locals(): del result
500
  if 'mask' in locals(): del mask
 
614
  logger.info("-" * 50)
615
 
616
  return JSONResponse(content=response, status_code=200)
617
+
618
+
619
+ @nto_cto_router.post("/clothingAndNecklaceTryOn")
620
+ async def clothing_and_necklace_try_on(
621
+ image: UploadFile = File(...),
622
+ necklace: UploadFile = File(...),
623
+ clothing_type: str = Form(...)
624
+ ):
625
+ logger.info("-" * 50)
626
+ logger.info(">>> CLOTHING AND NECKLACE TRY ON STARTED <<<")
627
+ start_time = time.time()
628
+
629
+ def image_to_base64(img: Image.Image) -> str:
630
+ buffer = BytesIO()
631
+ img.save(buffer, format="WEBP", quality=85, optimize=True)
632
+ return f"data:image/webp;base64,{base64.b64encode(buffer.getvalue()).decode('utf-8')}"
633
+
634
+ try:
635
+ # Load both images concurrently
636
+ person_bytes, necklace_bytes = await asyncio.gather(
637
+ image.read(),
638
+ necklace.read()
639
+ )
640
+
641
+ # Convert bytes to PIL Images
642
+ person_image = Image.open(BytesIO(person_bytes)).convert("RGB").resize((512, 512))
643
+ necklace_image = Image.open(BytesIO(necklace_bytes)).convert("RGBA")
644
+ logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<")
645
+
646
+ # Generate mask and get shoulder points in one go
647
+ mask, left_point, right_point = await pipeline.shoulderPointMaskGeneration_(image=person_image)
648
+ logger.info(">>> MASK AND POINTS GENERATION COMPLETED <<<")
649
+
650
+ # Prepare base64 encodings concurrently
651
+ mask_data_uri, image_data_uri = await asyncio.gather(
652
+ asyncio.to_thread(image_to_base64, mask),
653
+ asyncio.to_thread(image_to_base64, person_image)
654
+ )
655
+
656
+ # Run CTO
657
+ cto_output = replicate_run_cto({
658
+ "mask": mask_data_uri,
659
+ "image": image_data_uri,
660
+ "prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse",
661
+ "negative_prompt": "necklaces, jewellery, jewelry, necklace, neckpiece, garland, chain, neck wear, jewelled neck, jeweled neck, necklace on neck, jewellery on neck, accessories, watermark, text, changed background, wider body, narrower body, bad proportions, extra limbs, mutated hands, changed sizes, altered proportions, unnatural body proportions, blury, ugly",
662
+ "num_inference_steps": 20 # reduced from 25 for optimization
663
+ })
664
+
665
+ if not cto_output or not isinstance(cto_output, (list, tuple)) or not cto_output[0]:
666
+ raise ValueError("Invalid output from clothing try-on")
667
+
668
+ # Download CTO result
669
+ async with aiohttp.ClientSession() as session:
670
+ async with session.get(str(cto_output[0])) as response:
671
+ if response.status != 200:
672
+ raise HTTPException(status_code=response.status, detail="Failed to fetch CTO output")
673
+ cto_result_bytes = await response.read()
674
+
675
+ # Process CTO result and perform NTO with points
676
+ with BytesIO(cto_result_bytes) as buf:
677
+ cto_result_image = Image.open(buf).convert("RGB")
678
+
679
+ # Use necklaceTryOnWithPoints directly with the points we already have
680
+ result, headerText, _ = await pipeline.necklaceTryOnWithPoints_(
681
+ image=cto_result_image,
682
+ jewellery=necklace_image,
683
+ left_shoulder=left_point,
684
+ right_shoulder=right_point,
685
+ storename="default"
686
+ )
687
+ result.show()
688
+
689
+ if result is None:
690
+ raise ValueError("Failed to process necklace try-on")
691
+
692
+ final_base64 = await asyncio.to_thread(image_to_base64, result)
693
+
694
+ logger.info(f"Left Shoulder: {left_point}, Right Shoulder: {right_point}")
695
+
696
+ response = {
697
+ "code": 200,
698
+ "output": final_base64,
699
+ "inference_time": round((time.time() - start_time), 2)
700
+ }
701
+
702
+ except ValueError as ve:
703
+ logger.error(f">>> PROCESSING ERROR: {str(ve)} <<<")
704
+ return JSONResponse(status_code=400, content={"error": str(ve), "code": 400})
705
+ except Exception as e:
706
+ logger.error(f">>> PROCESSING ERROR: {str(e)} <<<")
707
+ return JSONResponse(status_code=500, content={"error": "Error during image processing", "code": 500})
708
+ finally:
709
+ gc.collect()
710
+
711
+ logger.info(f">>> TOTAL INFERENCE TIME: {response['inference_time']}s <<<")
712
+ logger.info(">>> REQUEST COMPLETED SUCCESSFULLY <<<")
713
+ logger.info("-" * 50)
714
+
715
+ return JSONResponse(content=response, status_code=200)
src/components/necklaceTryOn.py CHANGED
@@ -96,179 +96,48 @@ class NecklaceTryOn:
96
  logger.error(f"{CustomException(e)}:: {storename}")
97
  return [None, "error", None]
98
 
99
- def necklaceTryOnV3(self, image: Image.Image, jewellery: Image.Image, storename: str) -> list[
100
- Union[Image.Image, str]]:
101
  try:
102
- logger.info(f">>> NECKLACE TRY ON STARTED :: {storename} <<<")
103
-
104
- # reading the images
105
- image, jewellery = image.convert("RGB"), jewellery.convert("RGBA")
106
- image = np.array(image.resize((4000, 4000)))
107
  copy_image = image.copy()
108
- jewellery = np.array(jewellery)
109
-
110
- logger.info(f"NECKLACE TRY ON :: detecting pose and landmarks :: {storename}")
111
 
 
112
  image = self.detector.findPose(image)
113
  lmList, _ = self.detector.findPosition(image, bboxWithHands=False, draw=False)
114
- meshDetector = FaceMeshDetector(staticMode=True, maxFaces=1)
115
- img, faces = meshDetector.findFaceMesh(image, draw=False)
116
- left_lip_point = faces[0][61]
117
- right_lip_point = faces[0][291]
118
- pt12, pt11, pt10, pt9 = (
119
- lmList[12][:2],
120
- lmList[11][:2],
121
- lmList[10][:2],
122
- lmList[9][:2],
123
- )
124
-
125
- mid_lips = (
126
- int((left_lip_point[0] + right_lip_point[0]) / 2), int((left_lip_point[1] + right_lip_point[1]) / 2))
127
-
128
- mid_lips_x1 = int(pt12[0] + (mid_lips[0] - pt12[0]) / 2)
129
- mid_lips_y1 = int(pt12[1] + (mid_lips[1] - pt12[1]) / 2)
130
-
131
- mid_lips_x2 = int(pt11[0] + (mid_lips[0] - pt11[0]) / 2)
132
- mid_lips_y2 = int(pt11[1] + (mid_lips[1] - pt11[1]) / 2)
133
-
134
- # left right lip
135
- left_right_lip_org_x11 = int(pt12[0] + (right_lip_point[0] - pt12[0]) / 2)
136
- left_right_lip_org_y11 = int(pt12[1] + (right_lip_point[1] - pt12[1]) / 2)
137
-
138
- left_right_lip_org_x12 = int(pt11[0] + (left_lip_point[0] - pt11[0]) / 2)
139
- left_right_lip_org_y12 = int(pt11[1] + (left_lip_point[1] - pt11[1]) / 2)
140
-
141
- # left right lip 2
142
- left_right_lip_org_x21 = int(pt12[0] + (left_lip_point[0] - pt12[0]) / 2)
143
- left_right_lip_org_y21 = int(pt12[1] + (left_lip_point[1] - pt12[1]) / 2)
144
-
145
- left_right_lip_org_x22 = int(pt11[0] + (right_lip_point[0] - pt11[0]) / 2)
146
- left_right_lip_org_y22 = int(pt11[1] + (right_lip_point[1] - pt11[1]) / 2)
147
-
148
- logger.info(f"NECKLACE TRY ON :: estimating neck points :: {storename}")
149
-
150
- avg_x1 = int((mid_lips_x1 + left_right_lip_org_x11 + left_right_lip_org_x21) / 3)
151
- avg_y1 = int((mid_lips_y1 + left_right_lip_org_y11 + left_right_lip_org_y21) / 3)
152
-
153
- avg_x2 = int((mid_lips_x2 + left_right_lip_org_x12 + left_right_lip_org_x22) / 3)
154
- avg_y2 = int((mid_lips_y2 + left_right_lip_org_y12 + left_right_lip_org_y22) / 3)
155
-
156
- logger.info(f"NECKLACE TRY ON :: scaling the necklace image :: {storename}")
157
-
158
- if avg_y2 < avg_y1:
159
- angle = math.ceil(
160
- self.detector.findAngle(
161
- p1=(avg_x2, avg_y2), p2=(avg_x1, avg_y1), p3=(avg_x2, avg_y1)
162
- )[0]
163
- )
164
- else:
165
- angle = math.ceil(
166
- self.detector.findAngle(
167
- p1=(avg_x2, avg_y2), p2=(avg_x1, avg_y1), p3=(avg_x2, avg_y1)
168
- )[0]
169
- )
170
- angle = angle * -1
171
-
172
- xdist = avg_x2 - avg_x1
173
- origImgRatio = xdist / jewellery.shape[1]
174
- ydist = jewellery.shape[0] * origImgRatio
175
-
176
- logger.info(f"NECKLACE TRY ON :: adding offset based on the necklace shape :: {storename}")
177
-
178
- image_gray = cv2.cvtColor(jewellery, cv2.COLOR_BGRA2GRAY)
179
- for offset_orig in range(image_gray.shape[1]):
180
- pixel_value = image_gray[0, :][offset_orig]
181
- if (pixel_value != 255) & (pixel_value != 0):
182
- break
183
- else:
184
- continue
185
- offset = int(0.8 * xdist * (offset_orig / jewellery.shape[1]))
186
- jewellery = cv2.resize(
187
- jewellery, (int(xdist), int(ydist)), interpolation=cv2.INTER_AREA
188
- )
189
- jewellery = cvzone.rotateImage(jewellery, angle)
190
- y_coordinate = avg_y1 - offset
191
- available_space = copy_image.shape[0] - y_coordinate
192
- extra = jewellery.shape[0] - available_space
193
 
194
- logger.info(f"NECKLACE TRY ON :: generating necklace placement status :: {storename}")
195
-
196
- if extra > 0:
197
- headerText = "To see more of the necklace, please step back slightly."
198
- else:
199
- headerText = "success"
200
-
201
- logger.info(f"NECKLACE TRY ON :: generating output :: {storename}")
202
 
203
- result = cvzone.overlayPNG(copy_image, jewellery, (avg_x1, y_coordinate))
204
- image = Image.fromarray(result.astype(np.uint8))
205
- logo = Image.open(returnBytesData(url=self.necklaceTryOnConfig.logoURL.format({storename})))
206
- result = addWatermark(background=image, logo=logo)
 
 
207
 
208
- blackedNecklace = np.zeros(shape=copy_image.shape)
209
- cvzone.overlayPNG(blackedNecklace, jewellery, (avg_x1, y_coordinate))
210
- blackedNecklace = cv2.cvtColor(blackedNecklace.astype(np.uint8), cv2.COLOR_BGR2GRAY)
211
- binaryMask = blackedNecklace * ((blackedNecklace > 5) * 255)
212
- binaryMask[binaryMask >= 255] = 255
213
- binaryMask[binaryMask < 255] = 0
214
 
215
- mask = Image.fromarray(binaryMask.astype(np.uint8)).convert("RGB")
 
 
216
 
217
- gc.collect()
 
218
 
219
- return [result, headerText, mask]
220
 
221
  except Exception as e:
222
- logger.error(f"{CustomException(e)}:: {storename}")
223
  raise CustomException(e)
224
 
225
- def shoulderPointMaskGeneration(self, image: Image.Image) -> Image.Image:
226
- image = np.array(image)
227
- copy_image = image.copy()
228
-
229
- logger.info(f"SHOULDER POINT MASK GENERATION :: detecting pose and landmarks")
230
-
231
- image = self.detector.findPose(image)
232
- lmList, _ = self.detector.findPosition(image, bboxWithHands=False, draw=False)
233
-
234
- img, faces = self.meshDetector.findFaceMesh(image, draw=False)
235
- leftLandmarkIndex = 172
236
- rightLandmarkIndex = 397
237
-
238
- leftLandmark, rightLandmark = faces[0][leftLandmarkIndex], faces[0][rightLandmarkIndex]
239
- landmarksDistance = int(
240
- ((leftLandmark[0] - rightLandmark[0]) ** 2 + (leftLandmark[1] - rightLandmark[1]) ** 2) ** 0.5)
241
-
242
- logger.info(f"SHOULDER POINT MASK GENERATION :: estimating neck points")
243
-
244
- avg_x1 = int(leftLandmark[0] - landmarksDistance * 0.12)
245
- avg_x2 = int(rightLandmark[0] + landmarksDistance * 0.12)
246
-
247
- avg_y1 = int(leftLandmark[1] + landmarksDistance * 0.5)
248
- avg_y2 = int(rightLandmark[1] + landmarksDistance * 0.5)
249
-
250
- offset = 50
251
- avg_y1 -= offset
252
- avg_y2 -= offset
253
-
254
- logger.info(f"SHOULDER POINT MASK GENERATION :: generating shoulder point mask")
255
-
256
- logger.info("SHOULDER POINT MASK GENERATION :: generating shoulder point mask")
257
-
258
- mask = np.zeros_like(image[:, :, 0])
259
-
260
- mask[avg_y1:, :] = 255
261
-
262
- pts = np.array([[0, 0], [image.shape[1], 0], [avg_x2, avg_y2], [avg_x1, avg_y1]], np.int32)
263
- pts = pts.reshape((-1, 1, 2))
264
- cv2.fillPoly(mask, [pts], 0)
265
-
266
- black_n_white_mask = np.zeros_like(image[:, :, 0])
267
- black_n_white_mask[avg_y1:, :] = 255
268
- logger.info("SHOULDER POINT MASK GENERATION :: mask generated successfully")
269
-
270
- return Image.fromarray(black_n_white_mask.astype(np.uint8))
271
-
272
  def canvasPoints(self, image: Image.Image, jewellery: Image.Image, storename: str) -> dict:
273
  try:
274
  logger.info(f">>> NECKLACE TRY ON STARTED :: {storename} <<<")
 
96
  logger.error(f"{CustomException(e)}:: {storename}")
97
  return [None, "error", None]
98
 
99
+ def shoulderPointMaskGeneration(self, image: Image.Image) -> tuple[Image.Image, tuple[int, int], tuple[int, int]]:
 
100
  try:
101
+ image = np.array(image)
 
 
 
 
102
  copy_image = image.copy()
 
 
 
103
 
104
+ logger.info("SHOULDER POINT MASK GENERATION :: detecting pose and landmarks")
105
  image = self.detector.findPose(image)
106
  lmList, _ = self.detector.findPosition(image, bboxWithHands=False, draw=False)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
 
108
+ img, faces = self.meshDetector.findFaceMesh(image, draw=False)
109
+ leftLandmark, rightLandmark = faces[0][172], faces[0][397]
110
+
111
+ # Using the same distance calculation as necklaceTryOn
112
+ landmarksDistance = np.linalg.norm(np.array(leftLandmark) - np.array(rightLandmark))
 
 
 
113
 
114
+ logger.info("SHOULDER POINT MASK GENERATION :: estimating neck points")
115
+ # Using the same point calculation logic as necklaceTryOn
116
+ avg_x1 = int(leftLandmark[0] - landmarksDistance * 0.12)
117
+ avg_x2 = int(rightLandmark[0] + landmarksDistance * 0.12)
118
+ avg_y1 = int(leftLandmark[1] + landmarksDistance * 0.5)
119
+ avg_y2 = int(rightLandmark[1] + landmarksDistance * 0.5)
120
 
121
+ logger.info("SHOULDER POINT MASK GENERATION :: generating shoulder point mask")
122
+ mask = np.zeros_like(image[:, :, 0])
123
+ mask[avg_y1:, :] = 255
124
+ pts = np.array([[0, 0], [image.shape[1], 0], [avg_x2, avg_y2], [avg_x1, avg_y1]], np.int32)
125
+ pts = pts.reshape((-1, 1, 2))
126
+ cv2.fillPoly(mask, [pts], 0)
127
 
128
+ black_n_white_mask = np.zeros_like(image[:, :, 0])
129
+ black_n_white_mask[avg_y1:, :] = 255
130
+ logger.info("SHOULDER POINT MASK GENERATION :: mask generated successfully")
131
 
132
+ left_point = (avg_x1, avg_y1)
133
+ right_point = (avg_x2, avg_y2)
134
 
135
+ return Image.fromarray(black_n_white_mask.astype(np.uint8)), left_point, right_point
136
 
137
  except Exception as e:
138
+ logger.error(f"SHOULDER POINT MASK GENERATION ERROR: {str(e)}")
139
  raise CustomException(e)
140
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
141
  def canvasPoints(self, image: Image.Image, jewellery: Image.Image, storename: str) -> dict:
142
  try:
143
  logger.info(f">>> NECKLACE TRY ON STARTED :: {storename} <<<")
src/pipelines/completePipeline.py CHANGED
@@ -13,9 +13,9 @@ class Pipeline:
13
  storename=storename)
14
  return [result, headerText, mask]
15
 
16
- async def shoulderPointMaskGeneration_(self, image: Image.Image) -> Image.Image:
17
- mask = self.necklaceTryOnObj.shoulderPointMaskGeneration(image=image)
18
- return mask
19
 
20
  async def canvasPoint(self, image: Image.Image, jewellery: Image.Image, storename: str) -> dict:
21
  points = self.necklaceTryOnObj.canvasPoints(image=image, jewellery=jewellery, storename=storename)
 
13
  storename=storename)
14
  return [result, headerText, mask]
15
 
16
+ async def shoulderPointMaskGeneration_(self, image: Image.Image):
17
+ mask,left,right = self.necklaceTryOnObj.shoulderPointMaskGeneration(image=image)
18
+ return mask,left,right
19
 
20
  async def canvasPoint(self, image: Image.Image, jewellery: Image.Image, storename: str) -> dict:
21
  points = self.necklaceTryOnObj.canvasPoints(image=image, jewellery=jewellery, storename=storename)