222wdadsad

222wdadsad-PENX
222wdadsad
此内容为付费阅读,请付费后查看
98765
限时特惠
立即购买
您当前未登录!建议登陆后购买,可保存购买订单
付费阅读
# 标准库导入
import hashlib
import json
import logging
import os
import queue
import random
import re
import sys
import threading
import time as t
from datetime import datetime, time, timedelta
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

# 第三方库导入
import requests
import schedule
import customtkinter as ctk
from tkinter import LabelFrame, Listbox, messagebox
from tkinter import font as tkfont

# 本地模块导入(如果有的话)
# import customtkinter  # 注释掉未使用的导入

# =============================================================================
# 全局配置和常量
# =============================================================================

# 日志配置
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
LOG_LEVEL = logging.INFO

# 运行目录:支持 PyInstaller 冻结后的可执行文件目录
if getattr(sys, 'frozen', False):
    BASE_DIR = os.path.dirname(sys.executable)
else:
    BASE_DIR = os.getcwd()

# 配置全局日志记录(写入到运行目录)
logging.basicConfig(
    filename=os.path.join(BASE_DIR, 'application.log'),
    level=LOG_LEVEL,
    format=LOG_FORMAT,
    encoding='utf-8',  # 支持中文日志
    filemode='a'  # 追加模式
)

# 创建根日志记录器
logger = logging.getLogger(__name__)

# API相关常量
BASE_URL = 'https://seat.gsupl.edu.cn'
API_ENDPOINTS = {
    'config': '/interface/readingroom/singleAppConfig',
    'reservation_list': '/interface/readingroom/getdespeakdata',
    'cancel_reservation': '/interface/readingroom/deletedeskdata',
    'check_time': '/interface/readingroom/checktimecanbesk',
    'make_booking': '/interface/readingroom/beskdata',
    'signin': '/interface/readingroom/beskreadersigin',
    'signout': '/interface/readingroom/returntable'
}

# 房间号映射
ROOM_MAPPING = {
    1: '23',
    2: '26',
    3: '28',
    4: '24',
    5: '27'
}

# 预约时间段配置
BOOKING_TIMES = {
    'morning': ('07:00:00', '13:50:00'),
    'afternoon': ('13:50:01', '22:30:00')
}

# 请求头模板
DEFAULT_HEADERS = {
    "Content-Length": "0",
    'Host': 'seat.gsupl.edu.cn',
    'Connection': 'Keep-Alive',
    'Cookie2': '$Version=1',
    'Accept-Encoding': 'gzip'
}

# 网络请求默认超时时间(秒)
REQUEST_TIMEOUT = 10

# 全局文件路径(在main函数中设置)
sigin_file_path = ""
table_file_path = ""
baskid_file_path = ""

# =============================================================================
# 预约系统管理类
# =============================================================================

class ReservationSystem:
    """
    预约系统管理类

    负责处理图书馆座位预约相关的操作,包括获取预约列表和取消预约等功能。
    """

    def __init__(self, util: 'Util'):
        """初始化预约系统

        使用共享的工具实例以复用网络会话与通用工具方法。
        """
        self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
        self.util = util

    def get_reservation_list(self, jsessionid: str, reader_no: str) -> Optional[List[Dict[str, Any]]]:
        """
        获取用户的预约列表

        Args:
            jsessionid (str): 会话ID,用于身份验证
            reader_no (str): 读者编号(学号)

        Returns:
            Optional[List[Dict[str, Any]]]: 预约列表数据,如果获取失败则返回None

        Raises:
            requests.exceptions.RequestException: 网络请求异常
        """
        self.logger.info(f"开始获取预约列表,读者编号: {reader_no}")

        # 构建请求URL
        reservation_list_url = f"{BASE_URL}{API_ENDPOINTS['reservation_list']}?readerno={reader_no}"

        try:
            # 发送GET请求获取预约列表
            response = self.util.http_get(reservation_list_url, jsessionid=jsessionid, timeout=REQUEST_TIMEOUT)

            if response.status_code != 200:
                self.logger.error(f"获取预约列表失败,HTTP状态码: {response.status_code}")
                return None

            # 解析响应数据
            data = response.json()

            # 检查API返回状态
            if data.get("ReturnValue") != 0:
                self.logger.warning(f"API返回错误: {data.get('Msg', '未知错误')}")
                return None

            # 提取预约数据
            reservation_data = data.get('Data')
            if not reservation_data:
                self.logger.info("当前没有预约记录")
                return None

            self.logger.info(f"成功获取预约列表,共{len(reservation_data)}条记录")
            return reservation_data

        except requests.exceptions.RequestException as e:
            self.logger.error(f"网络请求异常: {e}")
            return None
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON解析错误: {e}")
            return None
        except Exception as e:
            self.logger.error(f"获取预约列表时发生未知错误: {e}")
            return None

    def cancel_reservation(self, jsessionid: str, reader_no: str, password: str) -> Union[bool, str, None]:
        """
        取消用户的预约

        Args:
            jsessionid (str): 会话ID,用于身份验证
            reader_no (str): 读者编号(学号)
            password (str): 用户密码

        Returns:
            Union[bool, str, None]: 
                - True: 取消成功
                - str: 错误消息
                - None: 网络错误或其他异常
        """
        self.logger.info(f"开始取消预约,读者编号: {reader_no}")

        # 首先获取预约列表
        reservation_data = self.get_reservation_list(jsessionid, reader_no)

        if not reservation_data:
            self.logger.error("无法获取预约列表,取消预约失败")
            return "无法获取预约列表"

        # 获取第一个预约的ID(假设用户只有一个预约)
        try:
            strid = reservation_data[0]['ID']
        except (KeyError, IndexError) as e:
            self.logger.error(f"预约数据格式错误: {e}")
            return "预约数据格式错误"

        # 构建取消预约的请求参数
        params = {
            "password": password,
            "readerno": reader_no,
            "strid": strid
        }

        # 构建请求URL
        cancel_url = f"{BASE_URL}{API_ENDPOINTS['cancel_reservation']}"

        try:
            # 发送取消预约请求
            response = self.util.http_get(cancel_url, params=params, jsessionid=jsessionid, timeout=REQUEST_TIMEOUT)

            if response.status_code != 200:
                self.logger.error(f"取消预约请求失败,HTTP状态码: {response.status_code}")
                return False

            # 解析响应数据
            cancel_data = response.json()

            # 检查取消结果
            if cancel_data.get("ReturnValue") == 0:
                self.logger.info("预约取消成功")
                return True
            else:
                error_msg = cancel_data.get('Msg', '取消预约失败')
                self.logger.warning(f"取消预约失败: {error_msg}")
                return error_msg

        except requests.exceptions.RequestException as e:
            self.logger.error(f"网络请求异常: {e}")
            return None
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON解析错误: {e}")
            return None
        except Exception as e:
            self.logger.error(f"取消预约时发生未知错误: {e}")
            return None

# =============================================================================
# 预约管理类
# =============================================================================

