完成用户保持

This commit is contained in:
Jalin
2019-01-09 21:01:12 +08:00
parent 7d5b8e2b80
commit 668c4ae8ce
20 changed files with 438 additions and 106 deletions

1
.gitignore vendored
View File

@@ -3,3 +3,4 @@
venv venv
__pycache__ __pycache__
env.py env.py
env.slave.py

View File

@@ -70,7 +70,7 @@ QUERY_JOBS = [
], ],
'allow_less_member': 0, # 是否允许余票不足时提交部分乘客 'allow_less_member': 0, # 是否允许余票不足时提交部分乘客
'seats': [ # 筛选座位 有先后顺序 :Array 'seats': [ # 筛选座位 有先后顺序 :Array
# 可用值: 特等座, 商务座, 一等座, 二等座, 软卧, 硬卧, 硬座, 无座 # 可用值: 特等座, 商务座, 一等座, 二等座, 软卧, 硬卧, 动卧, 硬座, 无座
'硬卧', '硬卧',
'硬座' '硬座'
], ],

21
main.py
View File

@@ -4,15 +4,18 @@ import sys
from py12306.app import * from py12306.app import *
from py12306.log.common_log import CommonLog from py12306.log.common_log import CommonLog
from py12306.query.query import Query from py12306.query.query import Query
from py12306.user.user import User
def main(): def main():
if '--test' in sys.argv or '-t' in sys.argv: test() load_argvs()
CommonLog.print_welcome()
App.run() App.run()
CommonLog.print_welcome().print_configs() CommonLog.print_configs()
App.did_start() App.did_start()
# App.run_check()
# User.run() App.run_check()
User.run()
Query.run() Query.run()
if not Const.IS_TEST: if not Const.IS_TEST:
while True: while True:
@@ -38,5 +41,15 @@ def test():
pass pass
def load_argvs():
if '--test' in sys.argv or '-t' in sys.argv: test()
config_index = None
if '--config' in sys.argv: config_index = sys.argv.index('--config')
if '-c' in sys.argv: config_index = sys.argv.index('-c')
if config_index:
Config.CONFIG_FILE = sys.argv[config_index + 1:config_index + 2].pop()
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -40,13 +40,14 @@ class App:
@classmethod @classmethod
def did_start(cls): def did_start(cls):
self = cls() self = cls()
if Config.is_cluster_enabled(): # if Config.is_cluster_enabled():
from py12306.cluster.cluster import Distributed # from py12306.cluster.cluster import Cluster
Distributed().join_cluster() # Cluster().run()
def init_class(self): def init_class(self):
from py12306.cluster.cluster import Distributed from py12306.cluster.cluster import Cluster
if Config.is_cluster_enabled(): Distributed() if Config.is_cluster_enabled():
Cluster().run()
def handler_exit(self, *args, **kwargs): def handler_exit(self, *args, **kwargs):
""" """
@@ -56,8 +57,8 @@ class App:
:return: :return:
""" """
if Config.is_cluster_enabled(): if Config.is_cluster_enabled():
from py12306.cluster.cluster import Distributed from py12306.cluster.cluster import Cluster
Distributed().left_cluster() Cluster().left_cluster()
sys.exit() sys.exit()

View File

