diff --git a/py12306/app.py b/py12306/app.py index bfa356b..df3450e 100644 --- a/py12306/app.py +++ b/py12306/app.py @@ -24,7 +24,7 @@ def app_available_check(): class App: """ 程序主类 - TODO 需要完善 + TODO 代码需要优化 """ @classmethod diff --git a/py12306/cluster/cluster.py b/py12306/cluster/cluster.py index 6bf5a96..469737a 100644 --- a/py12306/cluster/cluster.py +++ b/py12306/cluster/cluster.py @@ -1,3 +1,4 @@ +import json import os import pickle import sys @@ -19,12 +20,19 @@ class Cluster(): KEY_CONFIGS = 'configs' KEY_NODES = 'nodes' KEY_CHANNEL_LOG = 'channel_log' + KEY_CHANNEL_EVENT = 'channel_even' KEY_USER_COOKIES = 'user_cookies' KEY_USER_LAST_HEARTBEAT = 'user_last_heartbeat' KEY_NODES_ALIVE = 'nodes_alive' - KEY_LOCK_INIT_USER = 'lock_init_user' - # if self.cluster.get_lock(Cluster.KEY_LOCK_INIT_USER, self.lock_init_user_time): # TODO 未判断 失败重试 + # 锁 + KEY_LOCK_INIT_USER = 'lock_init_user' # 暂未使用 + KEY_LOCK_DO_ORDER = 'lock_do_order' # 订单锁 + lock_do_order_time = 60 * 1 # 订单锁超时时间 + + # 事件 + KEY_EVENT_JOB_DESTROY = 'job_destroy' + KEY_EVENT_USER_LOADED = 'user_loaded' KEY_MASTER = 1 KEY_SLAVE = 0 @@ -54,7 +62,7 @@ class Cluster(): def start(self): self.pubsub = self.session.pubsub() - self.pubsub.subscribe(self.KEY_CHANNEL_LOG) + self.pubsub.subscribe(self.KEY_CHANNEL_LOG, self.KEY_CHANNEL_EVENT) create_thread_and_run(self, 'subscribe', wait=False) self.is_ready = True self.get_nodes() # 提前获取节点列表 @@ -114,6 +122,14 @@ class Cluster(): message = ClusterLog.MESSAGE_SUBSCRIBE_NOTIFICATION.format(self.node_name, message) self.session.publish(self.KEY_CHANNEL_LOG, message) + def publish_event(self, name, data={}): + """ + 发布事件消息 + :return: + """ + data = {'event': name, 'data': data} + self.session.publish(self.KEY_CHANNEL_EVENT, json.dumps(data)) + def get_nodes(self) -> dict: res = self.session.hgetall(self.KEY_NODES) res = res if res else {} @@ -138,7 +154,7 @@ class Cluster(): :return: """ master = self.have_master() - if master == self.node_name: # 动态提升 + if master == self.node_name: # 动态提升 self.is_master = True else: self.is_master = False @@ -184,23 +200,48 @@ class Cluster(): while True: message = self.pubsub.get_message() if message: - if message.get('type') == 'message' and message.get('data'): + if message.get('type') == 'message' and message.get('channel') == self.KEY_CHANNEL_LOG and message.get( + 'data'): msg = message.get('data') if self.node_name: msg = msg.replace(ClusterLog.MESSAGE_SUBSCRIBE_NOTIFICATION_PREFIX.format(self.node_name), '') ClusterLog.add_quick_log(msg).flush(publish=False) + elif message.get('channel') == self.KEY_CHANNEL_EVENT: + create_thread_and_run(self, 'handle_events', args=(message,)) stay_second(self.refresh_channel_time) - def get_lock(self, key, timeout=1): + def handle_events(self, message): + # 这里应该分开处理,先都在这处理了 + if message.get('type') != 'message': return + result = json.loads(message.get('data', {})) + event_name = result.get('event') + data = result.get('data') + + from py12306.query.query import Query + from py12306.user.user import User + if event_name == self.KEY_EVENT_JOB_DESTROY: # 停止查询任务 + job = Query.job_by_name(data['name']) + if job: job.destroy() + elif event_name == self.KEY_EVENT_USER_LOADED: # 用户初始化完成 + query_job = Query.job_by_account_id(data['key']) + if query_job: + create_thread_and_run(query_job, 'check_passengers', Const.IS_TEST) # 检查乘客信息 防止提交订单时才检查 + + def get_lock(self, key, timeout=1, info={}): timeout = int(time.time()) + timeout res = self.session.setnx(key, timeout) if res: self.locks.append((key, timeout)) + if info: self.session.set_dict(key + '_info', info) # 存储额外信息 return True return False + def get_lock_info(self, key, default={}): + return self.session.get_dict(key + '_info', default=default) + def release_lock(self, key): self.session.delete(key) + self.session.delete(key + '_info') def check_locks(self): index = 0 diff --git a/py12306/cluster/redis.py b/py12306/cluster/redis.py index e449e48..2327e5d 100644 --- a/py12306/cluster/redis.py +++ b/py12306/cluster/redis.py @@ -51,8 +51,8 @@ class Redis(PyRedis): return self.set(name, pickle.dumps(value, 0).decode()) def get_pickle(self, name, default=None): - res = self.get(name).encode() - return pickle.loads(res) if res else default + res = self.get(name) + return pickle.loads(res.encode()) if res else default # def smembers(self, name, default=[]): # res = super().smembers(name) diff --git a/py12306/helpers/func.py b/py12306/helpers/func.py index 3cd7276..460e06e 100644 --- a/py12306/helpers/func.py +++ b/py12306/helpers/func.py @@ -8,8 +8,6 @@ from time import sleep from types import MethodType - - def singleton(cls): """ 将一个类作为单例 @@ -88,18 +86,20 @@ def current_thread_id(): def time_now(): return datetime.datetime.now() + def str_to_time(str): return datetime.datetime.strptime(str, '%Y-%m-%d %H:%M:%S.%f') + def time_int(): return int(time.time()) -def create_thread_and_run(jobs, callback_name, wait=True, daemon=True): +def create_thread_and_run(jobs, callback_name, wait=True, daemon=True, args=()): threads = [] if not isinstance(jobs, list): jobs = [jobs] for job in jobs: - thread = threading.Thread(target=getattr(job, callback_name)) + thread = threading.Thread(target=getattr(job, callback_name), args=args) thread.setDaemon(daemon) thread.start() threads.append(thread) @@ -118,6 +118,11 @@ def dict_find_key_by_value(data, value, default=None): return result.pop() if len(result) else default +def objects_find_object_by_key_value(objects, key, value, default=None): + result = [obj for obj in objects if getattr(obj, key) == value] + return result.pop() if len(result) else default + + def dict_count_key_num(data: dict, key, like=False): count = 0 for k in data.keys(): diff --git a/py12306/helpers/notification.py b/py12306/helpers/notification.py index c4e7c4d..f8189fa 100644 --- a/py12306/helpers/notification.py +++ b/py12306/helpers/notification.py @@ -1,5 +1,6 @@ import urllib +from py12306.config import Config from py12306.helpers.api import * from py12306.helpers.request import Request from py12306.log.common_log import CommonLog @@ -25,7 +26,7 @@ class Notification(): 购买地址 https://market.aliyun.com/products/57126001/cmapi019902.html?spm=5176.2020520132.101.5.37857218O6iJ3n :return: """ - appcode = config.NOTIFICATION_API_APP_CODE + appcode = Config().NOTIFICATION_API_APP_CODE if not appcode: CommonLog.add_quick_log(CommonLog.MESSAGE_EMPTY_APP_CODE).flush() return False @@ -60,4 +61,4 @@ class Notification(): if __name__ == '__main__': - Notification.voice_code('13800138000', '张三', '你的车票 广州 到 深圳 购买成功,请登录 12306 进行支付') + Notification.voice_code('13065667742', '张三', '你的车票 广州 到 深圳 购买成功,请登录 12306 进行支付') diff --git a/py12306/log/cluster_log.py b/py12306/log/cluster_log.py index 0f2000c..73690d5 100644 --- a/py12306/log/cluster_log.py +++ b/py12306/log/cluster_log.py @@ -25,7 +25,7 @@ class ClusterLog(BaseLog): MESSAGE_MASTER_NODE_ALREADY_RUN = '# 启动失败,主节点 {} 已经在运行中 #' MESSAGE_MASTER_NODE_NOT_FOUND = '# 启动失败,请先启动主节点 #' - MESSAGE_NODE_BECOME_MASTER_AGAIN = '# 节点 {} 已启动,已自动成功主节点 #' + MESSAGE_NODE_BECOME_MASTER_AGAIN = '# 节点 {} 已启动,已自动成为主节点 #' diff --git a/py12306/log/common_log.py b/py12306/log/common_log.py index acd2ddd..88f60e5 100644 --- a/py12306/log/common_log.py +++ b/py12306/log/common_log.py @@ -53,9 +53,17 @@ class CommonLog(BaseLog): disable = '未开启' self.add_quick_log('**** 当前配置 ****') self.add_quick_log('多线程查询: {}'.format(get_true_false_text(Config().QUERY_JOB_THREAD_ENABLED, enable, disable))) - self.add_quick_log('语音验证码: {}'.format(get_true_false_text(Config().NOTIFICATION_BY_VOICE_CODE, enable, disable))) + self.add_quick_log( + '语音验证码: {}'.format(get_true_false_text(Config().NOTIFICATION_BY_VOICE_CODE, enable, disable))) self.add_quick_log('查询间隔: {} 秒'.format(Config().QUERY_INTERVAL)) self.add_quick_log('用户心跳检测间隔: {} 秒'.format(Config().USER_HEARTBEAT_INTERVAL)) + if Config().is_cluster_enabled(): + from py12306.cluster.cluster import Cluster + self.add_quick_log('分布式查询: {}'.format(get_true_false_text(Config().is_cluster_enabled(), enable, enable))) + self.add_quick_log('节点名称: {}'.format(Cluster().node_name)) + self.add_quick_log('节点是否主节点: {}'.format(get_true_false_text(Config().is_master(), '是', '否'))) + self.add_quick_log( + '子节点提升为主节点: {}'.format(get_true_false_text(Config().NODE_SLAVE_CAN_BE_MASTER, enable, disable))) self.add_quick_log() self.flush() return self @@ -64,7 +72,7 @@ class CommonLog(BaseLog): def print_test_complete(cls): self = cls() self.add_quick_log('# 测试完成,请检查输出是否正确 #') - self.flush() + self.flush(publish=False) return self @classmethod diff --git a/py12306/log/query_log.py b/py12306/log/query_log.py index 6e3fa38..56e7c35 100644 --- a/py12306/log/query_log.py +++ b/py12306/log/query_log.py @@ -31,6 +31,12 @@ class QueryLog(BaseLog): MESSAGE_JOBS_DID_CHANGED = '\n任务已更新,正在重新加载...' + MESSAGE_SKIP_ORDER = '跳过本次请求,节点 {} 用户 {} 正在处理该订单\n' + + MESSAGE_QUERY_JOB_BEING_DESTROY = '当前查询任务 {} 已结束' + + MESSAGE_INIT_PASSENGERS_SUCCESS = '初始化乘客成功' + cluster = None def __init__(self): @@ -88,6 +94,7 @@ class QueryLog(BaseLog): self.add_log('坐席:{}'.format(','.join(job.allow_seats))) self.add_log('乘车人:{}'.format(','.join(job.members))) self.add_log('筛选车次:{}'.format(','.join(job.allow_train_numbers if job.allow_train_numbers else ['不筛选']))) + self.add_log('任务名称:{}'.format(job.job_name)) # 乘车日期:['2019-01-24', '2019-01-25', '2019-01-26', '2019-01-27'] self.add_log('') index += 1 @@ -139,11 +146,12 @@ class QueryLog(BaseLog): return self @classmethod - def print_job_start(cls): + def print_job_start(cls, job_name): self = cls() self.refresh_data() - self.add_log('=== 正在进行第 {query_count} 次查询 === {time}'.format(query_count=self.data.get('query_count'), - time=datetime.datetime.now())) + self.add_log( + '=== 正在进行第 {query_count} 次查询 {job_name} === {time}'.format(query_count=self.data.get('query_count'), + job_name=job_name, time=datetime.datetime.now())) if is_main_thread(): self.flush(publish=False) return self diff --git a/py12306/log/user_log.py b/py12306/log/user_log.py index b95b1ce..67d4c4e 100644 --- a/py12306/log/user_log.py +++ b/py12306/log/user_log.py @@ -22,13 +22,15 @@ class UserLog(BaseLog): MESSAGE_GET_USER_PASSENGERS_FAIL = '获取用户乘客列表失败,错误原因: {} {} 秒后重试' MESSAGE_USER_PASSENGERS_IS_INVALID = '乘客信息校验失败,在账号 {} 中未找到该乘客: {}' - MESSAGE_WAIT_USER_INIT_COMPLETE = '未找到可用账号或用户正在初始化,{} 秒后重试' + # MESSAGE_WAIT_USER_INIT_COMPLETE = '未找到可用账号或用户正在初始化,{} 秒后重试' MESSAGE_USERS_DID_CHANGED = '\n用户信息已更新,正在重新加载...' MESSAGE_USER_BEING_DESTROY = '用户 {} 已退出' MESSAGE_USER_COOKIE_NOT_FOUND_FROM_REMOTE = '用户 {} 状态加载中...' + MESSAGE_WAIT_USER_INIT_COMPLETE = '账号正在初始化,{} 秒后自动重试' + def __init__(self): super().__init__() self.init_data() diff --git a/py12306/order/order.py b/py12306/order/order.py index de6fdbc..40b29b1 100644 --- a/py12306/order/order.py +++ b/py12306/order/order.py @@ -1,14 +1,14 @@ import urllib # from py12306.config import UserType +from py12306.config import Config from py12306.helpers.api import * from py12306.helpers.func import * from py12306.helpers.notification import Notification +from py12306.helpers.type import UserType from py12306.log.order_log import OrderLog -# from py12306.query.job import Job -# from py12306.user.job import UserJob class Order: @@ -36,8 +36,10 @@ class Order: def __init__(self, query, user): self.session = user.session - # assert isinstance(query, Job) # 循环引用 - # assert isinstance(user, UserJob) + from py12306.query.job import Job + from py12306.user.job import UserJob + assert isinstance(query, Job) + assert isinstance(user, UserJob) self.query_ins = query self.user_ins = user @@ -49,8 +51,7 @@ class Order: 下单模式 暂时不清楚,使用正常步骤下单 :return: """ - self.normal_order() - pass + return self.normal_order() def normal_order(self): if not self.submit_order_request(): return @@ -62,6 +63,8 @@ class Order: if order_id: # 发送通知 self.order_id = order_id self.order_did_success() + return True + return False def order_did_success(self): OrderLog.print_ticket_did_ordered(self.order_id) @@ -74,17 +77,15 @@ class Order: sustain_time = self.notification_sustain_time while sustain_time: # TODO 后面直接查询有没有待支付的订单就可以 num += 1 - if config.NOTIFICATION_BY_VOICE_CODE: # 语音通知 + if Config().NOTIFICATION_BY_VOICE_CODE: # 语音通知 OrderLog.add_quick_log(OrderLog.MESSAGE_ORDER_SUCCESS_NOTIFICATION_OF_VOICE_CODE_START_SEND.format(num)) - Notification.voice_code(config.NOTIFICATION_VOICE_CODE_PHONE, self.user_ins.get_name(), + Notification.voice_code(Config().NOTIFICATION_VOICE_CODE_PHONE, self.user_ins.get_name(), OrderLog.MESSAGE_ORDER_SUCCESS_NOTIFICATION_OF_VOICE_CODE_CONTENT.format( self.query_ins.left_station, self.query_ins.arrive_station)) sustain_time -= self.notification_interval sleep(self.notification_interval) OrderLog.add_quick_log(OrderLog.MESSAGE_JOB_CLOSED) - # 结束运行 - while True: sleep(self.retry_time) def submit_order_request(self): data = { diff --git a/py12306/query/job.py b/py12306/query/job.py index 2db2460..c174c74 100644 --- a/py12306/query/job.py +++ b/py12306/query/job.py @@ -1,3 +1,7 @@ +import sys + +from py12306.app import app_available_check +from py12306.cluster.cluster import Cluster from py12306.config import Config from py12306.helpers.api import LEFT_TICKETS from py12306.helpers.station import Station @@ -13,7 +17,8 @@ class Job: """ 查询任务 """ - + is_alive = True + job_name = None left_dates = [] left_date = None stations = [] @@ -32,10 +37,12 @@ class Job: member_num_take = 0 # 最终提交的人数 passengers = [] allow_less_member = False + retry_time = 3 interval = {} query = None + cluster = None ticket_info = {} INDEX_TICKET_NUM = 11 INDEX_TRAIN_NUMBER = 3 @@ -47,6 +54,7 @@ class Job: INDEX_SECRET_STR = 0 def __init__(self, info, query): + self.cluster = Cluster() self.left_dates = info.get('left_dates') # 多车站已放在下面处理 # self.left_station = info.get('stations').get('left') @@ -55,8 +63,9 @@ class Job: # self.arrive_station_code = Station.get_station_key_by_name(self.arrive_station) self.stations = info.get('stations') self.stations = [self.stations] if isinstance(self.stations, dict) else self.stations + self.job_name = info.get('job_name', '{} -> {}'.format(self.stations[0]['left'], self.stations[0]['arrive'])) - self.account_key = info.get('account_key') + self.account_key = str(info.get('account_key')) self.allow_seats = info.get('seats') self.allow_train_numbers = info.get('train_numbers') self.members = info.get('members') @@ -73,26 +82,28 @@ class Job: def start(self): """ 处理单个任务 - 根据日期循环查询 - - 展示处理时间 + 根据日期循环查询, 展示处理时间 :param job: :return: """ - QueryLog.print_job_start() - for station in self.stations: - self.refresh_station(station) - for date in self.left_dates: - self.left_date = date - response = self.query_by_date(date) - self.handle_response(response) - self.safe_stay() - if is_main_thread(): - QueryLog.flush(sep='\t\t', publish=False) - if is_main_thread(): - QueryLog.add_quick_log('').flush(publish=False) - else: - QueryLog.add_log('\n').flush(sep='\t\t',publish=False) + while True and self.is_alive: + app_available_check() + QueryLog.print_job_start(self.job_name) + for station in self.stations: + self.refresh_station(station) + for date in self.left_dates: + self.left_date = date + response = self.query_by_date(date) + self.handle_response(response) + self.safe_stay() + if is_main_thread(): + QueryLog.flush(sep='\t\t', publish=False) + if is_main_thread(): + QueryLog.add_quick_log('').flush(publish=False) + break + else: + QueryLog.add_log('\n').flush(sep='\t\t', publish=False) + if Const.IS_TEST: return def query_by_date(self, date): """ @@ -154,9 +165,32 @@ class Job: QueryLog.print_ticket_available(left_date=self.get_info_of_left_date(), train_number=self.get_info_of_train_number(), rest_num=ticket_of_seat) - self.check_passengers() - order = Order(user=self.get_user(), query=self) - order.order() + order_result = False + user = self.get_user() + lock_id = Cluster.KEY_LOCK_DO_ORDER + '_' + user.key + if Config().is_cluster_enabled(): + if self.cluster.get_lock(lock_id, Cluster.lock_do_order_time, + {'node': self.cluster.node_name}): # 获得下单锁 + order_result = self.do_order(user) + if not order_result: # 下单失败,解锁 + self.cluster.release_lock(lock_id) + else: + QueryLog.add_quick_log( + QueryLog.MESSAGE_SKIP_ORDER.format(self.cluster.get_lock_info(lock_id).get('node'), + user.user_name)) + stay_second(self.retry_time) # 防止过多重复 + else: + order_result = self.do_order(user) + + # 任务已成功 通知集群停止任务 + if order_result: + self.cluster.publish_event(Cluster.KEY_EVENT_JOB_DESTROY, {'name': self.job_name}) + self.destroy() + + def do_order(self, user): + self.check_passengers() + order = Order(user=user, query=self) + return order.order() def get_results(self, response): """ @@ -183,6 +217,15 @@ class Job: def is_member_number_valid(self, seat): return seat == '有' or self.member_num <= int(seat) + def destroy(self): + """ + 退出任务 + :return: + """ + QueryLog.add_quick_log(QueryLog.MESSAGE_QUERY_JOB_BEING_DESTROY.format(self.job_name)).flush() + # sys.exit(1) # 无法退出线程... + self.is_alive = False + def safe_stay(self): interval = get_interval_num(self.interval) QueryLog.add_stay_log(interval) @@ -205,7 +248,8 @@ class Job: def check_passengers(self): if not self.passengers: - User.check_members(self.members, self.account_key, call_back=self.set_passengers) + self.set_passengers(User.get_passenger_for_members(self.members, self.account_key)) + QueryLog.add_quick_log(QueryLog.MESSAGE_INIT_PASSENGERS_SUCCESS) return True def refresh_station(self, station): diff --git a/py12306/query/query.py b/py12306/query/query.py index 6cfc1cc..cc10f04 100644 --- a/py12306/query/query.py +++ b/py12306/query/query.py @@ -48,15 +48,21 @@ class Query: # return # DEBUG self.init_jobs() QueryLog.init_data() - stay_second(1) - while True: - app_available_check() - if Config().QUERY_JOB_THREAD_ENABLED: # 多线程 - create_thread_and_run(jobs=self.jobs, callback_name='run') - else: - for job in self.jobs: job.run() - if Const.IS_TEST: return - # self.refresh_jobs() # 刷新任务 + app_available_check() + stay_second(3) + # 多线程 + if Config().QUERY_JOB_THREAD_ENABLED: # 多线程 + create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST) + else: + while True: jobs_do(self.jobs, 'run') + # while True: + # app_available_check() + # if Config().QUERY_JOB_THREAD_ENABLED: # 多线程 + # create_thread_and_run(jobs=self.jobs, callback_name='run') + # else: + # for job in self.jobs: job.run() + # if Const.IS_TEST: return + # self.refresh_jobs() # 刷新任务 def init_jobs(self): for job in self.query_jobs: @@ -64,19 +70,37 @@ class Query: self.jobs.append(job) QueryLog.print_init_jobs(jobs=self.jobs) - # def get_jobs_from_cluster(self): - # jobs = self.cluster.session.get_dict(Cluster.KEY_JOBS) - # return jobs - # - # def update_jobs_of_cluster(self): - # if config.CLUSTER_ENABLED and config.NODE_IS_MASTER: - # return self.cluster.session.set_dict(Cluster.KEY_JOBS, self.query_jobs) - # - # def refresh_jobs(self): - # if not config.CLUSTER_ENABLED: return - # jobs = self.get_jobs_from_cluster() - # if jobs != self.query_jobs: - # self.jobs = [] - # self.query_jobs = jobs - # QueryLog.add_quick_log(QueryLog.MESSAGE_JOBS_DID_CHANGED).flush() - # self.init_jobs() + @classmethod + def job_by_name(cls, name) -> Job: + self = cls() + for job in self.jobs: + if job.job_name == name: return job + return None + + @classmethod + def job_by_name(cls, name) -> Job: + self = cls() + return objects_find_object_by_key_value(self.jobs, 'job_name', name) + + @classmethod + def job_by_account_id(cls, account_id) -> Job: + self = cls() + return objects_find_object_by_key_value(self.jobs, 'account_id', account_id) + + +# def get_jobs_from_cluster(self): +# jobs = self.cluster.session.get_dict(Cluster.KEY_JOBS) +# return jobs +# +# def update_jobs_of_cluster(self): +# if config.CLUSTER_ENABLED and config.NODE_IS_MASTER: +# return self.cluster.session.set_dict(Cluster.KEY_JOBS, self.query_jobs) +# +# def refresh_jobs(self): +# if not config.CLUSTER_ENABLED: return +# jobs = self.get_jobs_from_cluster() +# if jobs != self.query_jobs: +# self.jobs = [] +# self.query_jobs = jobs +# QueryLog.add_quick_log(QueryLog.MESSAGE_JOBS_DID_CHANGED).flush() +# self.init_jobs() diff --git a/py12306/user/job.py b/py12306/user/job.py index 61be884..6d56e6c 100644 --- a/py12306/user/job.py +++ b/py12306/user/job.py @@ -1,4 +1,6 @@ +import json import pickle +import re from os import path from py12306.cluster.cluster import Cluster @@ -7,12 +9,14 @@ from py12306.app import * from py12306.helpers.auth_code import AuthCode from py12306.helpers.func import * from py12306.helpers.request import Request +from py12306.helpers.type import UserType from py12306.log.order_log import OrderLog from py12306.log.user_log import UserLog class UserJob: # heartbeat = 60 * 2 # 心跳保持时长 + is_alive = True heartbeat_interval = 60 * 2 check_interval = 5 key = None @@ -23,7 +27,7 @@ class UserJob: last_heartbeat = None is_ready = False passengers = [] - retry_time = 5 + retry_time = 3 # Init page global_repeat_submit_token = None @@ -40,7 +44,7 @@ class UserJob: def init_data(self, info): self.session = Request() - self.key = info.get('key') + self.key = str(info.get('key')) self.user_name = info.get('user_name') self.password = info.get('password') self.update_user() @@ -49,7 +53,8 @@ class UserJob: from py12306.user.user import User self.user = User() self.heartbeat_interval = self.user.heartbeat - if not Const.IS_TEST: self.load_user() + # if not Const.IS_TEST: 测试模块下也可以从文件中加载用户 + self.load_user() def run(self): # load user @@ -60,15 +65,16 @@ class UserJob: 检测心跳 :return: """ - while True: + while True and self.is_alive: app_available_check() if Config().is_slave(): self.load_user_from_remote() + pass # 虽然同一个 cookie,同时请求之后会导致失效,暂时不在子节点中加载用户 else: if Config().is_master() and not self.cookie: self.load_user_from_remote() # 主节点加载一次 Cookie self.check_heartbeat() if Const.IS_TEST: return - sleep(self.check_interval) + stay_second(self.check_interval) def check_heartbeat(self): # 心跳检测 @@ -76,6 +82,7 @@ class UserJob: return True # 只有主节点才能走到这 if self.is_first_time() or not self.check_user_is_login(): + self.is_ready = False if not self.handle_login(): return self.is_ready = True @@ -149,7 +156,7 @@ class UserJob: is_login = response.json().get('data.flag', False) if is_login: self.save_user() - self.get_user_info() # 检测应该是不会维持状态,这里再请求下个人中心看有没有有 + # self.get_user_info() # 检测应该是不会维持状态,这里再请求下个人中心看有没有用,01-10 看来应该是没用 return is_login @@ -206,6 +213,7 @@ class UserJob: UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER.format(self.user_name)).flush() if self.check_user_is_login() and self.get_user_info(): UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_SUCCESS.format(self.user_name)).flush() + self.cluster.publish_event(Cluster.KEY_EVENT_USER_LOADED, {'key': self.key}) # 发布通知 UserLog.print_welcome_user(self) else: UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_BUT_EXPIRED).flush() @@ -214,6 +222,7 @@ class UserJob: response = self.session.get(API_USER_INFO.get('url')) result = response.json() user_data = result.get('data.userDTO.loginUserDTO') + # 子节点访问会导致主节点登录失效 TODO 可快考虑实时同步 cookie if user_data: self.update_user_info({**user_data, **{'user_name': user_data.get('name')}}) return True @@ -250,13 +259,19 @@ class UserJob: def check_is_ready(self): return self.is_ready + def wait_for_ready(self): + if self.is_ready: return self + UserLog.add_quick_log(UserLog.MESSAGE_WAIT_USER_INIT_COMPLETE.format(self.retry_time)).flush() + stay_second(self.retry_time) + return self.wait_for_ready() + def destroy(self): """ 退出用户 :return: """ UserLog.add_quick_log(UserLog.MESSAGE_USER_BEING_DESTROY.format(self.user_name)).flush() - sys.exit() + self.is_alive = False def get_user_passengers(self): if self.passengers: return self.passengers diff --git a/py12306/user/user.py b/py12306/user/user.py index f69ce7c..6a80894 100644 --- a/py12306/user/user.py +++ b/py12306/user/user.py @@ -59,7 +59,7 @@ class User: if old_account and account != old_account: user = self.get_user(key) user.init_data(account) - elif not old_account: # 新用户 添加到 多线程 + elif not old_account: # 新用户 添加到 多线程 new_user = self.init_user(account) create_thread_and_run(jobs=new_user, callback_name='run', wait=Const.IS_TEST) @@ -76,20 +76,15 @@ class User: return None @classmethod - def check_members(cls, members, key, call_back): + def get_passenger_for_members(cls, members, key): """ 检测乘客信息 - :param passengers: + :param passengers :return: """ self = cls() for user in self.users: assert isinstance(user, UserJob) - if user.key == key and user.check_is_ready(): - passengers = user.get_passengers_by_members(members) - return call_back(passengers) - - UserLog.add_quick_log(UserLog.MESSAGE_WAIT_USER_INIT_COMPLETE.format(self.retry_time)).flush() - stay_second(self.retry_time) - return self.check_members(members, key, call_back) + if user.key == key and user.wait_for_ready(): + return user.get_passengers_by_members(members)