diff --git a/.gitignore b/.gitignore index 9f9225e..7292e9d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ .DS_Store venv __pycache__ -env.py \ No newline at end of file +env.py +env.slave.py \ No newline at end of file diff --git a/env.py.example b/env.py.example index c04495c..3b586c2 100644 --- a/env.py.example +++ b/env.py.example @@ -70,7 +70,7 @@ QUERY_JOBS = [ ], 'allow_less_member': 0, # 是否允许余票不足时提交部分乘客 'seats': [ # 筛选座位 有先后顺序 :Array - # 可用值: 特等座, 商务座, 一等座, 二等座, 软卧, 硬卧, 硬座, 无座 + # 可用值: 特等座, 商务座, 一等座, 二等座, 软卧, 硬卧, 动卧, 硬座, 无座 '硬卧', '硬座' ], diff --git a/main.py b/main.py index 92e6df6..a7918cd 100644 --- a/main.py +++ b/main.py @@ -4,15 +4,18 @@ import sys from py12306.app import * from py12306.log.common_log import CommonLog from py12306.query.query import Query +from py12306.user.user import User def main(): - if '--test' in sys.argv or '-t' in sys.argv: test() + load_argvs() + CommonLog.print_welcome() App.run() - CommonLog.print_welcome().print_configs() + CommonLog.print_configs() App.did_start() - # App.run_check() - # User.run() + + App.run_check() + User.run() Query.run() if not Const.IS_TEST: while True: @@ -38,5 +41,15 @@ def test(): pass +def load_argvs(): + if '--test' in sys.argv or '-t' in sys.argv: test() + config_index = None + + if '--config' in sys.argv: config_index = sys.argv.index('--config') + if '-c' in sys.argv: config_index = sys.argv.index('-c') + if config_index: + Config.CONFIG_FILE = sys.argv[config_index + 1:config_index + 2].pop() + + if __name__ == '__main__': main() diff --git a/py12306/app.py b/py12306/app.py index bf9eff7..bfa356b 100644 --- a/py12306/app.py +++ b/py12306/app.py @@ -40,13 +40,14 @@ class App: @classmethod def did_start(cls): self = cls() - if Config.is_cluster_enabled(): - from py12306.cluster.cluster import Distributed - Distributed().join_cluster() + # if Config.is_cluster_enabled(): + # from py12306.cluster.cluster import Cluster + # Cluster().run() def init_class(self): - from py12306.cluster.cluster import Distributed - if Config.is_cluster_enabled(): Distributed() + from py12306.cluster.cluster import Cluster + if Config.is_cluster_enabled(): + Cluster().run() def handler_exit(self, *args, **kwargs): """ @@ -56,8 +57,8 @@ class App: :return: """ if Config.is_cluster_enabled(): - from py12306.cluster.cluster import Distributed - Distributed().left_cluster() + from py12306.cluster.cluster import Cluster + Cluster().left_cluster() sys.exit() diff --git a/py12306/cluster/cluster.py b/py12306/cluster/cluster.py index c92792f..78f5d9a 100644 --- a/py12306/cluster/cluster.py +++ b/py12306/cluster/cluster.py @@ -1,3 +1,8 @@ +import os +import pickle +import sys +import time + import redis from redis.client import PubSub @@ -8,60 +13,210 @@ from py12306.log.cluster_log import ClusterLog @singleton -class Distributed(): +class Cluster(): KEY_QUERY_COUNT = 'query_count' KEY_QUERY_LAST_TIME = 'query_last_time' KEY_CONFIGS = 'configs' KEY_NODES = 'nodes' KEY_CHANNEL_LOG = 'channel_log' + KEY_USER_COOKIES = 'user_cookies' + KEY_USER_LAST_HEARTBEAT = 'user_last_heartbeat' + KEY_NODES_ALIVE = 'nodes_alive' + + KEY_LOCK_INIT_USER = 'lock_init_user' + # if self.cluster.get_lock(Cluster.KEY_LOCK_INIT_USER, self.lock_init_user_time): # TODO 未判断 失败重试 + + KEY_MASTER = 1 + KEY_SLAVE = 0 session: Redis = None pubsub: PubSub = None refresh_channel_time = 0.5 retry_time = 2 + keep_alive_time = 3 # 报告存活间隔 + lost_alive_time = keep_alive_time * 2 + locks = [] nodes = {} + node_name = None + is_ready = False + is_master = False def __init__(self, *args): - self.session = Redis() - self.pubsub = self.session.pubsub() - self.pubsub.subscribe(self.KEY_CHANNEL_LOG) - create_thread_and_run(self, 'refresh_data', wait=False) - create_thread_and_run(self, 'subscribe', wait=False) + if Config.is_cluster_enabled(): + self.session = Redis() return self + @classmethod + def run(cls): + self = cls() + self.start() + + def start(self): + self.pubsub = self.session.pubsub() + self.pubsub.subscribe(self.KEY_CHANNEL_LOG) + create_thread_and_run(self, 'subscribe', wait=False) + self.is_ready = True + self.get_nodes() # 提前获取节点列表 + self.check_nodes() # 防止 节点列表未清空 + self.join_cluster() + create_thread_and_run(self, 'keep_alive', wait=False) + create_thread_and_run(self, 'refresh_data', wait=False) + def join_cluster(self): - node_name = Config().NODE_NAME + """ + 加入到集群 + :return: + """ + self.node_name = node_name = Config().NODE_NAME + + if Config().NODE_IS_MASTER: + if self.node_name in self.nodes: # 重复运行主节点 + ClusterLog.add_quick_log(ClusterLog.MESSAGE_MASTER_NODE_ALREADY_RUN.format(node_name)).flush( + publish=False) + os._exit(1) + if self.have_master(): # 子节点提升为主节点情况,交回控制 + message = ClusterLog.MESSAGE_NODE_BECOME_MASTER_AGAIN.format(node_name) + self.publish_log_message(message) + self.make_nodes_as_slave() + elif not self.have_master(): # 只能通过主节点启动 + ClusterLog.add_quick_log(ClusterLog.MESSAGE_MASTER_NODE_NOT_FOUND).flush(publish=False) + os._exit(1) + if node_name in self.nodes: - node_name = node_name + '_' + str(dict_count_key_num(self.nodes, node_name)) + self.node_name = node_name = node_name + '_' + str(dict_count_key_num(self.nodes, node_name)) ClusterLog.add_quick_log(ClusterLog.MESSAGE_NODE_ALREADY_IN_CLUSTER.format(node_name)).flush() self.session.hset(self.KEY_NODES, node_name, Config().NODE_IS_MASTER) - message = ClusterLog.MESSAGE_JOIN_CLUSTER_SUCCESS.format(Config().NODE_NAME, list(self.get_nodes())) - # ClusterLog.add_quick_log(message).flush() + message = ClusterLog.MESSAGE_JOIN_CLUSTER_SUCCESS.format(self.node_name, ClusterLog.get_print_nodes( + self.get_nodes())) # 手动 get nodes + self.publish_log_message(message) + + def left_cluster(self, node_name=None): + node_name = node_name if node_name else self.node_name + self.session.hdel(self.KEY_NODES, node_name) + message = ClusterLog.MESSAGE_LEFT_CLUSTER.format(node_name, ClusterLog.get_print_nodes(self.get_nodes())) + self.publish_log_message(message) + + def make_nodes_as_slave(self): + """ + 将所有节点设为主节点 + :return: + """ + for node in self.nodes: + self.session.hset(self.KEY_NODES, node, self.KEY_SLAVE) + + def publish_log_message(self, message): + """ + 发布订阅消息 + :return: + """ + message = ClusterLog.MESSAGE_SUBSCRIBE_NOTIFICATION.format(self.node_name, message) self.session.publish(self.KEY_CHANNEL_LOG, message) - def left_cluster(self): - self.session.hdel(self.KEY_NODES, Config().NODE_NAME) - message = ClusterLog.MESSAGE_LEFT_CLUSTER.format(Config().NODE_NAME, list(self.get_nodes())) - # ClusterLog.add_quick_log(message).flush() - self.session.publish(self.KEY_CHANNEL_LOG, message) - - def get_nodes(self): + def get_nodes(self) -> dict: res = self.session.hgetall(self.KEY_NODES) res = res if res else {} self.nodes = res return res def refresh_data(self): + """ + 单独进程处理数据同步 + :return: + """ while True: self.get_nodes() + self.check_locks() + self.check_nodes() + self.check_master() stay_second(self.retry_time) + def check_master(self): + """ + 检测主节点是否可用 + :return: + """ + master = self.have_master() + if master == self.node_name: # 动态提升 + self.is_master = True + else: + self.is_master = False + + if not master: + if Config().NODE_SLAVE_CAN_BE_MASTER: + # 提升子节点为主节点 + slave = list(self.nodes)[-1] + self.session.hset(self.KEY_NODES, slave, self.KEY_MASTER) + self.publish_log_message(ClusterLog.MESSAGE_ASCENDING_MASTER_NODE.format(slave, + ClusterLog.get_print_nodes( + self.get_nodes()))) + return True + else: + self.publish_log_message(ClusterLog.MESSAGE_MASTER_DID_LOST.format(self.retry_time)) + stay_second(self.retry_time) + os._exit(1) # 退出整个程序 + + def have_master(self): + return dict_find_key_by_value(self.nodes, str(self.KEY_MASTER), False) + + def check_nodes(self): + """ + 检查节点是否存活 + :return: + """ + alive = self.session.hgetall(self.KEY_NODES_ALIVE) + for node in self.nodes: + if node not in alive or (time_int() - int(alive[node])) > self.lost_alive_time: + self.left_cluster(node) + + # def kick_out_from_nodes(self, node_name): + # pass + + def keep_alive(self): + while True: + if self.node_name not in self.get_nodes(): # 已经被 kict out 重新加下 + self.join_cluster() + self.session.hset(self.KEY_NODES_ALIVE, self.node_name, str(time_int())) + stay_second(self.keep_alive_time) + def subscribe(self): while True: message = self.pubsub.get_message() if message: if message.get('type') == 'message' and message.get('data'): - ClusterLog.add_quick_log(message.get('data')).flush() + msg = message.get('data') + if self.node_name: + msg = msg.replace(ClusterLog.MESSAGE_SUBSCRIBE_NOTIFICATION_PREFIX.format(self.node_name), '') + ClusterLog.add_quick_log(msg).flush(publish=False) stay_second(self.refresh_channel_time) + + def get_lock(self, key, timeout=1): + timeout = int(time.time()) + timeout + res = self.session.setnx(key, timeout) + if res: + self.locks.append((key, timeout)) + return True + return False + + def release_lock(self, key): + self.session.delete(key) + + def check_locks(self): + index = 0 + for key, timeout in self.locks: + if timeout >= int(time.time()): + del self.locks[index] + self.release_lock(key) + index += 1 + + @classmethod + def get_user_cookie(cls, key, default=None): + self = cls() + res = self.session.hget(Cluster.KEY_USER_COOKIES, key) + return pickle.loads(res.encode()) if res else default + + @classmethod + def set_user_cookie(cls, key, value): + self = cls() + return self.session.hset(Cluster.KEY_USER_COOKIES, key, pickle.dumps(value, 0).decode()) diff --git a/py12306/config.py b/py12306/config.py index bfe9d3d..3b018b5 100644 --- a/py12306/config.py +++ b/py12306/config.py @@ -44,6 +44,7 @@ class Config: # 集群配置 CLUSTER_ENABLED = 1 + NODE_SLAVE_CAN_BE_MASTER = 1 NODE_IS_MASTER = 1 NODE_NAME = '' REDIS_HOST = '' @@ -53,6 +54,20 @@ class Config: envs = [] retry_time = 5 + disallow_update_cofigs = [ + 'CLUSTER_ENABLED', + 'NODE_IS_MASTER', + 'NODE_NAME', + 'REDIS_HOST', + 'REDIS_PORT', + 'REDIS_PASSWORD', + ] + + def __init__(self): + self.init_envs() + if Config().is_slave(): + self.refresh_configs(True) + @classmethod def run(cls): self = cls() @@ -63,27 +78,26 @@ class Config: # self = cls() def start(self): - self.init_envs() self.save_to_remote() - # self.refresh_configs() create_thread_and_run(self, 'refresh_configs', wait=False) - def refresh_configs(self): + def refresh_configs(self, once=False): if not self.is_cluster_enabled(): return while True: remote_configs = self.get_remote_config() - self.update_configs_from_remote(remote_configs) + self.update_configs_from_remote(remote_configs, once) + if once: break stay_second(self.retry_time) def get_remote_config(self): if not self.is_cluster_enabled(): return - from py12306.cluster.cluster import Distributed - return Distributed().session.get_pickle(Distributed().KEY_CONFIGS, {}) + from py12306.cluster.cluster import Cluster + return Cluster().session.get_pickle(Cluster().KEY_CONFIGS, {}) def save_to_remote(self): if not self.is_master(): return - from py12306.cluster.cluster import Distributed - Distributed().session.set_pickle(Distributed().KEY_CONFIGS, self.envs) + from py12306.cluster.cluster import Cluster + Cluster().session.set_pickle(Cluster().KEY_CONFIGS, self.envs) def init_envs(self): self.envs = EnvLoader.load_with_file(self.CONFIG_FILE) @@ -93,31 +107,36 @@ class Config: for key, value in envs: setattr(self, key, value) - def update_configs_from_remote(self, envs): + def update_configs_from_remote(self, envs, first=False): if envs == self.envs: return from py12306.query.query import Query + from py12306.user.user import User for key, value in envs: - if key == 'USER_ACCOUNTS' and value != self.USER_ACCOUNTS: # 用户修改 - setattr(self, key, value) - print('用户修改了') # TODO - elif key == 'QUERY_JOBS' and value != self.QUERY_JOBS: # 任务修改 - setattr(self, key, value) and Query().update_query_jobs(auto=True) - elif key == 'QUERY_INTERVAL' and value != self.QUERY_INTERVAL: # 任务修改 - setattr(self, key, value) and Query().update_query_interval(auto=True) + if key in self.disallow_update_cofigs: continue if value != -1: + old = getattr(self, key) setattr(self, key, value) + if not first: + if key == 'USER_ACCOUNTS' and old != value: + # 用户修改 print('用户修改了') + User.update_user_accounts(auto=True, old=old) + elif key == 'QUERY_JOBS' and old != value: + Query().update_query_jobs(auto=True) # 任务修改 + elif key == 'QUERY_INTERVAL' and old != value: + Query().update_query_interval(auto=True) @staticmethod def is_master(): # 是不是 主 - return Config.CLUSTER_ENABLED and Config.NODE_IS_MASTER + from py12306.cluster.cluster import Cluster + return Config().CLUSTER_ENABLED and (Config().NODE_IS_MASTER or Cluster().is_master) @staticmethod def is_slave(): # 是不是 从 - return Config.CLUSTER_ENABLED and not Config.NODE_IS_MASTER + return Config().CLUSTER_ENABLED and not Config.is_master() @staticmethod def is_cluster_enabled(): - return Config.CLUSTER_ENABLED + return Config().CLUSTER_ENABLED # @staticmethod # def get_members(): diff --git a/py12306/helpers/OCR.py b/py12306/helpers/OCR.py index 862da08..cef1ff5 100644 --- a/py12306/helpers/OCR.py +++ b/py12306/helpers/OCR.py @@ -1,7 +1,7 @@ import math import random -from py12306 import config +from py12306.config import Config from py12306.log.common_log import CommonLog from py12306.vender.ruokuai.main import RKClient @@ -22,7 +22,7 @@ class OCR: return self.get_img_position_by_ruokuai(img_path) def get_img_position_by_ruokuai(self, img_path): - ruokuai_account = config.AUTO_CODE_ACCOUNT + ruokuai_account = Config().AUTO_CODE_ACCOUNT soft_id = '119671' soft_key = '6839cbaca1f942f58d2760baba5ed987' rc = RKClient(ruokuai_account.get('user'), ruokuai_account.get('pwd'), soft_id, soft_key) @@ -37,7 +37,7 @@ class OCR: positions = [] width = 70 height = 70 - random_num = random.randint(0, 10) + random_num = random.randint(0, 8) for offset in offsets: offset = int(offset) x = width * (offset % 5) - width / 2 + random_num diff --git a/py12306/helpers/auth_code.py b/py12306/helpers/auth_code.py index ef705fb..89753f0 100644 --- a/py12306/helpers/auth_code.py +++ b/py12306/helpers/auth_code.py @@ -3,6 +3,7 @@ import time from requests.exceptions import SSLError +from py12306.config import Config from py12306.helpers.OCR import OCR from py12306.helpers.api import API_AUTH_CODE_DOWNLOAD, API_AUTH_CODE_CHECK from py12306.helpers.request import Request @@ -20,7 +21,7 @@ class AuthCode: retry_time = 5 def __init__(self, session): - self.data_path = config.RUNTIME_DIR + self.data_path = Config().RUNTIME_DIR self.session = session @classmethod @@ -66,6 +67,7 @@ class AuthCode: UserLog.add_quick_log(UserLog.MESSAGE_CODE_AUTH_SUCCESS).flush() return True else: + # {'result_message': '验证码校验失败', 'result_code': '5'} UserLog.add_quick_log( UserLog.MESSAGE_CODE_AUTH_FAIL.format(result.get('result_message'))).flush() self.session.cookies.clear_session_cookies() diff --git a/py12306/helpers/func.py b/py12306/helpers/func.py index 403db6d..3cd7276 100644 --- a/py12306/helpers/func.py +++ b/py12306/helpers/func.py @@ -2,12 +2,12 @@ import datetime import random import threading import functools +import time from time import sleep from types import MethodType -# from py12306 import config def singleton(cls): @@ -88,11 +88,16 @@ def current_thread_id(): def time_now(): return datetime.datetime.now() +def str_to_time(str): + return datetime.datetime.strptime(str, '%Y-%m-%d %H:%M:%S.%f') + +def time_int(): + return int(time.time()) + def create_thread_and_run(jobs, callback_name, wait=True, daemon=True): threads = [] - if not isinstance(jobs, list): - jobs = [jobs] + if not isinstance(jobs, list): jobs = [jobs] for job in jobs: thread = threading.Thread(target=getattr(job, callback_name)) thread.setDaemon(daemon) @@ -102,6 +107,12 @@ def create_thread_and_run(jobs, callback_name, wait=True, daemon=True): for thread in threads: thread.join() +def jobs_do(jobs, do): + if not isinstance(jobs, list): jobs = [jobs] + for job in jobs: + getattr(job, do)() + + def dict_find_key_by_value(data, value, default=None): result = [k for k, v in data.items() if v == value] return result.pop() if len(result) else default diff --git a/py12306/helpers/notification.py b/py12306/helpers/notification.py index f62ed43..c4e7c4d 100644 --- a/py12306/helpers/notification.py +++ b/py12306/helpers/notification.py @@ -1,6 +1,5 @@ import urllib -from py12306 import config from py12306.helpers.api import * from py12306.helpers.request import Request from py12306.log.common_log import CommonLog diff --git a/py12306/helpers/type.py b/py12306/helpers/type.py index 2cccc81..c480939 100644 --- a/py12306/helpers/type.py +++ b/py12306/helpers/type.py @@ -25,6 +25,7 @@ class OrderSeatType: '二等座': 'O', '软卧': 4, '硬卧': 3, + '动卧': 1, '硬座': 1, '无座': 1, } @@ -39,6 +40,7 @@ class SeatType: '二等座': 30, '软卧': 23, '硬卧': 28, + '动卧': 33, '硬座': 29, '无座': 26, } diff --git a/py12306/log/base.py b/py12306/log/base.py index d430562..58c4ace 100644 --- a/py12306/log/base.py +++ b/py12306/log/base.py @@ -1,5 +1,7 @@ import os import sys +import io +from contextlib import redirect_stdout from py12306.config import Config from py12306.helpers.func import * @@ -23,14 +25,23 @@ class BaseLog: return self @classmethod - def flush(cls, sep='\n', end='\n', file=None, exit=False): + def flush(cls, sep='\n', end='\n', file=None, exit=False, publish=True): + from py12306.cluster.cluster import Cluster self = cls() logs = self.get_logs() # 输出到文件 if file == None and Config().OUT_PUT_LOG_TO_FILE_ENABLED: # TODO 文件无法写入友好提示 - file = open(Config().OUT_PUT_LOG_TO_FILE_PATH, 'a') + file = open(Config().OUT_PUT_LOG_TO_FILE_PATH, 'a', encoding='utf-8') if not file: file = None - print(*logs, sep=sep, end=end, file=file) + # 输出日志到各个节点 + if publish and self.quick_log and Config().is_cluster_enabled() and Cluster().is_ready: # + f = io.StringIO() + with redirect_stdout(f): + print(*logs, sep=sep, end='' if end == '\n' else end) + out = f.getvalue() + Cluster().publish_log_message(out) + else: + print(*logs, sep=sep, end=end, file=file) self.empty_logs(logs) if exit: sys.exit() diff --git a/py12306/log/cluster_log.py b/py12306/log/cluster_log.py index 95e0596..0f2000c 100644 --- a/py12306/log/cluster_log.py +++ b/py12306/log/cluster_log.py @@ -10,6 +10,26 @@ class ClusterLog(BaseLog): quick_log = [] MESSAGE_JOIN_CLUSTER_SUCCESS = '# 节点 {} 成功加入到集群,当前节点列表 {} #' + MESSAGE_LEFT_CLUSTER = '# 节点 {} 已离开集群,当前节点列表 {} #' MESSAGE_NODE_ALREADY_IN_CLUSTER = '# 当前节点已存在于集群中,自动分配新的节点名称 {} #' + + MESSAGE_SUBSCRIBE_NOTIFICATION_PREFIX = '{} )' + MESSAGE_SUBSCRIBE_NOTIFICATION = MESSAGE_SUBSCRIBE_NOTIFICATION_PREFIX + '{}' + + MESSAGE_ASCENDING_MASTER_NODE = '# 已将 {} 提升为主节点,当前节点列表 {} #' + + MESSAGE_MASTER_DID_LOST = '# 主节点已退出,{} 秒后程序将自动退出 #' + + MESSAGE_MASTER_NODE_ALREADY_RUN = '# 启动失败,主节点 {} 已经在运行中 #' + MESSAGE_MASTER_NODE_NOT_FOUND = '# 启动失败,请先启动主节点 #' + + MESSAGE_NODE_BECOME_MASTER_AGAIN = '# 节点 {} 已启动,已自动成功主节点 #' + + + + @staticmethod + def get_print_nodes(nodes): + message = ['{}{}'.format('*' if val == '1' else '', key) for key, val in nodes.items()] + return '[ {} ]'.format(', '.join(message)) diff --git a/py12306/log/common_log.py b/py12306/log/common_log.py index 0819644..acd2ddd 100644 --- a/py12306/log/common_log.py +++ b/py12306/log/common_log.py @@ -42,7 +42,7 @@ class CommonLog(BaseLog): self.add_quick_log('日志已输出到文件中: {}'.format(Config().OUT_PUT_LOG_TO_FILE_PATH)) self.add_quick_log() - self.flush(file=False) + self.flush(file=False, publish=False) return self @classmethod diff --git a/py12306/log/query_log.py b/py12306/log/query_log.py index ff682a6..6e3fa38 100644 --- a/py12306/log/query_log.py +++ b/py12306/log/query_log.py @@ -4,7 +4,7 @@ import sys from os import path from py12306.config import Config -from py12306.cluster.cluster import Distributed +from py12306.cluster.cluster import Cluster from py12306.log.base import BaseLog from py12306.helpers.func import * @@ -36,10 +36,11 @@ class QueryLog(BaseLog): def __init__(self): super().__init__() self.data_path = Config().QUERY_DATA_DIR + 'status.json' - self.cluster = Distributed() - self.init_data() + self.cluster = Cluster() - def init_data(self): + @classmethod + def init_data(cls): + self = cls() # 获取上次记录 if Const.IS_TEST: return result = False @@ -57,16 +58,16 @@ class QueryLog(BaseLog): self.print_data_restored() def get_data_from_cluster(self): - query_count = self.cluster.session.get(Distributed.KEY_QUERY_COUNT, 0) - last_time = self.cluster.session.get(Distributed.KEY_QUERY_LAST_TIME, '') + query_count = self.cluster.session.get(Cluster.KEY_QUERY_COUNT, 0) + last_time = self.cluster.session.get(Cluster.KEY_QUERY_LAST_TIME, '') if query_count and last_time: return {'query_count': query_count, 'last_time': last_time} return False def refresh_data_of_cluster(self): return { - 'query_count': self.cluster.session.incr(Distributed.KEY_QUERY_COUNT), - 'last_time': self.cluster.session.set(Distributed.KEY_QUERY_LAST_TIME, time_now()), + 'query_count': self.cluster.session.incr(Cluster.KEY_QUERY_COUNT), + 'last_time': self.cluster.session.set(Cluster.KEY_QUERY_LAST_TIME, time_now()), } @classmethod @@ -144,7 +145,7 @@ class QueryLog(BaseLog): self.add_log('=== 正在进行第 {query_count} 次查询 === {time}'.format(query_count=self.data.get('query_count'), time=datetime.datetime.now())) if is_main_thread(): - self.flush() + self.flush(publish=False) return self @classmethod @@ -158,7 +159,7 @@ class QueryLog(BaseLog): self.add_quick_log('|=== 查询记录恢复成功 上次查询 {last_date} ===|'.format(last_date=self.data.get('last_time'))) self.add_quick_log('============================================================') self.add_quick_log('') - self.flush() + self.flush(publish=False) return self def refresh_data(self): diff --git a/py12306/log/user_log.py b/py12306/log/user_log.py index e289a15..479754d 100644 --- a/py12306/log/user_log.py +++ b/py12306/log/user_log.py @@ -24,6 +24,11 @@ class UserLog(BaseLog): MESSAGE_WAIT_USER_INIT_COMPLETE = '未找到可用账号或用户正在初始化,{} 秒后重试' + MESSAGE_USERS_DID_CHANGED = '\n用户信息已更新,正在重新加载...' + + MESSAGE_USER_BEING_DESTROY = '用户 {} 已退出' + MESSAGE_USER_COOKIE_NOT_FOUND_FROM_REMOTE = '用户 {} 状态加载中...' + def __init__(self): super().__init__() self.init_data() @@ -38,14 +43,14 @@ class UserLog(BaseLog): :return: """ self = cls() - self.add_log('# 发现 {} 个用户 #'.format(len(users))) + self.add_quick_log('# 发现 {} 个用户 #'.format(len(users))) self.flush() return self @classmethod def print_welcome_user(cls, user): self = cls() - self.add_log('# 欢迎回来,{} #'.format(user.get_name())) + self.add_quick_log('# 欢迎回来,{} #'.format(user.get_name())) self.flush() return self diff --git a/py12306/query/job.py b/py12306/query/job.py index 14583e0..2db2460 100644 --- a/py12306/query/job.py +++ b/py12306/query/job.py @@ -88,11 +88,11 @@ class Job: self.handle_response(response) self.safe_stay() if is_main_thread(): - QueryLog.flush(sep='\t\t') + QueryLog.flush(sep='\t\t', publish=False) if is_main_thread(): - QueryLog.add_quick_log('').flush() + QueryLog.add_quick_log('').flush(publish=False) else: - QueryLog.add_log('\n').flush(sep='\t\t') + QueryLog.add_log('\n').flush(sep='\t\t',publish=False) def query_by_date(self, date): """ diff --git a/py12306/query/query.py b/py12306/query/query.py index 22ff78c..6cfc1cc 100644 --- a/py12306/query/query.py +++ b/py12306/query/query.py @@ -1,5 +1,5 @@ from py12306.config import Config -from py12306.cluster.cluster import Distributed +from py12306.cluster.cluster import Cluster from py12306.app import app_available_check from py12306.helpers.func import * from py12306.helpers.request import Request @@ -23,7 +23,7 @@ class Query: def __init__(self): self.session = Request() - self.cluster = Distributed() + self.cluster = Cluster() self.update_query_interval() self.update_query_jobs() @@ -47,6 +47,7 @@ class Query: def start(self): # return # DEBUG self.init_jobs() + QueryLog.init_data() stay_second(1) while True: app_available_check() @@ -64,12 +65,12 @@ class Query: QueryLog.print_init_jobs(jobs=self.jobs) # def get_jobs_from_cluster(self): - # jobs = self.cluster.session.get_dict(Distributed.KEY_JOBS) + # jobs = self.cluster.session.get_dict(Cluster.KEY_JOBS) # return jobs # # def update_jobs_of_cluster(self): # if config.CLUSTER_ENABLED and config.NODE_IS_MASTER: - # return self.cluster.session.set_dict(Distributed.KEY_JOBS, self.query_jobs) + # return self.cluster.session.set_dict(Cluster.KEY_JOBS, self.query_jobs) # # def refresh_jobs(self): # if not config.CLUSTER_ENABLED: return diff --git a/py12306/user/job.py b/py12306/user/job.py index d596835..853acb1 100644 --- a/py12306/user/job.py +++ b/py12306/user/job.py @@ -1,5 +1,7 @@ import pickle +from os import path +from py12306.cluster.cluster import Cluster from py12306.helpers.api import * from py12306.app import * from py12306.helpers.auth_code import AuthCode @@ -10,8 +12,8 @@ from py12306.log.user_log import UserLog class UserJob: - heartbeat = 60 * 2 # 心跳保持时长 - heartbeat_interval = 5 + # heartbeat = 60 * 2 # 心跳保持时长 + heartbeat_interval = 60 * 2 key = None user_name = '' password = '' @@ -27,19 +29,29 @@ class UserJob: ticket_info_for_passenger_form = None order_request_dto = None - def __init__(self, info, user): - self.session = Request() - self.heartbeat = user.heartbeat + cluster = None + lock_init_user_time = 3 * 60 + cookie = False + def __init__(self, info): + self.cluster = Cluster() + self.init_data(info) + + def init_data(self, info): + self.session = Request() self.key = info.get('key') self.user_name = info.get('user_name') self.password = info.get('password') - self.user = user + self.update_user() + + def update_user(self): + from py12306.user.user import User + self.user = User() + self.heartbeat_interval = self.user.heartbeat + if not Const.IS_TEST: self.load_user() def run(self): # load user - if not Const.IS_TEST: - self.load_user() self.start() def start(self): @@ -49,23 +61,45 @@ class UserJob: """ while True: app_available_check() - self.check_heartbeat() + if Config().is_slave(): + self.load_user_from_remote() + else: + if Config().is_master() and not self.cookie: self.load_user_from_remote() # 主节点加载一次 Cookie + self.check_heartbeat() if Const.IS_TEST: return sleep(self.heartbeat_interval) def check_heartbeat(self): # 心跳检测 - if self.last_heartbeat and (time_now() - self.last_heartbeat).seconds < self.heartbeat: + if self.get_last_heartbeat() and (time_int() - self.get_last_heartbeat()) < self.heartbeat_interval: return True + # 只有主节点才能走到这 if self.is_first_time() or not self.check_user_is_login(): self.handle_login() self.is_ready = True - UserLog.add_quick_log(UserLog.MESSAGE_USER_HEARTBEAT_NORMAL.format(self.get_name(), self.heartbeat)).flush() + message = UserLog.MESSAGE_USER_HEARTBEAT_NORMAL.format(self.get_name(), self.heartbeat_interval) + if not Config.is_cluster_enabled(): + UserLog.add_quick_log(message).flush() + else: + self.cluster.publish_log_message(message) + self.set_last_heartbeat() + + def get_last_heartbeat(self): + if Config().is_cluster_enabled(): + return int(self.cluster.session.get(Cluster.KEY_USER_LAST_HEARTBEAT, 0)) + + return self.last_heartbeat + + def set_last_heartbeat(self): + if Config().is_cluster_enabled(): + return self.cluster.session.set(Cluster.KEY_USER_LAST_HEARTBEAT, time_int()) self.last_heartbeat = time_now() # def init_cookies def is_first_time(self): + if Config().is_cluster_enabled(): + return not self.cluster.get_user_cookie(self.key) return not path.exists(self.get_cookie_path()) def handle_login(self): @@ -110,11 +144,10 @@ class UserJob: def check_user_is_login(self): response = self.session.get(API_USER_CHECK.get('url')) - is_login = response.json().get('data').get('flag', False) + is_login = response.json().get('data.flag', False) if is_login: self.save_user() - self.get_user_info() # 检测应该是不会维持状态,这里再请求下个人中心看有没有有 - + self.get_user_info() # 检测应该是不会维持状态,这里再请求下个人中心看有没有有 return is_login @@ -149,7 +182,7 @@ class UserJob: pass def get_cookie_path(self): - return config.USER_DATA_DIR + self.user_name + '.cookie' + return Config().USER_DATA_DIR + self.user_name + '.cookie' def update_user_info(self, info): self.info = {**self.info, **info} @@ -158,6 +191,8 @@ class UserJob: return self.info.get('user_name') def save_user(self): + if Config().is_cluster_enabled(): + return self.cluster.set_user_cookie(self.key, self.session.cookies) with open(self.get_cookie_path(), 'wb') as f: pickle.dump(self.session.cookies, f) @@ -166,7 +201,7 @@ class UserJob: 恢复用户成功 :return: """ - UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER.format(self.user_name)) + UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER.format(self.user_name)).flush() if self.check_user_is_login() and self.get_user_info(): UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_SUCCESS.format(self.user_name)).flush() UserLog.print_welcome_user(self) @@ -183,17 +218,42 @@ class UserJob: return None def load_user(self): + if Config().is_cluster_enabled(): return cookie_path = self.get_cookie_path() + if path.exists(cookie_path): with open(self.get_cookie_path(), 'rb') as f: - self.session.cookies.update(pickle.load(f)) + cookie = pickle.load(f) + self.cookie = True + self.session.cookies.update(cookie) self.did_loaded_user() return True return None + def load_user_from_remote(self): + cookie = self.cluster.get_user_cookie(self.key) + if not cookie and Config().is_slave(): + while True: # 子节点只能取 + UserLog.add_quick_log(UserLog.MESSAGE_USER_COOKIE_NOT_FOUND_FROM_REMOTE.format(self.user_name)).flush() + stay_second(self.retry_time) + return self.load_user_from_remote() + self.session.cookies.update(cookie) + if not self.cookie: # 第一次加载 + self.cookie = True + self.did_loaded_user() + return True + def check_is_ready(self): return self.is_ready + def destroy(self): + """ + 退出用户 + :return: + """ + UserLog.add_quick_log(UserLog.MESSAGE_USER_BEING_DESTROY.format(self.user_name)).flush() + sys.exit() + def get_user_passengers(self): if self.passengers: return self.passengers response = self.session.post(API_USER_PASSENGERS) diff --git a/py12306/user/user.py b/py12306/user/user.py index 57b5b1f..89aca10 100644 --- a/py12306/user/user.py +++ b/py12306/user/user.py @@ -1,4 +1,5 @@ from py12306.app import * +from py12306.cluster.cluster import Cluster from py12306.helpers.func import * from py12306.log.user_log import UserLog from py12306.user.job import UserJob @@ -8,11 +9,26 @@ from py12306.user.job import UserJob class User: heartbeat = 60 * 2 users = [] + user_accounts = [] retry_time = 3 + cluster = None def __init__(self): - self.interval = config.USER_HEARTBEAT_INTERVAL + self.cluster = Cluster() + self.heartbeat = Config().USER_HEARTBEAT_INTERVAL + self.update_interval() + self.update_user_accounts() + + def update_user_accounts(self, auto=False, old=None): + self.user_accounts = Config().USER_ACCOUNTS + if auto: + UserLog.add_quick_log(UserLog.MESSAGE_USERS_DID_CHANGED).flush() + self.refresh_users(old) + + def update_interval(self, auto=False): + self.interval = Config().USER_HEARTBEAT_INTERVAL + if auto: jobs_do(self.users, 'update_user') @classmethod def run(cls): @@ -28,17 +44,32 @@ class User: create_thread_and_run(jobs=self.users, callback_name='run', wait=Const.IS_TEST) def init_users(self): - accounts = config.USER_ACCOUNTS - for account in accounts: - user = UserJob(info=account, user=self) - self.users.append(user) + for account in self.user_accounts: + self.init_user(account) + + def init_user(self, info): + user = UserJob(info=info) + self.users.append(user) + + def refresh_users(self, old): + for account in self.user_accounts: + key = account.get('key') + old_account = array_dict_find_by_key_value(old, 'key', key) + if old_account and account != old_account: + user = self.get_user(key) + user.init_data(account) + elif not old_account: + self.init_user(account) + for account in old: # 退出已删除的用户 + if not array_dict_find_by_key_value(self.user_accounts, 'key', account.get('key')): + user = self.get_user(account.get('key')) + user.destroy() @classmethod - def get_user(cls, key): + def get_user(cls, key) -> UserJob: self = cls() for user in self.users: - if user.key == key: - return user + if user.key == key: return user return None @classmethod