@@ -1,3 +1,8 @@
import os
import pickle
import sys
import time
import redis import redis
from redis.client import PubSub from redis.client import PubSub
@@ -8,60 +13,210 @@ from py12306.log.cluster_log import ClusterLog
@singleton @singleton
class Distributed(): class Cluster():
KEY_QUERY_COUNT = 'query_count' KEY_QUERY_COUNT = 'query_count'
KEY_QUERY_LAST_TIME = 'query_last_time' KEY_QUERY_LAST_TIME = 'query_last_time'
KEY_CONFIGS = 'configs' KEY_CONFIGS = 'configs'
KEY_NODES = 'nodes' KEY_NODES = 'nodes'
KEY_CHANNEL_LOG = 'channel_log' KEY_CHANNEL_LOG = 'channel_log'
KEY_USER_COOKIES = 'user_cookies'
KEY_USER_LAST_HEARTBEAT = 'user_last_heartbeat'
KEY_NODES_ALIVE = 'nodes_alive'
KEY_LOCK_INIT_USER = 'lock_init_user'
# if self.cluster.get_lock(Cluster.KEY_LOCK_INIT_USER, self.lock_init_user_time): # TODO 未判断 失败重试
KEY_MASTER = 1
KEY_SLAVE = 0
session: Redis = None session: Redis = None
pubsub: PubSub = None pubsub: PubSub = None
refresh_channel_time = 0.5 refresh_channel_time = 0.5
retry_time = 2 retry_time = 2
keep_alive_time = 3 # 报告存活间隔
lost_alive_time = keep_alive_time * 2
locks = []
nodes = {} nodes = {}
node_name = None
is_ready = False
is_master = False
def __init__(self, *args): def __init__(self, *args):
if Config.is_cluster_enabled():
self.session = Redis() self.session = Redis()
self.pubsub = self.session.pubsub()
self.pubsub.subscribe(self.KEY_CHANNEL_LOG)
create_thread_and_run(self, 'refresh_data', wait=False)
create_thread_and_run(self, 'subscribe', wait=False)
return self return self
@classmethod
def run(cls):
self = cls()
self.start()
def start(self):
self.pubsub = self.session.pubsub()
self.pubsub.subscribe(self.KEY_CHANNEL_LOG)
create_thread_and_run(self, 'subscribe', wait=False)
self.is_ready = True
self.get_nodes() # 提前获取节点列表
self.check_nodes() # 防止 节点列表未清空
self.join_cluster()
create_thread_and_run(self, 'keep_alive', wait=False)
create_thread_and_run(self, 'refresh_data', wait=False)
def join_cluster(self): def join_cluster(self):
node_name = Config().NODE_NAME """
加入到集群
:return:
"""
self.node_name = node_name = Config().NODE_NAME
if Config().NODE_IS_MASTER:
if self.node_name in self.nodes: # 重复运行主节点
ClusterLog.add_quick_log(ClusterLog.MESSAGE_MASTER_NODE_ALREADY_RUN.format(node_name)).flush(
publish=False)
os._exit(1)
if self.have_master(): # 子节点提升为主节点情况,交回控制
message = ClusterLog.MESSAGE_NODE_BECOME_MASTER_AGAIN.format(node_name)
self.publish_log_message(message)
self.make_nodes_as_slave()
elif not self.have_master(): # 只能通过主节点启动
ClusterLog.add_quick_log(ClusterLog.MESSAGE_MASTER_NODE_NOT_FOUND).flush(publish=False)
os._exit(1)
if node_name in self.nodes: if node_name in self.nodes:
node_name = node_name + '_' + str(dict_count_key_num(self.nodes, node_name)) self.node_name = node_name = node_name + '_' + str(dict_count_key_num(self.nodes, node_name))
ClusterLog.add_quick_log(ClusterLog.MESSAGE_NODE_ALREADY_IN_CLUSTER.format(node_name)).flush() ClusterLog.add_quick_log(ClusterLog.MESSAGE_NODE_ALREADY_IN_CLUSTER.format(node_name)).flush()
self.session.hset(self.KEY_NODES, node_name, Config().NODE_IS_MASTER) self.session.hset(self.KEY_NODES, node_name, Config().NODE_IS_MASTER)
message = ClusterLog.MESSAGE_JOIN_CLUSTER_SUCCESS.format(Config().NODE_NAME, list(self.get_nodes())) message = ClusterLog.MESSAGE_JOIN_CLUSTER_SUCCESS.format(self.node_name, ClusterLog.get_print_nodes(
# ClusterLog.add_quick_log(message).flush() self.get_nodes())) # 手动 get nodes
self.publish_log_message(message)
def left_cluster(self, node_name=None):
node_name = node_name if node_name else self.node_name
self.session.hdel(self.KEY_NODES, node_name)
message = ClusterLog.MESSAGE_LEFT_CLUSTER.format(node_name, ClusterLog.get_print_nodes(self.get_nodes()))
self.publish_log_message(message)
def make_nodes_as_slave(self):
"""
将所有节点设为主节点
:return:
"""
for node in self.nodes:
self.session.hset(self.KEY_NODES, node, self.KEY_SLAVE)
def publish_log_message(self, message):
"""
发布订阅消息
:return:
"""
message = ClusterLog.MESSAGE_SUBSCRIBE_NOTIFICATION.format(self.node_name, message)
self.session.publish(self.KEY_CHANNEL_LOG, message) self.session.publish(self.KEY_CHANNEL_LOG, message)
def left_cluster(self): def get_nodes(self) -> dict:
self.session.hdel(self.KEY_NODES, Config().NODE_NAME)
message = ClusterLog.MESSAGE_LEFT_CLUSTER.format(Config().NODE_NAME, list(self.get_nodes()))
# ClusterLog.add_quick_log(message).flush()
self.session.publish(self.KEY_CHANNEL_LOG, message)
def get_nodes(self):
res = self.session.hgetall(self.KEY_NODES) res = self.session.hgetall(self.KEY_NODES)
res = res if res else {} res = res if res else {}
self.nodes = res self.nodes = res
return res return res
def refresh_data(self): def refresh_data(self):
"""
单独进程处理数据同步
:return:
"""
while True: while True:
self.get_nodes() self.get_nodes()
self.check_locks()
self.check_nodes()
self.check_master()
stay_second(self.retry_time) stay_second(self.retry_time)
def check_master(self):
"""
检测主节点是否可用
:return:
"""
master = self.have_master()
if master == self.node_name: # 动态提升
self.is_master = True
else:
self.is_master = False
if not master:
if Config().NODE_SLAVE_CAN_BE_MASTER:
# 提升子节点为主节点
slave = list(self.nodes)[-1]
self.session.hset(self.KEY_NODES, slave, self.KEY_MASTER)
self.publish_log_message(ClusterLog.MESSAGE_ASCENDING_MASTER_NODE.format(slave,
ClusterLog.get_print_nodes(
self.get_nodes())))
return True
else:
self.publish_log_message(ClusterLog.MESSAGE_MASTER_DID_LOST.format(self.retry_time))
stay_second(self.retry_time)
os._exit(1) # 退出整个程序
def have_master(self):
return dict_find_key_by_value(self.nodes, str(self.KEY_MASTER), False)
def check_nodes(self):
"""
检查节点是否存活
:return:
"""
alive = self.session.hgetall(self.KEY_NODES_ALIVE)
for node in self.nodes:
if node not in alive or (time_int() - int(alive[node])) > self.lost_alive_time:
self.left_cluster(node)
# def kick_out_from_nodes(self, node_name):
# pass
def keep_alive(self):
while True:
if self.node_name not in self.get_nodes(): # 已经被 kict out 重新加下
self.join_cluster()
self.session.hset(self.KEY_NODES_ALIVE, self.node_name, str(time_int()))
stay_second(self.keep_alive_time)
def subscribe(self): def subscribe(self):
while True: while True:
message = self.pubsub.get_message() message = self.pubsub.get_message()
if message: if message:
if message.get('type') == 'message' and message.get('data'): if message.get('type') == 'message' and message.get('data'):
ClusterLog.add_quick_log(message.get('data')).flush() msg = message.get('data')
if self.node_name:
msg = msg.replace(ClusterLog.MESSAGE_SUBSCRIBE_NOTIFICATION_PREFIX.format(self.node_name), '')
ClusterLog.add_quick_log(msg).flush(publish=False)
stay_second(self.refresh_channel_time) stay_second(self.refresh_channel_time)
def get_lock(self, key, timeout=1):
timeout = int(time.time()) + timeout
res = self.session.setnx(key, timeout)
if res:
self.locks.append((key, timeout))
return True
return False
def release_lock(self, key):
self.session.delete(key)
def check_locks(self):
index = 0
for key, timeout in self.locks:
if timeout >= int(time.time()):
del self.locks[index]
self.release_lock(key)
index += 1
@classmethod
def get_user_cookie(cls, key, default=None):
self = cls()
res = self.session.hget(Cluster.KEY_USER_COOKIES, key)
return pickle.loads(res.encode()) if res else default
@classmethod
def set_user_cookie(cls, key, value):
self = cls()
return self.session.hset(Cluster.KEY_USER_COOKIES, key, pickle.dumps(value, 0).decode())

View File

@@ -44,6 +44,7 @@ class Config:
# 集群配置 # 集群配置
CLUSTER_ENABLED = 1 CLUSTER_ENABLED = 1
NODE_SLAVE_CAN_BE_MASTER = 1
NODE_IS_MASTER = 1 NODE_IS_MASTER = 1
NODE_NAME = '' NODE_NAME = ''
REDIS_HOST = '' REDIS_HOST = ''
@@ -53,6 +54,20 @@ class Config:
envs = [] envs = []
retry_time = 5 retry_time = 5
disallow_update_cofigs = [
'CLUSTER_ENABLED',
'NODE_IS_MASTER',
'NODE_NAME',
'REDIS_HOST',
'REDIS_PORT',
'REDIS_PASSWORD',
]
def __init__(self):
self.init_envs()
if Config().is_slave():
self.refresh_configs(True)
@classmethod @classmethod
def run(cls): def run(cls):
self = cls() self = cls()
@@ -63,27 +78,26 @@ class Config:
# self = cls() # self = cls()
def start(self): def start(self):
self.init_envs()
self.save_to_remote() self.save_to_remote()
# self.refresh_configs()
create_thread_and_run(self, 'refresh_configs', wait=False) create_thread_and_run(self, 'refresh_configs', wait=False)
def refresh_configs(self): def refresh_configs(self, once=False):
if not self.is_cluster_enabled(): return if not self.is_cluster_enabled(): return
while True: while True:
remote_configs = self.get_remote_config() remote_configs = self.get_remote_config()
self.update_configs_from_remote(remote_configs) self.update_configs_from_remote(remote_configs, once)
if once: break
stay_second(self.retry_time) stay_second(self.retry_time)
def get_remote_config(self): def get_remote_config(self):
if not self.is_cluster_enabled(): return if not self.is_cluster_enabled(): return
from py12306.cluster.cluster import Distributed from py12306.cluster.cluster import Cluster
return Distributed().session.get_pickle(Distributed().KEY_CONFIGS, {}) return Cluster().session.get_pickle(Cluster().KEY_CONFIGS, {})
def save_to_remote(self): def save_to_remote(self):
if not self.is_master(): return if not self.is_master(): return
from py12306.cluster.cluster import Distributed from py12306.cluster.cluster import Cluster
Distributed().session.set_pickle(Distributed().KEY_CONFIGS, self.envs) Cluster().session.set_pickle(Cluster().KEY_CONFIGS, self.envs)
def init_envs(self): def init_envs(self):
self.envs = EnvLoader.load_with_file(self.CONFIG_FILE) self.envs = EnvLoader.load_with_file(self.CONFIG_FILE)
@@ -93,31 +107,36 @@ class Config:
for key, value in envs: for key, value in envs:
setattr(self, key, value) setattr(self, key, value)
def update_configs_from_remote(self, envs): def update_configs_from_remote(self, envs, first=False):
if envs == self.envs: return if envs == self.envs: return
from py12306.query.query import Query from py12306.query.query import Query
from py12306.user.user import User
for key, value in envs: for key, value in envs:
if key == 'USER_ACCOUNTS' and value != self.USER_ACCOUNTS: # 用户修改 if key in self.disallow_update_cofigs: continue
setattr(self, key, value)
print('用户修改了') # TODO
elif key == 'QUERY_JOBS' and value != self.QUERY_JOBS: # 任务修改
setattr(self, key, value) and Query().update_query_jobs(auto=True)
elif key == 'QUERY_INTERVAL' and value != self.QUERY_INTERVAL: # 任务修改
setattr(self, key, value) and Query().update_query_interval(auto=True)
if value != -1: if value != -1:
old = getattr(self, key)
setattr(self, key, value) setattr(self, key, value)
if not first:
if key == 'USER_ACCOUNTS' and old != value:
# 用户修改 print('用户修改了')
User.update_user_accounts(auto=True, old=old)
elif key == 'QUERY_JOBS' and old != value:
Query().update_query_jobs(auto=True) # 任务修改
elif key == 'QUERY_INTERVAL' and old != value:
Query().update_query_interval(auto=True)
@staticmethod @staticmethod
def is_master(): # 是不是 主 def is_master(): # 是不是 主
return Config.CLUSTER_ENABLED and Config.NODE_IS_MASTER from py12306.cluster.cluster import Cluster
return Config().CLUSTER_ENABLED and (Config().NODE_IS_MASTER or Cluster().is_master)
@staticmethod @staticmethod
def is_slave(): # 是不是 从 def is_slave(): # 是不是 从
return Config.CLUSTER_ENABLED and not Config.NODE_IS_MASTER return Config().CLUSTER_ENABLED and not Config.is_master()
@staticmethod @staticmethod
def is_cluster_enabled(): def is_cluster_enabled():
return Config.CLUSTER_ENABLED return Config().CLUSTER_ENABLED
# @staticmethod # @staticmethod
# def get_members(): # def get_members():

View File

@@ -1,7 +1,7 @@
import math import math
import random import random
from py12306 import config from py12306.config import Config
from py12306.log.common_log import CommonLog from py12306.log.common_log import CommonLog
from py12306.vender.ruokuai.main import RKClient from py12306.vender.ruokuai.main import RKClient
@@ -22,7 +22,7 @@ class OCR:
return self.get_img_position_by_ruokuai(img_path) return self.get_img_position_by_ruokuai(img_path)
def get_img_position_by_ruokuai(self, img_path): def get_img_position_by_ruokuai(self, img_path):
ruokuai_account = config.AUTO_CODE_ACCOUNT ruokuai_account = Config().AUTO_CODE_ACCOUNT
soft_id = '119671' soft_id = '119671'
soft_key = '6839cbaca1f942f58d2760baba5ed987' soft_key = '6839cbaca1f942f58d2760baba5ed987'
rc = RKClient(ruokuai_account.get('user'), ruokuai_account.get('pwd'), soft_id, soft_key) rc = RKClient(ruokuai_account.get('user'), ruokuai_account.get('pwd'), soft_id, soft_key)
@@ -37,7 +37,7 @@ class OCR:
positions = [] positions = []
width = 70 width = 70
height = 70 height = 70
random_num = random.randint(0, 10) random_num = random.randint(0, 8)
for offset in offsets: for offset in offsets:
offset = int(offset) offset = int(offset)
x = width * (offset % 5) - width / 2 + random_num x = width * (offset % 5) - width / 2 + random_num

View File

@@ -3,6 +3,7 @@ import time
from requests.exceptions import SSLError from requests.exceptions import SSLError
from py12306.config import Config
from py12306.helpers.OCR import OCR from py12306.helpers.OCR import OCR
from py12306.helpers.api import API_AUTH_CODE_DOWNLOAD, API_AUTH_CODE_CHECK from py12306.helpers.api import API_AUTH_CODE_DOWNLOAD, API_AUTH_CODE_CHECK
from py12306.helpers.request import Request from py12306.helpers.request import Request
@@ -20,7 +21,7 @@ class AuthCode:
retry_time = 5 retry_time = 5
def __init__(self, session): def __init__(self, session):
self.data_path = config.RUNTIME_DIR self.data_path = Config().RUNTIME_DIR
self.session = session self.session = session
@classmethod @classmethod
@@ -66,6 +67,7 @@ class AuthCode:
UserLog.add_quick_log(UserLog.MESSAGE_CODE_AUTH_SUCCESS).flush() UserLog.add_quick_log(UserLog.MESSAGE_CODE_AUTH_SUCCESS).flush()
return True return True
else: else:
# {'result_message': '验证码校验失败', 'result_code': '5'}
UserLog.add_quick_log( UserLog.add_quick_log(
UserLog.MESSAGE_CODE_AUTH_FAIL.format(result.get('result_message'))).flush() UserLog.MESSAGE_CODE_AUTH_FAIL.format(result.get('result_message'))).flush()
self.session.cookies.clear_session_cookies() self.session.cookies.clear_session_cookies()

View File

@@ -2,12 +2,12 @@ import datetime
import random import random
import threading import threading
import functools import functools
import time
from time import sleep from time import sleep
from types import MethodType from types import MethodType
# from py12306 import config
def singleton(cls): def singleton(cls):
@@ -88,11 +88,16 @@ def current_thread_id():
def time_now(): def time_now():
return datetime.datetime.now() return datetime.datetime.now()
def str_to_time(str):
return datetime.datetime.strptime(str, '%Y-%m-%d %H:%M:%S.%f')
def time_int():
return int(time.time())
def create_thread_and_run(jobs, callback_name, wait=True, daemon=True): def create_thread_and_run(jobs, callback_name, wait=True, daemon=True):
threads = [] threads = []
if not isinstance(jobs, list): if not isinstance(jobs, list): jobs = [jobs]
jobs = [jobs]
for job in jobs: for job in jobs:
thread = threading.Thread(target=getattr(job, callback_name)) thread = threading.Thread(target=getattr(job, callback_name))
thread.setDaemon(daemon) thread.setDaemon(daemon)
@@ -102,6 +107,12 @@ def create_thread_and_run(jobs, callback_name, wait=True, daemon=True):
for thread in threads: thread.join() for thread in threads: thread.join()
def jobs_do(jobs, do):
if not isinstance(jobs, list): jobs = [jobs]
for job in jobs:
getattr(job, do)()
def dict_find_key_by_value(data, value, default=None): def dict_find_key_by_value(data, value, default=None):
result = [k for k, v in data.items() if v == value] result = [k for k, v in data.items() if v == value]
return result.pop() if len(result) else default return result.pop() if len(result) else default

View File

@@ -1,6 +1,5 @@
import urllib import urllib
from py12306 import config
from py12306.helpers.api import * from py12306.helpers.api import *
from py12306.helpers.request import Request from py12306.helpers.request import Request
from py12306.log.common_log import CommonLog from py12306.log.common_log import CommonLog

View File

@@ -25,6 +25,7 @@ class OrderSeatType:
'二等座': 'O', '二等座': 'O',
'软卧': 4, '软卧': 4,
'硬卧': 3, '硬卧': 3,
'动卧': 1,
'硬座': 1, '硬座': 1,
'无座': 1, '无座': 1,
} }
@@ -39,6 +40,7 @@ class SeatType:
'二等座': 30, '二等座': 30,
'软卧': 23, '软卧': 23,
'硬卧': 28, '硬卧': 28,
'动卧': 33,
'硬座': 29, '硬座': 29,
'无座': 26, '无座': 26,
} }

