优化代码,增加测试模式

This commit is contained in:
Jalin
2019-01-08 01:39:20 +08:00
parent 27a5a13c0a
commit 6e6da93e4c
17 changed files with 314 additions and 58 deletions

35
main.py
View File

@@ -1,20 +1,41 @@
# encoding=utf8
import os
from threading import Thread
import sys
from time import sleep
from py12306.helpers.func import *
from py12306.helpers.app import *
from py12306.log.common_log import CommonLog
from py12306.query.query import Query
from py12306.user.user import User
def main():
# Thread(target=Query.run).start() # 余票查询
# create_thread_and_run(User, 'run', wait=False)
if '--test' in sys.argv or '-t' in sys.argv: test()
CommonLog.print_welcome().print_configs()
App.run_check()
User.run()
Query.run()
# Query.run()
while True:
sleep(1)
if not Const.IS_TEST:
while True:
sleep(1)
CommonLog.test_complete()
def test():
"""
功能检查
包含:
账号密码验证 (打码)
座位验证
乘客验证
语音验证码验证
:return:
"""
Const.IS_TEST = True
if '--test-notification' in sys.argv or '-n' in sys.argv:
Const.IS_TEST_NOTIFICATION = True
pass

View File

@@ -52,8 +52,13 @@ RUNTIME_DIR = PROJECT_DIR + 'runtime/'
QUERY_DATA_DIR = RUNTIME_DIR + 'query/'
USER_DATA_DIR = RUNTIME_DIR + 'user/'
STATION_FILE = 'data/stations.txt'
CONFIG_FILE = 'env.py'
STATION_FILE = PROJECT_DIR + 'data/stations.txt'
CONFIG_FILE = PROJECT_DIR + 'env.py'
# 语音验证码
NOTIFICATION_BY_VOICE_CODE = 0
NOTIFICATION_VOICE_CODE_PHONE = ''
NOTIFICATION_API_APP_CODE = ''
if path.exists(CONFIG_FILE):
exec(open(CONFIG_FILE, encoding='utf8').read())
@@ -71,7 +76,3 @@ class UserType:
'学生': STUDENT,
'残疾军人、伤残人民警察': SOLDIER,
}
def get(key, default=None):
return eval(key)

View File

@@ -1,4 +0,0 @@
class MemberInvalidException(Exception):
pass

View File

@@ -50,6 +50,8 @@ API_GET_QUEUE_COUNT = BASE_URL_OF_12306 + '/otn/confirmPassenger/getQueueCount'
API_CONFIRM_SINGLE_FOR_QUEUE = BASE_URL_OF_12306 + '/otn/confirmPassenger/confirmSingleForQueue'
API_QUERY_ORDER_WAIT_TIME = BASE_URL_OF_12306 + '/otn/confirmPassenger/queryOrderWaitTime?{}' # 排队查询
API_NOTIFICATION_BY_VOICE_CODE = 'http://ali-voice.showapi.com/sendVoice?'
urls = {
"auth": { # 登录接口
"req_url": "/passport/web/auth/uamtk",

View File

@@ -1,8 +1,12 @@
from py12306.helpers.func import *
from py12306.config import *
from py12306.helpers.notification import Notification
from py12306.log.common_log import CommonLog
from py12306.log.order_log import OrderLog
def app_available_check():
# return True # Debug
now = time_now()
if now.hour >= 23 or now.hour < 6:
CommonLog.add_quick_log(CommonLog.MESSAGE_12306_IS_CLOSED.format(time_now())).flush()
@@ -11,3 +15,44 @@ def app_available_check():
open_time += datetime.timedelta(1)
sleep((open_time - now).seconds)
return True
class App:
"""
程序主类
TODO 需要完善
"""
@classmethod
def check_auto_code(cls):
if not config.AUTO_CODE_ACCOUNT.get('user') or not config.AUTO_CODE_ACCOUNT.get('pwd'):
return False
return True
@classmethod
def check_user_account_is_empty(cls):
if config.USER_ACCOUNTS:
for account in config.USER_ACCOUNTS:
if account:
return True
return False
@classmethod
def test_send_notifications(cls):
if config.NOTIFICATION_BY_VOICE_CODE: # 语音通知
CommonLog.add_quick_log(CommonLog.MESSAGE_TEST_SEND_VOICE_CODE).flush()
Notification.voice_code(config.NOTIFICATION_VOICE_CODE_PHONE, '张三',
OrderLog.MESSAGE_ORDER_SUCCESS_NOTIFICATION_OF_VOICE_CODE_CONTENT.format('北京',
'深圳'))
@classmethod
def run_check(cls):
"""
待优化
:return:
"""
if not cls.check_auto_code():
CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_AUTO_CODE_FAIL).flush(exit=True)
if not cls.check_user_account_is_empty():
CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_EMPTY_USER_ACCOUNT).flush(exit=True)
if Const.IS_TEST_NOTIFICATION: cls.test_send_notifications()