class Booking:
    """
    预约管理类

    负责处理图书馆座位预约相关的操作,包括获取预约ID、预约座位、取消预约等功能。
    """

    def __init__(self, util: 'Util'):
        """初始化预约管理类

        使用共享的工具实例以复用网络会话与通用工具方法。
        """
        self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
        self.util = util

    def get_can_bask_id(self, jsessionid: str, room_no: str) -> Union[str, None]:
        """
        获取可预约时间段的ID

        Args:
            jsessionid (str): 会话ID,用于身份验证
            room_no (str): 房间号

        Returns:
            Union[str, None]: 可预约ID或错误消息,如果获取失败则返回None
        """
        self.logger.info(f"获取可预约时间段ID,房间号: {room_no}")

        # 构建请求URL
        url = f"{BASE_URL}{API_ENDPOINTS['check_time']}?roomno={room_no}"

        try:
            # 发送GET请求
            response = self.util.http_get(url, jsessionid=jsessionid, timeout=REQUEST_TIMEOUT)
            self.logger.debug(f"获取可预约ID请求状态码: {response.status_code}")

            if response.status_code == 200:
                data = response.json()

                if data.get("ReturnValue") == 0:
                    can_bask_id = data.get('CanBaskID')
                    self.logger.info(f"成功获取可预约ID: {can_bask_id}")
                    return can_bask_id
                else:
                    error_msg = data.get('Msg', '未知错误')
                    self.logger.warning(f"获取可预约ID失败: {error_msg}")
                    return error_msg
            else:
                self.logger.error(f"获取可预约ID请求失败,HTTP状态码: {response.status_code}")
                return None

        except requests.exceptions.RequestException as e:
            self.logger.error(f"网络请求异常: {e}")
            return None
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON解析错误: {e}")
            return None
        except Exception as e:
            self.logger.error(f"获取可预约ID时发生未知错误: {e}")
            return None

    def get_bask_id(self, filepath: str, room_no: str) -> Optional[str]:
        """
        从配置文件中获取预约ID

        Args:
            filepath (str): 配置文件路径
            room_no (str): 房间号

        Returns:
            Optional[str]: 预约ID,如果获取失败则返回None
        """
        self.logger.info(f"从配置文件获取预约ID,房间号: {room_no}")

        try:
            with open(filepath, 'r', encoding='utf-8') as file:
                data = json.load(file)

                # 获取morning时段的预约ID
                baskid = data.get("morning", {}).get(room_no)

                if baskid is not None:
                    self.logger.info(f"成功获取预约ID: {baskid}")
                    return str(baskid)
                else:
                    self.logger.warning(f"房间号 {room_no} 在配置文件中不存在")
                    return None

        except FileNotFoundError:
            self.logger.error(f"配置文件未找到: {filepath}")
            return None
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON文件格式错误: {e}")
            return None
        except Exception as e:
            self.logger.error(f"读取配置文件时发生未知错误: {e}")
            return None

    def cancel_reservation(self, jsessionid: str, reader_no: str, password: str) -> Union[bool, str, None]:
        """
        取消用户的预约(Booking类的实现)

        Args:
            jsessionid (str): 会话ID,用于身份验证
            reader_no (str): 读者编号(学号)
            password (str): 用户密码

        Returns:
            Union[bool, str, None]: 
                - True: 取消成功
                - str: 错误消息
                - None: 网络错误或其他异常
        """
        self.logger.info(f"开始取消预约,读者编号: {reader_no}")

        # 构建获取预约列表的URL
        get_str_id_url = f"{BASE_URL}{API_ENDPOINTS['reservation_list']}?readerno={reader_no}"

        try:
            # 获取预约列表
            response = self.util.http_get(get_str_id_url, jsessionid=jsessionid, timeout=REQUEST_TIMEOUT)

            if response.status_code != 200:
                self.logger.error(f"获取预约列表失败,HTTP状态码: {response.status_code}")
                return False

            # 解析预约列表数据
            strid_data = response.json()

            if strid_data.get("ReturnValue") != 0:
                error_msg = strid_data.get('Msg', '获取预约列表失败')
                self.logger.warning(f"获取预约列表失败: {error_msg}")
                return error_msg

            # 提取预约ID
            reservation_data = strid_data.get('Data')
            if not reservation_data:
                self.logger.info("当前没有预约记录")
                return "当前没有预约记录"

            strid = reservation_data[0].get('ID')
            if not strid:
                self.logger.error("预约数据中缺少ID字段")
                return False

            # 构建取消预约的请求参数
            params = {
                "password": password,
                "readerno": reader_no,
                "strid": strid
            }

            # 发送取消预约请求
            cancel_url = f"{BASE_URL}{API_ENDPOINTS['cancel_reservation']}"
            cancel_response = self.util.http_get(cancel_url, params=params, jsessionid=jsessionid, timeout=REQUEST_TIMEOUT)

            if cancel_response.status_code != 200:
                self.logger.error(f"取消预约请求失败,HTTP状态码: {cancel_response.status_code}")
                return False

            # 解析取消结果
            cancel_data = cancel_response.json()

            if cancel_data.get("ReturnValue") == 0:
                self.logger.info("预约取消成功")
                return True
            else:
                error_msg = cancel_data.get('Msg', '取消预约失败')
                self.logger.warning(f"取消预约失败: {error_msg}")
                return error_msg

        except requests.exceptions.RequestException as e:
            self.logger.error(f"网络请求异常: {e}")
            return None
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON解析错误: {e}")
            return None
        except Exception as e:
            self.logger.error(f"取消预约时发生未知错误: {e}")
            return None

    def _parse_booking_result(self, data: Dict[str, Any]) -> Tuple[bool, Union[str, bool]]:
        """
        解析预约结果

        Args:
            data (Dict[str, Any]): API返回的数据

        Returns:
            Tuple[bool, Union[str, bool]]: (是否成功, 消息或状态)
        """
        return_value = data.get("ReturnValue")
        msg = data.get('Msg', '未知消息')

        if return_value == 0:
            self.logger.info(f"预约成功: {msg}")
            return True, msg
        else:
            self.logger.info(f"预约返回信息: {msg}")

            # 根据不同的错误消息返回不同的结果
            if msg == "您已经预约过该时间段的座位了!无需重复预约":
                return True, msg  # 已经预约过也算成功
            elif msg == "预约失败":
                return False, msg
            elif msg == "此座位已被预约,不能重复预约":
                return False, msg
            elif "未到可预约时间" in msg:
                return False, msg
            elif msg == "读者没有预约权限":
                return False, False  # 特殊标记:没有权限
            else:
                self.logger.warning(f"预约失败,未知错误信息: {msg}")
                return False, msg

    def make_booking(self, jsessionid: str, begin_time: str, besk_id: str, check_str: str,
                     end_time: str, reader_no: str, room_no: str, table_id: str,
                     table_no: str) -> Union[Tuple[bool, Union[str, bool]], Tuple[bool, int], None]:
        """
        执行座位预约

        Args:
            jsessionid (str): 会话ID,用于身份验证
            begin_time (str): 开始时间
            besk_id (str): 预约ID
            check_str (str): 校验字符串
            end_time (str): 结束时间
            reader_no (str): 读者编号(学号)
            room_no (str): 房间号
            table_id (str): 桌子ID
            table_no (str): 桌子编号

        Returns:
            Union[Tuple[bool, Union[str, bool]], Tuple[bool, int], None]: 
                - (True, msg): 预约成功
                - (False, msg): 预约失败
                - (False, status_code): 服务器错误
                - None: 网络异常
        """
        self.logger.info(f"开始执行预约请求,座位: {table_no}")

        # 构建请求参数
        params = {
            "begintime": begin_time,
            "beskid": besk_id,
            "checkstr": check_str,
            "endtime": end_time,
            "readerno": reader_no,
            "roomno": room_no,
            "tableid": table_id,
            "tableno": table_no
        }

        # 构建请求URL
        url = f"{BASE_URL}{API_ENDPOINTS['make_booking']}"

        try:
            # 发送预约请求
            response = self.util.http_get(url, params=params, jsessionid=jsessionid, timeout=REQUEST_TIMEOUT)
            self.logger.debug(f"预约请求状态码: {response.status_code}")

            if response.status_code == 200:
                # 成功响应,解析结果
                data = response.json()
                return self._parse_booking_result(data)

            elif response.status_code in (502, 504):
                # 网关错误,返回状态码
                self.logger.warning(f"网关错误: {response.status_code}")
                return False, response.status_code

            elif response.status_code == 500:
                # 服务器内部错误,等待后重试
                self.logger.error("服务器内部错误500,等待60秒后重试...")
                t.sleep(60)
                return None

            else:
                # 其他HTTP错误
                self.logger.error(f"预约请求失败,HTTP状态码: {response.status_code}")
                return None

        except requests.exceptions.RequestException as e:
            self.logger.error(f"网络请求异常: {e}")
            return None
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON解析错误: {e}")
            return None
        except Exception as e:
            self.logger.error(f"预约时发生未知错误: {e}")
            return None

