from collections import defaultdict from flask import Blueprint, request, jsonify from flask_jwt_extended import jwt_required from flasgger.utils import swag_from from models.address import Address from models.contact import Contact from models.license import License from models.service_role_permission import ServiceRolePermission from models.service_roles import ServiceRole from models.user import User, db from utils import logged_user_id user_bp = Blueprint('user', __name__, url_prefix='/users') @user_bp.route('/', methods=['POST']) @jwt_required() @swag_from('../docs/users/create.yml') # Adiciona a documentação do Swagger def create_user(): # Obtém os dados do corpo da requisição data = request.get_json() username = data.get('username') password = data.get('password') profile_image = data.get('profile_image') birth_date = data.get('birth_date') parent_id = data.get('parent_id') service_instance_id = data.get('service_instance_id') status = data.get('status') cargo = data.get('cargo') cpf = data.get('cpf') pis = data.get('pis') rg = data.get('rg') cod_interno = data.get('cod_interno') # Contato email = data.get('email') phone = data.get('phone') email_secondary = data.get('email_secondary') phone_secondary = data.get('phone_secondary') # Verifica se o usuário já existe existing_user = User.query.filter_by(username=username).first() if existing_user: return jsonify({'error': 'Usuário já existe'}), 400 # Cria um novo usuário user = User( username=username, email=email, phone=phone, profile_image=profile_image, birth_date=birth_date, parent_id=parent_id, service_instance_id=service_instance_id, status=status, cpf=cpf, pis=pis, rg=rg, cod_interno=cod_interno, email_secondary=email_secondary, phone_secondary=phone_secondary, cargo = cargo, ) user.set_password(password) # Adiciona e comita o usuário no banco de dados db.session.add(user) db.session.commit() # Responde com sucesso e inclui o ID do usuário return jsonify({'message': 'Usuário criado com sucesso', 'user_id': user.id}), 201 @user_bp.route('/subordinate', methods=['POST']) @jwt_required() @logged_user_id # Ensure this is a valid decorator, or it must handle the function correctly @swag_from('../docs/users/create_subordinate.yml') def create_subordinate(logged_user_id): # ensure logged_user_id can still return the correct user_id. data = request.get_json() parent_id = logged_user_id # assuming logged_user_id is a function that gets this directly or modifies it. parent = User.query.get_or_404(parent_id) username = data.get('username') password = data.get('password') email = data.get('email') phone = data.get('phone') profile_image = data.get('profile_image') birth_date = data.get('birth_date') service_instance_id = data.get('service_instance_id') status = data.get('status') cpf = data.get('cpf') pis = data.get('pis') rg = data.get('rg') cod_interno = data.get('cod_interno') email_secondary = data.get('email_secondary') phone_secondary = data.get('phone_secondary') cargo = data.get('cargo') existing_user = User.query.filter_by(username=username).first() if existing_user: return jsonify({'error': 'Usuário já existe'}), 400 subordinate = User( username=username, email=email, phone=phone, profile_image=profile_image, birth_date=birth_date, parent_id=parent_id, service_instance_id=service_instance_id, status=status, cpf=cpf, pis=pis, rg=rg, cod_interno=cod_interno, email_secondary=email_secondary, phone_secondary=phone_secondary, cargo=cargo, ) subordinate.set_password(password) db.session.add(subordinate) db.session.commit() return jsonify({'message': 'Subordinado criado com sucesso', 'user_id': subordinate.id}), 201 @user_bp.route('/', methods=['GET']) @jwt_required() @logged_user_id @swag_from('../docs/users/hierarchy_without_id.yml') def list_hierarchy(user_id): def flatten_hierarchy(user): """ Transforma a hierarquia de usuários em uma lista plana, excluindo o usuário logado. """ flat_list = [] # Adiciona os subordinados do usuário, mas não o próprio usuário logado. for child in user.children: # Exclui o usuário logado da listagem (não inclui na hierarquia) flat_list.append({ 'id': child.id, 'username': child.username, 'email': child.email, 'birth_date':child.birth_date, 'phone':child.phone, 'service_instance_id':child.service_instance_id, 'status':child.status, 'cpf': child.cpf, 'pis': child.pis, 'rg': child.rg, 'cod_interno': child.cod_interno, 'email_secondary': child.email_secondary, 'phone_secondary': child.phone_secondary, 'cargo': child.cargo, }) # Verifica recursivamente os filhos (subordinados dos subordinados) flat_list.extend(flatten_hierarchy(child)) return flat_list # Busca o usuário logado user = User.query.filter_by(id=user_id).first() if not user: return jsonify({"error": "User not found"}), 404 # Monta a lista plana a partir do usuário logado, excluindo o próprio flat_hierarchy = flatten_hierarchy(user) return jsonify(flat_hierarchy), 200 @user_bp.route('/hierarchy_by_user_id', methods=['GET']) @jwt_required() @swag_from('../docs/users/hierarchy_by_id.yml') def list_hierarchy_by_id(): def build_hierarchy(user): return { 'id': user.id, 'username': user.username, 'email': user.email, 'birth_date':user.birth_date, 'phone':user.phone, 'subordinates': [build_hierarchy(sub) for sub in user.children], 'service_instance_id':user.service_instance_id, 'status':user.status, 'cpf': user.cpf, 'pis': user.pis, 'rg': user.rg, 'cod_interno': user.cod_interno, 'email_secondary': user.email_secondary, 'phone_secondary': user.phone_secondary, 'cargo': user.cargo, } responsible_id = request.args.get('responsible_id', type=int) # Get o responsible_id da query string # Filtra apenas os usuários que têm parent_id igual ao ID do usuário logado root_users = User.query.filter_by(parent_id=responsible_id).all() # Monta a hierarquia apenas para os subordinados ao usuário logado hierarchy = [build_hierarchy(user) for user in root_users] return jsonify(hierarchy), 200 @user_bp.route('/', methods=['PUT']) @jwt_required() @swag_from('../docs/users/update.yml') def update_user(user_id): data = request.get_json() user = User.query.get_or_404(user_id) # Atualiza os campos fornecidos if 'username' in data and data['username'] != user.username: existing_user = User.query.filter_by(username=data['username']).first() if existing_user: return jsonify({'error': 'Usuário com esse nome já existe'}), 400 user.username = data['username'] if 'email' in data: user.email = data['email'] if 'phone' in data: user.phone = data['phone'] if 'profile_image' in data: user.profile_image = data['profile_image'] if 'birth_date' in data: user.birth_date = data['birth_date'] if 'password' in data: user.set_password(data['password']) if 'status' in data: user.status = data['status'] if 'cpf' in data: user.cpf = data['cpf'] if 'pis' in data: user.pis = data['pis'] if 'rg' in data: user.rg = data['rg'] if 'cod_interno' in data: user.cod_interno = data['cod_interno'] if 'email_secondary' in data: user.email_secondary = data['email_secondary'] if 'phone_secondary' in data: user.phone_secondary = data['phone_secondary'] if 'cargo' in data: user.cargo = data['cargo'] db.session.commit() return jsonify({'message': 'Usuário atualizado com sucesso'}), 200 @user_bp.route('/', methods=['GET']) @jwt_required() @swag_from('../docs/users/get_by_id.yml') def get_user(user_id): user = User.query.get_or_404(user_id) # Se não houver parent_id, usamos o parent_id se houver para pegar dados derivados if not user.parent_id: parent_user = User.query.filter_by(id=user.parent_id).first() if parent_user: contacts = Contact.query.filter_by(user_id=parent_user.id).all() addresses = Address.query.filter_by(user_id=parent_user.id).all() licenses = License.query.filter_by(user_id=parent_user.id).all() else: contacts = Contact.query.filter_by(user_id=user.id).all() addresses = Address.query.filter_by(user_id=user.id).all() licenses = License.query.filter_by(user_id=user.id).all() else: contacts = Contact.query.filter_by(user_id=user.id).all() addresses = Address.query.filter_by(user_id=user.id).all() licenses = License.query.filter_by(user_id=user.id).all() # Recupera as permissões associadas ao usuário service_role_permissions = ServiceRolePermission.query.filter_by(user_id=user.id).all() # Agrupar permissões por service_role_id e service_instance_id permissions_by_role_and_instance = defaultdict(lambda: defaultdict(list)) # Garantir que estamos capturando também o nome do service_role e service_instance for srp in service_role_permissions: # Recuperando nome do service_instance e service_role diretamente da consulta service_instance_name = srp.service_instance.name service_role_name = srp.service_role.name permission = { 'id': srp.permission.id, 'type': srp.permission_type, # Tipo de permissão (read, write, delete) } permissions_by_role_and_instance[srp.service_role.id][srp.service_instance.id].append({ 'permission': permission, 'service_instance_name': service_instance_name, 'service_role_name': service_role_name, }) # Criar a resposta agrupada por service_role e service_instance grouped_permissions = [] for service_role_id, instances in permissions_by_role_and_instance.items(): for service_instance_id, perms in instances.items(): # Recuperar o nome do service_role e service_instance service_instance_name = perms[0]['service_instance_name'] service_role_name = perms[0]['service_role_name'] grouped_permissions.append({ "service_role_id": service_role_id, "service_role_name": service_role_name, "service_instance_id": service_instance_id, "service_instance_name": service_instance_name, "permissions": [p['permission'] for p in perms] # As permissões agrupadas aqui }) # Monta o retorno com os dados do usuário, contatos, endereços, permissões e licenças user_data = { 'id': user.id, 'username': user.username, 'email': user.email, 'parent_id': user.parent_id, 'birth_date':user.birth_date, 'phone':user.phone, 'cpf': user.cpf, 'pis': user.pis, 'rg': user.rg, 'cod_interno': user.cod_interno, 'email_secondary': user.email_secondary, 'phone_secondary': user.phone_secondary, 'cargo': user.cargo, #'service_role_id':user.service_role_id, 'status':user.status, 'licenses': [ { 'license_key': license.license_key, 'service_id': license.service.id, 'service_name': license.service.name, # Adicionando o nome do serviço 'start_date': license.start_date, 'end_date': license.end_date } for license in licenses ], 'permissions': grouped_permissions, 'contacts': [ { 'phone': contact.phone, 'contact_type': contact.contact_type, 'description': contact.description } for contact in contacts ], 'addresses': [ { 'street': address.street, 'city': address.city, 'state': address.state, 'zip_code': address.zip_code, 'country': address.country, 'type': address.address_type.name, 'number': address.number, 'complement': address.complement, 'neighborhood': address.neighborhood } for address in addresses ], } return jsonify({'user': user_data}), 200 @user_bp.route('/', methods=['DELETE']) @jwt_required() @swag_from('../docs/users/delete.yml') def delete_user(user_id): user = User.query.get_or_404(user_id) # Verifica se já está inativo if user.status.lower() == "inativo": return jsonify({'message': 'Usuário já está inativo'}), 400 # Atualiza o status para inativo user.status = "inativo" db.session.commit() return jsonify({'message': 'Usuário excluído com sucesso'}), 200