from collections import defaultdict from flask import Blueprint, request, jsonify from flask_jwt_extended import jwt_required from flasgger.utils import swag_from from models.address import Address from models.contact import Contact from models.license import License from models.service_role_permission import ServiceRolePermission from models.user import User, db from utils import logged_user_id user_bp = Blueprint('user', __name__, url_prefix='/users') @user_bp.route('/', methods=['POST']) @jwt_required() @swag_from('../docs/users/create.yml') # Adiciona a documentação do Swagger def create_user(): # Obtém os dados do corpo da requisição data = request.get_json() username = data.get('username') password = data.get('password') email = data.get('email') phone = data.get('phone') profile_image = data.get('profile_image') birth_date = data.get('birth_date') parent_id = data.get('parent_id') # Verifica se o usuário já existe existing_user = User.query.filter_by(username=username).first() if existing_user: return jsonify({'error': 'Usuário já existe'}), 400 # Cria um novo usuário user = User( username=username, email=email, phone=phone, profile_image=profile_image, birth_date=birth_date, parent_id=parent_id ) user.set_password(password) # Adiciona e comita o usuário no banco de dados db.session.add(user) db.session.commit() # Responde com sucesso e inclui o ID do usuário return jsonify({'message': 'Usuário criado com sucesso', 'user_id': user.id}), 201 @user_bp.route('/subordinate', methods=['POST']) @jwt_required() @logged_user_id # Ensure this is a valid decorator, or it must handle the function correctly @swag_from('../docs/users/create_subordinate.yml') def create_subordinate(logged_user_id): # ensure logged_user_id can still return the correct user_id. data = request.get_json() parent_id = logged_user_id # assuming logged_user_id is a function that gets this directly or modifies it. parent = User.query.get_or_404(parent_id) username = data.get('username') password = data.get('password') email = data.get('email') phone = data.get('phone') profile_image = data.get('profile_image') birth_date = data.get('birth_date') existing_user = User.query.filter_by(username=username).first() if existing_user: return jsonify({'error': 'Usuário já existe'}), 400 subordinate = User( username=username, email=email, phone=phone, profile_image=profile_image, birth_date=birth_date, parent_id=parent_id, ) subordinate.set_password(password) db.session.add(subordinate) db.session.commit() return jsonify({'message': 'Subordinado criado com sucesso', 'user_id': subordinate.id}), 201 @user_bp.route('/', methods=['GET']) @jwt_required() @logged_user_id @swag_from('../docs/users/hierarchy_without_id.yml') def list_hierarchy(user_id): def flatten_hierarchy(user): """ Transforma a hierarquia de usuários em uma lista plana, excluindo o usuário logado. """ flat_list = [] # Adiciona os subordinados do usuário, mas não o próprio usuário logado. for child in user.children: # Exclui o usuário logado da listagem (não inclui na hierarquia) flat_list.append({ 'id': child.id, 'username': child.username, 'email': child.email, 'birth_date':child.birth_date, 'phone':child.phone }) # Verifica recursivamente os filhos (subordinados dos subordinados) flat_list.extend(flatten_hierarchy(child)) return flat_list # Busca o usuário logado user = User.query.filter_by(id=user_id).first() if not user: return jsonify({"error": "User not found"}), 404 # Monta a lista plana a partir do usuário logado, excluindo o próprio flat_hierarchy = flatten_hierarchy(user) return jsonify(flat_hierarchy), 200 @user_bp.route('/hierarchy_by_user_id', methods=['GET']) @jwt_required() @swag_from('../docs/users/hierarchy_by_id.yml') def list_hierarchy_by_id(): def build_hierarchy(user): return { 'id': user.id, 'username': user.username, 'email': user.email, 'birth_date':user.birth_date, 'phone':user.phone, 'subordinates': [build_hierarchy(sub) for sub in user.children] } responsible_id = request.args.get('responsible_id', type=int) # Get o responsible_id da query string # Filtra apenas os usuários que têm parent_id igual ao ID do usuário logado root_users = User.query.filter_by(parent_id=responsible_id).all() # Monta a hierarquia apenas para os subordinados ao usuário logado hierarchy = [build_hierarchy(user) for user in root_users] return jsonify(hierarchy), 200 @user_bp.route('/', methods=['PUT']) @jwt_required() @swag_from('../docs/users/update.yml') def update_user(user_id): data = request.get_json() user = User.query.get_or_404(user_id) # Atualiza os campos fornecidos if 'username' in data and data['username'] != user.username: existing_user = User.query.filter_by(username=data['username']).first() if existing_user: return jsonify({'error': 'Usuário com esse nome já existe'}), 400 user.username = data['username'] if 'email' in data: user.email = data['email'] if 'phone' in data: user.phone = data['phone'] if 'profile_image' in data: user.profile_image = data['profile_image'] if 'birth_date' in data: user.birth_date = data['birth_date'] if 'password' in data: user.set_password(data['password']) db.session.commit() return jsonify({'message': 'Usuário atualizado com sucesso'}), 200 @user_bp.route('/', methods=['GET']) @jwt_required() @swag_from('../docs/users/get_by_id.yml') def get_user(user_id): user = User.query.get_or_404(user_id) # Se não houver parent_id, usamos o parent_id se houver para pegar dados derivados if not user.parent_id: parent_user = User.query.filter_by(id=user.parent_id).first() if parent_user: contacts = Contact.query.filter_by(user_id=parent_user.id).all() addresses = Address.query.filter_by(user_id=parent_user.id).all() licenses = License.query.filter_by(user_id=parent_user.id).all() else: contacts = Contact.query.filter_by(user_id=user.id).all() addresses = Address.query.filter_by(user_id=user.id).all() licenses = License.query.filter_by(user_id=user.id).all() else: contacts = Contact.query.filter_by(user_id=user.id).all() addresses = Address.query.filter_by(user_id=user.id).all() licenses = License.query.filter_by(user_id=user.id).all() # Recupera as permissões associadas ao usuário service_role_permissions = ServiceRolePermission.query.filter_by(user_id=user.id).all() # Agrupar permissões por service_role_id e service_instance_id permissions_by_role_and_instance = defaultdict(lambda: defaultdict(list)) # Garantir que estamos capturando também o nome do service_role e service_instance for srp in service_role_permissions: # Recuperando nome do service_instance e service_role diretamente da consulta service_instance_name = srp.service_instance.name service_role_name = srp.service_role.name permission = { 'id': srp.permission.id, 'type': srp.permission_type, # Tipo de permissão (read, write, delete) } permissions_by_role_and_instance[srp.service_role.id][srp.service_instance.id].append({ 'permission': permission, 'service_instance_name': service_instance_name, 'service_role_name': service_role_name, }) # Criar a resposta agrupada por service_role e service_instance grouped_permissions = [] for service_role_id, instances in permissions_by_role_and_instance.items(): for service_instance_id, perms in instances.items(): # Recuperar o nome do service_role e service_instance service_instance_name = perms[0]['service_instance_name'] service_role_name = perms[0]['service_role_name'] grouped_permissions.append({ "service_role_id": service_role_id, "service_role_name": service_role_name, "service_instance_id": service_instance_id, "service_instance_name": service_instance_name, "permissions": [p['permission'] for p in perms] # As permissões agrupadas aqui }) # Monta o retorno com os dados do usuário, contatos, endereços, permissões e licenças user_data = { 'id': user.id, 'username': user.username, 'email': user.email, 'parent_id': user.parent_id, 'birth_date':user.birth_date, 'phone':user.phone, 'licenses': [ { 'license_key': license.license_key, 'service_id': license.service.id, 'service_name': license.service.name, # Adicionando o nome do serviço 'start_date': license.start_date, 'end_date': license.end_date } for license in licenses ], 'permissions': grouped_permissions, 'contacts': [ { 'phone': contact.phone, 'contact_type': contact.contact_type, 'description': contact.description } for contact in contacts ], 'addresses': [ { 'street': address.street, 'city': address.city, 'state': address.state, 'zip_code': address.zip_code, 'country': address.country, 'type': address.address_type.name } for address in addresses ], } return jsonify({'user': user_data}), 200 @user_bp.route('/', methods=['DELETE']) @jwt_required() @swag_from('../docs/users/delete.yml') def delete_user(user_id): user = User.query.get_or_404(user_id) # Antes de excluir, você pode adicionar validações adicionais se necessário db.session.delete(user) db.session.commit() return jsonify({'message': 'Usuário excluído com sucesso'}), 200