import os import logging from datetime import datetime, date, time as dtime, timedelta from typing import Any, Dict, List, Set from collections import defaultdict import pandas as pd import pymysql from dotenv import load_dotenv # ----------------- Logging ----------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s" ) logger = logging.getLogger("db.pipeline") load_dotenv() # ----------------- ENV / Config ----------------- DB_HOST = os.getenv("DB_HOST", "localhost") DB_PORT = int(os.getenv("DB_PORT", "3306")) DB_USER = os.getenv("DB_USER", "user") DB_PASSWORD = os.getenv("DB_PASSWORD", "pass") DB_NAME = os.getenv("DB_NAME", "database") DB_CHARSET = os.getenv("DB_CHARSET", "utf8mb4") SERVICE_INSTANCE = os.getenv("SERVICE_INSTANCE", "") # Nomes de tabelas (ajuste conforme seu esquema) TBL_HOLIDAY = os.getenv("TBL_HOLIDAY", "holiday") TBL_SHIFT = os.getenv("TBL_SHIFT", "shift") TBL_TIME_RECORDS = os.getenv("TBL_TIME_RECORDS", "time_records") TBL_USER = os.getenv("TBL_USER", "user") # Colunas-chave (ajuste conforme seu esquema) COL_HOLI_DATE = os.getenv("COL_HOLI_DATE", "data") # holiday.data (DATE/DATETIME) COL_HOLI_SVC = os.getenv("COL_HOLI_SVC", "service_instance") # holiday.service_instance (opcional) COL_SHIFT_ID = os.getenv("COL_SHIFT_ID", "id") COL_SHIFT_SVC = os.getenv("COL_SHIFT_SVC", "service_instance") COL_SHIFT_START = os.getenv("COL_SHIFT_START", "start_time") COL_SHIFT_END = os.getenv("COL_SHIFT_END", "end_time") COL_SHIFT_INT_S = os.getenv("COL_SHIFT_INT_S", "interval_start") COL_SHIFT_INT_E = os.getenv("COL_SHIFT_INT_E", "interval_end") COL_TR_ID = os.getenv("COL_TR_ID", "id") COL_TR_DATE = os.getenv("COL_TR_DATE", "data") COL_TR_USER_ID = os.getenv("COL_TR_USER_ID", "user_id") COL_TR_IN = os.getenv("COL_TR_IN", "hora_entrada") COL_TR_OUT = os.getenv("COL_TR_OUT", "hora_saida") COL_TR_INT_IN = os.getenv("COL_TR_INT_IN", "hora_entrada_intervalo") COL_TR_INT_OUT = os.getenv("COL_TR_INT_OUT", "hora_retorno_intervalo") COL_TR_LOCAL = os.getenv("COL_TR_LOCAL", "local") COL_TR_STATUS = os.getenv("COL_TR_STATUS", "status") COL_TR_HEXTRA = os.getenv("COL_TR_HEXTRA", "horas_extras") COL_TR_HNOTURNA = os.getenv("COL_TR_HORAS_NOTURNAS", "horas_noturnas") COL_TR_TIPO_CALC = os.getenv("COL_TR_TIPO_CALC", "tipo_calculo") # pode não existir no seu schema COL_USR_ID = os.getenv("COL_USR_ID", "id") COL_USR_SHIFT_ID = os.getenv("COL_USR_SHIFT_ID", "shift_id") COL_USR_SALDO = os.getenv("COL_USR_SALDO", "saldo_atual_horas") COL_USR_SALDO100 = os.getenv("COL_USR_SALDO100", "saldo_atual_horas_100") # Janela noturna NIGHT_START_HH = int(os.getenv("NIGHT_START_HH", "18")) NIGHT_END_HH = int(os.getenv("NIGHT_END_HH", "22")) # ========= HELPERS PARA TIME ========= def _to_time(val) -> dtime | None: """Converte TIME do MySQL (timedelta/time/str/Timestamp) para datetime.time.""" if val is None: return None if isinstance(val, dtime): return val if isinstance(val, timedelta): total = int(val.total_seconds()) % (24*3600) return (datetime.min + timedelta(seconds=total)).time() if isinstance(val, pd.Timestamp): return val.time() if isinstance(val, datetime): return val.time() if isinstance(val, str): for fmt in ("%H:%M:%S", "%H:%M"): try: return datetime.strptime(val, fmt).time() except ValueError: pass return pd.to_datetime(val).time() return pd.to_datetime(val).time() def _sec_since_midnight(t: dtime) -> int: return t.hour*3600 + t.minute*60 + t.second def _hours_between_times(start_t: dtime, end_t: dtime) -> float: """Horas entre duas horas no mesmo dia; suporta cruzar meia-noite.""" s, e = _sec_since_midnight(start_t), _sec_since_midnight(end_t) if e < s: e += 24*3600 # cruzou meia-noite return (e - s) / 3600.0 def _overlap_hours(a_start: dtime, a_end: dtime, b_start: dtime, b_end: dtime) -> float: """Horas de sobreposição; suporta cruzar meia-noite em ambos os intervalos.""" def expand(start, end): s, e = _sec_since_midnight(start), _sec_since_midnight(end) if e < s: # cruza meia-noite return [(s, 24*3600), (0, e)] return [(s, e)] A, B = expand(a_start, a_end), expand(b_start, b_end) overlap = 0 for s1, e1 in A: for s2, e2 in B: overlap += max(0, min(e1, e2) - max(s1, s2)) return overlap / 3600.0 def _to_iso_time(val) -> str | None: """Formata para 'HH:MM:SS'.""" if val is None: return None t = _to_time(val) return t.isoformat() if t else None # ========= DB ========= def get_conn(): return pymysql.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, database=DB_NAME, charset=DB_CHARSET, cursorclass=pymysql.cursors.DictCursor, autocommit=False, connect_timeout=15, read_timeout=30, write_timeout=30, ) def _table_columns(conn, table: str) -> Set[str]: with conn.cursor() as cur: cur.execute(f"DESCRIBE `{table}`;") cols = {row["Field"] for row in cur.fetchall()} return cols def fetch_feriados(conn) -> Set[date]: """Retorna set de datas de feriados. Se existir coluna de service_instance, filtra por ela.""" cols = _table_columns(conn, TBL_HOLIDAY) params = [] where = "" if COL_HOLI_SVC in cols and SERVICE_INSTANCE: where = f"WHERE `{COL_HOLI_SVC}`=%s" params = [SERVICE_INSTANCE] sql = f"SELECT `{COL_HOLI_DATE}` AS data FROM `{TBL_HOLIDAY}` {where};" with conn.cursor() as cur: cur.execute(sql, params) rows = cur.fetchall() feriados = set() for r in rows: if r["data"] is None: continue feriados.add(pd.to_datetime(r["data"]).date()) logger.info(f"Feriados carregados: {len(feriados)}") return feriados def fetch_shifts(conn) -> Dict[int, Dict[str, Any]]: """Busca escalas, converte horários e calcula horas previstas. Retorna dict por id.""" cols = _table_columns(conn, TBL_SHIFT) params = [] where = "" if COL_SHIFT_SVC in cols and SERVICE_INSTANCE: where = f"WHERE `{COL_SHIFT_SVC}`=%s" params = [SERVICE_INSTANCE] sql = f""" SELECT `{COL_SHIFT_ID}` AS id, `{COL_SHIFT_START}` AS start_time, `{COL_SHIFT_END}` AS end_time, `{COL_SHIFT_INT_S}` AS interval_start, `{COL_SHIFT_INT_E}` AS interval_end FROM `{TBL_SHIFT}` {where}; """ with conn.cursor() as cur: cur.execute(sql, params) rows = cur.fetchall() shifts = {} for s in rows: st = _to_time(s.get("start_time")) en = _to_time(s.get("end_time")) is_ = _to_time(s.get("interval_start")) ie_ = _to_time(s.get("interval_end")) dur_int_h = _hours_between_times(is_, ie_) if (is_ and ie_) else 0.0 work_h = _hours_between_times(st, en) horas_previstas = work_h - dur_int_h # float (horas) s["start_time"] = st s["end_time"] = en s["interval_start"] = is_ s["interval_end"] = ie_ s["duracao_intervalo"] = timedelta(hours=dur_int_h) s["horas_trabalhadas_previstas"] = horas_previstas shifts[int(s["id"])] = s logger.info(f"Escalas carregadas: {len(shifts)}") return shifts # cache simples do schema _SCHEMA_CACHE = {} def get_table_schema(conn, table: str) -> Dict[str, str]: """Retorna {coluna: tipo} em minúsculas, ex.: {'horas_extras': 'decimal(6,2)'}.""" global _SCHEMA_CACHE if table in _SCHEMA_CACHE: return _SCHEMA_CACHE[table] with conn.cursor() as cur: cur.execute(f"SHOW COLUMNS FROM `{table}`;") schema = {row["Field"]: row["Type"].lower() for row in cur.fetchall()} _SCHEMA_CACHE[table] = schema return schema def _hours_to_hhmmss(hours: float | int | None) -> str | None: """Converte horas (float) em 'HH:MM:SS'. Suporta negativos.""" if hours is None: return None hours = float(hours) total_seconds = int(round(hours * 3600)) sign = "-" if total_seconds < 0 else "" total_seconds = abs(total_seconds) h = total_seconds // 3600 m = (total_seconds % 3600) // 60 s = total_seconds % 60 return f"{sign}{h:02d}:{m:02d}:{s:02d}" def _fit_value_for_column(coltype: str, value): """Adapta 'value' (horas float) para o tipo da coluna do MySQL.""" if value is None: return None t = (coltype or "").lower() # TIME -> HH:MM:SS if t.startswith("time"): return _hours_to_hhmmss(value) # DECIMAL/DOUBLE/FLOAT -> arredonda if t.startswith("decimal") or t.startswith("double") or t.startswith("float"): return round(float(value), 2) # INT -> segundos inteiros if t.startswith("int"): return int(round(float(value) * 3600)) # fallback: manda como está return value def fetch_time_records_range(conn) -> List[Dict[str, Any]]: start_str = os.getenv("START_DATE") end_str = os.getenv("END_DATE") if not start_str or not end_str: raise ValueError("Defina START_DATE e END_DATE no .env (YYYY-MM-DD).") # janela: [start 00:00:00, end+1 00:00:00) start_dt = datetime.strptime(start_str, "%Y-%m-%d") end_dt = datetime.strptime(end_str, "%Y-%m-%d") + timedelta(days=1) sql = f""" SELECT * FROM `{TBL_TIME_RECORDS}` WHERE `{COL_TR_DATE}` >= %s AND `{COL_TR_DATE}` < %s ORDER BY `{COL_TR_DATE}`, `{COL_TR_USER_ID}`, `{COL_TR_ID}`; """ with conn.cursor() as cur: cur.execute(sql, (start_dt, end_dt)) rows = cur.fetchall() logger.info(f"Registros no intervalo [{start_str} .. {end_str}]: {len(rows)} (sem filtro de service_instance)") return rows def fetch_user(conn, user_id: int) -> Dict[str, Any]: sql = f"SELECT * FROM `{TBL_USER}` WHERE `{COL_USR_ID}`=%s;" with conn.cursor() as cur: cur.execute(sql, (user_id,)) row = cur.fetchone() if not row: raise ValueError(f"Usuário {user_id} não encontrado.") return row def update_time_record(conn, record_id: int, payload: Dict[str, Any]) -> None: """ Atualiza o time_record. Converte horas_extras / horas_noturnas para o tipo real da coluna (TIME/DECIMAL/INT etc.) e loga o rowcount. """ cols = _table_columns(conn, TBL_TIME_RECORDS) schema = get_table_schema(conn, TBL_TIME_RECORDS) # Campos base candidates = [ (COL_TR_IN, payload.get("hora_entrada")), # já vai 'HH:MM:SS' (COL_TR_OUT, payload.get("hora_saida")), (COL_TR_INT_IN, payload.get("hora_entrada_intervalo")), (COL_TR_INT_OUT, payload.get("hora_retorno_intervalo")), (COL_TR_STATUS, payload.get("status")), (COL_TR_LOCAL, payload.get("local")), ] # horas_extras (converter se existir) if COL_TR_HEXTRA in cols: he_val = payload.get("horas_extras") he_val = _fit_value_for_column(schema.get(COL_TR_HEXTRA, ""), he_val) candidates.append((COL_TR_HEXTRA, he_val)) else: logger.warning(f"Coluna '{COL_TR_HEXTRA}' não existe em `{TBL_TIME_RECORDS}`; não será atualizada.") # tipo_calculo (opcional) if COL_TR_TIPO_CALC in cols: candidates.append((COL_TR_TIPO_CALC, payload.get("tipo_calculo"))) # horas_noturnas (converter se existir) col_hnot = os.getenv("COL_TR_HNOTURNA", "horas_noturnas") if col_hnot in cols: hn_val = payload.get("horas_noturnas") hn_val = _fit_value_for_column(schema.get(col_hnot, ""), hn_val) candidates.append((col_hnot, hn_val)) else: logger.warning(f"Coluna '{col_hnot}' não existe em `{TBL_TIME_RECORDS}`; não será atualizada.") # Mantém apenas colunas existentes fields = [(c, v) for (c, v) in candidates if c in cols] if not fields: logger.error( f"Nenhum campo válido para atualizar em `{TBL_TIME_RECORDS}` (id={record_id}). " f"Verifique nomes no .env. Colunas existentes: {sorted(cols)}" ) return set_clause = ", ".join([f"`{k}`=%s" for k, _ in fields]) params = [v for _, v in fields] + [record_id] sql = f"UPDATE `{TBL_TIME_RECORDS}` SET {set_clause} WHERE `{COL_TR_ID}`=%s;" with conn.cursor() as cur: cur.execute(sql, params) rc = cur.rowcount enviados = ", ".join([f"{k}={repr(v)}" for k, v in fields]) logger.info(f"UPDATE {TBL_TIME_RECORDS} WHERE {COL_TR_ID}={record_id} " f"→ ({enviados}) | linhas_afetadas={rc}") if rc == 0: logger.warning( f"UPDATE não alterou linhas (id={record_id}). Motivos comuns: " f"PK/WHERE não bate, ou valores já eram iguais, ou triggers revertendo." ) def update_user(conn, user_id: int, user_updates: Dict[str, Any]) -> None: """Atualiza colunas de saldo do usuário (somente as que existem na tabela).""" cols = _table_columns(conn, TBL_USER) fields = [] params = [] if user_updates.get("saldo_horas") is not None and COL_USR_SALDO in cols: fields.append(f"`{COL_USR_SALDO}`=%s") params.append(user_updates["saldo_horas"]) if user_updates.get("saldo_atual_horas_100") is not None and COL_USR_SALDO100 in cols: fields.append(f"`{COL_USR_SALDO100}`=%s") params.append(user_updates["saldo_atual_horas_100"]) if user_updates.get("saldo_atual_horas") is not None and COL_USR_SALDO in cols: fields.append(f"`{COL_USR_SALDO}`=%s") params.append(user_updates["saldo_atual_horas"]) if not fields: logger.warning(f"Nenhum campo válido para atualizar em `{TBL_USER}` para user_id={user_id}. Colunas: {sorted(cols)}") return params.append(user_id) sql = f"UPDATE `{TBL_USER}` SET {', '.join(fields)} WHERE `{COL_USR_ID}`=%s;" with conn.cursor() as cur: cur.execute(sql, params) def processar_registros_db() -> None: conn = get_conn() try: feriados = fetch_feriados(conn) shifts = fetch_shifts(conn) registros = fetch_time_records_range(conn) # usa START_DATE/END_DATE do .env # acumula horas extras positivas do período por usuário extras_periodo = defaultdict(float) user_cache: Dict[int, Dict[str, Any]] = {} for tr in registros: try: # --- data do registro data_reg = pd.to_datetime(tr.get(COL_TR_DATE)).date() # --- horários do registro h_in = _to_time(tr.get(COL_TR_IN)) h_out = _to_time(tr.get(COL_TR_OUT)) h_i_in = _to_time(tr.get(COL_TR_INT_IN)) h_i_out = _to_time(tr.get(COL_TR_INT_OUT)) if not all([h_in, h_out, h_i_in, h_i_out]): logger.warning(f"Registro incompleto (ignorado) id={tr.get(COL_TR_ID)} user={tr.get(COL_TR_USER_ID)}") continue # --- horas trabalhadas do dia dur_intervalo_h = _hours_between_times(h_i_in, h_i_out) work_h = _hours_between_times(h_in, h_out) horas_trab = work_h - dur_intervalo_h # --- escala do usuário user_id = int(tr.get(COL_TR_USER_ID)) user = user_cache.get(user_id) or fetch_user(conn, user_id) user_cache[user_id] = user shift_id = user.get(COL_USR_SHIFT_ID) if not shift_id or int(shift_id) not in shifts: logger.warning(f"Escala não encontrada (user_id={user_id}, shift_id={shift_id})") continue esc = shifts[int(shift_id)] horas_previstas = esc.get("horas_trabalhadas_previstas") if horas_previstas is None: st = esc.get("start_time"); en = esc.get("end_time") is_ = esc.get("interval_start"); ie_ = esc.get("interval_end") work_prev = _hours_between_times(st, en) dur_prev = _hours_between_times(is_, ie_) if (is_ and ie_) else 0.0 horas_previstas = work_prev - dur_prev # --- HORA EXTRA DIÁRIA (positivo = trabalhou a mais) extras = round(horas_trab - horas_previstas, 2) extras_pos = max(0.0, extras) # só grava positivo na coluna horas_extras # --- HORAS NOTURNAS (22->05 por padrão, configure NIGHT_START_HH/NIGHT_END_HH no .env) n_start = dtime(NIGHT_START_HH, 0) n_end = dtime(NIGHT_END_HH, 0) horas_noturnas = round(_overlap_hours(h_in, h_out, n_start, n_end), 2) # --- UPDATE time_records (grava a diária) payload = { "hora_entrada": _to_iso_time(h_in), "hora_saida": _to_iso_time(h_out), "hora_entrada_intervalo": _to_iso_time(h_i_in), "hora_retorno_intervalo": _to_iso_time(h_i_out), "horas_extras": extras_pos, # diária (sempre >= 0) "horas_noturnas": horas_noturnas, # diária "status": tr.get(COL_TR_STATUS), "local": tr.get(COL_TR_LOCAL), # tipo_calculo é opcional; deixe None se não quiser marcar: "tipo_calculo": ("Feriado" if (extras_pos > 0 and data_reg in feriados) else "Horas Extras" if extras_pos > 0 else None), } update_time_record(conn, int(tr.get(COL_TR_ID)), payload) # --- acumula extras do período p/ este usuário if extras_pos > 0: extras_periodo[user_id] += extras_pos except Exception as e_inner: logger.error(f"Erro ao processar registro id={tr.get(COL_TR_ID)}: {e_inner}") # --- PÓS-LOOP: adiciona a soma do período ao saldo acumulado do usuário for uid, total_extras in extras_periodo.items(): try: u = user_cache.get(uid) or fetch_user(conn, uid) base = u.get(COL_USR_SALDO, 0) or 0.0 novo = round(float(base) + float(total_extras), 2) update_user(conn, uid, {"saldo_atual_horas": novo}) logger.info(f"[Aggregate] user={uid} {COL_USR_SALDO}: {base} + {total_extras:.2f} = {novo:.2f}") except Exception as agg_err: logger.error(f"Falha ao agregar extras do período para user={uid}: {agg_err}") conn.commit() logger.info("✅ Processamento finalizado com sucesso.") except Exception as e: conn.rollback() logger.error(f"Erro geral no processamento: {e}") finally: conn.close() if __name__ == "__main__": processar_registros_db()