View File

@@ -1,5 +1,7 @@
import os import os
import sys import sys
import io
from contextlib import redirect_stdout
from py12306.config import Config from py12306.config import Config
from py12306.helpers.func import * from py12306.helpers.func import *
@@ -23,13 +25,22 @@ class BaseLog:
return self return self
@classmethod @classmethod
def flush(cls, sep='\n', end='\n', file=None, exit=False): def flush(cls, sep='\n', end='\n', file=None, exit=False, publish=True):
from py12306.cluster.cluster import Cluster
self = cls() self = cls()
logs = self.get_logs() logs = self.get_logs()
# 输出到文件 # 输出到文件
if file == None and Config().OUT_PUT_LOG_TO_FILE_ENABLED: # TODO 文件无法写入友好提示 if file == None and Config().OUT_PUT_LOG_TO_FILE_ENABLED: # TODO 文件无法写入友好提示
file = open(Config().OUT_PUT_LOG_TO_FILE_PATH, 'a') file = open(Config().OUT_PUT_LOG_TO_FILE_PATH, 'a', encoding='utf-8')
if not file: file = None if not file: file = None
# 输出日志到各个节点
if publish and self.quick_log and Config().is_cluster_enabled() and Cluster().is_ready: #
f = io.StringIO()
with redirect_stdout(f):
print(*logs, sep=sep, end='' if end == '\n' else end)
out = f.getvalue()
Cluster().publish_log_message(out)
else:
print(*logs, sep=sep, end=end, file=file) print(*logs, sep=sep, end=end, file=file)
self.empty_logs(logs) self.empty_logs(logs)
if exit: if exit:

