413 lines
16 KiB
Python
413 lines
16 KiB
Python
import os
|
|
import csv
|
|
import time
|
|
import logging
|
|
|
|
import numpy as np
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
from minio import Minio
|
|
import face_recognition
|
|
from deepface import DeepFace
|
|
from facenet_pytorch import MTCNN
|
|
from flask import Blueprint, request, jsonify
|
|
|
|
from services.face_service import compare_faces_service
|
|
from services.storage_service import minio_client, BUCKET
|
|
from services.storage_service import upload_image_to_minio
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
face_bp = Blueprint('face_bp', __name__)
|
|
mtcnn = MTCNN(image_size=160, margin=0)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
threshold = float(os.getenv("THRESHOLD", 0.6))
|
|
|
|
|
|
@face_bp.route('/compare_faces', methods=['POST'])
|
|
def compare_faces():
|
|
logging.info("🖼️ Recebendo imagens para comparação...")
|
|
|
|
if 'image1' not in request.files or 'image2' not in request.files:
|
|
return jsonify({"error": "Missing images. Keys should be 'image1' and 'image2'."}), 400
|
|
|
|
image1 = request.files['image1']
|
|
image2 = request.files['image2']
|
|
|
|
try:
|
|
result = compare_faces_service(image1, image2)
|
|
return jsonify(result), 200
|
|
except ValueError as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
except Exception as e:
|
|
return jsonify({"error": "Internal Server Error", "details": str(e)}), 500
|
|
|
|
|
|
|
|
@face_bp.route('/register_face', methods=['POST'])
|
|
def register_face():
|
|
person_id = request.form.get("person_id")
|
|
image_file = request.files.get("image")
|
|
|
|
if not person_id or not image_file:
|
|
return jsonify({"error": "Missing person_id or image"}), 400
|
|
|
|
# Salva no MinIO
|
|
try:
|
|
image_path = upload_image_to_minio(image_file, person_id)
|
|
except Exception as e:
|
|
return jsonify({"error": "Failed to upload image", "details": str(e)}), 500
|
|
|
|
# Extrai encoding
|
|
image_file.seek(0)
|
|
image = face_recognition.load_image_file(image_file)
|
|
encodings = face_recognition.face_encodings(image)
|
|
|
|
if not encodings:
|
|
return jsonify({"error": "No face detected in image"}), 400
|
|
|
|
# OBS: encoding poderia ser salvo num banco, aqui apenas retornamos
|
|
encoding = encodings[0].tolist() # JSON serializable
|
|
|
|
return jsonify({
|
|
"person_id": person_id,
|
|
"image_path": image_path,
|
|
"face_encoding": encoding
|
|
}), 200
|
|
|
|
|
|
|
|
|
|
|
|
@face_bp.route('/compare_face_with_registered', methods=['POST'])
|
|
def compare_face_with_registered():
|
|
logger.info("🔍 Iniciando comparação com registros salvos")
|
|
|
|
person_id = request.form.get("person_id")
|
|
image_file = request.files.get("image")
|
|
|
|
if not person_id or not image_file:
|
|
logger.warning("⚠️ Requisição inválida: 'person_id' ou 'image' ausente")
|
|
return jsonify({"error": "Missing person_id or image"}), 400
|
|
|
|
logger.debug(f"📤 Person ID recebido: {person_id}")
|
|
|
|
# Processa imagem recebida
|
|
try:
|
|
logger.info("🧠 Carregando imagem enviada para extração facial")
|
|
image = face_recognition.load_image_file(image_file)
|
|
input_encoding = face_recognition.face_encodings(image)
|
|
if not input_encoding:
|
|
logger.warning("⚠️ Nenhuma face detectada na imagem de entrada")
|
|
return jsonify({"error": "No face found in input image"}), 400
|
|
input_encoding = input_encoding[0]
|
|
logger.debug("✅ Encoding da imagem enviada obtido com sucesso")
|
|
except Exception as e:
|
|
logger.exception("❌ Erro ao processar a imagem enviada")
|
|
return jsonify({"error": "Failed to process input image", "details": str(e)}), 500
|
|
|
|
# Busca imagens registradas no MinIO
|
|
try:
|
|
logger.info("📂 Buscando imagens registradas no MinIO para o usuário")
|
|
registered_objects = list(minio_client.list_objects(BUCKET, prefix=f"{person_id}/", recursive=True))
|
|
if not registered_objects:
|
|
logger.warning("⚠️ Nenhuma imagem registrada encontrada para este usuário")
|
|
return jsonify({"error": "No registered images found for this person_id"}), 404
|
|
except Exception as e:
|
|
logger.exception("❌ Erro ao listar objetos no MinIO")
|
|
return jsonify({"error": "Failed to access MinIO", "details": str(e)}), 500
|
|
|
|
matches = []
|
|
for obj in registered_objects:
|
|
try:
|
|
logger.debug(f"🔄 Comparando com imagem registrada: {obj.object_name}")
|
|
response = minio_client.get_object(BUCKET, obj.object_name)
|
|
buffer = BytesIO(response.read())
|
|
reg_image = face_recognition.load_image_file(buffer)
|
|
encodings = face_recognition.face_encodings(reg_image)
|
|
if not encodings:
|
|
logger.warning(f"⚠️ Nenhuma face encontrada em {obj.object_name}, ignorando")
|
|
continue
|
|
|
|
encoding = encodings[0]
|
|
distance = face_recognition.face_distance([encoding], input_encoding)[0]
|
|
similarity = 1 - distance
|
|
threshold = float(os.getenv("THRESHOLD", 0.6))
|
|
match_result = bool(distance <= threshold)
|
|
|
|
logger.debug(f"📏 Similaridade: {similarity:.4f} | Match: {match_result}")
|
|
|
|
matches.append({
|
|
"registered_image": str(obj.object_name),
|
|
"similarity_score": round(float(similarity), 4),
|
|
"match": match_result
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"❌ Erro ao comparar com imagem {obj.object_name}: {str(e)}")
|
|
|
|
if not matches:
|
|
logger.warning("⚠️ Nenhuma comparação válida foi possível")
|
|
return jsonify({"error": "No valid registered faces found"}), 404
|
|
|
|
best_match = sorted(matches, key=lambda m: m["similarity_score"], reverse=True)[0]
|
|
logger.info(f"✅ Melhor match encontrado: {best_match['registered_image']} com score {best_match['similarity_score']}")
|
|
|
|
return jsonify(best_match), 200
|
|
|
|
|
|
|
|
|
|
@face_bp.route('/benchmark_face_match_working', methods=['POST'])
|
|
def benchmark_face_match_working():
|
|
logger.info("🔬 Iniciando benchmark facial")
|
|
|
|
person_id = request.form.get("person_id")
|
|
image_file = request.files.get("image")
|
|
|
|
if not person_id or not image_file:
|
|
return jsonify({"error": "Missing person_id or image"}), 400
|
|
|
|
try:
|
|
pil_input = Image.open(image_file).convert("RGB")
|
|
input_np = np.array(pil_input)
|
|
except Exception as e:
|
|
logger.exception("Erro ao carregar imagem")
|
|
return jsonify({"error": "Invalid image", "details": str(e)}), 400
|
|
|
|
try:
|
|
registered_objects = list(minio_client.list_objects(BUCKET, prefix=f"{person_id}/", recursive=True))
|
|
if not registered_objects:
|
|
return jsonify({"error": "No registered images found"}), 404
|
|
except Exception as e:
|
|
logger.exception("Erro ao acessar MinIO")
|
|
return jsonify({"error": "MinIO access error", "details": str(e)}), 500
|
|
|
|
deepface_models = [
|
|
"ArcFace", "Facenet", "Facenet512", "VGG-Face",
|
|
"OpenFace", "Dlib", "SFace" # Removido "DeepFace"
|
|
]
|
|
|
|
all_results = []
|
|
csv_rows = []
|
|
|
|
for obj in registered_objects:
|
|
logger.info(f"📂 Comparando com imagem: {obj.object_name}")
|
|
result_entry = {
|
|
"registered_image": str(obj.object_name),
|
|
"models": {}
|
|
}
|
|
|
|
try:
|
|
response = minio_client.get_object(BUCKET, obj.object_name)
|
|
buffer = BytesIO(response.read())
|
|
pil_registered = Image.open(buffer).convert("RGB")
|
|
reg_np = np.array(pil_registered)
|
|
except Exception as e:
|
|
logger.error(f"Erro ao carregar imagem: {e}")
|
|
continue
|
|
|
|
# Modelo 1: dlib
|
|
try:
|
|
t0 = time.time()
|
|
enc_input = face_recognition.face_encodings(input_np)
|
|
enc_reg = face_recognition.face_encodings(reg_np)
|
|
if enc_input and enc_reg:
|
|
dist = face_recognition.face_distance([enc_reg[0]], enc_input[0])[0]
|
|
sim = 1 - dist
|
|
match = dist <= threshold
|
|
duration = round(time.time() - t0, 4)
|
|
result_entry["models"]["dlib"] = {
|
|
"similarity_score": round(sim, 4),
|
|
"match": bool(match),
|
|
"duration_sec": duration
|
|
}
|
|
csv_rows.append(["dlib", sim, duration, match])
|
|
else:
|
|
raise ValueError("Encodings não encontrados")
|
|
except Exception as e:
|
|
result_entry["models"]["dlib"] = {"error": str(e)}
|
|
|
|
|
|
# DeepFace models
|
|
for model in deepface_models:
|
|
model_key = f"deepface_{model.lower()}"
|
|
try:
|
|
t0 = time.time()
|
|
analysis = DeepFace.verify(
|
|
np.array(pil_input),
|
|
np.array(pil_registered),
|
|
model_name=model,
|
|
enforce_detection=False
|
|
)
|
|
sim = 1 - analysis['distance']
|
|
match = analysis['verified']
|
|
duration = round(time.time() - t0, 4)
|
|
result_entry["models"][model_key] = {
|
|
"similarity_score": round(sim, 4),
|
|
"match": bool(match),
|
|
# "distance_metric": analysis.get("distance_metric", ""),
|
|
"distance_metric": analysis.get("distance_metric", "cosine"),
|
|
"duration_sec": duration
|
|
}
|
|
csv_rows.append([model_key, sim, duration, match])
|
|
except Exception as e:
|
|
result_entry["models"][model_key] = {"error": str(e)}
|
|
|
|
all_results.append(result_entry)
|
|
|
|
# Salva CSV
|
|
csv_path = "benchmark_results.csv"
|
|
try:
|
|
with open(csv_path, "w", newline="") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(["model", "similarity_score", "duration_sec", "match"])
|
|
writer.writerows(csv_rows)
|
|
logger.info(f"✅ CSV salvo em {csv_path}")
|
|
except Exception as e:
|
|
logger.error(f"❌ Falha ao salvar CSV: {e}")
|
|
|
|
return jsonify(all_results), 200
|
|
|
|
|
|
@face_bp.route('/benchmark_face_match', methods=['POST'])
|
|
def benchmark_face_match():
|
|
import csv
|
|
|
|
logger.info("🧪 Iniciando benchmark com múltiplas imagens de entrada")
|
|
person_id = request.form.get("person_id")
|
|
image_files = request.files.getlist("images[]")
|
|
|
|
if not person_id or not image_files:
|
|
return jsonify({"error": "Missing person_id or images[]"}), 400
|
|
|
|
# Validação e conversão de imagens de entrada
|
|
input_images = []
|
|
for img_file in image_files:
|
|
try:
|
|
pil_img = Image.open(img_file).convert("RGB")
|
|
input_images.append(np.array(pil_img))
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Imagem inválida: {img_file.filename} | {str(e)}")
|
|
|
|
if not input_images:
|
|
return jsonify({"error": "No valid input images"}), 400
|
|
|
|
try:
|
|
registered_objects = list(minio_client.list_objects(BUCKET, prefix=f"{person_id}/", recursive=True))
|
|
if not registered_objects:
|
|
return jsonify({"error": "No registered images found"}), 404
|
|
except Exception as e:
|
|
logger.exception("❌ Erro ao acessar MinIO")
|
|
return jsonify({"error": "MinIO access error", "details": str(e)}), 500
|
|
|
|
deepface_models = [
|
|
"ArcFace", "Facenet", "Facenet512", "VGG-Face",
|
|
"OpenFace", "Dlib", "SFace"
|
|
]
|
|
all_model_stats = {}
|
|
|
|
# Inicializa estatísticas por modelo
|
|
for model in ["dlib", "mtcnn+dlib"] + [f"deepface_{m.lower()}" for m in deepface_models]:
|
|
all_model_stats[model] = {
|
|
"similarities": [],
|
|
"durations": [],
|
|
"matches": []
|
|
}
|
|
|
|
for input_np in input_images:
|
|
for obj in registered_objects:
|
|
try:
|
|
response = minio_client.get_object(BUCKET, obj.object_name)
|
|
buffer = BytesIO(response.read())
|
|
pil_registered = Image.open(buffer).convert("RGB")
|
|
reg_np = np.array(pil_registered)
|
|
except Exception as e:
|
|
logger.warning(f"❌ Erro ao carregar imagem registrada {obj.object_name}: {e}")
|
|
continue
|
|
|
|
# dlib
|
|
try:
|
|
t0 = time.time()
|
|
enc_input = face_recognition.face_encodings(input_np)
|
|
enc_reg = face_recognition.face_encodings(reg_np)
|
|
if enc_input and enc_reg:
|
|
dist = face_recognition.face_distance([enc_reg[0]], enc_input[0])[0]
|
|
sim = 1 - dist
|
|
match = dist <= threshold
|
|
duration = time.time() - t0
|
|
all_model_stats["dlib"]["similarities"].append(sim)
|
|
all_model_stats["dlib"]["durations"].append(duration)
|
|
all_model_stats["dlib"]["matches"].append(match)
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Dlib falhou: {e}")
|
|
|
|
# mtcnn + dlib
|
|
try:
|
|
t0 = time.time()
|
|
t_input = mtcnn(Image.fromarray(input_np))
|
|
t_reg = mtcnn(pil_registered)
|
|
if t_input is None or t_reg is None:
|
|
raise ValueError("MTCNN não detectou rosto")
|
|
arr_input = (t_input.permute(1,2,0).numpy()*255).astype(np.uint8)
|
|
arr_reg = (t_reg.permute(1,2,0).numpy()*255).astype(np.uint8)
|
|
enc_input = face_recognition.face_encodings(arr_input)
|
|
enc_reg = face_recognition.face_encodings(arr_reg)
|
|
if enc_input and enc_reg:
|
|
dist = face_recognition.face_distance([enc_reg[0]], enc_input[0])[0]
|
|
sim = 1 - dist
|
|
match = dist <= threshold
|
|
duration = time.time() - t0
|
|
all_model_stats["mtcnn+dlib"]["similarities"].append(sim)
|
|
all_model_stats["mtcnn+dlib"]["durations"].append(duration)
|
|
all_model_stats["mtcnn+dlib"]["matches"].append(match)
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ mtcnn+dlib falhou: {e}")
|
|
|
|
# DeepFace models
|
|
for model in deepface_models:
|
|
model_key = f"deepface_{model.lower()}"
|
|
try:
|
|
t0 = time.time()
|
|
analysis = DeepFace.verify(
|
|
input_np,
|
|
reg_np,
|
|
model_name=model,
|
|
enforce_detection=False
|
|
)
|
|
sim = 1 - analysis['distance']
|
|
match = analysis['verified']
|
|
duration = time.time() - t0
|
|
all_model_stats[model_key]["similarities"].append(sim)
|
|
all_model_stats[model_key]["durations"].append(duration)
|
|
all_model_stats[model_key]["matches"].append(match)
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ DeepFace ({model}) falhou: {e}")
|
|
|
|
# Calcular médias
|
|
results = []
|
|
for model, stats in all_model_stats.items():
|
|
if stats["similarities"]:
|
|
avg_sim = sum(stats["similarities"]) / len(stats["similarities"])
|
|
avg_time = sum(stats["durations"]) / len(stats["durations"])
|
|
match_rate = sum(stats["matches"]) / len(stats["matches"])
|
|
results.append({
|
|
"model": model,
|
|
"avg_similarity_score": round(avg_sim, 4),
|
|
"avg_duration_sec": round(avg_time, 4),
|
|
"avg_match_rate": round(match_rate, 4)
|
|
})
|
|
|
|
# Salvar CSV
|
|
csv_path = "benchmark_results.csv"
|
|
try:
|
|
with open(csv_path, "w", newline="") as f:
|
|
writer = csv.DictWriter(f, fieldnames=["model", "avg_similarity_score", "avg_duration_sec", "avg_match_rate"])
|
|
writer.writeheader()
|
|
writer.writerows(results)
|
|
logger.info(f"✅ CSV salvo com {len(results)} modelos em {csv_path}")
|
|
except Exception as e:
|
|
logger.error(f"❌ Falha ao salvar CSV: {e}")
|
|
|
|
return jsonify(results), 200
|