cadastro-checkin-PE/routes/face_routes_benchmark_uses.py

413 lines
16 KiB
Python

import os
import csv
import time
import logging
import numpy as np
from PIL import Image
from io import BytesIO
from minio import Minio
import face_recognition
from deepface import DeepFace
from facenet_pytorch import MTCNN
from flask import Blueprint, request, jsonify
from services.face_service import compare_faces_service
from services.storage_service import minio_client, BUCKET
from services.storage_service import upload_image_to_minio
logging.basicConfig(level=logging.INFO)
face_bp = Blueprint('face_bp', __name__)
mtcnn = MTCNN(image_size=160, margin=0)
logger = logging.getLogger(__name__)
threshold = float(os.getenv("THRESHOLD", 0.6))
@face_bp.route('/compare_faces', methods=['POST'])
def compare_faces():
logging.info("🖼️ Recebendo imagens para comparação...")
if 'image1' not in request.files or 'image2' not in request.files:
return jsonify({"error": "Missing images. Keys should be 'image1' and 'image2'."}), 400
image1 = request.files['image1']
image2 = request.files['image2']
try:
result = compare_faces_service(image1, image2)
return jsonify(result), 200
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": "Internal Server Error", "details": str(e)}), 500
@face_bp.route('/register_face', methods=['POST'])
def register_face():
person_id = request.form.get("person_id")
image_file = request.files.get("image")
if not person_id or not image_file:
return jsonify({"error": "Missing person_id or image"}), 400
# Salva no MinIO
try:
image_path = upload_image_to_minio(image_file, person_id)
except Exception as e:
return jsonify({"error": "Failed to upload image", "details": str(e)}), 500
# Extrai encoding
image_file.seek(0)
image = face_recognition.load_image_file(image_file)
encodings = face_recognition.face_encodings(image)
if not encodings:
return jsonify({"error": "No face detected in image"}), 400
# OBS: encoding poderia ser salvo num banco, aqui apenas retornamos
encoding = encodings[0].tolist() # JSON serializable
return jsonify({
"person_id": person_id,
"image_path": image_path,
"face_encoding": encoding
}), 200
@face_bp.route('/compare_face_with_registered', methods=['POST'])
def compare_face_with_registered():
logger.info("🔍 Iniciando comparação com registros salvos")
person_id = request.form.get("person_id")
image_file = request.files.get("image")
if not person_id or not image_file:
logger.warning("⚠️ Requisição inválida: 'person_id' ou 'image' ausente")
return jsonify({"error": "Missing person_id or image"}), 400
logger.debug(f"📤 Person ID recebido: {person_id}")
# Processa imagem recebida
try:
logger.info("🧠 Carregando imagem enviada para extração facial")
image = face_recognition.load_image_file(image_file)
input_encoding = face_recognition.face_encodings(image)
if not input_encoding:
logger.warning("⚠️ Nenhuma face detectada na imagem de entrada")
return jsonify({"error": "No face found in input image"}), 400
input_encoding = input_encoding[0]
logger.debug("✅ Encoding da imagem enviada obtido com sucesso")
except Exception as e:
logger.exception("❌ Erro ao processar a imagem enviada")
return jsonify({"error": "Failed to process input image", "details": str(e)}), 500
# Busca imagens registradas no MinIO
try:
logger.info("📂 Buscando imagens registradas no MinIO para o usuário")
registered_objects = list(minio_client.list_objects(BUCKET, prefix=f"{person_id}/", recursive=True))
if not registered_objects:
logger.warning("⚠️ Nenhuma imagem registrada encontrada para este usuário")
return jsonify({"error": "No registered images found for this person_id"}), 404
except Exception as e:
logger.exception("❌ Erro ao listar objetos no MinIO")
return jsonify({"error": "Failed to access MinIO", "details": str(e)}), 500
matches = []
for obj in registered_objects:
try:
logger.debug(f"🔄 Comparando com imagem registrada: {obj.object_name}")
response = minio_client.get_object(BUCKET, obj.object_name)
buffer = BytesIO(response.read())
reg_image = face_recognition.load_image_file(buffer)
encodings = face_recognition.face_encodings(reg_image)
if not encodings:
logger.warning(f"⚠️ Nenhuma face encontrada em {obj.object_name}, ignorando")
continue
encoding = encodings[0]
distance = face_recognition.face_distance([encoding], input_encoding)[0]
similarity = 1 - distance
threshold = float(os.getenv("THRESHOLD", 0.6))
match_result = bool(distance <= threshold)
logger.debug(f"📏 Similaridade: {similarity:.4f} | Match: {match_result}")
matches.append({
"registered_image": str(obj.object_name),
"similarity_score": round(float(similarity), 4),
"match": match_result
})
except Exception as e:
logger.error(f"❌ Erro ao comparar com imagem {obj.object_name}: {str(e)}")
if not matches:
logger.warning("⚠️ Nenhuma comparação válida foi possível")
return jsonify({"error": "No valid registered faces found"}), 404
best_match = sorted(matches, key=lambda m: m["similarity_score"], reverse=True)[0]
logger.info(f"✅ Melhor match encontrado: {best_match['registered_image']} com score {best_match['similarity_score']}")
return jsonify(best_match), 200
@face_bp.route('/benchmark_face_match_working', methods=['POST'])
def benchmark_face_match_working():
logger.info("🔬 Iniciando benchmark facial")
person_id = request.form.get("person_id")
image_file = request.files.get("image")
if not person_id or not image_file:
return jsonify({"error": "Missing person_id or image"}), 400
try:
pil_input = Image.open(image_file).convert("RGB")
input_np = np.array(pil_input)
except Exception as e:
logger.exception("Erro ao carregar imagem")
return jsonify({"error": "Invalid image", "details": str(e)}), 400
try:
registered_objects = list(minio_client.list_objects(BUCKET, prefix=f"{person_id}/", recursive=True))
if not registered_objects:
return jsonify({"error": "No registered images found"}), 404
except Exception as e:
logger.exception("Erro ao acessar MinIO")
return jsonify({"error": "MinIO access error", "details": str(e)}), 500
deepface_models = [
"ArcFace", "Facenet", "Facenet512", "VGG-Face",
"OpenFace", "Dlib", "SFace" # Removido "DeepFace"
]
all_results = []
csv_rows = []
for obj in registered_objects:
logger.info(f"📂 Comparando com imagem: {obj.object_name}")
result_entry = {
"registered_image": str(obj.object_name),
"models": {}
}
try:
response = minio_client.get_object(BUCKET, obj.object_name)
buffer = BytesIO(response.read())
pil_registered = Image.open(buffer).convert("RGB")
reg_np = np.array(pil_registered)
except Exception as e:
logger.error(f"Erro ao carregar imagem: {e}")
continue
# Modelo 1: dlib
try:
t0 = time.time()
enc_input = face_recognition.face_encodings(input_np)
enc_reg = face_recognition.face_encodings(reg_np)
if enc_input and enc_reg:
dist = face_recognition.face_distance([enc_reg[0]], enc_input[0])[0]
sim = 1 - dist
match = dist <= threshold
duration = round(time.time() - t0, 4)
result_entry["models"]["dlib"] = {
"similarity_score": round(sim, 4),
"match": bool(match),
"duration_sec": duration
}
csv_rows.append(["dlib", sim, duration, match])
else:
raise ValueError("Encodings não encontrados")
except Exception as e:
result_entry["models"]["dlib"] = {"error": str(e)}
# DeepFace models
for model in deepface_models:
model_key = f"deepface_{model.lower()}"
try:
t0 = time.time()
analysis = DeepFace.verify(
np.array(pil_input),
np.array(pil_registered),
model_name=model,
enforce_detection=False
)
sim = 1 - analysis['distance']
match = analysis['verified']
duration = round(time.time() - t0, 4)
result_entry["models"][model_key] = {
"similarity_score": round(sim, 4),
"match": bool(match),
# "distance_metric": analysis.get("distance_metric", ""),
"distance_metric": analysis.get("distance_metric", "cosine"),
"duration_sec": duration
}
csv_rows.append([model_key, sim, duration, match])
except Exception as e:
result_entry["models"][model_key] = {"error": str(e)}
all_results.append(result_entry)
# Salva CSV
csv_path = "benchmark_results.csv"
try:
with open(csv_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["model", "similarity_score", "duration_sec", "match"])
writer.writerows(csv_rows)
logger.info(f"✅ CSV salvo em {csv_path}")
except Exception as e:
logger.error(f"❌ Falha ao salvar CSV: {e}")
return jsonify(all_results), 200
@face_bp.route('/benchmark_face_match', methods=['POST'])
def benchmark_face_match():
import csv
logger.info("🧪 Iniciando benchmark com múltiplas imagens de entrada")
person_id = request.form.get("person_id")
image_files = request.files.getlist("images[]")
if not person_id or not image_files:
return jsonify({"error": "Missing person_id or images[]"}), 400
# Validação e conversão de imagens de entrada
input_images = []
for img_file in image_files:
try:
pil_img = Image.open(img_file).convert("RGB")
input_images.append(np.array(pil_img))
except Exception as e:
logger.warning(f"⚠️ Imagem inválida: {img_file.filename} | {str(e)}")
if not input_images:
return jsonify({"error": "No valid input images"}), 400
try:
registered_objects = list(minio_client.list_objects(BUCKET, prefix=f"{person_id}/", recursive=True))
if not registered_objects:
return jsonify({"error": "No registered images found"}), 404
except Exception as e:
logger.exception("❌ Erro ao acessar MinIO")
return jsonify({"error": "MinIO access error", "details": str(e)}), 500
deepface_models = [
"ArcFace", "Facenet", "Facenet512", "VGG-Face",
"OpenFace", "Dlib", "SFace"
]
all_model_stats = {}
# Inicializa estatísticas por modelo
for model in ["dlib", "mtcnn+dlib"] + [f"deepface_{m.lower()}" for m in deepface_models]:
all_model_stats[model] = {
"similarities": [],
"durations": [],
"matches": []
}
for input_np in input_images:
for obj in registered_objects:
try:
response = minio_client.get_object(BUCKET, obj.object_name)
buffer = BytesIO(response.read())
pil_registered = Image.open(buffer).convert("RGB")
reg_np = np.array(pil_registered)
except Exception as e:
logger.warning(f"❌ Erro ao carregar imagem registrada {obj.object_name}: {e}")
continue
# dlib
try:
t0 = time.time()
enc_input = face_recognition.face_encodings(input_np)
enc_reg = face_recognition.face_encodings(reg_np)
if enc_input and enc_reg:
dist = face_recognition.face_distance([enc_reg[0]], enc_input[0])[0]
sim = 1 - dist
match = dist <= threshold
duration = time.time() - t0
all_model_stats["dlib"]["similarities"].append(sim)
all_model_stats["dlib"]["durations"].append(duration)
all_model_stats["dlib"]["matches"].append(match)
except Exception as e:
logger.warning(f"⚠️ Dlib falhou: {e}")
# mtcnn + dlib
try:
t0 = time.time()
t_input = mtcnn(Image.fromarray(input_np))
t_reg = mtcnn(pil_registered)
if t_input is None or t_reg is None:
raise ValueError("MTCNN não detectou rosto")
arr_input = (t_input.permute(1,2,0).numpy()*255).astype(np.uint8)
arr_reg = (t_reg.permute(1,2,0).numpy()*255).astype(np.uint8)
enc_input = face_recognition.face_encodings(arr_input)
enc_reg = face_recognition.face_encodings(arr_reg)
if enc_input and enc_reg:
dist = face_recognition.face_distance([enc_reg[0]], enc_input[0])[0]
sim = 1 - dist
match = dist <= threshold
duration = time.time() - t0
all_model_stats["mtcnn+dlib"]["similarities"].append(sim)
all_model_stats["mtcnn+dlib"]["durations"].append(duration)
all_model_stats["mtcnn+dlib"]["matches"].append(match)
except Exception as e:
logger.warning(f"⚠️ mtcnn+dlib falhou: {e}")
# DeepFace models
for model in deepface_models:
model_key = f"deepface_{model.lower()}"
try:
t0 = time.time()
analysis = DeepFace.verify(
input_np,
reg_np,
model_name=model,
enforce_detection=False
)
sim = 1 - analysis['distance']
match = analysis['verified']
duration = time.time() - t0
all_model_stats[model_key]["similarities"].append(sim)
all_model_stats[model_key]["durations"].append(duration)
all_model_stats[model_key]["matches"].append(match)
except Exception as e:
logger.warning(f"⚠️ DeepFace ({model}) falhou: {e}")
# Calcular médias
results = []
for model, stats in all_model_stats.items():
if stats["similarities"]:
avg_sim = sum(stats["similarities"]) / len(stats["similarities"])
avg_time = sum(stats["durations"]) / len(stats["durations"])
match_rate = sum(stats["matches"]) / len(stats["matches"])
results.append({
"model": model,
"avg_similarity_score": round(avg_sim, 4),
"avg_duration_sec": round(avg_time, 4),
"avg_match_rate": round(match_rate, 4)
})
# Salvar CSV
csv_path = "benchmark_results.csv"
try:
with open(csv_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["model", "avg_similarity_score", "avg_duration_sec", "avg_match_rate"])
writer.writeheader()
writer.writerows(results)
logger.info(f"✅ CSV salvo com {len(results)} modelos em {csv_path}")
except Exception as e:
logger.error(f"❌ Falha ao salvar CSV: {e}")
return jsonify(results), 200