View File

@@ -10,6 +10,26 @@ class ClusterLog(BaseLog):
quick_log = [] quick_log = []
MESSAGE_JOIN_CLUSTER_SUCCESS = '# 节点 {} 成功加入到集群,当前节点列表 {} #' MESSAGE_JOIN_CLUSTER_SUCCESS = '# 节点 {} 成功加入到集群,当前节点列表 {} #'
MESSAGE_LEFT_CLUSTER = '# 节点 {} 已离开集群,当前节点列表 {} #' MESSAGE_LEFT_CLUSTER = '# 节点 {} 已离开集群,当前节点列表 {} #'
MESSAGE_NODE_ALREADY_IN_CLUSTER = '# 当前节点已存在于集群中,自动分配新的节点名称 {} #' MESSAGE_NODE_ALREADY_IN_CLUSTER = '# 当前节点已存在于集群中,自动分配新的节点名称 {} #'
MESSAGE_SUBSCRIBE_NOTIFICATION_PREFIX = '{} '
MESSAGE_SUBSCRIBE_NOTIFICATION = MESSAGE_SUBSCRIBE_NOTIFICATION_PREFIX + '{}'
MESSAGE_ASCENDING_MASTER_NODE = '# 已将 {} 提升为主节点,当前节点列表 {} #'
MESSAGE_MASTER_DID_LOST = '# 主节点已退出,{} 秒后程序将自动退出 #'
MESSAGE_MASTER_NODE_ALREADY_RUN = '# 启动失败,主节点 {} 已经在运行中 #'
MESSAGE_MASTER_NODE_NOT_FOUND = '# 启动失败,请先启动主节点 #'
MESSAGE_NODE_BECOME_MASTER_AGAIN = '# 节点 {} 已启动,已自动成功主节点 #'
@staticmethod
def get_print_nodes(nodes):
message = ['{}{}'.format('*' if val == '1' else '', key) for key, val in nodes.items()]
return '[ {} ]'.format(', '.join(message))