View File

@@ -1,10 +1,12 @@
import datetime
import random
import threading
import functools
from time import sleep
from py12306 import config
import functools
def singleton(cls):
@@ -65,6 +67,15 @@ def stay_second(second, call_back=None):
return call_back()
def sleep_forever():
"""
当不是主线程时,假象停止
:return:
"""
if not is_main_thread():
while True: sleep(10000000)
def is_main_thread():
return threading.current_thread() == threading.main_thread()
@@ -99,4 +110,17 @@ def array_dict_find_by_key_value(data, key, value, default=None):
result = [v for k, v in enumerate(data) if key in v and v[key] == value]
return result.pop() if len(result) else default
# def test:
def get_true_false_text(value, true='', false=''):
if value: return true
return false
def sleep_forever_when_in_test():
if Const.IS_TEST: sleep_forever()
@singleton
class Const:
IS_TEST = False
IS_TEST_NOTIFICATION = False

View File

@@ -0,0 +1,64 @@
import urllib
from py12306 import config
from py12306.helpers.api import *
from py12306.helpers.request import Request
from py12306.log.common_log import CommonLog
class Notification():
"""
通知类
"""
session = None
def __init__(self):
self.session = Request()
@classmethod
def voice_code(cls, phone, name='', content=''):
self = cls()
self.send_voice_code_of_yiyuan(phone, name=name, content=content)
def send_voice_code_of_yiyuan(self, phone, name='', content=''):
"""
发送语音验证码
购买地址 https://market.aliyun.com/products/57126001/cmapi019902.html?spm=5176.2020520132.101.5.37857218O6iJ3n
:return:
"""
appcode = config.NOTIFICATION_API_APP_CODE
if not appcode:
CommonLog.add_quick_log(CommonLog.MESSAGE_EMPTY_APP_CODE).flush()
return False
body = {
'userName': name,
'mailNo': content
}
params = {
'content': body,
'mobile': phone,
'sex': 2,
'tNum': 'T170701001056'
}
response = self.session.request(url=API_NOTIFICATION_BY_VOICE_CODE + urllib.parse.urlencode(params),
method='GET', headers={
'Authorization': 'APPCODE {}'.format(appcode)
})
response_message = '-'
result = {}
try:
result = response.json()
response_message = result['showapi_res_body']['remark']
except:
pass
if response.status_code == 401 or response.status_code == 403:
return CommonLog.add_quick_log(CommonLog.MESSAGE_VOICE_API_FORBID).flush()
if response.status_code == 200 and 'showapi_res_body' in result and result['showapi_res_body'].get('flag'):
CommonLog.add_quick_log(CommonLog.MESSAGE_VOICE_API_SEND_SUCCESS.format(response_message)).flush()
return True
else:
return CommonLog.add_quick_log(CommonLog.MESSAGE_VOICE_API_SEND_FAIL.format(response_message)).flush()
if __name__ == '__main__':
Notification.voice_code('13800138000', '张三', '你的车票 广州 到 深圳 购买成功,请登录 12306 进行支付')

View File

@@ -7,7 +7,6 @@ class Station:
stations = []
def __init__(self):
print('Station 初始化')
if path.exists(config.STATION_FILE):
result = open(config.STATION_FILE, encoding='utf-8').read()
result = result.lstrip('@').split('@')

View File

