diff --git a/py12306/app.py b/py12306/app.py index df3450e..afb4976 100644 --- a/py12306/app.py +++ b/py12306/app.py @@ -40,6 +40,8 @@ class App: @classmethod def did_start(cls): self = cls() + from py12306.helpers.station import Station + Station() # 防止多线程时初始化出现问题 # if Config.is_cluster_enabled(): # from py12306.cluster.cluster import Cluster # Cluster().run() @@ -73,8 +75,8 @@ class App: if Config().USER_ACCOUNTS: for account in Config().USER_ACCOUNTS: if account: - return True - return False + return False + return True @classmethod def test_send_notifications(cls): @@ -90,10 +92,10 @@ class App: 待优化 :return: """ - if not cls.check_auto_code(): - CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_AUTO_CODE_FAIL).flush(exit=True) if not cls.check_user_account_is_empty(): - CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_EMPTY_USER_ACCOUNT).flush(exit=True) + # CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_EMPTY_USER_ACCOUNT).flush(exit=True, publish=False) # 不填写用户则不自动下单 + if not cls.check_auto_code(): + CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_AUTO_CODE_FAIL).flush(exit=True, publish=False) if Const.IS_TEST_NOTIFICATION: cls.test_send_notifications() diff --git a/py12306/cluster/cluster.py b/py12306/cluster/cluster.py index b2947af..1ece4c1 100644 --- a/py12306/cluster/cluster.py +++ b/py12306/cluster/cluster.py @@ -30,6 +30,9 @@ class Cluster(): KEY_LOCK_DO_ORDER = 'lock_do_order' # 订单锁 lock_do_order_time = 60 * 1 # 订单锁超时时间 + lock_prefix = 'lock_' # 锁键前缀 + lock_info_prefix = 'info_' + KEY_MASTER = 1 KEY_SLAVE = 0 @@ -40,7 +43,6 @@ class Cluster(): keep_alive_time = 3 # 报告存活间隔 lost_alive_time = keep_alive_time * 2 - locks = [] nodes = {} node_name = None is_ready = False @@ -222,25 +224,23 @@ class Cluster(): timeout = int(time.time()) + timeout res = self.session.setnx(key, timeout) if res: - self.locks.append((key, timeout)) - if info: self.session.set_dict(key + '_info', info) # 存储额外信息 + if info: self.session.set_dict(self.lock_info_prefix + key, info) # 存储额外信息 return True return False def get_lock_info(self, key, default={}): - return self.session.get_dict(key + '_info', default=default) + return self.session.get_dict(self.lock_info_prefix + key, default=default) def release_lock(self, key): self.session.delete(key) - self.session.delete(key + '_info') + self.session.delete(self.lock_info_prefix + key) def check_locks(self): - index = 0 - for key, timeout in self.locks: - if timeout >= int(time.time()): - del self.locks[index] + locks = self.session.keys(self.lock_prefix + '*') + for key in locks: + val = self.session.get(key) + if val and int(val) <= time_int(): self.release_lock(key) - index += 1 @classmethod def get_user_cookie(cls, key, default=None): diff --git a/py12306/config.py b/py12306/config.py index cb38dc8..2fa4022 100644 --- a/py12306/config.py +++ b/py12306/config.py @@ -8,6 +8,8 @@ from py12306.helpers.func import * @singleton class Config: + IS_DEBUG = False + USER_ACCOUNTS = [] # 查询任务 QUERY_JOBS = [] diff --git a/py12306/helpers/notification.py b/py12306/helpers/notification.py index f8189fa..1013a84 100644 --- a/py12306/helpers/notification.py +++ b/py12306/helpers/notification.py @@ -44,16 +44,11 @@ class Notification(): method='GET', headers={ 'Authorization': 'APPCODE {}'.format(appcode) }) - response_message = '-' - result = {} - try: - result = response.json() - response_message = result['showapi_res_body']['remark'] - except: - pass - if response.status_code == 401 or response.status_code == 403: + result = response.json() + response_message = result.get('showapi_res_body.remark') + if response.status_code in [400, 401, 403]: return CommonLog.add_quick_log(CommonLog.MESSAGE_VOICE_API_FORBID).flush() - if response.status_code == 200 and 'showapi_res_body' in result and result['showapi_res_body'].get('flag'): + if response.status_code == 200 and result.get('showapi_res_body.flag'): CommonLog.add_quick_log(CommonLog.MESSAGE_VOICE_API_SEND_SUCCESS.format(response_message)).flush() return True else: diff --git a/py12306/helpers/station.py b/py12306/helpers/station.py index c8b4f43..473e2b7 100644 --- a/py12306/helpers/station.py +++ b/py12306/helpers/station.py @@ -7,6 +7,7 @@ from py12306.helpers.func import * @singleton class Station: stations = [] + station_kvs = {} def __init__(self): if path.exists(Config().STATION_FILE): @@ -20,6 +21,7 @@ class Station: 'pinyin': tmp_info[3], 'id': tmp_info[5] }) + self.station_kvs[tmp_info[1]] = tmp_info[2] @classmethod def get_station_by_name(cls, name): @@ -35,7 +37,8 @@ class Station: @classmethod def get_station_key_by_name(cls, name): - return cls.get_station_by_name(name).get('key') + self = cls() + return self.station_kvs[name] @classmethod def get_station_name_by_key(cls, key): diff --git a/py12306/log/query_log.py b/py12306/log/query_log.py index a73634c..0aa070a 100644 --- a/py12306/log/query_log.py +++ b/py12306/log/query_log.py @@ -33,11 +33,14 @@ class QueryLog(BaseLog): MESSAGE_SKIP_ORDER = '跳过本次请求,节点 {} 用户 {} 正在处理该订单\n' - MESSAGE_QUERY_JOB_BEING_DESTROY = '当前查询任务 {} 已结束' + MESSAGE_QUERY_JOB_BEING_DESTROY = '当前查询任务 {} 已结束\n' MESSAGE_INIT_PASSENGERS_SUCCESS = '初始化乘客成功' MESSAGE_CHECK_PASSENGERS = '正在验证乘客信息' + MESSAGE_USER_IS_EMPTY_WHEN_DO_ORDER = '未配置自动下单账号,{} 秒后继续查询\n' + MESSAGE_ORDER_USER_IS_EMPTY = '未找到下单账号,{} 秒后继续查询' + cluster = None def __init__(self): diff --git a/py12306/order/order.py b/py12306/order/order.py index 40b29b1..3bd06b1 100644 --- a/py12306/order/order.py +++ b/py12306/order/order.py @@ -9,8 +9,6 @@ from py12306.helpers.type import UserType from py12306.log.order_log import OrderLog - - class Order: """ 处理下单 @@ -51,6 +49,9 @@ class Order: 下单模式 暂时不清楚,使用正常步骤下单 :return: """ + # Debug + if Config().IS_DEBUG: + return random.randint(0, 10) > 7 return self.normal_order() def normal_order(self): diff --git a/py12306/query/job.py b/py12306/query/job.py index 9aa19b2..0e7a559 100644 --- a/py12306/query/job.py +++ b/py12306/query/job.py @@ -96,6 +96,7 @@ class Job: self.left_date = date response = self.query_by_date(date) self.handle_response(response) + if not self.is_alive: return self.safe_stay() if is_main_thread(): QueryLog.flush(sep='\t\t', publish=False) @@ -143,6 +144,7 @@ class Job: allow_seats = self.allow_seats if self.allow_seats else list( Config.SEAT_TYPES.values()) # 未设置 则所有可用 TODO 合法检测 self.handle_seats(allow_seats, ticket_info) + if not self.is_alive: return def handle_seats(self, allow_seats, ticket_info): for seat in allow_seats: # 检查座位是否有票 @@ -166,12 +168,21 @@ class Job: QueryLog.print_ticket_available(left_date=self.get_info_of_left_date(), train_number=self.get_info_of_train_number(), rest_num=ticket_of_seat) + if User.is_empty(): + QueryLog.add_quick_log(QueryLog.MESSAGE_USER_IS_EMPTY_WHEN_DO_ORDER.format(self.retry_time)) + return stay_second(self.retry_time) + order_result = False user = self.get_user() + if not user: + QueryLog.add_quick_log(QueryLog.MESSAGE_ORDER_USER_IS_EMPTY.format(self.retry_time)) + return stay_second(self.retry_time) + lock_id = Cluster.KEY_LOCK_DO_ORDER + '_' + user.key if Config().is_cluster_enabled(): if self.cluster.get_lock(lock_id, Cluster.lock_do_order_time, {'node': self.cluster.node_name}): # 获得下单锁 + QueryLog.add_quick_log('拿到锁' + lock_id).flush() order_result = self.do_order(user) if not order_result: # 下单失败,解锁 self.cluster.release_lock(lock_id) @@ -222,9 +233,13 @@ class Job: 退出任务 :return: """ + from py12306.query.query import Query QueryLog.add_quick_log(QueryLog.MESSAGE_QUERY_JOB_BEING_DESTROY.format(self.job_name)).flush() # sys.exit(1) # 无法退出线程... self.is_alive = False + # 手动移出jobs 防止单线程死循环 + index = Query().jobs.index(self) + Query().jobs.pop(index) def safe_stay(self): interval = get_interval_num(self.interval) @@ -241,9 +256,9 @@ class Job: def get_user(self): user = User.get_user(self.account_key) - if not user.check_is_ready(): - # TODO user is not ready - pass + # if not user.check_is_ready(): # 这里不需要检测了,后面获取乘客时已经检测过 + # # + # pass return user def check_passengers(self): diff --git a/py12306/query/query.py b/py12306/query/query.py index 42d3a3e..7660b19 100644 --- a/py12306/query/query.py +++ b/py12306/query/query.py @@ -48,13 +48,13 @@ class Query: # return # DEBUG self.init_jobs() QueryLog.init_data() - app_available_check() stay_second(3) # 多线程 if Config().QUERY_JOB_THREAD_ENABLED: # 多线程 create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST) else: while True: + if not self.jobs: break jobs_do(self.jobs, 'run') if Const.IS_TEST: return @@ -90,7 +90,6 @@ class Query: self = cls() return objects_find_object_by_key_value(self.jobs, 'account_key', account_key) - # def get_jobs_from_cluster(self): # jobs = self.cluster.session.get_dict(Cluster.KEY_JOBS) # return jobs diff --git a/py12306/user/job.py b/py12306/user/job.py index 1850ef0..3a99dfa 100644 --- a/py12306/user/job.py +++ b/py12306/user/job.py @@ -83,6 +83,7 @@ class UserJob: return True # 只有主节点才能走到这 if self.is_first_time() or not self.check_user_is_login(): + a = 1 self.is_ready = False if not self.handle_login(): return @@ -103,7 +104,7 @@ class UserJob: def set_last_heartbeat(self): if Config().is_cluster_enabled(): return self.cluster.session.set(Cluster.KEY_USER_LAST_HEARTBEAT, time_int()) - self.last_heartbeat = time_now() + self.last_heartbeat = time_int() # def init_cookies def is_first_time(self): @@ -227,6 +228,7 @@ class UserJob: # 子节点访问会导致主节点登录失效 TODO 可快考虑实时同步 cookie if user_data: self.update_user_info({**user_data, **{'user_name': user_data.get('name')}}) + self.save_user() return True return None diff --git a/py12306/user/user.py b/py12306/user/user.py index 33667b9..26cea5b 100644 --- a/py12306/user/user.py +++ b/py12306/user/user.py @@ -68,6 +68,11 @@ class User: if not array_dict_find_by_key_value(self.user_accounts, 'key', account.get('key')): Event().user_job_destroy({'key': account.get('key')}) + @classmethod + def is_empty(cls): + self = cls() + return not bool(self.users) + @classmethod def get_user(cls, key) -> UserJob: self = cls()