front_ponto_eletronico/routes/user.py
2025-04-29 15:23:24 -03:00

347 lines
12 KiB
Python

from collections import defaultdict
from flask import Blueprint, request, jsonify
from flask_jwt_extended import jwt_required
from flasgger.utils import swag_from
from models.address import Address
from models.contact import Contact
from models.license import License
from models.service_role_permission import ServiceRolePermission
from models.service_roles import ServiceRole
from models.user import User, db
from utils import logged_user_id
user_bp = Blueprint('user', __name__, url_prefix='/users')
@user_bp.route('/', methods=['POST'])
@jwt_required()
@swag_from('../docs/users/create.yml') # Adiciona a documentação do Swagger
def create_user():
# Obtém os dados do corpo da requisição
data = request.get_json()
username = data.get('username')
password = data.get('password')
profile_image = data.get('profile_image')
admission_date = data.get('admission_date')
parent_id = data.get('parent_id')
service_instance_id = data.get('service_instance_id')
status = data.get('status')
cargo = data.get('cargo')
department = data.get('department')
shift_id = data.get('scheduling_id')
# Contato
email = data.get('email')
# Verifica se o usuário já existe
existing_user = User.query.filter_by(username=username).first()
if existing_user:
return jsonify({'error': 'Usuário já existe'}), 400
# Cria um novo usuário
user = User(
username=username,
email=email,
profile_image=profile_image,
admission_date=admission_date,
parent_id=parent_id,
service_instance_id=service_instance_id,
status=status,
department=department,
shift_id=shift_id,
cargo = cargo,
)
user.set_password(password)
# Adiciona e comita o usuário no banco de dados
db.session.add(user)
db.session.commit()
# Responde com sucesso e inclui o ID do usuário
return jsonify({'message': 'Usuário criado com sucesso', 'user_id': user.id}), 201
@user_bp.route('/subordinate', methods=['POST'])
@jwt_required()
@logged_user_id # Ensure this is a valid decorator, or it must handle the function correctly
@swag_from('../docs/users/create_subordinate.yml')
def create_subordinate(logged_user_id):
# ensure logged_user_id can still return the correct user_id.
data = request.get_json()
parent_id = logged_user_id # assuming logged_user_id is a function that gets this directly or modifies it.
parent = User.query.get_or_404(parent_id)
username = data.get('username')
password = data.get('password')
email = data.get('email')
profile_image = data.get('profile_image')
admission_date = data.get('admission_date')
service_instance_id = data.get('service_instance_id')
status = data.get('status')
department = data.get('department')
cargo = data.get('cargo')
shift_id = data.get('scheduling_id')
existing_user = User.query.filter_by(username=username).first()
if existing_user:
return jsonify({'error': 'Usuário já existe'}), 400
subordinate = User(
username=username,
email=email,
profile_image=profile_image,
parent_id=parent_id,
service_instance_id=service_instance_id,
status=status,
department=department,
shift_id=shift_id,
admission_date=admission_date,
cargo=cargo,
)
subordinate.set_password(password)
db.session.add(subordinate)
db.session.commit()
return jsonify({'message': 'Subordinado criado com sucesso', 'user_id': subordinate.id}), 201
@user_bp.route('/', methods=['GET'])
@jwt_required()
@logged_user_id
@swag_from('../docs/users/hierarchy_without_id.yml')
def list_hierarchy(user_id):
def flatten_hierarchy(user):
"""
Transforma a hierarquia de usuários em uma lista plana, excluindo o usuário logado.
"""
flat_list = []
# Adiciona os subordinados do usuário, mas não o próprio usuário logado.
for child in user.children:
# Exclui o usuário logado da listagem (não inclui na hierarquia)
flat_list.append({
'id': child.id,
'username': child.username,
'email': child.email,
'admission_date':child.admission_date,
'service_instance_id':child.service_instance_id,
'status':child.status,
'cargo': child.cargo,
'department': child.department,
'shift_id': child.shift_id,
})
# Verifica recursivamente os filhos (subordinados dos subordinados)
flat_list.extend(flatten_hierarchy(child))
return flat_list
# Busca o usuário logado
user = User.query.filter_by(id=user_id).first()
if not user:
return jsonify({"error": "User not found"}), 404
# Monta a lista plana a partir do usuário logado, excluindo o próprio
flat_hierarchy = flatten_hierarchy(user)
return jsonify(flat_hierarchy), 200
@user_bp.route('/hierarchy_by_user_id', methods=['GET'])
@jwt_required()
@swag_from('../docs/users/hierarchy_by_id.yml')
def list_hierarchy_by_id():
def build_hierarchy(user):
return {
'id': user.id,
'username': user.username,
'email': user.email,
'subordinates': [build_hierarchy(sub) for sub in user.children],
'service_instance_id':user.service_instance_id,
'status':user.status,
'admission_date':user.admission_date,
'department': user.department,
'shift_id': user.shift_id,
'cargo': user.cargo,
}
responsible_id = request.args.get('responsible_id', type=int) # Get o responsible_id da query string
# Filtra apenas os usuários que têm parent_id igual ao ID do usuário logado
root_users = User.query.filter_by(parent_id=responsible_id).all()
# Monta a hierarquia apenas para os subordinados ao usuário logado
hierarchy = [build_hierarchy(user) for user in root_users]
return jsonify(hierarchy), 200
@user_bp.route('/<int:user_id>', methods=['PUT'])
@jwt_required()
@swag_from('../docs/users/update.yml')
def update_user(user_id):
data = request.get_json()
user = User.query.get_or_404(user_id)
# Atualiza os campos fornecidos
if 'username' in data and data['username'] != user.username:
existing_user = User.query.filter_by(username=data['username']).first()
if existing_user:
return jsonify({'error': 'Usuário com esse nome já existe'}), 400
user.username = data['username']
if 'email' in data:
user.email = data['email']
if 'profile_image' in data:
user.profile_image = data['profile_image']
if 'admission_date' in data:
user.admission_date = data['admission_date']
if 'password' in data:
user.set_password(data['password'])
if 'status' in data:
user.status = data['status']
if 'department' in data:
user.department = data['department']
if 'shift_id' in data:
user.shift_id = data['shift_id']
if 'cargo' in data:
user.cargo = data['cargo']
db.session.commit()
return jsonify({'message': 'Usuário atualizado com sucesso'}), 200
@user_bp.route('/<int:user_id>', methods=['GET'])
@jwt_required()
@swag_from('../docs/users/get_by_id.yml')
def get_user(user_id):
user = User.query.get_or_404(user_id)
# Se não houver parent_id, usamos o parent_id se houver para pegar dados derivados
if not user.parent_id:
parent_user = User.query.filter_by(id=user.parent_id).first()
if parent_user:
contacts = Contact.query.filter_by(user_id=parent_user.id).all()
addresses = Address.query.filter_by(user_id=parent_user.id).all()
licenses = License.query.filter_by(user_id=parent_user.id).all()
else:
contacts = Contact.query.filter_by(user_id=user.id).all()
addresses = Address.query.filter_by(user_id=user.id).all()
licenses = License.query.filter_by(user_id=user.id).all()
else:
contacts = Contact.query.filter_by(user_id=user.id).all()
addresses = Address.query.filter_by(user_id=user.id).all()
licenses = License.query.filter_by(user_id=user.id).all()
# Recupera as permissões associadas ao usuário
service_role_permissions = ServiceRolePermission.query.filter_by(user_id=user.id).all()
# Agrupar permissões por service_role_id e service_instance_id
permissions_by_role_and_instance = defaultdict(lambda: defaultdict(list))
# Garantir que estamos capturando também o nome do service_role e service_instance
for srp in service_role_permissions:
# Recuperando nome do service_instance e service_role diretamente da consulta
service_instance_name = srp.service_instance.name
service_role_name = srp.service_role.name
permission = {
'id': srp.permission.id,
'type': srp.permission_type, # Tipo de permissão (read, write, delete)
}
permissions_by_role_and_instance[srp.service_role.id][srp.service_instance.id].append({
'permission': permission,
'service_instance_name': service_instance_name,
'service_role_name': service_role_name,
})
# Criar a resposta agrupada por service_role e service_instance
grouped_permissions = []
for service_role_id, instances in permissions_by_role_and_instance.items():
for service_instance_id, perms in instances.items():
# Recuperar o nome do service_role e service_instance
service_instance_name = perms[0]['service_instance_name']
service_role_name = perms[0]['service_role_name']
grouped_permissions.append({
"service_role_id": service_role_id,
"service_role_name": service_role_name,
"service_instance_id": service_instance_id,
"service_instance_name": service_instance_name,
"permissions": [p['permission'] for p in perms] # As permissões agrupadas aqui
})
# Monta o retorno com os dados do usuário, contatos, endereços, permissões e licenças
user_data = {
'id': user.id,
'username': user.username,
'email': user.email,
'parent_id': user.parent_id,
'admission_date':user.admission_date,
'department': user.department,
'shift_id': user.shift_id,
'cargo': user.cargo,
#'service_role_id':user.service_role_id,
'status':user.status,
'licenses': [
{
'license_key': license.license_key,
'service_id': license.service.id,
'service_name': license.service.name, # Adicionando o nome do serviço
'start_date': license.start_date,
'end_date': license.end_date
}
for license in licenses
],
'permissions': grouped_permissions,
'contacts': [
{
'phone': contact.phone,
'contact_type': contact.contact_type,
'description': contact.description
}
for contact in contacts
],
'addresses': [
{
'street': address.street,
'city': address.city,
'state': address.state,
'zip_code': address.zip_code,
'country': address.country,
'type': address.address_type.name,
'number': address.number,
'complement': address.complement,
'neighborhood': address.neighborhood
}
for address in addresses
],
}
return jsonify({'user': user_data}), 200
@user_bp.route('/<int:user_id>', methods=['DELETE'])
@jwt_required()
@swag_from('../docs/users/delete.yml')
def delete_user(user_id):
user = User.query.get_or_404(user_id)
# Verifica se já está inativo
if user.status.lower() == "inativo":
return jsonify({'message': 'Usuário já está inativo'}), 400
# Atualiza o status para inativo
user.status = "inativo"
db.session.commit()
return jsonify({'message': 'Usuário excluído com sucesso'}), 200