@@ -3,14 +3,13 @@ import sys
from py12306.helpers.func import *
class BaseLog:
logs = []
thread_logs = {}
quick_log = []
@classmethod
def add_log(cls, content):
def add_log(cls, content=''):
self = cls()
# print('添加 Log 主进程{} 进程ID{}'.format(is_main_thread(), current_thread_id()))
if is_main_thread():
@@ -31,7 +30,6 @@ class BaseLog:
logs = self.logs
else:
logs = self.thread_logs.get(current_thread_id())
# for i in logs:
print(*logs, sep=sep, end=end, file=file)
if self.quick_log:
self.quick_log = []
@@ -40,12 +38,11 @@ class BaseLog:
self.logs = []
else:
if logs: del self.thread_logs[current_thread_id()]
# print(self.logs)
if exit:
sys.exit()
@classmethod
def add_quick_log(cls, content):
def add_quick_log(cls, content = ''):
self = cls()
self.quick_log.append(content)
return self

View File

@@ -1,18 +1,67 @@
from py12306.log.base import BaseLog
from py12306.config import *
from py12306.helpers.func import *
@singleton
class CommonLog(BaseLog):
# 这里如果不声明,会出现重复打印,目前不知道什么原因
logs = []
thread_logs = {}
quick_log = []
MESSAGE_12306_IS_CLOSED = '当前时间: {} | 12306 休息时间,程序将在明天早上 6 点自动运行'
MESSAGE_RETRY_AUTH_CODE = '{} 秒后重新获取验证码'
MESSAGE_EMPTY_APP_CODE = '无法发送语音消息,未填写验证码接口 appcode'
MESSAGE_VOICE_API_FORBID = '语音消息发送失败,请检查 appcode 是否填写正确或 套餐余额是否充足'
MESSAGE_VOICE_API_SEND_FAIL = '语音消息发送失败,错误原因 {}'
MESSAGE_VOICE_API_SEND_SUCCESS = '语音消息发送成功! 接口返回信息 {} '
MESSAGE_CHECK_AUTO_CODE_FAIL = '请配置打码账号的账号密码'
MESSAGE_CHECK_EMPTY_USER_ACCOUNT = '请配置 12306 账号密码'
MESSAGE_TEST_SEND_VOICE_CODE = '正在测试发送语音验证码...'
def __init__(self):
super().__init__()
self.init_data()
def init_data(self):
print('Common Log 初始化')
pass
@classmethod
def print_welcome(cls):
self = cls()
self.add_quick_log('######## py12306 购票助手,本程序为开源工具,请勿用于商业用途 ########')
if Const.IS_TEST:
self.add_quick_log()
self.add_quick_log('当前为测试模式,程序运行完成后自动结束')
self.add_quick_log()
self.flush()
return self
@classmethod
def print_configs(cls):
# 打印配置
self = cls()
enable = '已开启'
disable = '未开启'
self.add_quick_log('**** 当前配置 ****')
self.add_quick_log('多线程查询: {}'.format(get_true_false_text(config.QUERY_JOB_THREAD_ENABLED, enable, disable)))
self.add_quick_log('语音验证码: {}'.format(get_true_false_text(config.QUERY_JOB_THREAD_ENABLED, enable, disable)))
self.add_quick_log('查询间隔: {}'.format(config.QUERY_INTERVAL))
self.add_quick_log('用户心跳检测间隔: {}'.format(config.USER_HEARTBEAT_INTERVAL))
self.add_quick_log()
self.flush()
return self
@classmethod
def test_complete(cls):
self = cls()
self.add_quick_log('# 测试完成,请检查输出是否正确 #')
self.flush()
return self
@classmethod
def print_auto_code_fail(cls, reason):

View File