View File

@@ -42,7 +42,7 @@ class CommonLog(BaseLog):
self.add_quick_log('日志已输出到文件中: {}'.format(Config().OUT_PUT_LOG_TO_FILE_PATH)) self.add_quick_log('日志已输出到文件中: {}'.format(Config().OUT_PUT_LOG_TO_FILE_PATH))
self.add_quick_log() self.add_quick_log()
self.flush(file=False) self.flush(file=False, publish=False)
return self return self
@classmethod @classmethod

View File

@@ -4,7 +4,7 @@ import sys
from os import path from os import path
from py12306.config import Config from py12306.config import Config
from py12306.cluster.cluster import Distributed from py12306.cluster.cluster import Cluster
from py12306.log.base import BaseLog from py12306.log.base import BaseLog
from py12306.helpers.func import * from py12306.helpers.func import *
@@ -36,10 +36,11 @@ class QueryLog(BaseLog):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.data_path = Config().QUERY_DATA_DIR + 'status.json' self.data_path = Config().QUERY_DATA_DIR + 'status.json'
self.cluster = Distributed() self.cluster = Cluster()
self.init_data()
def init_data(self): @classmethod
def init_data(cls):
self = cls()
# 获取上次记录 # 获取上次记录
if Const.IS_TEST: return if Const.IS_TEST: return
result = False result = False
@@ -57,16 +58,16 @@ class QueryLog(BaseLog):
self.print_data_restored() self.print_data_restored()
def get_data_from_cluster(self): def get_data_from_cluster(self):
query_count = self.cluster.session.get(Distributed.KEY_QUERY_COUNT, 0) query_count = self.cluster.session.get(Cluster.KEY_QUERY_COUNT, 0)
last_time = self.cluster.session.get(Distributed.KEY_QUERY_LAST_TIME, '') last_time = self.cluster.session.get(Cluster.KEY_QUERY_LAST_TIME, '')
if query_count and last_time: if query_count and last_time:
return {'query_count': query_count, 'last_time': last_time} return {'query_count': query_count, 'last_time': last_time}
return False return False
def refresh_data_of_cluster(self): def refresh_data_of_cluster(self):
return { return {
'query_count': self.cluster.session.incr(Distributed.KEY_QUERY_COUNT), 'query_count': self.cluster.session.incr(Cluster.KEY_QUERY_COUNT),
'last_time': self.cluster.session.set(Distributed.KEY_QUERY_LAST_TIME, time_now()), 'last_time': self.cluster.session.set(Cluster.KEY_QUERY_LAST_TIME, time_now()),
} }
@classmethod @classmethod
@@ -144,7 +145,7 @@ class QueryLog(BaseLog):
self.add_log('=== 正在进行第 {query_count} 次查询 === {time}'.format(query_count=self.data.get('query_count'), self.add_log('=== 正在进行第 {query_count} 次查询 === {time}'.format(query_count=self.data.get('query_count'),
time=datetime.datetime.now())) time=datetime.datetime.now()))
if is_main_thread(): if is_main_thread():
self.flush() self.flush(publish=False)
return self return self
@classmethod @classmethod
@@ -158,7 +159,7 @@ class QueryLog(BaseLog):
self.add_quick_log('|=== 查询记录恢复成功 上次查询 {last_date} ===|'.format(last_date=self.data.get('last_time'))) self.add_quick_log('|=== 查询记录恢复成功 上次查询 {last_date} ===|'.format(last_date=self.data.get('last_time')))
self.add_quick_log('============================================================') self.add_quick_log('============================================================')
self.add_quick_log('') self.add_quick_log('')
self.flush() self.flush(publish=False)
return self return self
def refresh_data(self): def refresh_data(self):