# =============================================================================
# 签到签退管理类
# =============================================================================

class SignInOut:
    """
    签到签退管理类

    负责处理图书馆座位的签到和签退操作。
    """

    def __init__(self, util: 'Util'):
        """初始化签到签退管理类

        使用共享的工具实例以复用网络会话与通用工具方法。
        """
        self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
        self.util = util

    def send_signin_req(self, readerno: str, password: str, tableno: str,
                        jsessionid: str, checkstr: str) -> Optional[str]:
        """
        发送签到请求

        Args:
            readerno (str): 读者编号(学号)
            password (str): 用户密码
            tableno (str): 桌子编号
            jsessionid (str): 会话ID,用于身份验证
            checkstr (str): 校验字符串

        Returns:
            Optional[str]: 签到结果消息,如果请求失败则返回None
        """
        self.logger.info(f"开始执行签到请求,座位: {tableno}")

        # 构建请求URL
        url = f"{BASE_URL}{API_ENDPOINTS['signin']}"

        # 构建请求参数
        params = {
            'checkstr': checkstr,
            'password': password,
            'readerno': readerno,
            'tableno': tableno
        }

        try:
            # 发送签到请求
            response = self.util.http_get(url, params=params, jsessionid=jsessionid, timeout=REQUEST_TIMEOUT)
            self.logger.debug(f"签到请求状态码: {response.status_code}")

            if response.status_code == 200:
                # 解析响应数据
                data = response.json()
                msg = data.get('Msg', '未知消息')
                self.logger.info(f"签到请求返回信息: {msg}")
                return msg
            else:
                self.logger.error(f"签到请求失败,HTTP状态码: {response.status_code}")
                return None

        except requests.exceptions.RequestException as e:
            self.logger.error(f"网络请求异常: {e}")
            return None
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON解析错误: {e}")
            return None
        except Exception as e:
            self.logger.error(f"签到时发生未知错误: {e}")
            return None

    def send_signout_req(self, readerno: str, password: str, jsessionid: str) -> Optional[str]:
        """
        发送签退请求

        Args:
            readerno (str): 读者编号(学号)
            password (str): 用户密码
            jsessionid (str): 会话ID,用于身份验证

        Returns:
            Optional[str]: 签退结果消息,如果请求失败则返回None
        """
        self.logger.info(f"开始执行签退请求,读者编号: {readerno}")

        # 构建请求URL
        url = f"{BASE_URL}{API_ENDPOINTS['signout']}"

        # 构建请求参数
        params = {
            'password': password,
            'readerno': readerno
        }

        try:
            # 发送签退请求
            response = self.util.http_get(url, params=params, jsessionid=jsessionid, timeout=REQUEST_TIMEOUT)
            self.logger.debug(f"签退请求状态码: {response.status_code}")

            if response.status_code == 200:
                # 解析响应数据
                data = response.json()
                msg = data.get('Msg', '未知消息')
                self.logger.info(f"签退成功,返回信息: {msg}")
                return msg
            else:
                self.logger.error(f"签退请求失败,HTTP状态码: {response.status_code}")
                return None

        except requests.exceptions.RequestException as e:
            self.logger.error(f"网络请求异常: {e}")
            return None
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON解析错误: {e}")
            return None
        except Exception as e:
            self.logger.error(f"签退时发生未知错误: {e}")
            return None

# =============================================================================
# 工具类
# =============================================================================

class Util:
    """
    工具类

    提供各种实用功能,包括房间号映射、文件操作、数据验证等。
    """

    def __init__(self):
        """初始化工具类"""
        self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
        # 缓存机制
        self._file_cache: Dict[str, Dict[str, Any]] = {}
        self._cache_timestamps: Dict[str, float] = {}
        self._cache_timeout = 300  # 5分钟缓存超时
        # 共享的 HTTP 会话以复用连接、降低握手开销
        self.session = requests.Session()

    def build_headers(self, jsessionid: Optional[str] = None) -> Dict[str, str]:
        """构建请求头,如果提供 jsessionid 则自动附带 Cookie。"""
        headers = DEFAULT_HEADERS.copy()
        if jsessionid:
            headers['Cookie'] = jsessionid
        return headers

    def http_get(self, url: str, params: Optional[Dict[str, Any]] = None,
                 jsessionid: Optional[str] = None, timeout: int = REQUEST_TIMEOUT) -> requests.Response:
        """统一的 GET 调用入口,使用共享会话与超时设置。"""
        headers = self.build_headers(jsessionid)
        return self.session.get(url, headers=headers, params=params, timeout=timeout)

    def http_post(self, url: str, data: Optional[Dict[str, Any]] = None,
                  jsessionid: Optional[str] = None, timeout: int = REQUEST_TIMEOUT) -> requests.Response:
        """统一的 POST 调用入口,使用共享会话与超时设置。"""
        headers = self.build_headers(jsessionid)
        return self.session.post(url, headers=headers, data=data, timeout=timeout)

    def get_room_no(self, room_no: int) -> str:
        """
        获取房间号映射

        Args:
            room_no (int): 输入的房间号

        Returns:
            str: 映射后的房间号
        """
        self.logger.debug(f"获取房间号映射,输入: {room_no}")

        # 使用预定义的映射表
        mapped_room = ROOM_MAPPING.get(room_no, '27')  # 默认为27

        self.logger.debug(f"房间号映射结果: {room_no} -> {mapped_room}")
        return mapped_room

    def get_table_info(self, table_no: str, filepath: str) -> Optional[int]:
        """
        获取桌子信息

        Args:
            table_no (str): 桌子编号
            filepath (str): 配置文件路径

        Returns:
            Optional[int]: 桌子ID,如果不存在则返回None
        """
        self.logger.debug(f"获取桌子信息,桌子编号: {table_no}")

        # 读取配置文件数据
        data = self.read_json_file(filepath)
        if not data:
            return None

        # 获取桌子信息
        table_info = data.get(table_no)
        if table_info is not None:
            self.logger.debug(f"找到桌子信息: {table_no} -> {table_info}")
            return table_info
        else:
            self.logger.warning(f"桌子编号 {table_no} 在配置文件中不存在")
            return None

    def read_json_file(self, filepath: str) -> Dict[str, Any]:
        """
        读取JSON文件(带缓存机制)

        Args:
            filepath (str): 文件路径

        Returns:
            Dict[str, Any]: JSON数据,如果读取失败则返回空字典
        """
        self.logger.debug(f"读取JSON文件: {filepath}")

        # 检查缓存
        current_time = t.time()
        if (filepath in self._file_cache and
                filepath in self._cache_timestamps and
                current_time - self._cache_timestamps[filepath] < self._cache_timeout):
            self.logger.debug(f"从缓存读取文件: {filepath}")
            return self._file_cache[filepath]

        try:
            with open(filepath, 'r', encoding='utf-8') as file:
                data = json.load(file)

            # 更新缓存
            self._file_cache[filepath] = data
            self._cache_timestamps[filepath] = current_time

            self.logger.debug(f"成功读取JSON文件: {filepath}")
            return data

        except FileNotFoundError:
            self.logger.error(f"文件未找到: {filepath}")
            return {}
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON解码错误: {filepath}, 错误: {e}")
            return {}
        except Exception as e:
            self.logger.error(f"读取文件时发生未知错误: {e}")
            return {}

    def get_jsessionid(self) -> str:
        """
        获取JSESSIONID

        Returns:
            str: JSESSIONID字符串,如果获取失败则返回默认值
        """
        self.logger.info("开始获取JSESSIONID")

        # 构建请求URL
        url = f"{BASE_URL}{API_ENDPOINTS['config']}"

        try:
            # 发送POST请求获取配置
            response = self.http_post(url, timeout=REQUEST_TIMEOUT)

            if response.status_code == 200:
                # 从响应头中提取Set-Cookie
                set_cookie = response.headers.get('Set-Cookie')
                if set_cookie:
                    jsessionid = set_cookie.split(';')[0]
                    self.logger.info(f"成功获取JSESSIONID: {jsessionid}")
                    return jsessionid
                else:
                    self.logger.warning("响应头中未找到Set-Cookie")
            else:
                self.logger.error(f"获取JSESSIONID失败,HTTP状态码: {response.status_code}")

        except requests.exceptions.RequestException as e:
            self.logger.error(f"网络请求异常: {e}")
        except Exception as e:
            self.logger.error(f"获取JSESSIONID时发生未知错误: {e}")

        # 返回默认的JSESSIONID作为备用
        default_jsessionid = "JSESSIONID=71533C0C9E6F10B43BE710E33837648C"
        self.logger.warning(f"使用默认JSESSIONID: {default_jsessionid}")
        return default_jsessionid

    def get_checkstr(self, reader_no: str, table_no: str) -> str:
        """
        生成校验字符串

        Args:
            reader_no (str): 读者编号(学号)
            table_no (str): 桌子编号

        Returns:
            str: MD5校验字符串

        Raises:
            RuntimeError: 生成校验字符串时发生错误
        """
        self.logger.debug(f"生成校验字符串,读者编号: {reader_no}, 桌子编号: {table_no}")

        try:
            # 获取当前时间并格式化
            current_time = datetime.now().strftime('%Y%m%d%H%M')

            # 构建参数字符串
            param_string = f"{reader_no}GOLDLIBGDLIS{table_no}CHECKTABLE{current_time}"

            # 生成MD5哈希
            checkstr = hashlib.md5(param_string.encode('utf-8')).hexdigest()

            self.logger.debug(f"成功生成校验字符串: {checkstr}")
            return checkstr

        except Exception as e:
            self.logger.error(f"生成校验字符串时发生错误: {e}")
            raise RuntimeError(f"生成校验字符串失败: {e}")

    def update_json_file(self, filepath: str, data: Dict[str, Any]) -> bool:
        """
        更新JSON文件

        Args:
            filepath (str): 文件路径
            data (Dict[str, Any]): 要写入的数据

        Returns:
            bool: 更新是否成功
        """
        self.logger.info(f"更新JSON文件: {filepath}")

        try:
            with open(filepath, 'w', encoding='utf-8') as file:
                json.dump(data, file, ensure_ascii=False, indent=4)

            # 清除缓存
            if filepath in self._file_cache:
                del self._file_cache[filepath]
            if filepath in self._cache_timestamps:
                del self._cache_timestamps[filepath]

            self.logger.info(f"成功更新JSON文件: {filepath}")
            return True

        except Exception as e:
            self.logger.error(f"更新JSON文件时发生错误: {e}")
            return False

    def clear_cache(self) -> None:
        """
        清除文件缓存
        """
        self._file_cache.clear()
        self._cache_timestamps.clear()
        self.logger.info("已清除文件缓存")

