diff --git a/main.py b/main.py index 73e2ecf..de96647 100644 --- a/main.py +++ b/main.py @@ -15,6 +15,9 @@ def main(): App.did_start() App.run_check() + Query.check_before_fun() + + ####### 运行任务 User.run() Query.run() if not Const.IS_TEST: diff --git a/py12306/helpers/event.py b/py12306/helpers/event.py index 595ced7..0e43e56 100644 --- a/py12306/helpers/event.py +++ b/py12306/helpers/event.py @@ -27,13 +27,15 @@ class Event(): job.destroy() def user_loaded(self, data={}, callback=False): # 用户初始化完成 - from py12306.query.query import Query if Config().is_cluster_enabled() and not callback: return self.cluster.publish_event(self.KEY_USER_LOADED, data) # 通知其它节点退出 + from py12306.query.query import Query - query_job = Query.job_by_account_key(data.get('key')) - if query_job and Config().is_master(): - create_thread_and_run(query_job, 'check_passengers', Const.IS_TEST) # 检查乘客信息 防止提交订单时才检查 + if not Config().is_cluster_enabled() or Config().is_master(): + query = Query.wait_for_ready() + for job in query.jobs: + if job.account_key == data.get('key'): + create_thread_and_run(job, 'check_passengers', Const.IS_TEST) # 检查乘客信息 防止提交订单时才检查 def user_job_destroy(self, data={}, callback=False): from py12306.user.user import User diff --git a/py12306/log/query_log.py b/py12306/log/query_log.py index 5bf96ec..bb64882 100644 --- a/py12306/log/query_log.py +++ b/py12306/log/query_log.py @@ -29,7 +29,7 @@ class QueryLog(BaseLog): MESSAGE_QUERY_LOG_OF_TRAIN_INFO = '{} {}' MESSAGE_QUERY_START_BY_DATE = '出发日期 {}: {} - {}' - MESSAGE_JOBS_DID_CHANGED = '\n任务已更新,正在重新加载...' + MESSAGE_JOBS_DID_CHANGED = '任务已更新,正在重新加载...\n' MESSAGE_SKIP_ORDER = '跳过本次请求,节点 {} 用户 {} 正在处理该订单\n' diff --git a/py12306/log/user_log.py b/py12306/log/user_log.py index 67d4c4e..9234725 100644 --- a/py12306/log/user_log.py +++ b/py12306/log/user_log.py @@ -20,7 +20,7 @@ class UserLog(BaseLog): MESSAGE_USER_HEARTBEAT_NORMAL = '用户 {} 心跳正常,下次检测 {} 秒后' MESSAGE_GET_USER_PASSENGERS_FAIL = '获取用户乘客列表失败,错误原因: {} {} 秒后重试' - MESSAGE_USER_PASSENGERS_IS_INVALID = '乘客信息校验失败,在账号 {} 中未找到该乘客: {}' + MESSAGE_USER_PASSENGERS_IS_INVALID = '乘客信息校验失败,在账号 {} 中未找到该乘客: {}\n' # MESSAGE_WAIT_USER_INIT_COMPLETE = '未找到可用账号或用户正在初始化,{} 秒后重试' @@ -45,14 +45,14 @@ class UserLog(BaseLog): :return: """ self = cls() - self.add_quick_log('# 发现 {} 个用户 #'.format(len(users))) + self.add_quick_log('# 发现 {} 个用户 #\n'.format(len(users))) self.flush() return self @classmethod def print_welcome_user(cls, user): self = cls() - self.add_quick_log('# 欢迎回来,{} #'.format(user.get_name())) + self.add_quick_log('# 欢迎回来,{} #\n'.format(user.get_name())) self.flush() return self @@ -67,6 +67,6 @@ class UserLog(BaseLog): def print_user_passenger_init_success(cls, passengers): self = cls() result = [passenger.get('name') + '(' + passenger.get('type_text') + ')' for passenger in passengers] - self.add_quick_log('# 乘客验证成功 {} #'.format(', '.join(result))) + self.add_quick_log('# 乘客验证成功 {} #\n'.format(', '.join(result))) self.flush() return self diff --git a/py12306/query/query.py b/py12306/query/query.py index 9039f28..0b473da 100644 --- a/py12306/query/query.py +++ b/py12306/query/query.py @@ -23,6 +23,7 @@ class Query: is_in_thread = False retry_time = 3 + is_ready = False def __init__(self): self.session = Request() @@ -48,9 +49,14 @@ class Query: self.start() pass + @classmethod + def check_before_fun(cls): + self = cls() + self.init_jobs() + self.is_ready = True + def start(self): # return # DEBUG - self.init_jobs() QueryLog.init_data() stay_second(3) # 多线程 @@ -59,6 +65,7 @@ class Query: if not self.is_in_thread: self.is_in_thread = True create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST) + if Const.IS_TEST: return stay_second(self.retry_time) else: if not self.jobs: break @@ -105,6 +112,13 @@ class Query: self.jobs.append(job) return job + @classmethod + def wait_for_ready(cls): + self = cls() + if self.is_ready: return self + stay_second(self.retry_time) + return self.wait_for_ready() + @classmethod def job_by_name(cls, name) -> Job: self = cls() diff --git a/py12306/user/job.py b/py12306/user/job.py index e692dcf..e24cc28 100644 --- a/py12306/user/job.py +++ b/py12306/user/job.py @@ -26,6 +26,7 @@ class UserJob: info = {} # 用户信息 last_heartbeat = None is_ready = False + user_loaded = False # 用户是否已加载成功 passengers = [] retry_time = 3 @@ -47,7 +48,6 @@ class UserJob: self.key = str(info.get('key')) self.user_name = info.get('user_name') self.password = info.get('password') - self.update_user() def update_user(self): from py12306.user.user import User @@ -57,6 +57,7 @@ class UserJob: def run(self): # load user + self.update_user() self.start() def start(self): @@ -85,6 +86,7 @@ class UserJob: if not self.handle_login(): return self.is_ready = True + self.user_did_load() message = UserLog.MESSAGE_USER_HEARTBEAT_NORMAL.format(self.get_name(), Config().USER_HEARTBEAT_INTERVAL) if not Config.is_cluster_enabled(): UserLog.add_quick_log(message).flush() @@ -212,12 +214,21 @@ class UserJob: UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER.format(self.user_name)).flush() if self.check_user_is_login() and self.get_user_info(): UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_SUCCESS.format(self.user_name)).flush() - Event().user_loaded({'key': self.key}) # 发布通知 self.is_ready = True UserLog.print_welcome_user(self) + self.user_did_load() else: UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_BUT_EXPIRED).flush() + def user_did_load(self): + """ + 用户已经加载成功 + :return: + """ + if self.user_loaded: return + self.user_loaded = True + Event().user_loaded({'key': self.key}) # 发布通知 + def get_user_info(self): response = self.session.get(API_USER_INFO.get('url')) result = response.json()