View File

@@ -24,6 +24,11 @@ class UserLog(BaseLog):
MESSAGE_WAIT_USER_INIT_COMPLETE = '未找到可用账号或用户正在初始化,{} 秒后重试' MESSAGE_WAIT_USER_INIT_COMPLETE = '未找到可用账号或用户正在初始化,{} 秒后重试'
MESSAGE_USERS_DID_CHANGED = '\n用户信息已更新,正在重新加载...'
MESSAGE_USER_BEING_DESTROY = '用户 {} 已退出'
MESSAGE_USER_COOKIE_NOT_FOUND_FROM_REMOTE = '用户 {} 状态加载中...'
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.init_data() self.init_data()
@@ -38,14 +43,14 @@ class UserLog(BaseLog):
:return: :return:
""" """
self = cls() self = cls()
self.add_log('# 发现 {} 个用户 #'.format(len(users))) self.add_quick_log('# 发现 {} 个用户 #'.format(len(users)))
self.flush() self.flush()
return self return self
@classmethod @classmethod
def print_welcome_user(cls, user): def print_welcome_user(cls, user):
self = cls() self = cls()
self.add_log('# 欢迎回来,{} #'.format(user.get_name())) self.add_quick_log('# 欢迎回来,{} #'.format(user.get_name()))
self.flush() self.flush()
return self return self

View File

@@ -88,11 +88,11 @@ class Job:
self.handle_response(response) self.handle_response(response)
self.safe_stay() self.safe_stay()
if is_main_thread(): if is_main_thread():
QueryLog.flush(sep='\t\t') QueryLog.flush(sep='\t\t', publish=False)
if is_main_thread(): if is_main_thread():
QueryLog.add_quick_log('').flush() QueryLog.add_quick_log('').flush(publish=False)
else: else:
QueryLog.add_log('\n').flush(sep='\t\t') QueryLog.add_log('\n').flush(sep='\t\t',publish=False)
def query_by_date(self, date): def query_by_date(self, date):
""" """

View File

@@ -1,5 +1,5 @@
from py12306.config import Config from py12306.config import Config
from py12306.cluster.cluster import Distributed from py12306.cluster.cluster import Cluster
from py12306.app import app_available_check from py12306.app import app_available_check
from py12306.helpers.func import * from py12306.helpers.func import *
from py12306.helpers.request import Request from py12306.helpers.request import Request
@@ -23,7 +23,7 @@ class Query:
def __init__(self): def __init__(self):
self.session = Request() self.session = Request()
self.cluster = Distributed() self.cluster = Cluster()
self.update_query_interval() self.update_query_interval()
self.update_query_jobs() self.update_query_jobs()
@@ -47,6 +47,7 @@ class Query:
def start(self): def start(self):
# return # DEBUG # return # DEBUG
self.init_jobs() self.init_jobs()
QueryLog.init_data()
stay_second(1) stay_second(1)
while True: while True:
app_available_check() app_available_check()
@@ -64,12 +65,12 @@ class Query:
QueryLog.print_init_jobs(jobs=self.jobs) QueryLog.print_init_jobs(jobs=self.jobs)
# def get_jobs_from_cluster(self): # def get_jobs_from_cluster(self):
# jobs = self.cluster.session.get_dict(Distributed.KEY_JOBS) # jobs = self.cluster.session.get_dict(Cluster.KEY_JOBS)
# return jobs # return jobs
# #
# def update_jobs_of_cluster(self): # def update_jobs_of_cluster(self):
# if config.CLUSTER_ENABLED and config.NODE_IS_MASTER: # if config.CLUSTER_ENABLED and config.NODE_IS_MASTER:
# return self.cluster.session.set_dict(Distributed.KEY_JOBS, self.query_jobs) # return self.cluster.session.set_dict(Cluster.KEY_JOBS, self.query_jobs)
# #
# def refresh_jobs(self): # def refresh_jobs(self):
# if not config.CLUSTER_ENABLED: return # if not config.CLUSTER_ENABLED: return