# =============================================================================
# 线程管理类
# =============================================================================

class ThreadManager:
    """
    线程管理器

    负责管理后台线程的创建、执行和结果处理,提供线程安全的操作接口。
    """

    def __init__(self, max_threads: int = 10):
        """
        初始化线程管理器

        Args:
            max_threads (int): 最大线程数量限制
        """
        self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
        self.result_queue: queue.Queue = queue.Queue()  # 用于存储线程的返回结果
        self.threads: List[threading.Thread] = []  # 存储活动线程
        self.lock = threading.RLock()  # 使用可重入锁
        self.max_threads = max_threads
        self._shutdown = False

        self.logger.info(f"线程管理器初始化完成,最大线程数: {max_threads}")

    def start_thread(self, target: Callable, args: Tuple = (),
                     callback: Optional[Callable[[Any], None]] = None) -> bool:
        """
        启动线程并将结果放入队列,线程结束后执行回调

        Args:
            target (Callable): 要在线程中执行的函数
            args (Tuple): 传递给目标函数的参数
            callback (Optional[Callable]): 线程完成后的回调函数

        Returns:
            bool: 线程是否成功启动
        """
        if self._shutdown:
            self.logger.warning("线程管理器已关闭,无法启动新线程")
            return False

        # 检查线程数量限制
        with self.lock:
            if len(self.threads) >= self.max_threads:
                self.logger.warning(f"已达到最大线程数限制: {self.max_threads}")
                return False

        def thread_target():
            """线程目标函数"""
            try:
                self.logger.debug(f"线程开始执行: {target.__name__}")
                result = target(*args)  # 执行任务
                self.logger.debug(f"线程执行完成: {target.__name__}")
                self.result_queue.put((result, callback))  # 放入队列并带回调函数
            except Exception as e:
                self.logger.error(f"线程执行异常: {target.__name__}, 错误: {e}")
                self.result_queue.put((None, callback))  # 异常时也放入队列

        try:
            # 创建并启动线程
            thread = threading.Thread(target=thread_target, daemon=True,
                                      name=f"Worker-{target.__name__}")
            thread.start()

            # 线程安全地添加到线程列表
            with self.lock:
                self.threads.append(thread)

            self.logger.debug(f"成功启动线程: {target.__name__}")
            return True

        except Exception as e:
            self.logger.error(f"启动线程失败: {target.__name__}, 错误: {e}")
            return False

    def check_results(self) -> int:
        """
        从队列中取出结果并执行回调

        Returns:
            int: 处理的结果数量
        """
        processed_count = 0

        # 处理队列中的所有结果
        while not self.result_queue.empty():
            try:
                result, callback = self.result_queue.get_nowait()

                if callback:
                    try:
                        callback(result)  # 执行回调处理结果
                    except Exception as e:
                        self.logger.error(f"回调函数执行异常: {e}")

                processed_count += 1

            except queue.Empty:
                break
            except Exception as e:
                self.logger.error(f"处理结果时发生异常: {e}")
                break

        # 清理已结束的线程
        with self.lock:
            alive_threads = []
            for thread in self.threads:
                if thread.is_alive():
                    alive_threads.append(thread)
                else:
                    self.logger.debug(f"线程已结束: {thread.name}")

            self.threads = alive_threads

        return processed_count

    def get_thread_count(self) -> int:
        """
        获取当前活跃线程数量

        Returns:
            int: 活跃线程数量
        """
        with self.lock:
            return len([t for t in self.threads if t.is_alive()])

    def shutdown(self, timeout: float = 5.0) -> None:
        """
        关闭线程管理器

        Args:
            timeout (float): 等待线程结束的超时时间(秒)
        """
        self.logger.info("开始关闭线程管理器")
        self._shutdown = True

        # 等待所有线程结束
        with self.lock:
            for thread in self.threads:
                if thread.is_alive():
                    thread.join(timeout=timeout)
                    if thread.is_alive():
                        self.logger.warning(f"线程未在超时时间内结束: {thread.name}")

        self.logger.info("线程管理器已关闭")

# =============================================================================
# 主界面类
# =============================================================================

class MainInterface(ctk.CTk):
    """
    主界面类

    图书馆座位预约系统的主界面,提供用户交互界面和功能控制。
    """

    def __init__(self):
        """初始化主界面"""
        super().__init__()

        # 初始化日志记录器
        self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")

        # 初始化业务类对象
        self.util = Util()
        self.signInOut = SignInOut(self.util)
        self.booking = Booking(self.util)
        self.thread_manager = ThreadManager(max_threads=5)  # 限制最大线程数
        self.reservation = ReservationSystem(self.util)

        # 界面状态变量
        self.tabel_pool: List[str] = []  # 座位池
        self.tabel_point: int = 0  # 当前座位指针
        self.is_running: bool = False  # 定时任务运行状态
        self.jsessionid: str = ""  # 会话ID

        # 线程锁
        self.lock = threading.Lock()
        self.rlock = threading.RLock()

        # 字体设置
        self.font_title = ctk.CTkFont(size=16, weight="bold")
        self.font_label = ctk.CTkFont(size=16)
        self.font_entry = ctk.CTkFont(size=16)
        self.font_button = ctk.CTkFont(size=16)
        self.font_text = ctk.CTkFont(size=16)
        self.tk_list_font = tkfont.Font(size=16)

        # 初始化界面
        self._initialize_ui()

        # 加载配置数据
        self._load_configuration()

        # 启动线程结果检查
        self.after(100, self.process_thread_results)

        self.logger.info("主界面初始化完成")

    def _initialize_ui(self) -> None:
        """初始化用户界面"""
        self.logger.debug("开始初始化用户界面")
        self.set_widget()
        self.logger.debug("用户界面初始化完成")

    def ask_string(self, title: str, prompt: str) -> str:
        """使用customtkinter弹出输入框"""
        dialog = ctk.CTkInputDialog(title=title, text=prompt)
        return dialog.get_input()

    def show_warning(self, title: str, message: str) -> None:
        """使用customtkinter显示警告对话框"""
        win = ctk.CTkToplevel(self)
        win.title(title)
        win.transient(self)
        win.grab_set()
        ctk.CTkLabel(win, text=message, wraplength=260, justify="left").pack(padx=20, pady=20)
        ctk.CTkButton(win, text="确定", width=80, command=win.destroy).pack(pady=10)
        self.wait_window(win)

    def _load_configuration(self) -> None:
        """加载配置数据"""
        self.logger.debug("开始加载配置数据")

        # 加载配置文件
        config_data = self.util.read_json_file(sigin_file_path)
        self.fill_controls(config_data)

        # 生成并显示校验字符串
        if self.tabel_pool:
            checkstr = self.util.get_checkstr(
                self.readerno_entry.get(),
                self.tabel_pool[self.tabel_point]
            )
            self.write_checkstr(checkstr)

        # 获取会话ID
        self.jsessionid = self.util.get_jsessionid()
        self.write_log(f"初始化获取jsessionid:{self.jsessionid}")

        self.logger.debug("配置数据加载完成")

    def set_widget(self):
        self.title("图书馆座位预约")
        self.resizable(True, True)
        # self.wm_attributes("-toolwindow", True)
        self.geometry(self.set_screen(650, 500))
        # Root grid weights for responsiveness
        for c in range(2):
            self.grid_columnconfigure(c, weight=1)
        for r in range(8):
            self.grid_rowconfigure(r, weight=0)
        self.grid_rowconfigure(1, weight=1)
        self.grid_rowconfigure(2, weight=1)
        self.grid_rowconfigure(7, weight=1)

        info_frame = LabelFrame(self, width=400, height=200, text="基本信息", font=self.font_label)
        info_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")
        info_frame.grid_columnconfigure(0, weight=0)
        info_frame.grid_columnconfigure(1, weight=1)
        ctk.CTkLabel(info_frame, text="学号:", font=self.font_label).grid(row=0, column=0, padx=10, pady=5, sticky="e")
        self.readerno_entry = ctk.CTkEntry(info_frame, font=self.font_entry)
        self.readerno_entry.grid(row=0, column=1, padx=10, pady=5, sticky="ew")

        ctk.CTkLabel(info_frame, text="密码:", font=self.font_label).grid(row=1, column=0, padx=10, pady=5, sticky="e")
        self.password_entry = ctk.CTkEntry(info_frame, font=self.font_entry)
        self.password_entry.grid(row=1, column=1, padx=10, pady=5, sticky="ew")

        time_frame = LabelFrame(self, width=400, height=200, text="签到信息", font=self.font_label)
        time_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
        for c in range(4):
            time_frame.grid_columnconfigure(c, weight=1)
        ctk.CTkLabel(time_frame, text="签到1:", font=self.font_label).grid(row=0, column=0, pady=5, sticky="e")
        self.time1_entry = ctk.CTkEntry(time_frame, font=self.font_entry)
        self.time1_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")

        ctk.CTkLabel(time_frame, text="签到2:", font=self.font_label).grid(row=1, column=0, pady=5, sticky="e")
        self.time2_entry = ctk.CTkEntry(time_frame, font=self.font_entry)
        self.time2_entry.grid(row=1, column=1, padx=5, pady=5, sticky="ew")

        ctk.CTkLabel(time_frame, text="签退1:", font=self.font_label).grid(row=0, column=2, pady=5, sticky="e")
        self.time3_entry = ctk.CTkEntry(time_frame, font=self.font_entry)
        self.time3_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew")

        ctk.CTkLabel(time_frame, text="签退2:", font=self.font_label).grid(row=1, column=2, pady=5, sticky="e")
        self.time4_entry = ctk.CTkEntry(time_frame, font=self.font_entry)
        self.time4_entry.grid(row=1, column=3, padx=5, pady=5, sticky="ew")

        checkstr_frame = LabelFrame(self, width=400, height=200, text="校验码", font=self.font_label)
        checkstr_frame.grid(row=1, column=0, rowspan=2, padx=5, pady=5, sticky="nsew")
        checkstr_frame.grid_rowconfigure(0, weight=1)
        checkstr_frame.grid_columnconfigure(0, weight=1)
        self.checkstr_entry = ctk.CTkTextbox(checkstr_frame, state='disabled', font=self.font_text)
        self.checkstr_entry.grid(row=0, column=0, padx=10, pady=6, sticky="nsew")

        res_frame = LabelFrame(self, width=400, height=300, text="座位池信息", font=self.font_label)
        res_frame.grid(row=1, column=1, rowspan=2, columnspan=2, padx=5, pady=5, sticky="nsew")
        for c in range(3):
            res_frame.grid_columnconfigure(c, weight=1)
        for r in range(3):
            res_frame.grid_rowconfigure(r, weight=1)
        self.tabel_pool_list = Listbox(res_frame, bd=0, highlightthickness=0, relief='flat')
        try:
            self.tabel_pool_list.configure(font=self.tk_list_font)
        except Exception:
            pass
        self.tabel_pool_list.grid(row=0, column=0, rowspan=3, padx=10, pady=5, sticky="nsew")

        def add_tabel():
            tabel_no = self.ask_string("获取座位号", "请输入新的座位号:")
            if tabel_no == "" or tabel_no is None:
                return
            if self.util.get_table_info(tabel_no, table_file_path) is None:
                self.show_warning("警告", "该座位不存在!")
                return
            if tabel_no in self.tabel_pool_list.get(0, 'end'):
                self.show_warning("警告", "该座位号已存在!")
                return
            self.tabel_pool_list.insert(self.tabel_pool_list.size(), tabel_no)

        add_button = ctk.CTkButton(res_frame, text="添加", command=add_tabel, font=self.font_button)
        add_button.grid(row=0, column=1, padx=6, pady=5, sticky="ew")

        def insert_tabel():
            tabel_no = self.ask_string("获取座位号", "请输入新的座位号:")
            if tabel_no == "" or tabel_no is None:
                return
            if self.util.get_table_info(tabel_no, table_file_path) is None:
                self.show_warning("警告", "该座位不存在!")
                return
            if tabel_no in self.tabel_pool_list.get(0, 'end'):
                self.show_warning("警告", "该座位号已存在!")
                return
            if self.tabel_pool_list.curselection() != ():
                self.tabel_pool_list.insert(self.tabel_pool_list.curselection(), tabel_no)
            else:
                self.tabel_pool_list.insert(0, tabel_no)

        insert_button = ctk.CTkButton(res_frame, text="插入", command=insert_tabel, font=self.font_button)
        insert_button.grid(row=1, column=1, padx=6, pady=5, sticky="ew")

        def del_tabel():
            if self.tabel_pool_list.curselection() != ():
                self.tabel_pool_list.delete(self.tabel_pool_list.curselection())

        del_button = ctk.CTkButton(res_frame, text="删除", command=del_tabel, font=self.font_button)
        del_button.grid(row=0, column=2, padx=6, pady=5, sticky="ew")

        clear_button = ctk.CTkButton(res_frame, text="清空",
                                      command=lambda: self.tabel_pool_list.delete(0, 'end'), font=self.font_button)
        clear_button.grid(row=1, column=2, padx=6, pady=5, sticky="ew")

        btn_frame = LabelFrame(self, width=400, height=200, text="执行专区", font=self.font_button)
        btn_frame.grid(row=6, column=0, columnspan=2, padx=5, pady=5, sticky="ew")
        for c in range(7):
            btn_frame.grid_columnconfigure(c, weight=1)

        self.start_button = ctk.CTkButton(btn_frame, text="定时运行", height=36,
                                          command=self.toggle_task, font=self.font_button)
        self.start_button.grid(row=0, column=0, padx=4, pady=11, sticky="ew")
        self.start_button.configure(text_color="#33CC33")

        self.signin_button = ctk.CTkButton(btn_frame, text="立即签到", height=36,
                                           command=self.signin_now, font=self.font_button)
        self.signin_button.grid(row=0, column=1, padx=4, pady=11, sticky="ew")
        self.signin_button = ctk.CTkButton(btn_frame, text="立即签退", height=36,
                                           command=self.signout_now, font=self.font_button)
        self.signin_button.grid(row=0, column=2, padx=4, pady=11, sticky="ew")

        def booking_morning():
            if self.is_running:
                self.write_log("请先点击停止···")
                return None
            self.update_all_info(sigin_file_path)
            self.thread_manager.start_thread(self.execute_booking_plus, args=(False, True))

        self.booking_morning_button = ctk.CTkButton(btn_frame, text="预约早上", height=36,
                                                    command=booking_morning, font=self.font_button)
        self.booking_morning_button.grid(row=0, column=3, padx=4, pady=11, sticky="ew")

        def booking_afternoon():
            if self.is_running:
                self.write_log("请先点击停止···")
                return None
            self.update_all_info(sigin_file_path)
            self.thread_manager.start_thread(self.execute_booking_plus, args=(True, True))

        self.booking_afternoon_button = ctk.CTkButton(btn_frame, text="预约下午", height=36,
                                                      command=booking_afternoon, font=self.font_button)
        self.booking_afternoon_button.grid(row=0, column=4, padx=4, pady=11, sticky="ew")

        self.cancel_booking_button = ctk.CTkButton(btn_frame, text="取消预约", height=36,
                                                   command=self.booking_cancel, font=self.font_button)
        self.cancel_booking_button.grid(row=0, column=5, padx=4, pady=11, sticky="ew")

        self.clear_outbox_button = ctk.CTkButton(btn_frame, text="清空输出", height=36,
                                                 command=self.clear_output_box, font=self.font_button)
        self.clear_outbox_button.grid(row=0, column=6, padx=4, pady=11, sticky="ew")

        self.output_box = ctk.CTkTextbox(self, state='disabled', font=self.font_text)
        self.output_box.grid(row=7, column=0, columnspan=2, padx=10, pady=10, sticky="nsew")

    def clear_output_box(self):
        self.output_box.configure(state='normal')  # 允许编辑
        self.output_box.delete('1.0', 'end')  # 清空内容
        self.output_box.configure(state='disabled')  # 重新禁用编辑

    # 设置界面大小与位置
    def set_screen(self, width, height):
        screen_width = self.winfo_screenwidth()
        screen_height = self.winfo_screenheight()
        window_size = '%dx%d+%d+%d' % (width, height, (screen_width - width) / 2, (screen_height - height) / 2)
        return window_size

    # 分别填充到各个 Entry 控件中
    def fill_controls(self, data):
        # 先清空再填充,避免重复插入
        self.readerno_entry.delete(0, 'end')
        self.password_entry.delete(0, 'end')
        self.time1_entry.delete(0, 'end')
        self.time2_entry.delete(0, 'end')
        self.time3_entry.delete(0, 'end')
        self.time4_entry.delete(0, 'end')

        self.readerno_entry.insert(0, data.get("readerno", ""))
        self.password_entry.insert(0, data.get("password", ""))
        self.time1_entry.insert(0, data.get("sigin_time1", ""))
        self.time2_entry.insert(0, data.get("sigin_time2", ""))
        self.time3_entry.insert(0, data.get("signout_time1", ""))
        self.time4_entry.insert(0, data.get("signout_time2", ""))
        self.tabel_pool = data.get("tables", [])
        for item in self.tabel_pool:
            self.tabel_pool_list.insert('end', item)

    def update_all_info(self, filepath):
        logging.info("更新所有信息至文件: %s", filepath)
        data = self.util.read_json_file(filepath)
        if data:
            data["readerno"] = self.readerno_entry.get()
            data["password"] = self.password_entry.get()
            data["sigin_time1"] = self.time1_entry.get()
            data["sigin_time2"] = self.time2_entry.get()
            data["signout_time1"] = self.time3_entry.get()
            data["signout_time2"] = self.time4_entry.get()
            self.tabel_pool.clear()
            self.tabel_pool = list(self.tabel_pool_list.get(0, 'end'))
            data["tables"] = self.tabel_pool

            self.util.update_json_file(filepath, data)
            logging.info("成功更新所有信息至文件")
            self.write_log("成功更新所有信息至文件!")
        else:
            logging.warning("未能从文件中读取到数据")
            self.write_log("未能从文件中读取到数据!")

    def write_checkstr(self, checkstr) -> str:
        self.checkstr_entry.configure(state='normal')

        self.checkstr_entry.delete('1.0', 'end')
        self.checkstr_entry.insert('end', checkstr)

        self.checkstr_entry.configure(state='disabled')

        return checkstr

    # 在文本框中输出日志
    def write_log(self, message):
        self.output_box.configure(state='normal')
        self.output_box.insert('end', message + "\n")
        self.output_box.see('end')
        self.output_box.configure(state='disabled')

    def process_thread_results(self):
        """定时检查线程管理器的队列,处理回调"""
        self.thread_manager.check_results()
        self.after(100, self.process_thread_results)  # 循环检查

    def booking_cancel(self):
        if self.is_running:
            self.write_log("请先点击停止···")
            return None
        self.update_all_info(sigin_file_path)
        self.thread_manager.start_thread(self.reservation.cancel_reservation,
                                         args=(self.jsessionid,
                                               self.readerno_entry.get(),
                                               self.password_entry.get()),
                                         callback=self.booking_cancel_callback
                                         )

    def booking_cancel_callback(self, result):
        if isinstance(result, str):
            self.write_log(result)
        elif result:
            self.write_log("已将预约信息取消···")
        else:
            self.write_log("具体问题请查看日志···")

    def signin_now(self):
        if self.is_running:
            self.write_log("请先点击停止···")
            return None

        self.update_all_info(sigin_file_path)
        readerno = self.readerno_entry.get()
        password = self.password_entry.get()
        self.jsessionid = self.util.get_jsessionid()
        if self.tabel_pool_list.curselection() != ():
            tableno = self.tabel_pool[self.tabel_pool_list.curselection()[0]]
        else:
            tableno = self.tabel_pool[self.tabel_point]
        checkstr = self.util.get_checkstr(readerno, tableno)

        self.write_log(f"开始签到,座位: {tableno}")

        # 启动签到线程,并传递结果回调方法
        self.thread_manager.start_thread(
            target=self.signInOut.send_signin_req,
            args=(readerno, password, tableno, self.jsessionid, checkstr),
            callback=self.signin_callback  # 签到结束后的回调
        )

    def signin_callback(self, result):
        """签到回调函数,处理签到结果"""
        self.write_log(f"签到结果: {result}")

    def signout_now(self):
        if self.is_running:
            self.write_log("请先点击停止···")
            return None

        self.update_all_info(sigin_file_path)
        readerno = self.readerno_entry.get()
        password = self.password_entry.get()

        self.write_log(f"开始签退,学号: {readerno}")
        # 启动签退线程,并传递结果回调方法
        self.thread_manager.start_thread(
            target=self.signInOut.send_signout_req,
            args=(readerno, password, self.jsessionid),
            callback=self.signout_callback  # 签退结束后的回调
        )

    def signout_callback(self, result):
        """签退回调函数,处理签退结果"""
        self.write_log(f"签退结果: {result}")

    def toggle_task(self):
        # 切换任务状态
        if not self.is_running:
            self.clear_output_box()
            self.start_scheduled_tasks()
            self.start_button.configure(text="停止运行", text_color="#ff0000")  # 更新按钮文本为"停止"
            self.is_running = True
        else:
            self.is_running = False
            self.start_button.configure(text="定时运行", text_color="#33CC33")  # 恢复按钮文本为"开始"

    # 开始多线程任务
    def start_scheduled_tasks(self):
        self.update_all_info(sigin_file_path)

        # 读取输入的基本信息
        readerno = self.readerno_entry.get()
        password = self.password_entry.get()
        time1 = self.time1_entry.get()
        time2 = self.time2_entry.get()
        time3 = self.time3_entry.get()
        time4 = self.time4_entry.get()

        # 启动自动运行线程,并传递结果回调方法
        self.thread_manager.start_thread(
            target=self.schedule_tasks,
            args=(readerno, password, time1, time2, time3, time4),
        )

    def shuffle_seat(self):
        random.shuffle(self.tabel_pool)
        self.tabel_pool_list.delete(0, 'end')
        for item in self.tabel_pool:
            self.tabel_pool_list.insert('end', item)
        self.update_all_info(sigin_file_path)
        self.write_log(f"座位打乱:{self.tabel_pool}")

    def schedule_tasks(self, readerno, password, time1, time2, time3, time4):
        with self.lock:
            schedule.clear()
            # 定时任务调度
            schedule.every().day.at(time1).do(self.schedule_task, readerno, password, time1)
            schedule.every().day.at(time2).do(self.schedule_task, readerno, password, time2)
            schedule.every().day.at(time3).do(self.schedule_task, readerno, password, time3, signout=True)
            schedule.every().day.at(time4).do(self.schedule_task, readerno, password, time4, signout=True)

            schedule.every().day.at("17:00:00").do(self.execute_booking_plus, False)
            today_booking = "20:" + readerno[-2:] + ":00"
            # today_booking = "17:46:30"      #测试项
            schedule.every().day.at(today_booking).do(self.execute_booking_plus, True)

            schedule.every().day.at("23:00:00").do(self.clear_output_box)
            schedule.every().day.at("23:30:00").do(self.shuffle_seat)

            self.write_log("任务调度正在运行...")
            while self.is_running:
                try:
                    schedule.run_pending()
                except Exception as e:
                    logging.error(f"调度器运行时出现错误: {e}")
                t.sleep(1)
            self.write_log("任务调度结束运行...")

    def schedule_task(self, readerno, password, time, signout=False):
        """辅助函数用于签到或签退的定时任务执行"""
        if signout:
            self.write_log(f"定时签退任务执行中: {time}")
            signout_info = self.signInOut.send_signout_req(readerno, password, self.jsessionid)
            self.write_log(f"签退任务执行结果: {signout_info}")
        else:
            self.write_log(f"定时签到任务执行中: {time}")

            sigin_info = ""
            reservationData = self.reservation.get_reservation_list(self.jsessionid, readerno)
            if not reservationData:
                self.write_log("当前无可签到座位信息...")
                logging.info("未获取到预约信息列表...")
                return None
            for item in reservationData:
                checkstr = self.util.get_checkstr(readerno, item['TableNo'])
                sigin_info = self.signInOut.send_signin_req(readerno, password, item['TableNo'], self.jsessionid,
                                                            checkstr)
                if sigin_info == "签到成功":
                    break
            self.write_log(f"签到任务执行结果: {sigin_info}")

        logging.info("任务 %s 执行完毕: %s", "签到" if not signout else "签退", time)

    # 刷新座位信息
    def refresh_seat_info(self, point):
        self.tabel_pool[0], self.tabel_pool[point] = self.tabel_pool[point], self.tabel_pool[0]
        self.tabel_point = 0
        self.tabel_pool_list.delete(0, 'end')
        for item in self.tabel_pool:
            self.tabel_pool_list.insert('end', item)

    def update_seat_info(self, table_no, now_table_no):
        if table_no == now_table_no:
            self.write_log(f"已预约成功:{table_no}")
            logging.info(f"已预约成功:{table_no}")
            if self.tabel_point != 0:
                self.refresh_seat_info(self.tabel_point)
        else:
            self.write_log(f"已有座位信息: {now_table_no}")
            logging.info(f"已有座位信息: {now_table_no}")
            table_pool_now = list(self.tabel_pool_list.get(0, 'end'))
            if now_table_no in table_pool_now:
                if self.tabel_point != 0:
                    self.refresh_seat_info(table_pool_now.index(now_table_no))
            else:
                self.tabel_pool_list.insert(0, now_table_no)
                self.update_all_info(sigin_file_path)

    def execute_booking_plus(self, e_time, manual_operation=False):
        # 获取信息
        reader_no = self.readerno_entry.get()

        begin_time, end_time = self.get_booking_times(e_time)

        time_count = 1
        max_retries = 10

        # 获取预约时间段
        c_time = datetime.now().strftime("%H:%M:%S")
        self.write_log(f"执行预约操作,时间:{c_time}")

        # 进行预约
        while True:
            table_no = self.tabel_pool[self.tabel_point]
            table_id = self.util.get_table_info(table_no, table_file_path)
            room_no = int(table_no[0])
            room_id = self.util.get_room_no(room_no)
            check_str = self.util.get_checkstr(reader_no, table_no)
            bask_id = self.booking.get_bask_id(baskid_file_path, "27" if e_time else room_id)
            result = self.booking.make_booking(
                self.jsessionid,
                begin_time,
                int(bask_id),
                check_str,
                end_time,
                reader_no,
                int(room_id),
                int(table_id),
                table_no
            )
            if not result:
                self.write_log("预约请求出现异常,重新执行...")
                logging.info("预约请求出现异常,重新执行...")
                if time_count > max_retries:
                    self.write_log("预约请求达到上限···")
                    return None
                time_count += 1
                continue
            if result[0]:
                if "预约成功" not in result[1] and self.tabel_point != 0:
                    self.tabel_point -= 1
                if self.tabel_point != 0:
                    self.refresh_seat_info(self.tabel_point)
                self.write_log(f"{result[1]}:{table_no}")
                return True
            else:
                # False, False
                if not result[0] and not result[1]:
                    self.write_log("读者没有预约权限, 等待30s重新发送请求···")
                    t.sleep(30)
                    continue

                # False, 502, 504
                if result[1] in (502, 504):
                    data = self.reservation.get_reservation_list(self.jsessionid, reader_no)
                    if not data:
                        self.tabel_point += 1
                        if self.tabel_point >= len(self.tabel_pool):
                            self.tabel_point = 0
                            self.write_log("新一轮预约···")
                            logging.info("新一轮预约···")
                        logging.info(f"移动到下一个座位: {self.tabel_pool[self.tabel_point]}")
                        self.write_log(f"移动到下一个座位: {self.tabel_pool[self.tabel_point]}")
                        continue
                    self.update_seat_info(table_no, data[0]['TableNo'])
                    return True

                # 获取返回时间判断
                sleep_time_m = self.get_remaining_seconds(result[1])

                # False, msg
                if sleep_time_m is None:
                    if not e_time:
                        data = self.reservation.get_reservation_list(self.jsessionid, reader_no)
                        if data:
                            self.update_seat_info(table_no, data[0]['TableNo'])
                            return True
                    self.write_log(f"{result[1]}:{table_no}")
                    self.tabel_point += 1
                    if self.tabel_point >= len(self.tabel_pool):
                        self.tabel_point = 0
                        self.write_log("无可预约座位···")
                        logging.error("无可预约座位···")
                        return None
                    logging.info(f"移动到下一个座位: {self.tabel_pool[self.tabel_point]}")
                    self.write_log(f"移动到下一个座位: {self.tabel_pool[self.tabel_point]}")
                else:
                    if manual_operation:  # 手动只显示不执行
                        self.write_log(result[1])
                        return None
                    if time_count > max_retries:
                        self.write_log("运行达到上限···")
                        return None
                    time_count += 1
                    self.write_log(f"等待{sleep_time_m}秒后重新执行")
                    logging.info(f"等待{sleep_time_m}秒后重新执行")
                    t.sleep(sleep_time_m)
                    t.sleep(1)  # 容错时间

    def get_remaining_seconds(self, text):
        # 使用正则表达式提取时间
        match = re.search(r"\d{2}:\d{2}:\d{2}", text)

        # 如果找不到时间格式,则返回提示
        if not match:
            return None

        # 提取时间字符串并转换为 datetime 对象
        time_str = match.group()
        current_time = datetime.strptime(time_str, "%H:%M:%S")

        # 目标时间 17:00:00
        target_time = current_time.replace(hour=17, minute=0, second=0)

        # 计算剩余时间
        time_difference = target_time - current_time
        remaining_seconds = int(time_difference.total_seconds())

        # 如果已超过17点,返回0秒
        return max(remaining_seconds, 0)

    def get_booking_times(self, e_time):
        """返回开始和结束时间"""
        if e_time:
            return "13:50:01", "22:30:00"  # 示例下午时段
        else:
            return "07:00:00", "13:50:00"  # 示例上午时段

