From a87f10c884bb8fbb06f37fc148c53886da88a2cb Mon Sep 17 00:00:00 2001 From: Jalin Date: Fri, 11 Jan 2019 01:42:04 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=8A=A8=E6=80=81=E5=8A=A0?= =?UTF-8?q?=E8=BD=BD=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 ++ py12306/app.py | 2 ++ py12306/config.py | 33 ++++++++++++++++++++++++--- py12306/helpers/func.py | 17 ++++++++++++++ py12306/log/base.py | 4 ++-- py12306/log/common_log.py | 2 ++ py12306/log/query_log.py | 6 ++--- py12306/query/job.py | 24 ++++++++++++-------- py12306/query/query.py | 48 ++++++++++++++++++++++++++++++++------- py12306/user/job.py | 6 ++--- py12306/user/user.py | 2 -- 11 files changed, 114 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 2969967..c9ae29e 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,8 @@ docker run -d -v $(pwd):/config -v py12306:/data pjialin/py12306 ## 更新 ### 19-01-10 * 支持分布式集群 +### 19-01-11 +* 配置文件支持动态修改 ## 下单成功截图 ![下单成功图片](./data/images/order_success.png) diff --git a/py12306/app.py b/py12306/app.py index 628fea4..89986f1 100644 --- a/py12306/app.py +++ b/py12306/app.py @@ -11,6 +11,8 @@ from py12306.log.order_log import OrderLog def app_available_check(): # return True # Debug + if Config().IS_DEBUG: + return True now = time_now() if now.hour >= 23 or now.hour < 6: CommonLog.add_quick_log(CommonLog.MESSAGE_12306_IS_CLOSED.format(time_now())).flush() diff --git a/py12306/config.py b/py12306/config.py index 2fa4022..8789592 100644 --- a/py12306/config.py +++ b/py12306/config.py @@ -45,7 +45,7 @@ class Config: NOTIFICATION_API_APP_CODE = '' # 集群配置 - CLUSTER_ENABLED = 1 + CLUSTER_ENABLED = 0 NODE_SLAVE_CAN_BE_MASTER = 1 NODE_IS_MASTER = 1 NODE_NAME = '' @@ -55,6 +55,7 @@ class Config: envs = [] retry_time = 5 + last_modify_time = 0 disallow_update_cofigs = [ 'CLUSTER_ENABLED', @@ -67,8 +68,11 @@ class Config: def __init__(self): self.init_envs() + self.last_modify_time = get_file_modify_time(self.CONFIG_FILE) if Config().is_slave(): self.refresh_configs(True) + else: + create_thread_and_run(self, 'watch_file_change', False) @classmethod def run(cls): @@ -109,6 +113,24 @@ class Config: for key, value in envs: setattr(self, key, value) + def watch_file_change(self): + """ + 监听配置文件修改 + :return: + """ + if Config().is_slave(): return + from py12306.log.common_log import CommonLog + while True: + value = get_file_modify_time(self.CONFIG_FILE) + if value > self.last_modify_time: + self.last_modify_time = value + CommonLog.add_quick_log(CommonLog.MESSAGE_CONFIG_FILE_DID_CHANGED).flush() + envs = EnvLoader.load_with_file(self.CONFIG_FILE) + self.update_configs_from_remote(envs) + if Config().is_master(): # 保存配置 + self.save_to_remote() + stay_second(self.retry_time) + def update_configs_from_remote(self, envs, first=False): if envs == self.envs: return from py12306.query.query import Query @@ -148,9 +170,12 @@ class Config: # return members -class EnvLoader(): +class EnvLoader: envs = [] + def __init__(self): + self.envs = [] # 不是单例不初始化怎么还会有值 + @classmethod def load_with_file(cls, file): self = cls() @@ -161,4 +186,6 @@ class EnvLoader(): return self.envs def __setattr__(self, key, value): - self.envs.append(([key, value])) + super().__setattr__(key, value) + if re.search(r'^[A-Z]+_', key): + self.envs.append(([key, value])) diff --git a/py12306/helpers/func.py b/py12306/helpers/func.py index 42c05cf..82ae72d 100644 --- a/py12306/helpers/func.py +++ b/py12306/helpers/func.py @@ -1,4 +1,7 @@ import datetime +import hashlib +import json +import os import random import threading import functools @@ -87,6 +90,16 @@ def time_now(): return datetime.datetime.now() +def timestamp_to_time(timestamp): + time_struct = time.localtime(timestamp) + return time.strftime('%Y-%m-%d %H:%M:%S', time_struct) + + +def get_file_modify_time(filePath): + timestamp = os.path.getmtime(filePath) + return timestamp_to_time(timestamp) + + def str_to_time(str): return datetime.datetime.strptime(str, '%Y-%m-%d %H:%M:%S.%f') @@ -160,6 +173,10 @@ def available_value(value): return str(value) +def md5(value): + return hashlib.md5(json.dumps(value).encode()).hexdigest() + + @singleton class Const: IS_TEST = False diff --git a/py12306/log/base.py b/py12306/log/base.py index 38ead41..1ac355d 100644 --- a/py12306/log/base.py +++ b/py12306/log/base.py @@ -30,7 +30,7 @@ class BaseLog: self = cls() logs = self.get_logs() # 输出到文件 - if file == None and Config().OUT_PUT_LOG_TO_FILE_ENABLED: # TODO 文件无法写入友好提示 + if file == None and Config().OUT_PUT_LOG_TO_FILE_ENABLED and not Const.IS_TEST: # TODO 文件无法写入友好提示 file = open(Config().OUT_PUT_LOG_TO_FILE_PATH, 'a', encoding='utf-8') if not file: file = None # 输出日志到各个节点 @@ -56,7 +56,7 @@ class BaseLog: logs = self.thread_logs.get(current_thread_id()) return logs - def empty_logs(self, logs): + def empty_logs(self, logs=None): if self.quick_log: self.quick_log = [] else: diff --git a/py12306/log/common_log.py b/py12306/log/common_log.py index 88f60e5..f0fa2d1 100644 --- a/py12306/log/common_log.py +++ b/py12306/log/common_log.py @@ -23,6 +23,8 @@ class CommonLog(BaseLog): MESSAGE_TEST_SEND_VOICE_CODE = '正在测试发送语音验证码...' + MESSAGE_CONFIG_FILE_DID_CHANGED = '配置文件已修改,正在重新加载中' + def __init__(self): super().__init__() self.init_data() diff --git a/py12306/log/query_log.py b/py12306/log/query_log.py index 0aa070a..5bf96ec 100644 --- a/py12306/log/query_log.py +++ b/py12306/log/query_log.py @@ -52,7 +52,7 @@ class QueryLog(BaseLog): def init_data(cls): self = cls() # 获取上次记录 - if Const.IS_TEST: return + # if Const.IS_TEST: return result = False if not Config.is_cluster_enabled() and path.exists(self.data_path): with open(self.data_path, encoding='utf-8') as f: @@ -152,10 +152,10 @@ class QueryLog(BaseLog): @classmethod def print_job_start(cls, job_name): self = cls() - self.refresh_data() self.add_log( - '=== 正在进行第 {query_count} 次查询 {job_name} === {time}'.format(query_count=self.data.get('query_count'), + '=== 正在进行第 {query_count} 次查询 {job_name} === {time}'.format(query_count=self.data.get('query_count') + 1, job_name=job_name, time=datetime.datetime.now())) + self.refresh_data() if is_main_thread(): self.flush(publish=False) return self diff --git a/py12306/query/job.py b/py12306/query/job.py index 71bcee4..a0ef3b5 100644 --- a/py12306/query/job.py +++ b/py12306/query/job.py @@ -18,6 +18,7 @@ class Job: """ 查询任务 """ + id = 0 is_alive = True job_name = None left_dates = [] @@ -56,15 +57,18 @@ class Job: def __init__(self, info, query): self.cluster = Cluster() + self.query = query + self.init_data(info) + self.update_interval() + + def init_data(self, info): + self.id = md5(info) self.left_dates = info.get('left_dates') - # 多车站已放在下面处理 - # self.left_station = info.get('stations').get('left') - # self.arrive_station = info.get('stations').get('arrive') - # self.left_station_code = Station.get_station_key_by_name(self.left_station) - # self.arrive_station_code = Station.get_station_key_by_name(self.arrive_station) self.stations = info.get('stations') self.stations = [self.stations] if isinstance(self.stations, dict) else self.stations - self.job_name = info.get('job_name', '{} -> {}'.format(self.stations[0]['left'], self.stations[0]['arrive'])) + if not self.job_name: # name 不能被修改 + self.job_name = info.get('job_name', + '{} -> {}'.format(self.stations[0]['left'], self.stations[0]['arrive'])) self.account_key = str(info.get('account_key')) self.allow_seats = info.get('seats') @@ -74,8 +78,8 @@ class Job: self.member_num_take = self.member_num self.allow_less_member = bool(info.get('allow_less_member')) - self.interval = query.interval - self.query = query + def update_interval(self): + self.interval = self.query.interval def run(self): self.start() @@ -100,7 +104,7 @@ class Job: self.safe_stay() if is_main_thread(): QueryLog.flush(sep='\t\t', publish=False) - if is_main_thread(): + if not Config().QUERY_JOB_THREAD_ENABLED: QueryLog.add_quick_log('').flush(publish=False) break else: @@ -233,9 +237,9 @@ class Job: :return: """ from py12306.query.query import Query + self.is_alive = False QueryLog.add_quick_log(QueryLog.MESSAGE_QUERY_JOB_BEING_DESTROY.format(self.job_name)).flush() # sys.exit(1) # 无法退出线程... - self.is_alive = False # 手动移出jobs 防止单线程死循环 index = Query().jobs.index(self) Query().jobs.pop(index) diff --git a/py12306/query/query.py b/py12306/query/query.py index 7660b19..9039f28 100644 --- a/py12306/query/query.py +++ b/py12306/query/query.py @@ -21,6 +21,9 @@ class Query: interval = {} cluster = None + is_in_thread = False + retry_time = 3 + def __init__(self): self.session = Request() self.cluster = Cluster() @@ -29,13 +32,14 @@ class Query: def update_query_interval(self, auto=False): self.interval = init_interval_by_number(Config().QUERY_INTERVAL) + if auto: + jobs_do(self.jobs, 'update_interval') def update_query_jobs(self, auto=False): self.query_jobs = Config().QUERY_JOBS if auto: - self.jobs = [] QueryLog.add_quick_log(QueryLog.MESSAGE_JOBS_DID_CHANGED).flush() - self.init_jobs() + self.refresh_jobs() @classmethod def run(cls): @@ -50,11 +54,15 @@ class Query: QueryLog.init_data() stay_second(3) # 多线程 - if Config().QUERY_JOB_THREAD_ENABLED: # 多线程 - create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST) - else: - while True: + while True: + if Config().QUERY_JOB_THREAD_ENABLED: # 多线程 + if not self.is_in_thread: + self.is_in_thread = True + create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST) + stay_second(self.retry_time) + else: if not self.jobs: break + self.is_in_thread = False jobs_do(self.jobs, 'run') if Const.IS_TEST: return @@ -67,12 +75,36 @@ class Query: # if Const.IS_TEST: return # self.refresh_jobs() # 刷新任务 + def refresh_jobs(self): + """ + 更新任务 + :return: + """ + allow_jobs = [] + for job in self.query_jobs: + id = md5(job) + job_ins = objects_find_object_by_key_value(self.jobs, 'id', id) # [1 ,2] + if not job_ins: + job_ins = self.init_job(job) + if Config().QUERY_JOB_THREAD_ENABLED: # 多线程重新添加 + create_thread_and_run(jobs=job_ins, callback_name='run', wait=Const.IS_TEST) + allow_jobs.append(job_ins) + + for job in self.jobs: # 退出已删除 Job + if job not in allow_jobs: job.destroy() + + QueryLog.print_init_jobs(jobs=self.jobs) + def init_jobs(self): for job in self.query_jobs: - job = Job(info=job, query=self) - self.jobs.append(job) + self.init_job(job) QueryLog.print_init_jobs(jobs=self.jobs) + def init_job(self, job): + job = Job(info=job, query=self) + self.jobs.append(job) + return job + @classmethod def job_by_name(cls, name) -> Job: self = cls() diff --git a/py12306/user/job.py b/py12306/user/job.py index 25589c1..e692dcf 100644 --- a/py12306/user/job.py +++ b/py12306/user/job.py @@ -18,7 +18,6 @@ from py12306.log.user_log import UserLog class UserJob: # heartbeat = 60 * 2 # 心跳保持时长 is_alive = True - heartbeat_interval = 60 * 2 check_interval = 5 key = None user_name = '' @@ -53,7 +52,6 @@ class UserJob: def update_user(self): from py12306.user.user import User self.user = User() - self.heartbeat_interval = self.user.heartbeat # if not Const.IS_TEST: 测试模块下也可以从文件中加载用户 self.load_user() @@ -79,7 +77,7 @@ class UserJob: def check_heartbeat(self): # 心跳检测 - if self.get_last_heartbeat() and (time_int() - self.get_last_heartbeat()) < self.heartbeat_interval: + if self.get_last_heartbeat() and (time_int() - self.get_last_heartbeat()) < Config().USER_HEARTBEAT_INTERVAL: return True # 只有主节点才能走到这 if self.is_first_time() or not self.check_user_is_login(): @@ -87,7 +85,7 @@ class UserJob: if not self.handle_login(): return self.is_ready = True - message = UserLog.MESSAGE_USER_HEARTBEAT_NORMAL.format(self.get_name(), self.heartbeat_interval) + message = UserLog.MESSAGE_USER_HEARTBEAT_NORMAL.format(self.get_name(), Config().USER_HEARTBEAT_INTERVAL) if not Config.is_cluster_enabled(): UserLog.add_quick_log(message).flush() else: diff --git a/py12306/user/user.py b/py12306/user/user.py index 26cea5b..4d749f7 100644 --- a/py12306/user/user.py +++ b/py12306/user/user.py @@ -8,7 +8,6 @@ from py12306.user.job import UserJob @singleton class User: - heartbeat = 60 * 2 users = [] user_accounts = [] @@ -17,7 +16,6 @@ class User: def __init__(self): self.cluster = Cluster() - self.heartbeat = Config().USER_HEARTBEAT_INTERVAL self.update_interval() self.update_user_accounts()