diff --git a/main.py b/main.py index a7918cd..73e2ecf 100644 --- a/main.py +++ b/main.py @@ -20,7 +20,8 @@ def main(): if not Const.IS_TEST: while True: sleep(10000) - + else: + if Config().is_cluster_enabled(): stay_second(5) # 等待接受完通知 CommonLog.print_test_complete() diff --git a/py12306/cluster/cluster.py b/py12306/cluster/cluster.py index 469737a..b2947af 100644 --- a/py12306/cluster/cluster.py +++ b/py12306/cluster/cluster.py @@ -30,10 +30,6 @@ class Cluster(): KEY_LOCK_DO_ORDER = 'lock_do_order' # 订单锁 lock_do_order_time = 60 * 1 # 订单锁超时时间 - # 事件 - KEY_EVENT_JOB_DESTROY = 'job_destroy' - KEY_EVENT_USER_LOADED = 'user_loaded' - KEY_MASTER = 1 KEY_SLAVE = 0 @@ -104,7 +100,7 @@ class Cluster(): node_name = node_name if node_name else self.node_name self.session.hdel(self.KEY_NODES, node_name) message = ClusterLog.MESSAGE_LEFT_CLUSTER.format(node_name, ClusterLog.get_print_nodes(self.get_nodes())) - self.publish_log_message(message) + self.publish_log_message(message, node_name) def make_nodes_as_slave(self): """ @@ -114,12 +110,13 @@ class Cluster(): for node in self.nodes: self.session.hset(self.KEY_NODES, node, self.KEY_SLAVE) - def publish_log_message(self, message): + def publish_log_message(self, message, node_name=None): """ 发布订阅消息 :return: """ - message = ClusterLog.MESSAGE_SUBSCRIBE_NOTIFICATION.format(self.node_name, message) + node_name = node_name if node_name else self.node_name + message = ClusterLog.MESSAGE_SUBSCRIBE_NOTIFICATION.format(node_name, message) self.session.publish(self.KEY_CHANNEL_LOG, message) def publish_event(self, name, data={}): @@ -216,16 +213,10 @@ class Cluster(): result = json.loads(message.get('data', {})) event_name = result.get('event') data = result.get('data') - - from py12306.query.query import Query - from py12306.user.user import User - if event_name == self.KEY_EVENT_JOB_DESTROY: # 停止查询任务 - job = Query.job_by_name(data['name']) - if job: job.destroy() - elif event_name == self.KEY_EVENT_USER_LOADED: # 用户初始化完成 - query_job = Query.job_by_account_id(data['key']) - if query_job: - create_thread_and_run(query_job, 'check_passengers', Const.IS_TEST) # 检查乘客信息 防止提交订单时才检查 + from py12306.helpers.event import Event + method = getattr(Event(), event_name) + if method: + create_thread_and_run(Event(), event_name, Const.IS_TEST, kwargs={'data': data, 'callback': True}) def get_lock(self, key, timeout=1, info={}): timeout = int(time.time()) + timeout diff --git a/py12306/config.py b/py12306/config.py index 103c7fb..cb38dc8 100644 --- a/py12306/config.py +++ b/py12306/config.py @@ -79,14 +79,14 @@ class Config: def start(self): self.save_to_remote() - create_thread_and_run(self, 'refresh_configs', wait=False) + create_thread_and_run(self, 'refresh_configs', wait=Const.IS_TEST) def refresh_configs(self, once=False): if not self.is_cluster_enabled(): return while True: remote_configs = self.get_remote_config() self.update_configs_from_remote(remote_configs, once) - if once: break + if once or Const.IS_TEST: return stay_second(self.retry_time) def get_remote_config(self): diff --git a/py12306/helpers/event.py b/py12306/helpers/event.py new file mode 100644 index 0000000..595ced7 --- /dev/null +++ b/py12306/helpers/event.py @@ -0,0 +1,45 @@ +from py12306.helpers.func import * +from py12306.config import Config + + +@singleton +class Event(): + """ + 处理事件 + """ + # 事件 + KEY_JOB_DESTROY = 'job_destroy' + KEY_USER_JOB_DESTROY = 'user_job_destroy' + KEY_USER_LOADED = 'user_loaded' + cluster = None + + def __init__(self): + from py12306.cluster.cluster import Cluster + self.cluster = Cluster() + + def job_destroy(self, data={}, callback=False): # 停止查询任务 + from py12306.query.query import Query + if Config().is_cluster_enabled() and not callback: + return self.cluster.publish_event(self.KEY_JOB_DESTROY, data) # 通知其它节点退出 + + job = Query.job_by_name(data.get('name')) + if job: + job.destroy() + + def user_loaded(self, data={}, callback=False): # 用户初始化完成 + from py12306.query.query import Query + if Config().is_cluster_enabled() and not callback: + return self.cluster.publish_event(self.KEY_USER_LOADED, data) # 通知其它节点退出 + + query_job = Query.job_by_account_key(data.get('key')) + if query_job and Config().is_master(): + create_thread_and_run(query_job, 'check_passengers', Const.IS_TEST) # 检查乘客信息 防止提交订单时才检查 + + def user_job_destroy(self, data={}, callback=False): + from py12306.user.user import User + if Config().is_cluster_enabled() and not callback: + return self.cluster.publish_event(self.KEY_JOB_DESTROY, data) # 通知其它节点退出 + + user = User.get_user(data.get('key')) + if user: + user.destroy() diff --git a/py12306/helpers/func.py b/py12306/helpers/func.py index 460e06e..42c05cf 100644 --- a/py12306/helpers/func.py +++ b/py12306/helpers/func.py @@ -95,11 +95,11 @@ def time_int(): return int(time.time()) -def create_thread_and_run(jobs, callback_name, wait=True, daemon=True, args=()): +def create_thread_and_run(jobs, callback_name, wait=True, daemon=True, args=(), kwargs={}): threads = [] if not isinstance(jobs, list): jobs = [jobs] for job in jobs: - thread = threading.Thread(target=getattr(job, callback_name), args=args) + thread = threading.Thread(target=getattr(job, callback_name), args=args, kwargs=kwargs) thread.setDaemon(daemon) thread.start() threads.append(thread) diff --git a/py12306/log/query_log.py b/py12306/log/query_log.py index 56e7c35..a73634c 100644 --- a/py12306/log/query_log.py +++ b/py12306/log/query_log.py @@ -36,6 +36,7 @@ class QueryLog(BaseLog): MESSAGE_QUERY_JOB_BEING_DESTROY = '当前查询任务 {} 已结束' MESSAGE_INIT_PASSENGERS_SUCCESS = '初始化乘客成功' + MESSAGE_CHECK_PASSENGERS = '正在验证乘客信息' cluster = None diff --git a/py12306/query/job.py b/py12306/query/job.py index c174c74..9aa19b2 100644 --- a/py12306/query/job.py +++ b/py12306/query/job.py @@ -11,6 +11,7 @@ from py12306.helpers.func import * from py12306.log.user_log import UserLog from py12306.order.order import Order from py12306.user.user import User +from py12306.helpers.event import Event class Job: @@ -184,8 +185,7 @@ class Job: # 任务已成功 通知集群停止任务 if order_result: - self.cluster.publish_event(Cluster.KEY_EVENT_JOB_DESTROY, {'name': self.job_name}) - self.destroy() + Event().job_destroy({'name': self.job_name}) def do_order(self, user): self.check_passengers() @@ -248,8 +248,8 @@ class Job: def check_passengers(self): if not self.passengers: + QueryLog.add_quick_log(QueryLog.MESSAGE_CHECK_PASSENGERS).flush() self.set_passengers(User.get_passenger_for_members(self.members, self.account_key)) - QueryLog.add_quick_log(QueryLog.MESSAGE_INIT_PASSENGERS_SUCCESS) return True def refresh_station(self, station): diff --git a/py12306/query/query.py b/py12306/query/query.py index cc10f04..42d3a3e 100644 --- a/py12306/query/query.py +++ b/py12306/query/query.py @@ -54,7 +54,10 @@ class Query: if Config().QUERY_JOB_THREAD_ENABLED: # 多线程 create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST) else: - while True: jobs_do(self.jobs, 'run') + while True: + jobs_do(self.jobs, 'run') + if Const.IS_TEST: return + # while True: # app_available_check() # if Config().QUERY_JOB_THREAD_ENABLED: # 多线程 @@ -83,9 +86,9 @@ class Query: return objects_find_object_by_key_value(self.jobs, 'job_name', name) @classmethod - def job_by_account_id(cls, account_id) -> Job: + def job_by_account_key(cls, account_key) -> Job: self = cls() - return objects_find_object_by_key_value(self.jobs, 'account_id', account_id) + return objects_find_object_by_key_value(self.jobs, 'account_key', account_key) # def get_jobs_from_cluster(self): diff --git a/py12306/user/job.py b/py12306/user/job.py index 6d56e6c..1850ef0 100644 --- a/py12306/user/job.py +++ b/py12306/user/job.py @@ -7,6 +7,7 @@ from py12306.cluster.cluster import Cluster from py12306.helpers.api import * from py12306.app import * from py12306.helpers.auth_code import AuthCode +from py12306.helpers.event import Event from py12306.helpers.func import * from py12306.helpers.request import Request from py12306.helpers.type import UserType @@ -213,7 +214,8 @@ class UserJob: UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER.format(self.user_name)).flush() if self.check_user_is_login() and self.get_user_info(): UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_SUCCESS.format(self.user_name)).flush() - self.cluster.publish_event(Cluster.KEY_EVENT_USER_LOADED, {'key': self.key}) # 发布通知 + Event().user_loaded({'key': self.key}) # 发布通知 + self.is_ready = True UserLog.print_welcome_user(self) else: UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_BUT_EXPIRED).flush() diff --git a/py12306/user/user.py b/py12306/user/user.py index 6a80894..33667b9 100644 --- a/py12306/user/user.py +++ b/py12306/user/user.py @@ -1,5 +1,6 @@ from py12306.app import * from py12306.cluster.cluster import Cluster +from py12306.helpers.event import Event from py12306.helpers.func import * from py12306.log.user_log import UserLog from py12306.user.job import UserJob @@ -65,8 +66,7 @@ class User: for account in old: # 退出已删除的用户 if not array_dict_find_by_key_value(self.user_accounts, 'key', account.get('key')): - user = self.get_user(account.get('key')) - user.destroy() + Event().user_job_destroy({'key': account.get('key')}) @classmethod def get_user(cls, key) -> UserJob: