代码
[content_hide]
# 标准库导入
import hashlib
import json
import logging
import os
import queue
import random
import re
import sys
import threading
import time as t
from datetime import datetime, time, timedelta
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
# 第三方库导入
import requests
import schedule
import customtkinter as ctk
from tkinter import LabelFrame, Listbox, messagebox
from tkinter import font as tkfont
# 本地模块导入(如果有的话)
# import customtkinter # 注释掉未使用的导入
# =============================================================================
# 全局配置和常量
# =============================================================================
# 日志配置
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
LOG_LEVEL = logging.INFO
# 配置全局日志记录
logging.basicConfig(
filename=os.path.join(os.getcwd(), 'application.log'),
level=LOG_LEVEL,
format=LOG_FORMAT,
encoding='utf-8', # 支持中文日志
filemode='a' # 追加模式
)
# 创建根日志记录器
logger = logging.getLogger(__name__)
# API相关常量
BASE_URL = 'https://seat.gsupl.edu.cn'
API_ENDPOINTS = {
'config': '/interface/readingroom/singleAppConfig',
'reservation_list': '/interface/readingroom/getdespeakdata',
'cancel_reservation': '/interface/readingroom/deletedeskdata',
'check_time': '/interface/readingroom/checktimecanbesk',
'make_booking': '/interface/readingroom/beskdata',
'signin': '/interface/readingroom/beskreadersigin',
'signout': '/interface/readingroom/returntable'
}
# 房间号映射
ROOM_MAPPING = {
1: '23',
2: '26',
3: '28',
4: '24',
5: '27'
}
# 预约时间段配置
BOOKING_TIMES = {
'morning': ('07:00:00', '13:50:00'),
'afternoon': ('13:50:01', '22:30:00')
}
# 请求头模板
DEFAULT_HEADERS = {
"Content-Length": "0",
'Host': 'seat.gsupl.edu.cn',
'Connection': 'Keep-Alive',
'Cookie2': '$Version=1',
'Accept-Encoding': 'gzip'
}
# 全局文件路径(在main函数中设置)
sigin_file_path = ""
table_file_path = ""
baskid_file_path = ""
# =============================================================================
# 预约系统管理类
# =============================================================================
class ReservationSystem:
"""
预约系统管理类
负责处理图书馆座位预约相关的操作,包括获取预约列表和取消预约等功能。
"""
def __init__(self):
"""初始化预约系统"""
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
def get_reservation_list(self, jsessionid: str, reader_no: str) -> Optional[List[Dict[str, Any]]]:
"""
获取用户的预约列表
Args:
jsessionid (str): 会话ID,用于身份验证
reader_no (str): 读者编号(学号)
Returns:
Optional[List[Dict[str, Any]]]: 预约列表数据,如果获取失败则返回None
Raises:
requests.exceptions.RequestException: 网络请求异常
"""
self.logger.info(f"开始获取预约列表,读者编号: {reader_no}")
# 构建请求URL
reservation_list_url = f"{BASE_URL}{API_ENDPOINTS['reservation_list']}?readerno={reader_no}"
# 构建请求头
headers = DEFAULT_HEADERS.copy()
headers['Cookie'] = jsessionid
try:
# 发送GET请求获取预约列表
response = requests.get(reservation_list_url, headers=headers, timeout=10)
if response.status_code != 200:
self.logger.error(f"获取预约列表失败,HTTP状态码: {response.status_code}")
return None
# 解析响应数据
data = response.json()
# 检查API返回状态
if data.get("ReturnValue") != 0:
self.logger.warning(f"API返回错误: {data.get('Msg', '未知错误')}")
return None
# 提取预约数据
reservation_data = data.get('Data')
if not reservation_data:
self.logger.info("当前没有预约记录")
return None
self.logger.info(f"成功获取预约列表,共{len(reservation_data)}条记录")
return reservation_data
except requests.exceptions.RequestException as e:
self.logger.error(f"网络请求异常: {e}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"JSON解析错误: {e}")
return None
except Exception as e:
self.logger.error(f"获取预约列表时发生未知错误: {e}")
return None
def cancel_reservation(self, jsessionid: str, reader_no: str, password: str) -> Union[bool, str, None]:
"""
取消用户的预约
Args:
jsessionid (str): 会话ID,用于身份验证
reader_no (str): 读者编号(学号)
password (str): 用户密码
Returns:
Union[bool, str, None]:
- True: 取消成功
- str: 错误消息
- None: 网络错误或其他异常
"""
self.logger.info(f"开始取消预约,读者编号: {reader_no}")
# 首先获取预约列表
reservation_data = self.get_reservation_list(jsessionid, reader_no)
if not reservation_data:
self.logger.error("无法获取预约列表,取消预约失败")
return "无法获取预约列表"
# 获取第一个预约的ID(假设用户只有一个预约)
try:
strid = reservation_data[0]['ID']
except (KeyError, IndexError) as e:
self.logger.error(f"预约数据格式错误: {e}")
return "预约数据格式错误"
# 构建取消预约的请求参数
params = {
"password": password,
"readerno": reader_no,
"strid": strid
}
# 构建请求URL和请求头
cancel_url = f"{BASE_URL}{API_ENDPOINTS['cancel_reservation']}"
headers = DEFAULT_HEADERS.copy()
headers['Cookie'] = jsessionid
try:
# 发送取消预约请求
response = requests.get(cancel_url, headers=headers, params=params, timeout=10)
if response.status_code != 200:
self.logger.error(f"取消预约请求失败,HTTP状态码: {response.status_code}")
return False
# 解析响应数据
cancel_data = response.json()
# 检查取消结果
if cancel_data.get("ReturnValue") == 0:
self.logger.info("预约取消成功")
return True
else:
error_msg = cancel_data.get('Msg', '取消预约失败')
self.logger.warning(f"取消预约失败: {error_msg}")
return error_msg
except requests.exceptions.RequestException as e:
self.logger.error(f"网络请求异常: {e}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"JSON解析错误: {e}")
return None
except Exception as e:
self.logger.error(f"取消预约时发生未知错误: {e}")
return None
# =============================================================================
# 预约管理类
# =============================================================================
class Booking:
"""
预约管理类
负责处理图书馆座位预约相关的操作,包括获取预约ID、预约座位、取消预约等功能。
"""
def __init__(self):
"""初始化预约管理类"""
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
def get_can_bask_id(self, jsessionid: str, room_no: str) -> Union[str, None]:
"""
获取可预约时间段的ID
Args:
jsessionid (str): 会话ID,用于身份验证
room_no (str): 房间号
Returns:
Union[str, None]: 可预约ID或错误消息,如果获取失败则返回None
"""
self.logger.info(f"获取可预约时间段ID,房间号: {room_no}")
# 构建请求URL
url = f"{BASE_URL}{API_ENDPOINTS['check_time']}?roomno={room_no}"
# 构建请求头
headers = DEFAULT_HEADERS.copy()
headers['Cookie'] = jsessionid
try:
# 发送GET请求
response = requests.get(url, headers=headers, timeout=10)
self.logger.debug(f"获取可预约ID请求状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
if data.get("ReturnValue") == 0:
can_bask_id = data.get('CanBaskID')
self.logger.info(f"成功获取可预约ID: {can_bask_id}")
return can_bask_id
else:
error_msg = data.get('Msg', '未知错误')
self.logger.warning(f"获取可预约ID失败: {error_msg}")
return error_msg
else:
self.logger.error(f"获取可预约ID请求失败,HTTP状态码: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
self.logger.error(f"网络请求异常: {e}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"JSON解析错误: {e}")
return None
except Exception as e:
self.logger.error(f"获取可预约ID时发生未知错误: {e}")
return None
def get_bask_id(self, filepath: str, room_no: str) -> Optional[str]:
"""
从配置文件中获取预约ID
Args:
filepath (str): 配置文件路径
room_no (str): 房间号
Returns:
Optional[str]: 预约ID,如果获取失败则返回None
"""
self.logger.info(f"从配置文件获取预约ID,房间号: {room_no}")
try:
with open(filepath, 'r', encoding='utf-8') as file:
data = json.load(file)
# 获取morning时段的预约ID
baskid = data.get("morning", {}).get(room_no)
if baskid is not None:
self.logger.info(f"成功获取预约ID: {baskid}")
return str(baskid)
else:
self.logger.warning(f"房间号 {room_no} 在配置文件中不存在")
return None
except FileNotFoundError:
self.logger.error(f"配置文件未找到: {filepath}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"JSON文件格式错误: {e}")
return None
except Exception as e:
self.logger.error(f"读取配置文件时发生未知错误: {e}")
return None
def cancel_reservation(self, jsessionid: str, reader_no: str, password: str) -> Union[bool, str, None]:
"""
取消用户的预约(Booking类的实现)
Args:
jsessionid (str): 会话ID,用于身份验证
reader_no (str): 读者编号(学号)
password (str): 用户密码
Returns:
Union[bool, str, None]:
- True: 取消成功
- str: 错误消息
- None: 网络错误或其他异常
"""
self.logger.info(f"开始取消预约,读者编号: {reader_no}")
# 构建获取预约列表的URL
get_str_id_url = f"{BASE_URL}{API_ENDPOINTS['reservation_list']}?readerno={reader_no}"
# 构建请求头
headers = DEFAULT_HEADERS.copy()
headers['Cookie'] = jsessionid
try:
# 获取预约列表
response = requests.get(get_str_id_url, headers=headers, timeout=10)
if response.status_code != 200:
self.logger.error(f"获取预约列表失败,HTTP状态码: {response.status_code}")
return False
# 解析预约列表数据
strid_data = response.json()
if strid_data.get("ReturnValue") != 0:
error_msg = strid_data.get('Msg', '获取预约列表失败')
self.logger.warning(f"获取预约列表失败: {error_msg}")
return error_msg
# 提取预约ID
reservation_data = strid_data.get('Data')
if not reservation_data:
self.logger.info("当前没有预约记录")
return "当前没有预约记录"
strid = reservation_data[0].get('ID')
if not strid:
self.logger.error("预约数据中缺少ID字段")
return False
# 构建取消预约的请求参数
params = {
"password": password,
"readerno": reader_no,
"strid": strid
}
# 发送取消预约请求
cancel_url = f"{BASE_URL}{API_ENDPOINTS['cancel_reservation']}"
cancel_response = requests.get(cancel_url, headers=headers, params=params, timeout=10)
if cancel_response.status_code != 200:
self.logger.error(f"取消预约请求失败,HTTP状态码: {cancel_response.status_code}")
return False
# 解析取消结果
cancel_data = cancel_response.json()
if cancel_data.get("ReturnValue") == 0:
self.logger.info("预约取消成功")
return True
else:
error_msg = cancel_data.get('Msg', '取消预约失败')
self.logger.warning(f"取消预约失败: {error_msg}")
return error_msg
except requests.exceptions.RequestException as e:
self.logger.error(f"网络请求异常: {e}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"JSON解析错误: {e}")
return None
except Exception as e:
self.logger.error(f"取消预约时发生未知错误: {e}")
return None
def _parse_booking_result(self, data: Dict[str, Any]) -> Tuple[bool, Union[str, bool]]:
"""
解析预约结果
Args:
data (Dict[str, Any]): API返回的数据
Returns:
Tuple[bool, Union[str, bool]]: (是否成功, 消息或状态)
"""
return_value = data.get("ReturnValue")
msg = data.get('Msg', '未知消息')
if return_value == 0:
self.logger.info(f"预约成功: {msg}")
return True, msg
else:
self.logger.info(f"预约返回信息: {msg}")
# 根据不同的错误消息返回不同的结果
if msg == "您已经预约过该时间段的座位了!无需重复预约":
return True, msg # 已经预约过也算成功
elif msg == "预约失败":
return False, msg
elif msg == "此座位已被预约,不能重复预约":
return False, msg
elif "未到可预约时间" in msg:
return False, msg
elif msg == "读者没有预约权限":
return False, False # 特殊标记:没有权限
else:
self.logger.warning(f"预约失败,未知错误信息: {msg}")
return False, msg
def make_booking(self, jsessionid: str, begin_time: str, besk_id: str, check_str: str,
end_time: str, reader_no: str, room_no: str, table_id: str,
table_no: str) -> Union[Tuple[bool, Union[str, bool]], Tuple[bool, int], None]:
"""
执行座位预约
Args:
jsessionid (str): 会话ID,用于身份验证
begin_time (str): 开始时间
besk_id (str): 预约ID
check_str (str): 校验字符串
end_time (str): 结束时间
reader_no (str): 读者编号(学号)
room_no (str): 房间号
table_id (str): 桌子ID
table_no (str): 桌子编号
Returns:
Union[Tuple[bool, Union[str, bool]], Tuple[bool, int], None]:
- (True, msg): 预约成功
- (False, msg): 预约失败
- (False, status_code): 服务器错误
- None: 网络异常
"""
self.logger.info(f"开始执行预约请求,座位: {table_no}")
# 构建请求头
headers = DEFAULT_HEADERS.copy()
headers['Cookie'] = jsessionid
# 构建请求参数
params = {
"begintime": begin_time,
"beskid": besk_id,
"checkstr": check_str,
"endtime": end_time,
"readerno": reader_no,
"roomno": room_no,
"tableid": table_id,
"tableno": table_no
}
# 构建请求URL
url = f"{BASE_URL}{API_ENDPOINTS['make_booking']}"
try:
# 发送预约请求
response = requests.get(url, headers=headers, params=params, timeout=10)
self.logger.debug(f"预约请求状态码: {response.status_code}")
if response.status_code == 200:
# 成功响应,解析结果
data = response.json()
return self._parse_booking_result(data)
elif response.status_code in (502, 504):
# 网关错误,返回状态码
self.logger.warning(f"网关错误: {response.status_code}")
return False, response.status_code
elif response.status_code == 500:
# 服务器内部错误,等待后重试
self.logger.error("服务器内部错误500,等待60秒后重试...")
t.sleep(60)
return None
else:
# 其他HTTP错误
self.logger.error(f"预约请求失败,HTTP状态码: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
self.logger.error(f"网络请求异常: {e}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"JSON解析错误: {e}")
return None
except Exception as e:
self.logger.error(f"预约时发生未知错误: {e}")
return None
# =============================================================================
# 签到签退管理类
# =============================================================================
class SignInOut:
"""
签到签退管理类
负责处理图书馆座位的签到和签退操作。
"""
def __init__(self):
"""初始化签到签退管理类"""
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
def send_signin_req(self, readerno: str, password: str, tableno: str,
jsessionid: str, checkstr: str) -> Optional[str]:
"""
发送签到请求
Args:
readerno (str): 读者编号(学号)
password (str): 用户密码
tableno (str): 桌子编号
jsessionid (str): 会话ID,用于身份验证
checkstr (str): 校验字符串
Returns:
Optional[str]: 签到结果消息,如果请求失败则返回None
"""
self.logger.info(f"开始执行签到请求,座位: {tableno}")
# 构建请求URL
url = f"{BASE_URL}{API_ENDPOINTS['signin']}"
# 构建请求参数
params = {
'checkstr': checkstr,
'password': password,
'readerno': readerno,
'tableno': tableno
}
# 构建请求头
headers = DEFAULT_HEADERS.copy()
headers['Cookie'] = jsessionid
try:
# 发送签到请求
response = requests.get(url, headers=headers, params=params, timeout=10)
self.logger.debug(f"签到请求状态码: {response.status_code}")
if response.status_code == 200:
# 解析响应数据
data = response.json()
msg = data.get('Msg', '未知消息')
self.logger.info(f"签到请求返回信息: {msg}")
return msg
else:
self.logger.error(f"签到请求失败,HTTP状态码: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
self.logger.error(f"网络请求异常: {e}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"JSON解析错误: {e}")
return None
except Exception as e:
self.logger.error(f"签到时发生未知错误: {e}")
return None
def send_signout_req(self, readerno: str, password: str, jsessionid: str) -> Optional[str]:
"""
发送签退请求
Args:
readerno (str): 读者编号(学号)
password (str): 用户密码
jsessionid (str): 会话ID,用于身份验证
Returns:
Optional[str]: 签退结果消息,如果请求失败则返回None
"""
self.logger.info(f"开始执行签退请求,读者编号: {readerno}")
# 构建请求URL
url = f"{BASE_URL}{API_ENDPOINTS['signout']}"
# 构建请求参数
params = {
'password': password,
'readerno': readerno
}
# 构建请求头
headers = DEFAULT_HEADERS.copy()
headers['Cookie'] = jsessionid
try:
# 发送签退请求
response = requests.get(url, headers=headers, params=params, timeout=10)
self.logger.debug(f"签退请求状态码: {response.status_code}")
if response.status_code == 200:
# 解析响应数据
data = response.json()
msg = data.get('Msg', '未知消息')
self.logger.info(f"签退成功,返回信息: {msg}")
return msg
else:
self.logger.error(f"签退请求失败,HTTP状态码: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
self.logger.error(f"网络请求异常: {e}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"JSON解析错误: {e}")
return None
except Exception as e:
self.logger.error(f"签退时发生未知错误: {e}")
return None
# =============================================================================
# 工具类
# =============================================================================
class Util:
"""
工具类
提供各种实用功能,包括房间号映射、文件操作、数据验证等。
"""
def __init__(self):
"""初始化工具类"""
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
# 缓存机制
self._file_cache: Dict[str, Dict[str, Any]] = {}
self._cache_timestamps: Dict[str, float] = {}
self._cache_timeout = 300 # 5分钟缓存超时
def get_room_no(self, room_no: int) -> str:
"""
获取房间号映射
Args:
room_no (int): 输入的房间号
Returns:
str: 映射后的房间号
"""
self.logger.debug(f"获取房间号映射,输入: {room_no}")
# 使用预定义的映射表
mapped_room = ROOM_MAPPING.get(room_no, '27') # 默认为27
self.logger.debug(f"房间号映射结果: {room_no} -> {mapped_room}")
return mapped_room
def get_table_info(self, table_no: str, filepath: str) -> Optional[int]:
"""
获取桌子信息
Args:
table_no (str): 桌子编号
filepath (str): 配置文件路径
Returns:
Optional[int]: 桌子ID,如果不存在则返回None
"""
self.logger.debug(f"获取桌子信息,桌子编号: {table_no}")
# 读取配置文件数据
data = self.read_json_file(filepath)
if not data:
return None
# 获取桌子信息
table_info = data.get(table_no)
if table_info is not None:
self.logger.debug(f"找到桌子信息: {table_no} -> {table_info}")
return table_info
else:
self.logger.warning(f"桌子编号 {table_no} 在配置文件中不存在")
return None
def read_json_file(self, filepath: str) -> Dict[str, Any]:
"""
读取JSON文件(带缓存机制)
Args:
filepath (str): 文件路径
Returns:
Dict[str, Any]: JSON数据,如果读取失败则返回空字典
"""
self.logger.debug(f"读取JSON文件: {filepath}")
# 检查缓存
current_time = t.time()
if (filepath in self._file_cache and
filepath in self._cache_timestamps and
current_time - self._cache_timestamps[filepath] < self._cache_timeout):
self.logger.debug(f"从缓存读取文件: {filepath}")
return self._file_cache[filepath]
try:
with open(filepath, 'r', encoding='utf-8') as file:
data = json.load(file)
# 更新缓存
self._file_cache[filepath] = data
self._cache_timestamps[filepath] = current_time
self.logger.debug(f"成功读取JSON文件: {filepath}")
return data
except FileNotFoundError:
self.logger.error(f"文件未找到: {filepath}")
return {}
except json.JSONDecodeError as e:
self.logger.error(f"JSON解码错误: {filepath}, 错误: {e}")
return {}
except Exception as e:
self.logger.error(f"读取文件时发生未知错误: {e}")
return {}
def get_jsessionid(self) -> str:
"""
获取JSESSIONID
Returns:
str: JSESSIONID字符串,如果获取失败则返回默认值
"""
self.logger.info("开始获取JSESSIONID")
# 构建请求URL
url = f"{BASE_URL}{API_ENDPOINTS['config']}"
# 构建请求头
headers = DEFAULT_HEADERS.copy()
try:
# 发送POST请求获取配置
response = requests.post(url, headers=headers, timeout=10)
if response.status_code == 200:
# 从响应头中提取Set-Cookie
set_cookie = response.headers.get('Set-Cookie')
if set_cookie:
jsessionid = set_cookie.split(';')[0]
self.logger.info(f"成功获取JSESSIONID: {jsessionid}")
return jsessionid
else:
self.logger.warning("响应头中未找到Set-Cookie")
else:
self.logger.error(f"获取JSESSIONID失败,HTTP状态码: {response.status_code}")
except requests.exceptions.RequestException as e:
self.logger.error(f"网络请求异常: {e}")
except Exception as e:
self.logger.error(f"获取JSESSIONID时发生未知错误: {e}")
# 返回默认的JSESSIONID作为备用
default_jsessionid = "JSESSIONID=71533C0C9E6F10B43BE710E33837648C"
self.logger.warning(f"使用默认JSESSIONID: {default_jsessionid}")
return default_jsessionid
def get_checkstr(self, reader_no: str, table_no: str) -> str:
"""
生成校验字符串
Args:
reader_no (str): 读者编号(学号)
table_no (str): 桌子编号
Returns:
str: MD5校验字符串
Raises:
RuntimeError: 生成校验字符串时发生错误
"""
self.logger.debug(f"生成校验字符串,读者编号: {reader_no}, 桌子编号: {table_no}")
try:
# 获取当前时间并格式化
current_time = datetime.now().strftime('%Y%m%d%H%M')
# 构建参数字符串
param_string = f"{reader_no}GOLDLIBGDLIS{table_no}CHECKTABLE{current_time}"
# 生成MD5哈希
checkstr = hashlib.md5(param_string.encode('utf-8')).hexdigest()
self.logger.debug(f"成功生成校验字符串: {checkstr}")
return checkstr
except Exception as e:
self.logger.error(f"生成校验字符串时发生错误: {e}")
raise RuntimeError(f"生成校验字符串失败: {e}")
def update_json_file(self, filepath: str, data: Dict[str, Any]) -> bool:
"""
更新JSON文件
Args:
filepath (str): 文件路径
data (Dict[str, Any]): 要写入的数据
Returns:
bool: 更新是否成功
"""
self.logger.info(f"更新JSON文件: {filepath}")
try:
with open(filepath, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
# 清除缓存
if filepath in self._file_cache:
del self._file_cache[filepath]
if filepath in self._cache_timestamps:
del self._cache_timestamps[filepath]
self.logger.info(f"成功更新JSON文件: {filepath}")
return True
except Exception as e:
self.logger.error(f"更新JSON文件时发生错误: {e}")
return False
def clear_cache(self) -> None:
"""
清除文件缓存
"""
self._file_cache.clear()
self._cache_timestamps.clear()
self.logger.info("已清除文件缓存")
# =============================================================================
# 线程管理类
# =============================================================================
class ThreadManager:
"""
线程管理器
负责管理后台线程的创建、执行和结果处理,提供线程安全的操作接口。
"""
def __init__(self, max_threads: int = 10):
"""
初始化线程管理器
Args:
max_threads (int): 最大线程数量限制
"""
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
self.result_queue: queue.Queue = queue.Queue() # 用于存储线程的返回结果
self.threads: List[threading.Thread] = [] # 存储活动线程
self.lock = threading.RLock() # 使用可重入锁
self.max_threads = max_threads
self._shutdown = False
self.logger.info(f"线程管理器初始化完成,最大线程数: {max_threads}")
def start_thread(self, target: Callable, args: Tuple = (),
callback: Optional[Callable[[Any], None]] = None) -> bool:
"""
启动线程并将结果放入队列,线程结束后执行回调
Args:
target (Callable): 要在线程中执行的函数
args (Tuple): 传递给目标函数的参数
callback (Optional[Callable]): 线程完成后的回调函数
Returns:
bool: 线程是否成功启动
"""
if self._shutdown:
self.logger.warning("线程管理器已关闭,无法启动新线程")
return False
# 检查线程数量限制
with self.lock:
if len(self.threads) >= self.max_threads:
self.logger.warning(f"已达到最大线程数限制: {self.max_threads}")
return False
def thread_target():
"""线程目标函数"""
try:
self.logger.debug(f"线程开始执行: {target.__name__}")
result = target(*args) # 执行任务
self.logger.debug(f"线程执行完成: {target.__name__}")
self.result_queue.put((result, callback)) # 放入队列并带回调函数
except Exception as e:
self.logger.error(f"线程执行异常: {target.__name__}, 错误: {e}")
self.result_queue.put((None, callback)) # 异常时也放入队列
try:
# 创建并启动线程
thread = threading.Thread(target=thread_target, daemon=True,
name=f"Worker-{target.__name__}")
thread.start()
# 线程安全地添加到线程列表
with self.lock:
self.threads.append(thread)
self.logger.debug(f"成功启动线程: {target.__name__}")
return True
except Exception as e:
self.logger.error(f"启动线程失败: {target.__name__}, 错误: {e}")
return False
def check_results(self) -> int:
"""
从队列中取出结果并执行回调
Returns:
int: 处理的结果数量
"""
processed_count = 0
# 处理队列中的所有结果
while not self.result_queue.empty():
try:
result, callback = self.result_queue.get_nowait()
if callback:
try:
callback(result) # 执行回调处理结果
except Exception as e:
self.logger.error(f"回调函数执行异常: {e}")
processed_count += 1
except queue.Empty:
break
except Exception as e:
self.logger.error(f"处理结果时发生异常: {e}")
break
# 清理已结束的线程
with self.lock:
alive_threads = []
for thread in self.threads:
if thread.is_alive():
alive_threads.append(thread)
else:
self.logger.debug(f"线程已结束: {thread.name}")
self.threads = alive_threads
return processed_count
def get_thread_count(self) -> int:
"""
获取当前活跃线程数量
Returns:
int: 活跃线程数量
"""
with self.lock:
return len([t for t in self.threads if t.is_alive()])
def shutdown(self, timeout: float = 5.0) -> None:
"""
关闭线程管理器
Args:
timeout (float): 等待线程结束的超时时间(秒)
"""
self.logger.info("开始关闭线程管理器")
self._shutdown = True
# 等待所有线程结束
with self.lock:
for thread in self.threads:
if thread.is_alive():
thread.join(timeout=timeout)
if thread.is_alive():
self.logger.warning(f"线程未在超时时间内结束: {thread.name}")
self.logger.info("线程管理器已关闭")
# =============================================================================
# 主界面类
# =============================================================================
class MainInterface(ctk.CTk):
"""
主界面类
图书馆座位预约系统的主界面,提供用户交互界面和功能控制。
"""
def __init__(self):
"""初始化主界面"""
super().__init__()
# 初始化日志记录器
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
# 初始化业务类对象
self.util = Util()
self.signInOut = SignInOut()
self.booking = Booking()
self.thread_manager = ThreadManager(max_threads=5) # 限制最大线程数
self.reservation = ReservationSystem()
# 界面状态变量
self.tabel_pool: List[str] = [] # 座位池
self.tabel_point: int = 0 # 当前座位指针
self.is_running: bool = False # 定时任务运行状态
self.jsessionid: str = "" # 会话ID
# 线程锁
self.lock = threading.Lock()
self.rlock = threading.RLock()
# 字体设置
self.font_title = ctk.CTkFont(size=16, weight="bold")
self.font_label = ctk.CTkFont(size=16)
self.font_entry = ctk.CTkFont(size=16)
self.font_button = ctk.CTkFont(size=16)
self.font_text = ctk.CTkFont(size=16)
self.tk_list_font = tkfont.Font(size=16)
# 初始化界面
self._initialize_ui()
# 加载配置数据
self._load_configuration()
# 启动线程结果检查
self.after(100, self.process_thread_results)
self.logger.info("主界面初始化完成")
def _initialize_ui(self) -> None:
"""初始化用户界面"""
self.logger.debug("开始初始化用户界面")
self.set_widget()
self.logger.debug("用户界面初始化完成")
def ask_string(self, title: str, prompt: str) -> str:
"""使用customtkinter弹出输入框"""
dialog = ctk.CTkInputDialog(title=title, text=prompt)
return dialog.get_input()
def show_warning(self, title: str, message: str) -> None:
"""使用customtkinter显示警告对话框"""
win = ctk.CTkToplevel(self)
win.title(title)
win.transient(self)
win.grab_set()
ctk.CTkLabel(win, text=message, wraplength=260, justify="left").pack(padx=20, pady=20)
ctk.CTkButton(win, text="确定", width=80, command=win.destroy).pack(pady=10)
self.wait_window(win)
def _load_configuration(self) -> None:
"""加载配置数据"""
self.logger.debug("开始加载配置数据")
# 加载配置文件
config_data = self.util.read_json_file(sigin_file_path)
self.fill_controls(config_data)
# 生成并显示校验字符串
if self.tabel_pool:
checkstr = self.util.get_checkstr(
self.readerno_entry.get(),
self.tabel_pool[self.tabel_point]
)
self.write_checkstr(checkstr)
# 获取会话ID
self.jsessionid = self.util.get_jsessionid()
self.write_log(f"初始化获取jsessionid:{self.jsessionid}")
self.logger.debug("配置数据加载完成")
def set_widget(self):
self.title("签到签退定时预约脚本")
self.resizable(True, True)
# self.wm_attributes("-toolwindow", True)
self.geometry(self.set_screen(650, 500))
# Root grid weights for responsiveness
for c in range(2):
self.grid_columnconfigure(c, weight=1)
for r in range(8):
self.grid_rowconfigure(r, weight=0)
self.grid_rowconfigure(1, weight=1)
self.grid_rowconfigure(2, weight=1)
self.grid_rowconfigure(7, weight=1)
info_frame = LabelFrame(self, width=400, height=200, text="基本信息", font=self.font_label)
info_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")
info_frame.grid_columnconfigure(0, weight=0)
info_frame.grid_columnconfigure(1, weight=1)
ctk.CTkLabel(info_frame, text="学号:", font=self.font_label).grid(row=0, column=0, padx=10, pady=5, sticky="e")
self.readerno_entry = ctk.CTkEntry(info_frame, font=self.font_entry)
self.readerno_entry.grid(row=0, column=1, padx=10, pady=5, sticky="ew")
ctk.CTkLabel(info_frame, text="密码:", font=self.font_label).grid(row=1, column=0, padx=10, pady=5, sticky="e")
self.password_entry = ctk.CTkEntry(info_frame, font=self.font_entry)
self.password_entry.grid(row=1, column=1, padx=10, pady=5, sticky="ew")
time_frame = LabelFrame(self, width=400, height=200, text="签到信息", font=self.font_label)
time_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
for c in range(4):
time_frame.grid_columnconfigure(c, weight=1)
ctk.CTkLabel(time_frame, text="签到1:", font=self.font_label).grid(row=0, column=0, pady=5, sticky="e")
self.time1_entry = ctk.CTkEntry(time_frame, font=self.font_entry)
self.time1_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
ctk.CTkLabel(time_frame, text="签到2:", font=self.font_label).grid(row=1, column=0, pady=5, sticky="e")
self.time2_entry = ctk.CTkEntry(time_frame, font=self.font_entry)
self.time2_entry.grid(row=1, column=1, padx=5, pady=5, sticky="ew")
ctk.CTkLabel(time_frame, text="签退1:", font=self.font_label).grid(row=0, column=2, pady=5, sticky="e")
self.time3_entry = ctk.CTkEntry(time_frame, font=self.font_entry)
self.time3_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
ctk.CTkLabel(time_frame, text="签退2:", font=self.font_label).grid(row=1, column=2, pady=5, sticky="e")
self.time4_entry = ctk.CTkEntry(time_frame, font=self.font_entry)
self.time4_entry.grid(row=1, column=3, padx=5, pady=5, sticky="ew")
checkstr_frame = LabelFrame(self, width=400, height=200, text="校验码", font=self.font_label)
checkstr_frame.grid(row=1, column=0, rowspan=2, padx=5, pady=5, sticky="nsew")
checkstr_frame.grid_rowconfigure(0, weight=1)
checkstr_frame.grid_columnconfigure(0, weight=1)
self.checkstr_entry = ctk.CTkTextbox(checkstr_frame, state='disabled', font=self.font_text)
self.checkstr_entry.grid(row=0, column=0, padx=10, pady=6, sticky="nsew")
res_frame = LabelFrame(self, width=400, height=300, text="座位池信息", font=self.font_label)
res_frame.grid(row=1, column=1, rowspan=2, columnspan=2, padx=5, pady=5, sticky="nsew")
for c in range(3):
res_frame.grid_columnconfigure(c, weight=1)
for r in range(3):
res_frame.grid_rowconfigure(r, weight=1)
self.tabel_pool_list = Listbox(res_frame, bd=0, highlightthickness=0, relief='flat')
try:
self.tabel_pool_list.configure(font=self.tk_list_font)
except Exception:
pass
self.tabel_pool_list.grid(row=0, column=0, rowspan=3, padx=10, pady=5, sticky="nsew")
def add_tabel():
tabel_no = self.ask_string("获取座位号", "请输入新的座位号:")
if tabel_no == "" or tabel_no is None:
return
if self.util.get_table_info(tabel_no, table_file_path) is None:
self.show_warning("警告", "该座位不存在!")
return
if tabel_no in self.tabel_pool_list.get(0, 'end'):
self.show_warning("警告", "该座位号已存在!")
return
self.tabel_pool_list.insert(self.tabel_pool_list.size(), tabel_no)
add_button = ctk.CTkButton(res_frame, text="添加", command=add_tabel, font=self.font_button)
add_button.grid(row=0, column=1, padx=6, pady=5, sticky="ew")
def insert_tabel():
tabel_no = self.ask_string("获取座位号", "请输入新的座位号:")
if tabel_no == "" or tabel_no is None:
return
if self.util.get_table_info(tabel_no, table_file_path) is None:
self.show_warning("警告", "该座位不存在!")
return
if tabel_no in self.tabel_pool_list.get(0, 'end'):
self.show_warning("警告", "该座位号已存在!")
return
if self.tabel_pool_list.curselection() != ():
self.tabel_pool_list.insert(self.tabel_pool_list.curselection(), tabel_no)
else:
self.tabel_pool_list.insert(0, tabel_no)
insert_button = ctk.CTkButton(res_frame, text="插入", command=insert_tabel, font=self.font_button)
insert_button.grid(row=1, column=1, padx=6, pady=5, sticky="ew")
def del_tabel():
if self.tabel_pool_list.curselection() != ():
self.tabel_pool_list.delete(self.tabel_pool_list.curselection())
del_button = ctk.CTkButton(res_frame, text="删除", command=del_tabel, font=self.font_button)
del_button.grid(row=0, column=2, padx=6, pady=5, sticky="ew")
clear_button = ctk.CTkButton(res_frame, text="清空",
command=lambda: self.tabel_pool_list.delete(0, 'end'), font=self.font_button)
clear_button.grid(row=1, column=2, padx=6, pady=5, sticky="ew")
btn_frame = LabelFrame(self, width=400, height=200, text="执行专区", font=self.font_button)
btn_frame.grid(row=6, column=0, columnspan=2, padx=5, pady=5, sticky="ew")
for c in range(7):
btn_frame.grid_columnconfigure(c, weight=1)
self.start_button = ctk.CTkButton(btn_frame, text="定时运行", height=36,
command=self.toggle_task, font=self.font_button)
self.start_button.grid(row=0, column=0, padx=4, pady=11, sticky="ew")
self.start_button.configure(text_color="#33CC33")
self.signin_button = ctk.CTkButton(btn_frame, text="立即签到", height=36,
command=self.signin_now, font=self.font_button)
self.signin_button.grid(row=0, column=1, padx=4, pady=11, sticky="ew")
self.signin_button = ctk.CTkButton(btn_frame, text="立即签退", height=36,
command=self.signout_now, font=self.font_button)
self.signin_button.grid(row=0, column=2, padx=4, pady=11, sticky="ew")
def booking_morning():
if self.is_running:
self.write_log("请先点击停止···")
return None
self.update_all_info(sigin_file_path)
self.thread_manager.start_thread(self.execute_booking_plus, args=(False, True))
self.booking_morning_button = ctk.CTkButton(btn_frame, text="预约早上", height=36,
command=booking_morning, font=self.font_button)
self.booking_morning_button.grid(row=0, column=3, padx=4, pady=11, sticky="ew")
def booking_afternoon():
if self.is_running:
self.write_log("请先点击停止···")
return None
self.update_all_info(sigin_file_path)
self.thread_manager.start_thread(self.execute_booking_plus, args=(True, True))
self.booking_afternoon_button = ctk.CTkButton(btn_frame, text="预约下午", height=36,
command=booking_afternoon, font=self.font_button)
self.booking_afternoon_button.grid(row=0, column=4, padx=4, pady=11, sticky="ew")
self.cancel_booking_button = ctk.CTkButton(btn_frame, text="取消预约", height=36,
command=self.booking_cancel, font=self.font_button)
self.cancel_booking_button.grid(row=0, column=5, padx=4, pady=11, sticky="ew")
self.clear_outbox_button = ctk.CTkButton(btn_frame, text="清空输出", height=36,
command=self.clear_output_box, font=self.font_button)
self.clear_outbox_button.grid(row=0, column=6, padx=4, pady=11, sticky="ew")
self.output_box = ctk.CTkTextbox(self, state='disabled', font=self.font_text)
self.output_box.grid(row=7, column=0, columnspan=2, padx=10, pady=10, sticky="nsew")
def clear_output_box(self):
self.output_box.configure(state='normal') # 允许编辑
self.output_box.delete('1.0', 'end') # 清空内容
self.output_box.configure(state='disabled') # 重新禁用编辑
# 设置界面大小与位置
def set_screen(self, width, height):
screen_width = self.winfo_screenwidth()
screen_height = self.winfo_screenheight()
window_size = '%dx%d+%d+%d' % (width, height, (screen_width - width) / 2, (screen_height - height) / 2)
return window_size
# 分别填充到各个 Entry 控件中
def fill_controls(self, data):
self.readerno_entry.delete(0, 'end')
self.readerno_entry.insert(0, data.get("readerno", "")) # readerno
self.password_entry.insert(0, data.get("password", "")) # password
self.time1_entry.insert(0, data.get("sigin_time1", "")) # signin_time1
self.time2_entry.insert(0, data.get("sigin_time2", "")) # signin_time2
self.time3_entry.insert(0, data.get("signout_time1", "")) # signout_time1
self.time4_entry.insert(0, data.get("signout_time2", "")) # signout_time2
self.tabel_pool = data.get("tables", [])
for item in self.tabel_pool:
self.tabel_pool_list.insert('end', item)
def update_all_info(self, filepath):
logging.info("更新所有信息至文件: %s", filepath)
data = self.util.read_json_file(filepath)
if data:
data["readerno"] = self.readerno_entry.get()
data["password"] = self.password_entry.get()
data["sigin_time1"] = self.time1_entry.get()
data["sigin_time2"] = self.time2_entry.get()
data["signout_time1"] = self.time3_entry.get()
data["signout_time2"] = self.time4_entry.get()
self.tabel_pool.clear()
self.tabel_pool = list(self.tabel_pool_list.get(0, 'end'))
data["tables"] = self.tabel_pool
self.util.update_json_file(filepath, data)
logging.info("成功更新所有信息至文件")
self.write_log("成功更新所有信息至文件!")
else:
logging.warning("未能从文件中读取到数据")
self.write_log("未能从文件中读取到数据!")
def write_checkstr(self, checkstr) -> str:
self.checkstr_entry.configure(state='normal')
self.checkstr_entry.delete('1.0', 'end')
self.checkstr_entry.insert('end', checkstr)
self.checkstr_entry.configure(state='disabled')
return checkstr
# 在文本框中输出日志
def write_log(self, message):
self.output_box.configure(state='normal')
self.output_box.insert('end', message + "\n")
self.output_box.see('end')
self.output_box.configure(state='disabled')
def process_thread_results(self):
"""定时检查线程管理器的队列,处理回调"""
self.thread_manager.check_results()
self.after(100, self.process_thread_results) # 循环检查
def booking_cancel(self):
if self.is_running:
self.write_log("请先点击停止···")
return None
self.update_all_info(sigin_file_path)
self.thread_manager.start_thread(self.reservation.cancel_reservation,
args=(self.jsessionid,
self.readerno_entry.get(),
self.password_entry.get()),
callback=self.booking_cancel_callback
)
def booking_cancel_callback(self, result):
if isinstance(result, str):
self.write_log(result)
elif result:
self.write_log("已将预约信息取消···")
else:
self.write_log("具体问题请查看日志···")
def signin_now(self):
if self.is_running:
self.write_log("请先点击停止···")
return None
self.update_all_info(sigin_file_path)
readerno = self.readerno_entry.get()
password = self.password_entry.get()
self.jsessionid = self.util.get_jsessionid()
if self.tabel_pool_list.curselection() != ():
tableno = self.tabel_pool[self.tabel_pool_list.curselection()[0]]
else:
tableno = self.tabel_pool[self.tabel_point]
checkstr = self.util.get_checkstr(readerno, tableno)
self.write_log(f"开始签到,座位: {tableno}")
# 启动签到线程,并传递结果回调方法
self.thread_manager.start_thread(
target=self.signInOut.send_signin_req,
args=(readerno, password, tableno, self.jsessionid, checkstr),
callback=self.signin_callback # 签到结束后的回调
)
def signin_callback(self, result):
"""签到回调函数,处理签到结果"""
self.write_log(f"签到结果: {result}")
def signout_now(self):
if self.is_running:
self.write_log("请先点击停止···")
return None
self.update_all_info(sigin_file_path)
readerno = self.readerno_entry.get()
password = self.password_entry.get()
self.write_log(f"开始签退,学号: {readerno}")
# 启动签退线程,并传递结果回调方法
self.thread_manager.start_thread(
target=self.signInOut.send_signout_req,
args=(readerno, password, self.jsessionid),
callback=self.signout_callback # 签退结束后的回调
)
def signout_callback(self, result):
"""签退回调函数,处理签退结果"""
self.write_log(f"签退结果: {result}")
def toggle_task(self):
# 切换任务状态
if not self.is_running:
self.clear_output_box()
self.start_scheduled_tasks()
self.start_button.configure(text="停止运行", text_color="#ff0000") # 更新按钮文本为"停止"
self.is_running = True
else:
self.is_running = False
self.start_button.configure(text="定时运行", text_color="#33CC33") # 恢复按钮文本为"开始"
# 开始多线程任务
def start_scheduled_tasks(self):
self.update_all_info(sigin_file_path)
# 读取输入的基本信息
readerno = self.readerno_entry.get()
password = self.password_entry.get()
time1 = self.time1_entry.get()
time2 = self.time2_entry.get()
time3 = self.time3_entry.get()
time4 = self.time4_entry.get()
# 启动自动运行线程,并传递结果回调方法
self.thread_manager.start_thread(
target=self.schedule_tasks,
args=(readerno, password, time1, time2, time3, time4),
)
def shuffle_seat(self):
random.shuffle(self.tabel_pool)
self.tabel_pool_list.delete(0, 'end')
for item in self.tabel_pool:
self.tabel_pool_list.insert('end', item)
self.update_all_info(sigin_file_path)
self.write_log(f"座位打乱:{self.tabel_pool}")
def schedule_tasks(self, readerno, password, time1, time2, time3, time4):
with self.lock:
schedule.clear()
# 定时任务调度
schedule.every().day.at(time1).do(self.schedule_task, readerno, password, time1)
schedule.every().day.at(time2).do(self.schedule_task, readerno, password, time2)
schedule.every().day.at(time3).do(self.schedule_task, readerno, password, time3, signout=True)
schedule.every().day.at(time4).do(self.schedule_task, readerno, password, time4, signout=True)
schedule.every().day.at("17:00:00").do(self.execute_booking_plus, False)
today_booking = "20:" + readerno[-2:] + ":00"
# today_booking = "17:46:30" #测试项
schedule.every().day.at(today_booking).do(self.execute_booking_plus, True)
schedule.every().day.at("23:00:00").do(self.clear_output_box)
schedule.every().day.at("23:30:00").do(self.shuffle_seat)
self.write_log("任务调度正在运行...")
while self.is_running:
try:
schedule.run_pending()
except Exception as e:
logging.error(f"调度器运行时出现错误: {e}")
t.sleep(1)
self.write_log("任务调度结束运行...")
def schedule_task(self, readerno, password, time, signout=False):
"""辅助函数用于签到或签退的定时任务执行"""
if signout:
self.write_log(f"定时签退任务执行中: {time}")
signout_info = self.signInOut.send_signout_req(readerno, password, self.jsessionid)
self.write_log(f"签退任务执行结果: {signout_info}")
else:
self.write_log(f"定时签到任务执行中: {time}")
sigin_info = ""
reservationData = self.reservation.get_reservation_list(self.jsessionid, readerno)
if not reservationData:
self.write_log("当前无可签到座位信息...")
logging.info("未获取到预约信息列表...")
return None
for item in reservationData:
checkstr = self.util.get_checkstr(readerno, item['TableNo'])
sigin_info = self.signInOut.send_signin_req(readerno, password, item['TableNo'], self.jsessionid,
checkstr)
if sigin_info == "签到成功":
break
self.write_log(f"签到任务执行结果: {sigin_info}")
logging.info("任务 %s 执行完毕: %s", "签到" if not signout else "签退", time)
# 刷新座位信息
def refresh_seat_info(self, point):
self.tabel_pool[0], self.tabel_pool[point] = self.tabel_pool[point], self.tabel_pool[0]
self.tabel_point = 0
self.tabel_pool_list.delete(0, 'end')
for item in self.tabel_pool:
self.tabel_pool_list.insert('end', item)
def update_seat_info(self, table_no, now_table_no):
if table_no == now_table_no:
self.write_log(f"已预约成功:{table_no}")
logging.info(f"已预约成功:{table_no}")
if self.tabel_point != 0:
self.refresh_seat_info(self.tabel_point)
else:
self.write_log(f"已有座位信息: {now_table_no}")
logging.info(f"已有座位信息: {now_table_no}")
table_pool_now = list(self.tabel_pool_list.get(0, 'end'))
if now_table_no in table_pool_now:
if self.tabel_point != 0:
self.refresh_seat_info(table_pool_now.index(now_table_no))
else:
self.tabel_pool_list.insert(0, now_table_no)
self.update_all_info(sigin_file_path)
def execute_booking_plus(self, e_time, manual_operation=False):
# 获取信息
reader_no = self.readerno_entry.get()
begin_time, end_time = self.get_booking_times(e_time)
time_count = 1
max_retries = 10
# 获取预约时间段
c_time = datetime.now().strftime("%H:%M:%S")
self.write_log(f"执行预约操作,时间:{c_time}")
# 进行预约
while True:
table_no = self.tabel_pool[self.tabel_point]
table_id = self.util.get_table_info(table_no, table_file_path)
room_no = int(table_no[0])
room_id = self.util.get_room_no(room_no)
check_str = self.util.get_checkstr(reader_no, table_no)
bask_id = self.booking.get_bask_id(baskid_file_path, "27" if e_time else room_id)
result = self.booking.make_booking(
self.jsessionid,
begin_time,
int(bask_id),
check_str,
end_time,
reader_no,
int(room_id),
int(table_id),
table_no
)
if not result:
self.write_log("预约请求出现异常,重新执行...")
logging.info("预约请求出现异常,重新执行...")
if time_count > max_retries:
self.write_log("预约请求达到上限···")
return None
time_count += 1
continue
if result[0]:
if "预约成功" not in result[1] and self.tabel_point != 0:
self.tabel_point -= 1
if self.tabel_point != 0:
self.refresh_seat_info(self.tabel_point)
self.write_log(f"{result[1]}:{table_no}")
return True
else:
# False, False
if not result[0] and not result[1]:
self.write_log("读者没有预约权限, 等待30s重新发送请求···")
t.sleep(30)
continue
# False, 502, 504
if result[1] in (502, 504):
data = self.reservation.get_reservation_list(self.jsessionid, reader_no)
if not data:
self.tabel_point += 1
if self.tabel_point >= len(self.tabel_pool):
self.tabel_point = 0
self.write_log("新一轮预约···")
logging.info("新一轮预约···")
logging.info(f"移动到下一个座位: {self.tabel_pool[self.tabel_point]}")
self.write_log(f"移动到下一个座位: {self.tabel_pool[self.tabel_point]}")
continue
self.update_seat_info(table_no, data[0]['TableNo'])
return True
# 获取返回时间判断
sleep_time_m = self.get_remaining_seconds(result[1])
# False, msg
if sleep_time_m is None:
if not e_time:
data = self.reservation.get_reservation_list(self.jsessionid, reader_no)
if data:
self.update_seat_info(table_no, data[0]['TableNo'])
return True
self.write_log(f"{result[1]}:{table_no}")
self.tabel_point += 1
if self.tabel_point >= len(self.tabel_pool):
self.tabel_point = 0
self.write_log("无可预约座位···")
logging.error("无可预约座位···")
return None
logging.info(f"移动到下一个座位: {self.tabel_pool[self.tabel_point]}")
self.write_log(f"移动到下一个座位: {self.tabel_pool[self.tabel_point]}")
else:
if manual_operation: # 手动只显示不执行
self.write_log(result[1])
return None
if time_count > max_retries:
self.write_log("运行达到上限···")
return None
time_count += 1
self.write_log(f"等待{sleep_time_m}秒后重新执行")
logging.info(f"等待{sleep_time_m}秒后重新执行")
t.sleep(sleep_time_m)
t.sleep(1) # 容错时间
def get_remaining_seconds(self, text):
# 使用正则表达式提取时间
match = re.search(r"\d{2}:\d{2}:\d{2}", text)
# 如果找不到时间格式,则返回提示
if not match:
return None
# 提取时间字符串并转换为 datetime 对象
time_str = match.group()
current_time = datetime.strptime(time_str, "%H:%M:%S")
# 目标时间 17:00:00
target_time = current_time.replace(hour=17, minute=0, second=0)
# 计算剩余时间
time_difference = target_time - current_time
remaining_seconds = int(time_difference.total_seconds())
# 如果已超过17点,返回0秒
return max(remaining_seconds, 0)
def get_booking_times(self, e_time):
"""返回开始和结束时间"""
if e_time:
return "13:50:01", "22:30:00" # 示例下午时段
else:
return "07:00:00", "13:50:00" # 示例上午时段
# =============================================================================
# 配置验证和主程序入口
# =============================================================================
def validate_config_file(file_path: str, required_fields: Optional[List[str]] = None) -> bool:
"""
验证配置文件
Args:
file_path (str): 配置文件路径
required_fields (Optional[List[str]]): 必需的字段列表
Returns:
bool: 配置文件是否有效
"""
logger = logging.getLogger(__name__)
# 检查文件是否存在
if not os.path.isfile(file_path):
logger.error(f"配置文件不存在: {file_path}")
return False
try:
# 读取并验证JSON格式
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 验证必需字段
if required_fields:
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
logger.error(f"配置文件 {file_path} 缺少必需字段: {missing_fields}")
return False
logger.info(f"配置文件验证通过: {file_path}")
return True
except json.JSONDecodeError as e:
logger.error(f"配置文件JSON格式错误: {file_path}, 错误: {e}")
return False
except Exception as e:
logger.error(f"验证配置文件时发生错误: {file_path}, 错误: {e}")
return False
def validate_all_configs() -> bool:
"""
验证所有配置文件
Returns:
bool: 所有配置文件是否都有效
"""
logger = logging.getLogger(__name__)
logger.info("开始验证配置文件")
# 获取当前目录
current_directory = os.getcwd()
# 配置文件路径和必需字段
config_files = {
os.path.join(current_directory, "sigin_info.json"): [
"readerno", "password", "sigin_time1", "sigin_time2",
"signout_time1", "signout_time2", "tables"
],
os.path.join(current_directory, 'table_info.json'): [],
os.path.join(current_directory, 'bask_id.json'): ["morning", "afternoon"]
}
all_valid = True
for file_path, required_fields in config_files.items():
if not validate_config_file(file_path, required_fields):
all_valid = False
# 显示错误对话框
messagebox.showerror(
"配置错误",
f"配置文件验证失败:\n{file_path}\n\n请检查配置文件格式和必需字段。"
)
if all_valid:
logger.info("所有配置文件验证通过")
else:
logger.error("配置文件验证失败")
return all_valid
def create_default_configs() -> None:
"""
创建默认配置文件(如果不存在)
"""
logger = logging.getLogger(__name__)
current_directory = os.getcwd()
# 默认配置
default_configs = {
"sigin_info.json": {
"readerno": "",
"password": "",
"sigin_time1": "15:06:00",
"sigin_time2": "14:00:00",
"signout_time1": "13:40:00",
"signout_time2": "22:20:00",
"tables": ["2-126", "2-128"]
}
}
for filename, config in default_configs.items():
file_path = os.path.join(current_directory, filename)
if not os.path.exists(file_path):
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=4)
logger.info(f"创建默认配置文件: {file_path}")
except Exception as e:
logger.error(f"创建默认配置文件失败: {file_path}, 错误: {e}")
def main():
"""
主程序入口
"""
global sigin_file_path, table_file_path, baskid_file_path
logger = logging.getLogger(__name__)
logger.info("程序启动")
try:
# 设置全局文件路径
current_directory = os.getcwd()
sigin_file_path = os.path.join(current_directory, "sigin_info.json")
table_file_path = os.path.join(current_directory, 'table_info.json')
baskid_file_path = os.path.join(current_directory, 'bask_id.json')
# 创建默认配置文件(如果需要)
create_default_configs()
# 验证配置文件
if not validate_all_configs():
logger.error("配置文件验证失败,程序退出")
sys.exit(1)
# 启动主界面
logger.info("启动主界面")
app = MainInterface()
app.mainloop()
except KeyboardInterrupt:
logger.info("用户中断程序")
except Exception as e:
logger.error(f"程序运行时发生错误: {e}")
messagebox.showerror("程序错误", f"程序运行时发生错误:\n{e}")
finally:
logger.info("程序结束")
if __name__ == '__main__':
main()
[/content_hide]
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
暂无评论内容