增加 cdn 查询

This commit is contained in:
Jalin
2019-01-18 17:08:39 +08:00
parent 8032422f7d
commit 98e5d8f95a
11 changed files with 2518 additions and 23 deletions

2220
data/cdn.txt Normal file
View File

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@
import sys
from py12306.app import *
from py12306.helpers.cdn import Cdn
from py12306.log.common_log import CommonLog
from py12306.query.query import Query
from py12306.user.user import User
@@ -20,6 +21,7 @@ def main():
####### 运行任务
Web.run()
Cdn.run()
User.run()
Query.run()
if not Const.IS_TEST:

View File

@@ -27,6 +27,9 @@ class Cluster():
KEY_USER_LAST_HEARTBEAT = KEY_PREFIX + 'user_last_heartbeat'
KEY_NODES_ALIVE = KEY_PREFIX + 'nodes_alive'
KEY_CDN_AVAILABLE_ITEMS = KEY_PREFIX + 'cdn_available_items'
KEY_CDN_LAST_CHECK_AT = KEY_PREFIX + 'cdn_last_check_at'
# 锁
KEY_LOCK_INIT_USER = KEY_PREFIX + 'lock_init_user' # 暂未使用
KEY_LOCK_DO_ORDER = KEY_PREFIX + 'lock_do_order' # 订单锁

View File

@@ -82,6 +82,11 @@ class Config:
WEB_PORT = 8080
WEB_ENTER_HTML_PATH = PROJECT_DIR + 'py12306/web/static/index.html'
# CDN
CDN_ENABLED = 0
CDN_ITEM_FILE = PROJECT_DIR + 'data/cdn.txt'
CDN_ENABLED_AVAILABLE_ITEM_FILE = QUERY_DATA_DIR + 'available.json'
envs = []
retry_time = 5
last_modify_time = 0
@@ -164,18 +169,22 @@ class Config:
if envs == self.envs: return
from py12306.query.query import Query
from py12306.user.user import User
from py12306.helpers.cdn import Cdn
self.envs = envs
for key, value in envs:
if key in self.disallow_update_configs: continue
if value != -1:
old = getattr(self, key)
setattr(self, key, value)
if not first:
if key == 'USER_ACCOUNTS' and old != value:
if not first and old != value:
if key == 'USER_ACCOUNTS':
User().update_user_accounts(auto=True, old=old)
elif key == 'QUERY_JOBS' and old != value:
elif key == 'QUERY_JOBS':
Query().update_query_jobs(auto=True) # 任务修改
elif key == 'QUERY_INTERVAL' and old != value:
elif key == 'QUERY_INTERVAL':
Query().update_query_interval(auto=True)
elif key == 'CDN_ENABLED':
Cdn().update_cdn_status(auto=True)
@staticmethod
def is_master(): # 是不是 主
@@ -190,20 +199,16 @@ class Config:
def is_cluster_enabled():
return Config().CLUSTER_ENABLED
# @staticmethod
# def get_members():
# members = []
# for name, value in vars(Config).items():
# if name.isupper():
# members.append(([name, value]))
# return members
@staticmethod
def is_cdn_enabled():
return Config().CDN_ENABLED
class EnvLoader:
envs = []
def __init__(self):
self.envs = [] # 不是单例不初始化怎么还会有值
self.envs = []
@classmethod
def load_with_file(cls, file):

View File