@@ -4,11 +4,17 @@ from py12306.helpers.func import *
@singleton
class OrderLog(BaseLog):
# 这里如果不声明,会出现重复打印,目前不知道什么原因
logs = []
thread_logs = {}
quick_log = []
MESSAGE_REQUEST_INIT_DC_PAGE_FAIL = '请求初始化订单页面失败'
MESSAGE_SUBMIT_ORDER_REQUEST_FAIL = '提交订单失败,错误原因 {}'
MESSAGE_SUBMIT_ORDER_REQUEST_FAIL = '提交订单失败,错误原因 {} \n'
MESSAGE_SUBMIT_ORDER_REQUEST_SUCCESS = '提交订单成功'
MESSAGE_CHECK_ORDER_INFO_FAIL = '检查订单失败,错误原因 {}'
MESSAGE_CHECK_ORDER_INFO_FAIL = '检查订单失败,错误原因 {} \n'
MESSAGE_CHECK_ORDER_INFO_SUCCESS = '检查订单成功'
MESSAGE_GET_QUEUE_COUNT_SUCCESS = '排队成功,你当前排在第 {} 位, 余票还剩余 {}'
@@ -25,6 +31,10 @@ class OrderLog(BaseLog):
MESSAGE_ORDER_SUCCESS_NOTIFICATION_TITLE = '车票购买成功!'
MESSAGE_ORDER_SUCCESS_NOTIFICATION_CONTENT = '请及时器登录12306打开 \'未完成订单\'在30分钟内完成支付!'
MESSAGE_ORDER_SUCCESS_NOTIFICATION_OF_VOICE_CODE_START_SEND = '正在发送语音通知, 第 {}'
MESSAGE_ORDER_SUCCESS_NOTIFICATION_OF_VOICE_CODE_CONTENT = '你的车票 {}{} 购买成功,请登录 12306 进行支付'
MESSAGE_JOB_CLOSED = '当前任务已结束'
@classmethod
def print_passenger_did_deleted(cls, passengers):
@@ -37,6 +47,6 @@ class OrderLog(BaseLog):
@classmethod
def print_ticket_did_ordered(cls, order_id):
self = cls()
self.add_quick_log('# 车票购买成功,订单号{} #'.format(order_id))
self.add_quick_log('# 车票购买成功,订单号 {} #'.format(order_id))
self.flush()
return self

View File

@@ -8,6 +8,11 @@ from py12306.helpers.func import *
@singleton
class QueryLog(BaseLog):
# 这里如果不声明,会出现重复打印,目前不知道什么原因
logs = []
thread_logs = {}
quick_log = []
data = {
'query_count': 1,
'last_time': '',
@@ -27,13 +32,14 @@ class QueryLog(BaseLog):
def init_data(self):
# 获取上次记录
print('Query Log 初始化')
if Const.IS_TEST: return
if path.exists(self.data_path):
result = open(self.data_path, encoding='utf-8').read()
if result:
result = json.loads(result)
self.data = {**self.data, **result}
self.print_data_restored()
with open(self.data_path, encoding='utf-8') as f:
result = f.read()
if result:
result = json.loads(result)
self.data = {**self.data, **result}
self.print_data_restored()
@classmethod
def print_init_jobs(cls, jobs):
@@ -50,10 +56,11 @@ class QueryLog(BaseLog):
self.add_log('乘车日期:{}'.format(job.left_dates))
self.add_log('坐席:{}'.format(''.join(job.allow_seats)))
self.add_log('乘车人:{}'.format(''.join(job.members)))
self.add_log('筛选车次:{}'.format(''.join(job.allow_train_numbers)))
self.add_log('筛选车次:{}'.format(''.join(job.allow_train_numbers if job.allow_train_numbers else ['不筛选'])))
# 乘车日期:['2019-01-24', '2019-01-25', '2019-01-26', '2019-01-27']
self.add_log('')
index += 1
self.add_log('')
self.flush()
return self
@@ -71,10 +78,11 @@ class QueryLog(BaseLog):
def print_ticket_seat_available(cls, left_date, train_number, seat_type, rest_num):
self = cls()
self.add_quick_log(
'查询到座位可用 出发时间 {left_date} 车次 {train_number} 座位类型 {seat_type} 余票数量 {rest_num}'.format(left_date=left_date,
train_number=train_number,
seat_type=seat_type,
rest_num=rest_num))
'[ 查询到座位可用 出发时间 {left_date} 车次 {train_number} 座位类型 {seat_type} 余票数量 {rest_num} ]'.format(
left_date=left_date,
train_number=train_number,
seat_type=seat_type,
rest_num=rest_num))
self.flush()
return self
@@ -107,8 +115,6 @@ class QueryLog(BaseLog):
self.refresh_data()
if is_main_thread():
self.flush()
else:
self.add_log('\n')
return self
@classmethod
@@ -121,7 +127,7 @@ class QueryLog(BaseLog):
self.add_quick_log('============================================================')
self.add_quick_log('|=== 查询记录恢复成功 上次查询 {last_date} ===|'.format(last_date=self.data.get('last_time')))
self.add_quick_log('============================================================')
self.add_log('')
self.add_quick_log('')
self.flush()
return self