View File

@@ -1,5 +1,7 @@
import pickle import pickle
from os import path
from py12306.cluster.cluster import Cluster
from py12306.helpers.api import * from py12306.helpers.api import *
from py12306.app import * from py12306.app import *
from py12306.helpers.auth_code import AuthCode from py12306.helpers.auth_code import AuthCode
@@ -10,8 +12,8 @@ from py12306.log.user_log import UserLog
class UserJob: class UserJob:
heartbeat = 60 * 2 # 心跳保持时长 # heartbeat = 60 * 2 # 心跳保持时长
heartbeat_interval = 5 heartbeat_interval = 60 * 2
key = None key = None
user_name = '' user_name = ''
password = '' password = ''
@@ -27,19 +29,29 @@ class UserJob:
ticket_info_for_passenger_form = None ticket_info_for_passenger_form = None
order_request_dto = None order_request_dto = None
def __init__(self, info, user): cluster = None
self.session = Request() lock_init_user_time = 3 * 60
self.heartbeat = user.heartbeat cookie = False
def __init__(self, info):
self.cluster = Cluster()
self.init_data(info)
def init_data(self, info):
self.session = Request()
self.key = info.get('key') self.key = info.get('key')
self.user_name = info.get('user_name') self.user_name = info.get('user_name')
self.password = info.get('password') self.password = info.get('password')
self.user = user self.update_user()
def update_user(self):
from py12306.user.user import User
self.user = User()
self.heartbeat_interval = self.user.heartbeat
if not Const.IS_TEST: self.load_user()
def run(self): def run(self):
# load user # load user
if not Const.IS_TEST:
self.load_user()
self.start() self.start()
def start(self): def start(self):
@@ -49,23 +61,45 @@ class UserJob:
""" """
while True: while True:
app_available_check() app_available_check()
if Config().is_slave():
self.load_user_from_remote()
else:
if Config().is_master() and not self.cookie: self.load_user_from_remote() # 主节点加载一次 Cookie
self.check_heartbeat() self.check_heartbeat()
if Const.IS_TEST: return if Const.IS_TEST: return
sleep(self.heartbeat_interval) sleep(self.heartbeat_interval)
def check_heartbeat(self): def check_heartbeat(self):
# 心跳检测 # 心跳检测
if self.last_heartbeat and (time_now() - self.last_heartbeat).seconds < self.heartbeat: if self.get_last_heartbeat() and (time_int() - self.get_last_heartbeat()) < self.heartbeat_interval:
return True return True
# 只有主节点才能走到这
if self.is_first_time() or not self.check_user_is_login(): if self.is_first_time() or not self.check_user_is_login():
self.handle_login() self.handle_login()
self.is_ready = True self.is_ready = True
UserLog.add_quick_log(UserLog.MESSAGE_USER_HEARTBEAT_NORMAL.format(self.get_name(), self.heartbeat)).flush() message = UserLog.MESSAGE_USER_HEARTBEAT_NORMAL.format(self.get_name(), self.heartbeat_interval)
if not Config.is_cluster_enabled():
UserLog.add_quick_log(message).flush()
else:
self.cluster.publish_log_message(message)
self.set_last_heartbeat()
def get_last_heartbeat(self):
if Config().is_cluster_enabled():
return int(self.cluster.session.get(Cluster.KEY_USER_LAST_HEARTBEAT, 0))
return self.last_heartbeat
def set_last_heartbeat(self):
if Config().is_cluster_enabled():
return self.cluster.session.set(Cluster.KEY_USER_LAST_HEARTBEAT, time_int())
self.last_heartbeat = time_now() self.last_heartbeat = time_now()
# def init_cookies # def init_cookies
def is_first_time(self): def is_first_time(self):
if Config().is_cluster_enabled():
return not self.cluster.get_user_cookie(self.key)
return not path.exists(self.get_cookie_path()) return not path.exists(self.get_cookie_path())
def handle_login(self): def handle_login(self):
@@ -110,12 +144,11 @@ class UserJob:
def check_user_is_login(self): def check_user_is_login(self):
response = self.session.get(API_USER_CHECK.get('url')) response = self.session.get(API_USER_CHECK.get('url'))
is_login = response.json().get('data').get('flag', False) is_login = response.json().get('data.flag', False)
if is_login: if is_login:
self.save_user() self.save_user()
self.get_user_info() # 检测应该是不会维持状态,这里再请求下个人中心看有没有有 self.get_user_info() # 检测应该是不会维持状态,这里再请求下个人中心看有没有有
return is_login return is_login
def auth_uamtk(self): def auth_uamtk(self):
@@ -149,7 +182,7 @@ class UserJob:
pass pass
def get_cookie_path(self): def get_cookie_path(self):
return config.USER_DATA_DIR + self.user_name + '.cookie' return Config().USER_DATA_DIR + self.user_name + '.cookie'
def update_user_info(self, info): def update_user_info(self, info):
self.info = {**self.info, **info} self.info = {**self.info, **info}
@@ -158,6 +191,8 @@ class UserJob:
return self.info.get('user_name') return self.info.get('user_name')
def save_user(self): def save_user(self):
if Config().is_cluster_enabled():
return self.cluster.set_user_cookie(self.key, self.session.cookies)
with open(self.get_cookie_path(), 'wb') as f: with open(self.get_cookie_path(), 'wb') as f:
pickle.dump(self.session.cookies, f) pickle.dump(self.session.cookies, f)
@@ -166,7 +201,7 @@ class UserJob:
恢复用户成功 恢复用户成功
:return: :return:
""" """
UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER.format(self.user_name)) UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER.format(self.user_name)).flush()
if self.check_user_is_login() and self.get_user_info(): if self.check_user_is_login() and self.get_user_info():
UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_SUCCESS.format(self.user_name)).flush() UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_SUCCESS.format(self.user_name)).flush()
UserLog.print_welcome_user(self) UserLog.print_welcome_user(self)
@@ -183,17 +218,42 @@ class UserJob:
return None return None
def load_user(self): def load_user(self):
if Config().is_cluster_enabled(): return
cookie_path = self.get_cookie_path() cookie_path = self.get_cookie_path()
if path.exists(cookie_path): if path.exists(cookie_path):
with open(self.get_cookie_path(), 'rb') as f: with open(self.get_cookie_path(), 'rb') as f:
self.session.cookies.update(pickle.load(f)) cookie = pickle.load(f)
self.cookie = True
self.session.cookies.update(cookie)
self.did_loaded_user() self.did_loaded_user()
return True return True
return None return None
def load_user_from_remote(self):
cookie = self.cluster.get_user_cookie(self.key)
if not cookie and Config().is_slave():
while True: # 子节点只能取
UserLog.add_quick_log(UserLog.MESSAGE_USER_COOKIE_NOT_FOUND_FROM_REMOTE.format(self.user_name)).flush()
stay_second(self.retry_time)
return self.load_user_from_remote()
self.session.cookies.update(cookie)
if not self.cookie: # 第一次加载
self.cookie = True
self.did_loaded_user()
return True
def check_is_ready(self): def check_is_ready(self):
return self.is_ready return self.is_ready
def destroy(self):
"""
退出用户
:return:
"""
UserLog.add_quick_log(UserLog.MESSAGE_USER_BEING_DESTROY.format(self.user_name)).flush()
sys.exit()
def get_user_passengers(self): def get_user_passengers(self):
if self.passengers: return self.passengers if self.passengers: return self.passengers
response = self.session.post(API_USER_PASSENGERS) response = self.session.post(API_USER_PASSENGERS)