@@ -2,7 +2,8 @@
# 查询余票
import time
BASE_URL_OF_12306 = 'https://kyfw.12306.cn'
HOST_URL_OF_12306 = 'kyfw.12306.cn'
BASE_URL_OF_12306 = 'https://' + HOST_URL_OF_12306
LEFT_TICKETS = {
"url": BASE_URL_OF_12306 + "/otn/{type}?leftTicketDTO.train_date={left_date}&leftTicketDTO.from_station={left_station}&leftTicketDTO.to_station={arrive_station}&purpose_codes=ADULT",
@@ -46,3 +47,5 @@ API_NOTIFICATION_BY_VOICE_CODE = 'http://ali-voice.showapi.com/sendVoice?'
API_FREE_CODE_QCR_API = 'http://60.205.200.159/api'
API_FREE_CODE_QCR_API_CHECK = 'http://check.huochepiao.360.cn/img_vcode'
API_CHECK_CDN_AVAILABLE = 'https://{}/otn/dynamicJs/omseuuq'

231
py12306/helpers/cdn.py Normal file
View File

@@ -0,0 +1,231 @@
import random
import json
from datetime import timedelta
from os import path
from py12306.cluster.cluster import Cluster
from py12306.config import Config
from py12306.app import app_available_check
from py12306.helpers.api import API_CHECK_CDN_AVAILABLE, HOST_URL_OF_12306
from py12306.helpers.func import *
from py12306.helpers.request import Request
from py12306.log.common_log import CommonLog
@singleton
class Cdn:
"""
CDN 管理
"""
items = []
available_items = []
unavailable_items = []
recheck_available_items = []
recheck_unavailable_items = []
retry_time = 3
is_ready = False
is_finished = False
is_ready_num = 10 # 当可用超过 10已准备好
is_alive = True
is_recheck = False
safe_stay_time = 0.2
retry_num = 1
thread_num = 8
check_time_out = 3
last_check_at = 0
save_second = 5
check_keep_second = 60 * 60 * 24
def __init__(self):
self.cluster = Cluster()
create_thread_and_run(self, 'watch_cdn', False)
def init_data(self):
self.items = []
self.available_items = []
self.unavailable_items = []
self.is_finished = False
self.is_ready = False
self.is_recheck = False
def update_cdn_status(self, auto=False):
if auto:
if Config().is_cdn_enabled():
self.run()
else:
self.destroy()
@classmethod
def run(cls):
self = cls()
app_available_check()
self.is_alive = True
self.start()
pass
def start(self):
if not Config.is_cdn_enabled() or Config().is_slave(): return
self.load_items()
CommonLog.add_quick_log(CommonLog.MESSAGE_CDN_START_TO_CHECK.format(len(self.items))).flush()
self.restore_items()
for i in range(self.thread_num): # 多线程
create_thread_and_run(jobs=self, callback_name='check_available', wait=Const.IS_TEST)
def load_items(self):
with open(Config().CDN_ITEM_FILE, encoding='utf-8') as f:
for line, val in enumerate(f):
self.items.append(val.rstrip('\n'))
def restore_items(self):
"""
恢复已有数据
:return: bool
"""
result = False
if path.exists(Config().CDN_ENABLED_AVAILABLE_ITEM_FILE):
with open(Config().CDN_ENABLED_AVAILABLE_ITEM_FILE, encoding='utf-8') as f:
result = f.read()
try:
result = json.loads(result)
except json.JSONDecodeError as e:
result = {}
# if Config.is_cluster_enabled(): # 集群不用同步 cdn
# result = self.get_data_from_cluster()
if result:
self.last_check_at = result.get('last_check_at', '')
if self.last_check_at: self.last_check_at = str_to_time(self.last_check_at)
self.available_items = result.get('items', [])
self.unavailable_items = result.get('fail_items', [])
CommonLog.add_quick_log(CommonLog.MESSAGE_CDN_RESTORE_SUCCESS.format(self.last_check_at,
self.last_check_at + timedelta(
seconds=self.check_keep_second))).flush()
return True
return False
# def get_data_from_cluster(self):
# available_items = self.cluster.session.smembers(Cluster.KEY_CDN_AVAILABLE_ITEMS)
# last_time = self.cluster.session.get(Cluster.KEY_CDN_LAST_CHECK_AT, '')
# if available_items and last_time:
# return {'items': available_items, 'last_check_at': last_time}
# return False
def is_need_to_recheck(self):
"""
是否需要重新检查 cdn
:return:
"""
if self.last_check_at and (
time_now() - self.last_check_at).seconds > self.check_keep_second:
return True
return False
def get_unchecked_item(self):
if not self.is_recheck:
items = list(set(self.items) - set(self.available_items) - set(self.unavailable_items))
else:
items = list(set(self.items) - set(self.recheck_available_items) - set(self.recheck_unavailable_items))
if items: return random.choice(items)
return None
def check_available(self):
while True and self.is_alive:
item = self.get_unchecked_item()
if not item: return self.check_did_finished()
self.check_item_available(item)
def watch_cdn(self):
"""
监控 cdn 状态,自动重新检测
:return:
"""
while True:
if self.is_alive and not self.is_recheck and self.is_need_to_recheck(): # 重新检测
self.is_recheck = True
self.is_finished = False
CommonLog.add_quick_log(
CommonLog.MESSAGE_CDN_START_TO_RECHECK.format(len(self.items), time_now())).flush()
for i in range(self.thread_num): # 多线程
create_thread_and_run(jobs=self, callback_name='check_available', wait=Const.IS_TEST)
stay_second(self.retry_num)
def destroy(self):
"""
关闭 CDN
:return:
"""
CommonLog.add_quick_log(CommonLog.MESSAGE_CDN_CLOSED).flush()
self.is_alive = False
self.init_data()
def check_item_available(self, item, try_num=0):
session = Request()
response = session.get(API_CHECK_CDN_AVAILABLE.format(item), headers={'Host': HOST_URL_OF_12306},
timeout=self.check_time_out,
verify=False)
if response.status_code == 200:
if not self.is_recheck:
self.available_items.append(item)
else:
self.recheck_available_items.append(item)
if not self.is_ready: self.check_is_ready()
elif try_num < self.retry_num: # 重试
stay_second(self.safe_stay_time)
return self.check_item_available(item, try_num + 1)
else:
if not self.is_recheck:
self.unavailable_items.append(item)
else:
self.recheck_unavailable_items.append(item)
if not self.is_recheck and (
not self.last_check_at or (time_now() - self.last_check_at).seconds > self.save_second):
self.save_available_items()
stay_second(self.safe_stay_time)
def check_did_finished(self):
self.is_ready = True
if not self.is_finished:
self.is_finished = True
if self.is_recheck:
self.is_recheck = False
self.available_items = self.recheck_available_items
self.unavailable_items = self.recheck_unavailable_items
self.recheck_available_items = []
self.recheck_unavailable_items = []
CommonLog.add_quick_log(CommonLog.MESSAGE_CDN_CHECKED_SUCCESS.format(len(self.available_items))).flush()
self.save_available_items()
def save_available_items(self):
self.last_check_at = time_now()
data = {'items': self.available_items, 'fail_items': self.unavailable_items,
'last_check_at': str(self.last_check_at)}
with open(Config().CDN_ENABLED_AVAILABLE_ITEM_FILE, 'w') as f:
f.write(json.dumps(data))
# if Config.is_master():
# self.cluster.session.sadd(Cluster.KEY_CDN_AVAILABLE_ITEMS, self.available_items)
# self.cluster.session.set(Cluster.KEY_CDN_LAST_CHECK_AT, time_now())
def check_is_ready(self):
if len(self.available_items) > self.is_ready_num:
self.is_ready = True
else:
self.is_ready = False
@classmethod
def get_cdn(cls):
self = cls()
if self.is_ready:
return random.choice(self.available_items)
return None
if __name__ == '__main__':
# Const.IS_TEST = True
Cdn.run()
while not Cdn().is_finished:
stay_second(1)

View File

@@ -1,8 +1,11 @@
import requests
from requests.exceptions import *
from py12306.helpers.func import *
from requests_html import HTMLSession, HTMLResponse
requests.packages.urllib3.disable_warnings()
class Request(HTMLSession):
"""
@@ -57,3 +60,11 @@ class Request(HTMLSession):
response.status_code = 500
expand_class(response, 'json', Request.json)
return response
def cdn_request(self, url: str, cdn=None, method='GET', **kwargs):
from py12306.helpers.api import HOST_URL_OF_12306
from py12306.helpers.cdn import Cdn
if not cdn: cdn = Cdn.get_cdn()
url = url.replace(HOST_URL_OF_12306, cdn)
return self.request(method,url, headers={'Host': HOST_URL_OF_12306}, verify=False, **kwargs)

View File

@@ -49,6 +49,12 @@ class CommonLog(BaseLog):
MESSAGE_RESPONSE_EMPTY_ERROR = '网络错误'
MESSAGE_CDN_START_TO_CHECK = '正在筛选 {} 个 CDN...'
MESSAGE_CDN_START_TO_RECHECK = '正在重新筛选 {} 个 CDN...当前时间 {}\n'
MESSAGE_CDN_RESTORE_SUCCESS = 'CDN 恢复成功,上次检测 {},下次检测 {}\n'
MESSAGE_CDN_CHECKED_SUCCESS = '# CDN 检测完成,可用 CDN {} #\n'
MESSAGE_CDN_CLOSED = '# CDN 已关闭 #'
def __init__(self):
super().__init__()
self.init_data()
@@ -82,13 +88,16 @@ class CommonLog(BaseLog):
disable = '未开启'
self.add_quick_log('**** 当前配置 ****')
self.add_quick_log('多线程查询: {}'.format(get_true_false_text(Config().QUERY_JOB_THREAD_ENABLED, enable, disable)))
self.add_quick_log('CDN 状态: {}'.format(get_true_false_text(Config().CDN_ENABLED, enable, disable))).flush()
self.add_quick_log('通知状态:')
self.add_quick_log(
'语音验证码: {}'.format(get_true_false_text(Config().NOTIFICATION_BY_VOICE_CODE, enable, disable)))
self.add_quick_log('邮件通知: {}'.format(get_true_false_text(Config().EMAIL_ENABLED, enable, disable)))
self.add_quick_log('钉钉通知: {}'.format(get_true_false_text(Config().DINGTALK_ENABLED, enable, disable)))
self.add_quick_log('Telegram通知: {}'.format(get_true_false_text(Config().TELEGRAM_ENABLED, enable, disable)))
self.add_quick_log('ServerChan通知: {}'.format(get_true_false_text(Config().SERVERCHAN_ENABLED, enable, disable)))
self.add_quick_log('PushBear通知: {}'.format(get_true_false_text(Config().PUSHBEAR_ENABLED, enable, disable)))
self.add_quick_log(
'PushBear通知: {}'.format(get_true_false_text(Config().PUSHBEAR_ENABLED, enable, disable))).flush(sep='\t\t')
self.add_quick_log('查询间隔: {}'.format(Config().QUERY_INTERVAL))
self.add_quick_log('用户心跳检测间隔: {}'.format(Config().USER_HEARTBEAT_INTERVAL))
self.add_quick_log('WEB 管理页面: {}'.format(get_true_false_text(Config().WEB_ENABLE, enable, disable)))

View File

@@ -64,7 +64,6 @@ class QueryLog(BaseLog):
# self.add_quick_log('加载status.json失败, 文件内容为: {}.'.format(repr(result)))
# self.flush() # 这里可以用不用提示
if Config.is_cluster_enabled():
result = self.get_data_from_cluster()
@@ -157,19 +156,23 @@ class QueryLog(BaseLog):
@classmethod
def print_job_start(cls, job_name):
self = cls()
message = '=== 正在进行{query_count} 次查询 {job_name} === {time}'.format(
message = '>> {query_count} 次查询 {job_name} {time}'.format(
query_count=int(self.data.get('query_count', 0)) + 1,
job_name=job_name, time=datetime.datetime.now())
job_name=job_name, time=time_now().strftime("%Y-%m-%d %H:%M:%S"))
self.add_log(message)
self.refresh_data()
if is_main_thread():
self.flush(publish=False)
return self
@classmethod
def add_query_time_log(cls, start, end, is_cdn):
return cls().add_log(('*' if is_cdn else '') + '耗时 %.2f' % (end - start))
@classmethod
def add_stay_log(cls, second):
self = cls()
self.add_log('安全停留 {}'.format(second))
self.add_log('停留 {}'.format(second))
return self
def print_data_restored(self):

View File

@@ -23,7 +23,7 @@ class Order:
is_need_auth_code = False
max_queue_wait = 120
max_queue_wait = 60 * 5 # 最大排队时长
current_queue_wait = 0
retry_time = 3
wait_queue_interval = 3

View File

@@ -46,6 +46,8 @@ class Job:
query = None
cluster = None
ticket_info = {}
is_cdn = False
query_time_out = 3
INDEX_TICKET_NUM = 11
INDEX_TRAIN_NUMBER = 3
INDEX_TRAIN_NO = 2
@@ -98,7 +100,10 @@ class Job:
self.refresh_station(station)
for date in self.left_dates:
self.left_date = date
tmp_start_time = time.time()
response = self.query_by_date(date)
tmp_end_time = time.time() # 耗时
QueryLog.add_query_time_log(tmp_start_time, tmp_end_time, is_cdn=self.is_cdn)
self.handle_response(response)
if not self.is_alive: return
self.safe_stay()
@@ -116,13 +121,17 @@ class Job:
通过日期进行查询
:return:
"""
from py12306.helpers.cdn import Cdn
QueryLog.add_log(('\n' if not is_main_thread() else '') + QueryLog.MESSAGE_QUERY_START_BY_DATE.format(date,
self.left_station,
self.arrive_station))
url = LEFT_TICKETS.get('url').format(left_date=date, left_station=self.left_station_code,
arrive_station=self.arrive_station_code, type='leftTicket/queryZ')
return self.query.session.get(url)
if Config.is_cdn_enabled() and Cdn().is_ready:
self.is_cdn = True
return self.query.session.cdn_request(url, timeout=self.query_time_out)
self.is_cdn = False
return self.query.session.get(url, timeout=self.query_time_out)
def handle_response(self, response):
"""
@@ -141,8 +150,7 @@ class Job:
self.ticket_info = ticket_info = result.split('|')
if not self.is_trains_number_valid(ticket_info): # 车次是否有效
continue
QueryLog.add_log(QueryLog.MESSAGE_QUERY_LOG_OF_EVERY_TRAIN.format(self.get_info_of_train_number(),
self.get_info_of_ticket_num()))
QueryLog.add_log(QueryLog.MESSAGE_QUERY_LOG_OF_EVERY_TRAIN.format(self.get_info_of_train_number()))
if not self.is_has_ticket(ticket_info):
continue
allow_seats = self.allow_seats if self.allow_seats else list(