View File

@@ -4,6 +4,11 @@ from py12306.helpers.func import *
@singleton
class UserLog(BaseLog):
# 这里如果不声明,会出现重复打印,目前不知道什么原因
logs = []
thread_logs = {}
quick_log = []
MESSAGE_DOWNLAOD_AUTH_CODE_FAIL = '验证码下载失败 错误原因: {} {} 秒后重试'
MESSAGE_DOWNLAODING_THE_CODE = '正在下载验证码...'
MESSAGE_CODE_AUTH_FAIL = '验证码验证失败 错误原因: {} {} 秒后重试'
@@ -17,14 +22,14 @@ class UserLog(BaseLog):
MESSAGE_GET_USER_PASSENGERS_FAIL = '获取用户乘客列表失败,错误原因: {} {} 秒后重试'
MESSAGE_USER_PASSENGERS_IS_INVALID = '乘客信息校验失败,在账号 {} 中未找到该乘客: {}'
MESSAGE_WAIT_USER_INIT_COMPLETE = '未找到可用账号或用户正在初始化,{} 秒重试'
MESSAGE_WAIT_USER_INIT_COMPLETE = '未找到可用账号或用户正在初始化,{}重试'
def __init__(self):
super().__init__()
self.init_data()
def init_data(self):
print('User Log 初始化')
pass
@classmethod
def print_init_users(cls, users):
@@ -33,7 +38,7 @@ class UserLog(BaseLog):
:return:
"""
self = cls()
self.add_log('================== 发现 {} 个用户 =================='.format(len(users)))
self.add_log('# 发现 {} 个用户 #'.format(len(users)))
self.flush()
return self

View File

@@ -5,6 +5,7 @@ from py12306.config import UserType
from py12306.helpers.api import *
from py12306.helpers.app import *
from py12306.helpers.func import *
from py12306.helpers.notification import Notification
from py12306.log.order_log import OrderLog
from py12306.log.user_log import UserLog
@@ -31,6 +32,11 @@ class Order:
retry_time = 3
wait_queue_interval = 3
order_id = 0
notification_sustain_time = 60 * 30 # 通知持续时间 30 分钟
notification_interval = 5 * 60 # 通知间隔
def __init__(self, query, user):
self.session = user.session
# assert isinstance(query, Job) # 循环引用
@@ -57,9 +63,31 @@ class Order:
if not self.confirm_single_for_queue(): return
order_id = self.query_order_wait_time()
if order_id: # 发送通知
OrderLog.print_ticket_did_ordered(order_id)
OrderLog.notification(OrderLog.MESSAGE_ORDER_SUCCESS_NOTIFICATION_TITLE,
OrderLog.MESSAGE_ORDER_SUCCESS_NOTIFICATION_CONTENT)
self.order_id = order_id
self.order_did_success()
def order_did_success(self):
OrderLog.print_ticket_did_ordered(self.order_id)
OrderLog.notification(OrderLog.MESSAGE_ORDER_SUCCESS_NOTIFICATION_TITLE,
OrderLog.MESSAGE_ORDER_SUCCESS_NOTIFICATION_CONTENT)
self.send_notification()
def send_notification(self):
num = 0 # 通知次数
sustain_time = self.notification_sustain_time
while sustain_time: # TODO 后面直接查询有没有待支付的订单就可以
num += 1
if config.NOTIFICATION_BY_VOICE_CODE: # 语音通知
OrderLog.add_quick_log(OrderLog.MESSAGE_ORDER_SUCCESS_NOTIFICATION_OF_VOICE_CODE_START_SEND.format(num))
Notification.voice_code(config.NOTIFICATION_VOICE_CODE_PHONE, self.user_ins.get_name(),
OrderLog.MESSAGE_ORDER_SUCCESS_NOTIFICATION_OF_VOICE_CODE_CONTENT.format(
self.query_ins.left_station, self.query_ins.arrive_station))
sustain_time -= self.notification_interval
sleep(self.notification_interval)
OrderLog.add_quick_log(OrderLog.MESSAGE_JOB_CLOSED)
# 结束运行
while True: sleep(self.retry_time)
def submit_order_request(self):
data = {
@@ -277,9 +305,11 @@ class Order:
elif result_data.get('waitTime') and result_data.get('waitTime') >= 0:
OrderLog.add_quick_log(
OrderLog.MESSAGE_QUERY_ORDER_WAIT_TIME_WAITING.format(result_data.get('waitTime'))).flush()
elif result_data.get('msg'): # 失败
elif result_data.get('msg'): # 失败 对不起由于您取消次数过多今日将不能继续受理您的订票请求。1月8日您可继续使用订票功能。
# TODO 需要增加判断 直接结束
OrderLog.add_quick_log(
OrderLog.MESSAGE_QUERY_ORDER_WAIT_TIME_FAIL.format(result_data.get('msg', '-'))).flush()
stay_second(self.retry_time)
return False
elif result.get('messages') or result.get('validateMessages'):
OrderLog.add_quick_log(OrderLog.MESSAGE_QUERY_ORDER_WAIT_TIME_FAIL.format(

View File

@@ -80,20 +80,21 @@ class Job:
self.handle_response(response)
self.safe_stay()
if is_main_thread():
QueryLog.flush(sep='\t')
else:
QueryLog.add_log('\n')
QueryLog.flush(sep='\t\t')
if is_main_thread():
QueryLog.add_quick_log('').flush()
else:
QueryLog.flush(sep='\t')
QueryLog.add_log('\n').flush(sep='\t\t')
def query_by_date(self, date):
"""
通过日期进行查询
:return:
"""
QueryLog.add_log(QueryLog.MESSAGE_QUERY_START_BY_DATE.format(date, self.left_station, self.arrive_station))
QueryLog.add_log(
('\n' if not is_main_thread() else '') + QueryLog.MESSAGE_QUERY_START_BY_DATE.format(date,
self.left_station,
self.arrive_station))
url = LEFT_TICKETS.get('url').format(left_date=date, left_station=self.left_station_code,
arrive_station=self.arrive_station_code, type='leftTicket/queryZ')
@@ -141,6 +142,7 @@ class Job:
QueryLog.add_quick_log(
QueryLog.MESSAGE_GIVE_UP_CHANCE_CAUSE_TICKET_NUM_LESS_THAN_SPECIFIED).flush()
continue
if Const.IS_TEST: return
# 检查完成 开始提交订单
QueryLog.print_ticket_available(left_date=self.get_info_of_left_date(),
train_number=self.get_info_of_train_number(),

View File

@@ -34,6 +34,8 @@ class Query:
# return # DEBUG
self.init_jobs()
QueryLog.print_init_jobs(jobs=self.jobs)
stay_second(1)
while True:
app_available_check()
if config.QUERY_JOB_THREAD_ENABLED: # 多线程
@@ -41,6 +43,7 @@ class Query:
else:
for job in self.jobs:
job.run()
if Const.IS_TEST: return
def init_jobs(self):
jobs = config.QUERY_JOBS

View File

@@ -42,7 +42,8 @@ class UserJob:
def run(self):
# load user
self.load_user()
if not Const.IS_TEST:
self.load_user()
self.start()
def start(self):
@@ -53,6 +54,7 @@ class UserJob:
while True:
app_available_check()
self.check_heartbeat()
if Const.IS_TEST: return
sleep(self.heartbeat_interval)
def check_heartbeat(self):
@@ -259,7 +261,7 @@ class UserJob:
order = re.search(r'var orderRequestDTO *= *(\{.+\})', html)
# 系统忙,请稍后重试
if html.find('系统忙,请稍后重试') != -1:
OrderLog.add_quick_log(OrderLog.MESSAGE_REQUEST_INIT_DC_PAGE_FAIL).flush() # 重试无用,直接跳过
OrderLog.add_quick_log(OrderLog.MESSAGE_REQUEST_INIT_DC_PAGE_FAIL).flush() # 重试无用,直接跳过
return False
try:
self.global_repeat_submit_token = token.groups()[0]