# =============================================================================
# 配置验证和主程序入口
# =============================================================================

def validate_config_file(file_path: str, required_fields: Optional[List[str]] = None) -> bool:
    """
    验证配置文件

    Args:
        file_path (str): 配置文件路径
        required_fields (Optional[List[str]]): 必需的字段列表

    Returns:
        bool: 配置文件是否有效
    """
    logger = logging.getLogger(__name__)

    # 检查文件是否存在
    if not os.path.isfile(file_path):
        logger.error(f"配置文件不存在: {file_path}")
        return False

    try:
        # 读取并验证JSON格式
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)

        # 验证必需字段
        if required_fields:
            missing_fields = [field for field in required_fields if field not in data]
            if missing_fields:
                logger.error(f"配置文件 {file_path} 缺少必需字段: {missing_fields}")
                return False

        logger.info(f"配置文件验证通过: {file_path}")
        return True

    except json.JSONDecodeError as e:
        logger.error(f"配置文件JSON格式错误: {file_path}, 错误: {e}")
        return False
    except Exception as e:
        logger.error(f"验证配置文件时发生错误: {file_path}, 错误: {e}")
        return False

def validate_all_configs() -> bool:
    """
    验证所有配置文件

    Returns:
        bool: 所有配置文件是否都有效
    """
    logger = logging.getLogger(__name__)
    logger.info("开始验证配置文件")

    # 获取当前目录(兼容打包后的可执行文件目录)
    current_directory = BASE_DIR

    # 配置文件路径和必需字段
    config_files = {
        os.path.join(current_directory, "sigin_info.json"): [
            "readerno", "password", "sigin_time1", "sigin_time2",
            "signout_time1", "signout_time2", "tables"
        ],
        os.path.join(current_directory, 'table_info.json'): [],
        os.path.join(current_directory, 'bask_id.json'): ["morning", "afternoon"]
    }

    all_valid = True

    for file_path, required_fields in config_files.items():
        if not validate_config_file(file_path, required_fields):
            all_valid = False
            # 显示错误对话框
            messagebox.showerror(
                "配置错误",
                f"配置文件验证失败:\n{file_path}\n\n请检查配置文件格式和必需字段。"
            )

    if all_valid:
        logger.info("所有配置文件验证通过")
    else:
        logger.error("配置文件验证失败")

    return all_valid