View File

@@ -1,4 +1,5 @@
from py12306.app import * from py12306.app import *
from py12306.cluster.cluster import Cluster
from py12306.helpers.func import * from py12306.helpers.func import *
from py12306.log.user_log import UserLog from py12306.log.user_log import UserLog
from py12306.user.job import UserJob from py12306.user.job import UserJob
@@ -8,11 +9,26 @@ from py12306.user.job import UserJob
class User: class User:
heartbeat = 60 * 2 heartbeat = 60 * 2
users = [] users = []
user_accounts = []
retry_time = 3 retry_time = 3
cluster = None
def __init__(self): def __init__(self):
self.interval = config.USER_HEARTBEAT_INTERVAL self.cluster = Cluster()
self.heartbeat = Config().USER_HEARTBEAT_INTERVAL
self.update_interval()
self.update_user_accounts()
def update_user_accounts(self, auto=False, old=None):
self.user_accounts = Config().USER_ACCOUNTS
if auto:
UserLog.add_quick_log(UserLog.MESSAGE_USERS_DID_CHANGED).flush()
self.refresh_users(old)
def update_interval(self, auto=False):
self.interval = Config().USER_HEARTBEAT_INTERVAL
if auto: jobs_do(self.users, 'update_user')
@classmethod @classmethod
def run(cls): def run(cls):
@@ -28,17 +44,32 @@ class User:
create_thread_and_run(jobs=self.users, callback_name='run', wait=Const.IS_TEST) create_thread_and_run(jobs=self.users, callback_name='run', wait=Const.IS_TEST)
def init_users(self): def init_users(self):
accounts = config.USER_ACCOUNTS for account in self.user_accounts:
for account in accounts: self.init_user(account)
user = UserJob(info=account, user=self)
def init_user(self, info):
user = UserJob(info=info)
self.users.append(user) self.users.append(user)
def refresh_users(self, old):
for account in self.user_accounts:
key = account.get('key')
old_account = array_dict_find_by_key_value(old, 'key', key)
if old_account and account != old_account:
user = self.get_user(key)
user.init_data(account)
elif not old_account:
self.init_user(account)
for account in old: # 退出已删除的用户
if not array_dict_find_by_key_value(self.user_accounts, 'key', account.get('key')):
user = self.get_user(account.get('key'))
user.destroy()
@classmethod @classmethod
def get_user(cls, key): def get_user(cls, key) -> UserJob:
self = cls() self = cls()
for user in self.users: for user in self.users:
if user.key == key: if user.key == key: return user
return user
return None return None
@classmethod @classmethod