def create_default_configs() -> None:
    """
    创建默认配置文件(如果不存在)
    """
    logger = logging.getLogger(__name__)
    current_directory = BASE_DIR

    # 默认配置
    default_configs = {
        "sigin_info.json": {
            "readerno": "",
            "password": "",
            "sigin_time1": "15:06:00",
            "sigin_time2": "14:00:00",
            "signout_time1": "13:40:00",
            "signout_time2": "22:20:00",
            "tables": ["2-126", "2-128"]
        }
    }

    for filename, config in default_configs.items():
        file_path = os.path.join(current_directory, filename)
        if not os.path.exists(file_path):
            try:
                with open(file_path, 'w', encoding='utf-8') as f:
                    json.dump(config, f, ensure_ascii=False, indent=4)
                logger.info(f"创建默认配置文件: {file_path}")
            except Exception as e:
                logger.error(f"创建默认配置文件失败: {file_path}, 错误: {e}")

def main():
    """
    主程序入口
    """
    global sigin_file_path, table_file_path, baskid_file_path

    logger = logging.getLogger(__name__)
    logger.info("程序启动")

    try:
        # 设置全局文件路径
        current_directory = BASE_DIR
        sigin_file_path = os.path.join(current_directory, "sigin_info.json")
        table_file_path = os.path.join(current_directory, 'table_info.json')
        baskid_file_path = os.path.join(current_directory, 'bask_id.json')

        # 创建默认配置文件(如果需要)
        create_default_configs()

        # 验证配置文件
        if not validate_all_configs():
            logger.error("配置文件验证失败,程序退出")
            sys.exit(1)

        # 启动主界面
        logger.info("启动主界面")
        app = MainInterface()
        app.mainloop()

    except KeyboardInterrupt:
        logger.info("用户中断程序")
    except Exception as e:
        logger.error(f"程序运行时发生错误: {e}")
        messagebox.showerror("程序错误", f"程序运行时发生错误:\n{e}")
    finally:
        logger.info("程序结束")

if __name__ == '__main__':
    main()
© 版权声明
THE END
喜欢就支持一下吧
点赞13 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容