12 Commits

Author SHA1 Message Date
Jalin
ad83fd261a Add verify of query result 2019-05-16 13:17:40 +08:00
Jalin
8b795d3217 Remove config.toml file 2019-05-15 21:47:57 +08:00
Jalin
f8a73ff455 Add query train ticket 2019-05-15 21:42:55 +08:00
Jalin
368bded297 add retry and exception support 2019-05-14 19:51:19 +08:00
Jalin
0913772ec6 add task logger 2019-05-14 18:07:06 +08:00
Jalin
f49b081bc5 rename old 2019-05-14 13:21:25 +08:00
Jalin
8d4ea3066a update browser id algorithm 2019-04-05 14:01:52 +08:00
Jalin
c0d28e6744 fix login fail #151 2019-04-04 15:36:44 +08:00
Jalin
4935582b19 fix hooks may not been assignment #148 2019-04-02 16:47:45 +08:00
Jalin
656dbe87d9 修复登录失效问题 #144 2019-04-02 16:33:16 +08:00
Jalin
80fdd4db61 修复登录失效 #144 2019-03-26 23:18:36 +08:00
Jalin
1e25b9079c 修复获取查询接口正则匹配问题 #142 2019-03-15 10:23:28 +08:00
95 changed files with 870 additions and 267 deletions

5
.gitignore vendored
View File

@@ -4,6 +4,5 @@
venv
__pycache__
env.py
env.slave.py
env.docker.py
docker-compose.yml
docker-compose.yml
config.toml

View File

@@ -1,6 +0,0 @@
ports:
- port: 8008
onOpen: open-preview
tasks:
- init: pip install -r requirements.txt && cp env.py.example env.py
command: python main.py -t

View File

@@ -1,4 +1,4 @@
version: "2"
version: "3"
services:
py12306:
build: .

View File

@@ -1,179 +0,0 @@
# -*- coding: utf-8 -*-
# 12306 账号
USER_ACCOUNTS = [
# 目前已支持仅查询,不下单,屏蔽掉下面的账号即可
{
'key': 0, # 如使用多个账号 key 不能重复
'user_name': 'your user name',
'password': 'your password'
},
# {
# 'key': 'wangwu',
# 'user_name': 'wangwu@qq.com',
# 'password': 'wangwu'
# }
]
# 查询间隔(指每一个任务中每一个日期的间隔 / 单位秒)
# 默认取间隔/2 到 间隔之间的随机数 如设置为 1 间隔则为 0.5 ~ 1 之间的随机数
# 接受字典形式 格式: {'min': 0.5, 'max': 1}
QUERY_INTERVAL = 1
# 用户心跳检测间隔 格式同上
USER_HEARTBEAT_INTERVAL = 120
# 多线程查询
QUERY_JOB_THREAD_ENABLED = 0 # 是否开启多线程查询,开启后第个任务会单独分配线程处理
# 打码平台账号
# 目前只支持免费打码接口 和 若快打码注册地址http://www.ruokuai.com/login
AUTO_CODE_PLATFORM = 'free' # 免费填写 free 若快 ruokuai # 免费打码无法保证持续可用,如失效请手动切换
AUTO_CODE_ACCOUNT = {
'user': 'your user name',
'pwd': 'your password'
}
# 语音验证码
# 没找到比较好用的,现在用的这个是阿里云 API 市场上的,基本满足要求,价格也便宜
# 购买成功后到控制台找到 APPCODE 放在下面就可以了
# 地址:易源 https://market.aliyun.com/products/57126001/cmapi019902.html
# 2019-01-18 更新
# 增加新的服务商 鼎信 https://market.aliyun.com/products/56928004/cmapi026600.html?spm=5176.2020520132.101.2.e27e7218KQttQS
NOTIFICATION_BY_VOICE_CODE = 1 # 开启语音通知
NOTIFICATION_VOICE_CODE_TYPE = 'dingxin' # 语音验证码服务商 可用项 dingxin yiyuan
NOTIFICATION_API_APP_CODE = 'your app code'
NOTIFICATION_VOICE_CODE_PHONE = 'your phone' # 接受通知的手机号
# 钉钉通知
DINGTALK_ENABLED = 0
DINGTALK_WEBHOOK = 'https://oapi.dingtalk.com/robot/send?access_token=your token'
# Telegram消息推送
# 目前共有两个Bot
# 1https://t.me/notificationme_bot
# 2https://t.me/RE_Link_Push_bot
# 任选一个Bot关注获取URL链接如果没有回复则发送给Bot这条信息: /start
# 将获取的URL填入下面对应位置
# 注意因为以上Bot都由他人公益提供无法保证随时可用如以上Bot都无法使用请使用其他消息推送方式
# Bot1来源https://github.com/Fndroid/tg_push_bot
# Bot2来源https://szc.me/post/2.html
TELEGRAM_ENABLED = 0
TELEGRAM_BOT_API_URL = 'https://tgbot.lbyczf.com/sendMessage/:your_token'
# ServerChan 和 PushBear 微信消息推送
# 使用说明
# ServerChan http://sc.ftqq.com
# PushBear http://pushbear.ftqq.com
SERVERCHAN_ENABLED = 0
SERVERCHAN_KEY = ''
PUSHBEAR_ENABLED = 0
PUSHBEAR_KEY = ''
# Bark 推送到ios设备
# 参考 https://www.v2ex.com/t/467407
BARK_ENABLED = 0
BARK_PUSH_URL = 'https://api.day.app/:your_token'
# 输出日志到文件 (Docker 中不建议修改此组配置项)
OUT_PUT_LOG_TO_FILE_ENABLED = 1
OUT_PUT_LOG_TO_FILE_PATH = '/config/12306.log' # 日志目录
RUNTIME_DIR = '/data/'
QUERY_DATA_DIR = '/data/query/'
USER_DATA_DIR = '/data/user/'
# 分布式集群配置
CLUSTER_ENABLED = 0 # 集群状态
NODE_IS_MASTER = 1 # 是否是主节点 同时只能启用 1 个主节点
NODE_SLAVE_CAN_BE_MASTER = 1 # 主节点宕机后,子节点是否可以自动提升为主节点(建议打开)
NODE_NAME = 'master' # 节点名称,不能重复
REDIS_HOST = 'localhost' # Redis host
REDIS_PORT = '6379' # Redis port
REDIS_PASSWORD = '' # Redis 密码 没有可以留空
# 邮箱配置
EMAIL_ENABLED = 0 # 是否开启邮件通知
EMAIL_SENDER = 'sender@example.com' # 邮件发送者
EMAIL_RECEIVER = 'receiver@example.com' # 邮件接受者 # 可以多个 [email1@gmail.com, email2@gmail.com]
EMAIL_SERVER_HOST = 'localhost' # 邮件服务 host
EMAIL_SERVER_USER = '' # 邮件服务登录用户名
EMAIL_SERVER_PASSWORD = '' # 邮件服务登录密码
# Web 管理
WEB_ENABLE = 1 # 是否打开 Web 管理
WEB_USER = { # 登录信息
'username': 'admin',
'password': 'password'
}
WEB_PORT = 8008 # 监听端口
# 是否开启 CDN 查询
CDN_ENABLED = 0
CDN_CHECK_TIME_OUT = 1 # 检测单个 cdn 是否可用超时时间
# 查询任务
QUERY_JOBS = [
{
# 'job_name': 'bj -> sz', # 任务名称,不填默认会以车站名命名,不可重复
'account_key': 0, # 将会使用指定账号下单
'left_dates': [ # 出发日期 :Array
"2019-01-25",
"2019-01-26",
],
'stations': { # 车站 支持多个车站同时查询 :Dict or :List
'left': '北京',
'arrive': '深圳',
},
# # 多个车站示例 (建议添加多个,有时多买几站成功率会高一点)
# 'stations': [{
# 'left': '北京',
# 'arrive': '深圳',
# },{ # 多个车站示例
# 'left': '北京',
# 'arrive': '广州',
# }],
'members': [ # 乘客姓名,会根据当前账号自动识别乘客类型 购买儿童票 设置两个相同的姓名即可,程序会自动识别 如 ['张三', '张三']
"张三",
"王五",
# 7, # 支持通过序号确定唯一乘客,序号查看可通过 python main.py -t 登录成功之后在 runtime/user/ 下找到对应的 用户名_passengers.json 文件,找到对应的 code 填入
],
'allow_less_member': 0, # 是否允许余票不足时提交部分乘客
'seats': [ # 筛选座位 有先后顺序 :Array
# 可用值: 特等座, 商务座, 一等座, 二等座, 软卧, 硬卧, 动卧, 软座, 硬座, 无座
'硬卧',
'硬座'
],
'train_numbers': [ # 筛选车次 可以为空,为空则所有车次都可以提交 如 [] 注意大小写需要保持一致
"K356",
"K1172",
"K4184"
],
'except_train_numbers': [ # 筛选车次,排除车次 train_numbers 和 except_train_numbers 不可同时存在
],
'period': { # 筛选时间
'from': '00:00',
'to': '24:00'
}
},
# {
# 'job_name': 'cd -> gz', # 任务名称,不填默认会以车站名命名,不可重复
# 'account_key': 0, # 将会使用指定账号下单
# 'left_dates': [
# "2019-01-27",
# "2019-01-28"
# ],
# 'stations': {
# 'left': '成都',
# 'arrive': '广州',
# },
# 'members': [
# "小王",
# ],
# 'allow_less_member': 0,
# 'seats': [
# '硬卧',
# ],
# 'train_numbers': []
# }
]

View File

@@ -1,12 +0,0 @@
# -*- coding: utf-8 -*-
# 分布式子节点配置文件示例
# 分布式集群配置
CLUSTER_ENABLED = 1 # 集群状态
NODE_IS_MASTER = 0 # 是否是主节点
NODE_NAME = 'slave 1' # 节点名称,不能重复
REDIS_HOST = 'localhost' # Redis host
REDIS_PORT = '6379' # Redis port
REDIS_PASSWORD = '' # Redis 密码 没有可以留空
# 没了,其它配置会自动从主节点同步

61
main.py
View File

@@ -1,63 +1,18 @@
# -*- coding: utf-8 -*-
import sys
from py12306.app import *
from py12306.helpers.cdn import Cdn
from py12306.log.common_log import CommonLog
from py12306.query.query import Query
from py12306.user.user import User
from py12306.web.web import Web
def main():
load_argvs()
CommonLog.print_welcome()
App.run()
CommonLog.print_configs()
App.did_start()
App.run_check()
Query.check_before_run()
####### 运行任务
Web.run()
Cdn.run()
User.run()
Query.run()
if not Const.IS_TEST:
while True:
sleep(10000)
else:
if Config().is_cluster_enabled(): stay_second(5) # 等待接受完通知
CommonLog.print_test_complete()
def test():
"""
功能检查
包含:
账号密码验证 (打码)
座位验证
乘客验证
语音验证码验证
通知验证
:return:
"""
Const.IS_TEST = True
Config.OUT_PUT_LOG_TO_FILE_ENABLED = False
if '--test-notification' in sys.argv or '-n' in sys.argv:
Const.IS_TEST_NOTIFICATION = True
pass
def load_argvs():
if '--test' in sys.argv or '-t' in sys.argv: test()
config_index = None
if '--config' in sys.argv: config_index = sys.argv.index('--config')
if '-c' in sys.argv: config_index = sys.argv.index('-c')
if config_index:
Config.CONFIG_FILE = sys.argv[config_index + 1:config_index + 2].pop()
# def load_argvs():
# if '--test' in sys.argv or '-t' in sys.argv: test()
# config_index = None
#
# if '--config' in sys.argv: config_index = sys.argv.index('--config')
# if '-c' in sys.argv: config_index = sys.argv.index('-c')
# if config_index:
# Config.CONFIG_FILE = sys.argv[config_index + 1:config_index + 2].pop()
if __name__ == '__main__':

View File

View File

View File

@@ -40,6 +40,8 @@ API_GET_QUEUE_COUNT = BASE_URL_OF_12306 + '/otn/confirmPassenger/getQueueCount'
API_CONFIRM_SINGLE_FOR_QUEUE = BASE_URL_OF_12306 + '/otn/confirmPassenger/confirmSingleForQueue'
API_QUERY_ORDER_WAIT_TIME = BASE_URL_OF_12306 + '/otn/confirmPassenger/queryOrderWaitTime?{}' # 排队查询
API_QUERY_INIT_PAGE = BASE_URL_OF_12306 + '/otn/leftTicket/init'
API_GET_BROWSER_DEVICE_ID = BASE_URL_OF_12306 + '/otn/HttpZF/logdevice'
API_NOTIFICATION_BY_VOICE_CODE = 'http://ali-voice.showapi.com/sendVoice?'
API_NOTIFICATION_BY_VOICE_CODE_DINGXIN = 'http://yuyin2.market.alicloudapi.com/dx/voice_notice'

View File

@@ -33,8 +33,9 @@ class Request(HTMLSession):
return response
def add_response_hook(self, hook):
exist_hooks = self.hooks['response']
if not isinstance(exist_hooks, list): hooks = [exist_hooks]
hooks = self.hooks['response']
if not isinstance(hooks, list):
hooks = [hooks]
hooks.append(hook)
self.hooks['response'] = hooks
return self

View File

@@ -94,15 +94,24 @@ class CommonLog(BaseLog):
self.add_quick_log('多线程查询: {}'.format(get_true_false_text(Config().QUERY_JOB_THREAD_ENABLED, enable, disable)))
self.add_quick_log('CDN 状态: {}'.format(get_true_false_text(Config().CDN_ENABLED, enable, disable))).flush()
self.add_quick_log('通知状态:')
self.add_quick_log(
'语音验证码: {}'.format(get_true_false_text(Config().NOTIFICATION_BY_VOICE_CODE, enable, disable)))
self.add_quick_log('邮件通知: {}'.format(get_true_false_text(Config().EMAIL_ENABLED, enable, disable)))
self.add_quick_log('钉钉通知: {}'.format(get_true_false_text(Config().DINGTALK_ENABLED, enable, disable)))
self.add_quick_log('Telegram通知: {}'.format(get_true_false_text(Config().TELEGRAM_ENABLED, enable, disable)))
self.add_quick_log('ServerChan通知: {}'.format(get_true_false_text(Config().SERVERCHAN_ENABLED, enable, disable)))
self.add_quick_log('Bark通知: {}'.format(get_true_false_text(Config().BARK_ENABLED, enable, disable)))
self.add_quick_log(
'PushBear通知: {}'.format(get_true_false_text(Config().PUSHBEAR_ENABLED, enable, disable))).flush(sep='\t\t')
if Config().NOTIFICATION_BY_VOICE_CODE:
self.add_quick_log(
'语音验证码: {}'.format(get_true_false_text(Config().NOTIFICATION_BY_VOICE_CODE, enable, disable)))
if Config().EMAIL_ENABLED:
self.add_quick_log('邮件通知: {}'.format(get_true_false_text(Config().EMAIL_ENABLED, enable, disable)))
if Config().DINGTALK_ENABLED:
self.add_quick_log('钉钉通知: {}'.format(get_true_false_text(Config().DINGTALK_ENABLED, enable, disable)))
if Config().TELEGRAM_ENABLED:
self.add_quick_log('Telegram通知: {}'.format(get_true_false_text(Config().TELEGRAM_ENABLED, enable, disable)))
if Config().SERVERCHAN_ENABLED:
self.add_quick_log(
'ServerChan通知: {}'.format(get_true_false_text(Config().SERVERCHAN_ENABLED, enable, disable)))
if Config().BARK_ENABLED:
self.add_quick_log('Bark通知: {}'.format(get_true_false_text(Config().BARK_ENABLED, enable, disable)))
if Config().PUSHBEAR_ENABLED:
self.add_quick_log(
'PushBear通知: {}'.format(get_true_false_text(Config().PUSHBEAR_ENABLED, enable, disable)))
self.add_quick_log().flush(sep='\t\t')
self.add_quick_log('查询间隔: {}'.format(Config().QUERY_INTERVAL))
self.add_quick_log('用户心跳检测间隔: {}'.format(Config().USER_HEARTBEAT_INTERVAL))
self.add_quick_log('WEB 管理页面: {}'.format(get_true_false_text(Config().WEB_ENABLE, enable, disable)))

View File

@@ -149,7 +149,7 @@ class Query:
return self.api_type
response = self.session.get(API_QUERY_INIT_PAGE)
if response.status_code == 200:
res = re.search(r'var CLeftTicketUrl = \'(leftTicket/queryX)\';', response.text)
res = re.search(r'var CLeftTicketUrl = \'(.*)\';', response.text)
try:
self.api_type = res.group(1)
except IndexError:

View File

@@ -123,6 +123,7 @@ class UserJob:
'password': self.password,
'appid': 'otn'
}
self.request_device_id()
answer = AuthCode.get_auth_code(self.session)
data['answer'] = answer
response = self.session.post(API_BASE_LOGIN.get('url'), data)
@@ -175,6 +176,142 @@ class UserJob:
# TODO 处理获取失败情况
return False
def request_device_id(self):
"""
获取加密后的浏览器特征 ID
:return:
"""
params = {"algID": self.request_alg_id(), "timestamp": int(time.time() * 1000)}
params = dict(params, **self._get_hash_code_params())
response = self.session.get(API_GET_BROWSER_DEVICE_ID, params=params)
if response.text.find('callbackFunction') >= 0:
result = response.text[18:-2]
try:
result = json.loads(result)
self.session.cookies.update({
'RAIL_EXPIRATION': result.get('exp'),
'RAIL_DEVICEID': result.get('dfp'),
})
except:
return False
def request_alg_id(self):
response = self.session.get("https://kyfw.12306.cn/otn/HttpZF/GetJS")
result = re.search(r'algID\\x3d(.*?)\\x26', response.text)
try:
return result.group(1)
except (IndexError, AttributeError) as e:
pass
return ""
def _get_hash_code_params(self):
from collections import OrderedDict
data = {
'adblock': '0',
'browserLanguage': 'en-US',
'cookieEnabled': '1',
'custID': '133',
'doNotTrack': 'unknown',
'flashVersion': '0',
'javaEnabled': '0',
'jsFonts': 'c227b88b01f5c513710d4b9f16a5ce52',
'localCode': '3232236206',
'mimeTypes': '52d67b2a5aa5e031084733d5006cc664',
'os': 'MacIntel',
'platform': 'WEB',
'plugins': 'd22ca0b81584fbea62237b14bd04c866',
'scrAvailSize': str(random.randint(500, 1000)) + 'x1920',
'srcScreenSize': '24xx1080x1920',
'storeDb': 'i1l1o1s1',
'timeZone': '-8',
'touchSupport': '99115dfb07133750ba677d055874de87',
'userAgent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.' + str(
random.randint(
5000, 7000)) + '.0 Safari/537.36',
'webSmartID': 'f4e3b7b14cc647e30a6267028ad54c56',
}
data_trans = {
'browserVersion': 'd435',
'touchSupport': 'wNLf',
'systemLanguage': 'e6OK',
'scrWidth': 'ssI5',
'openDatabase': 'V8vl',
'scrAvailSize': 'TeRS',
'hasLiedResolution': '3neK',
'hasLiedOs': 'ci5c',
'timeZone': 'q5aJ',
'userAgent': '0aew',
'userLanguage': 'hLzX',
'jsFonts': 'EOQP',
'scrAvailHeight': '88tV',
'browserName': '-UVA',
'cookieCode': 'VySQ',
'online': '9vyE',
'scrAvailWidth': 'E-lJ',
'flashVersion': 'dzuS',
'scrDeviceXDPI': '3jCe',
'srcScreenSize': 'tOHY',
'storeDb': 'Fvje',
'doNotTrack': 'VEek',
'mimeTypes': 'jp76',
'sessionStorage': 'HVia',
'cookieEnabled': 'VPIf',
'os': 'hAqN',
'hasLiedLanguages': 'j5po',
'hasLiedBrowser': '2xC5',
'webSmartID': 'E3gR',
'appcodeName': 'qT7b',
'javaEnabled': 'yD16',
'plugins': 'ks0Q',
'appMinorVersion': 'qBVW',
'cpuClass': 'Md7A',
'indexedDb': '3sw-',
'adblock': 'FMQw',
'localCode': 'lEnu',
'browserLanguage': 'q4f3',
'scrHeight': '5Jwy',
'localStorage': 'XM7l',
'historyList': 'kU5z',
'scrColorDepth': "qmyu"
}
data = OrderedDict(data)
data_str = ''
params = {}
for key, item in data.items():
data_str += key + item
key = data_trans[key] if key in data_trans else key
params[key] = item
data_str = self._encode_data_str(data_str)
data_str_len = len(data_str)
data_str_f = int(data_str_len / 3) if data_str_len % 3 == 0 else int(data_str_len / 3) + 1
if data_str_len >= 3:
data_str = data_str[data_str_f:2*data_str_f] + data_str[2*data_str_f:data_str_len] + data_str[0: data_str_f]
data_str = data_str[::-1]
data_str_tmp = ""
for e in range(0, len(data_str)):
data_str_code = ord(data_str[e])
data_str_tmp += chr(0) if data_str_code == 127 else chr(data_str_code + 1)
data_str = self._encode_data_str(data_str_tmp)
data_str = self._encode_string(data_str)
params['hashCode'] = data_str
return params
def _encode_data_str(self, data_str):
data_str_len = len(data_str)
data_str_len_tmp = int(data_str_len / 3) if data_str_len % 3 == 0 else int(data_str_len / 3) + 1
if data_str_len >= 3:
data_str_e = data_str[0:data_str_len_tmp]
data_str_f = data_str[data_str_len_tmp:2 * data_str_len_tmp]
return data_str[2 * data_str_len_tmp:data_str_len] + data_str_e + data_str_f
return data_str
def _encode_string(self, str):
import hashlib
import base64
result = base64.b64encode(hashlib.sha256(str.encode()).digest()).decode()
return result.replace('+', '-').replace('/', '_').replace('=', '')
def login_did_success(self):
"""
用户登录成功
@@ -312,7 +449,7 @@ class UserJob:
UserLog.MESSAGE_GET_USER_PASSENGERS_FAIL.format(
result.get('messages', CommonLog.MESSAGE_RESPONSE_EMPTY_ERROR), self.retry_time)).flush()
if Config().is_slave():
self.load_user_from_remote() # 加载最新 cookie
self.load_user_from_remote() # 加载最新 cookie
stay_second(self.retry_time)
return self.get_user_passengers()

View File

View File

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 11 KiB

View File

Before

Width:  |  Height:  |  Size: 729 KiB

After

Width:  |  Height:  |  Size: 729 KiB

View File

Before

Width:  |  Height:  |  Size: 136 KiB

After

Width:  |  Height:  |  Size: 136 KiB

View File

Before

Width:  |  Height:  |  Size: 775 KiB

After

Width:  |  Height:  |  Size: 775 KiB

0
py12306/app/__init__.py Normal file
View File

61
py12306/app/app.py Normal file
View File

@@ -0,0 +1,61 @@
import logging
import os
class Config:
class AppEnvType:
DEV = 'dev'
PRODUCTION = 'production'
APP_NAME = 'py12306'
APP_ENV = AppEnvType.PRODUCTION
LOADED = False
TEST_MODE = False
PROJECT_DIR = os.path.abspath(__file__ + '/../../../') + '/'
CONFIG_FILE = PROJECT_DIR + 'config.toml'
REQUEST_TIME_OUT = 5
# Config
REDIS = {
'host': '127.0.0.1',
'port': 6379,
'db': 0,
'password': None,
'decode_responses': True
}
# Redis keys
REDIS_PREFIX_KEY_TASKS = APP_NAME + ':tasks:'
# REDIS_KEY_USER_TASKS = 'user_jobs'
@classmethod
def load(cls):
"""
Load configs from toml file
:return:
"""
import toml
configs = toml.load(cls.CONFIG_FILE)
redis = configs.get('redis')
if redis:
cls.REDIS.update(redis)
app = configs.get('app')
if app:
cls.APP_ENV = app.get('env', cls.APP_ENV)
if not Config.LOADED:
Config.load()
# Logger
Logger = logging.getLogger(Config.APP_NAME)
Logger.setLevel('DEBUG' if Config.APP_ENV == Config.AppEnvType.DEV else 'ERROR')
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
handler.setFormatter(formatter)
Logger.addHandler(handler)

201
py12306/app/query.py Normal file
View File

@@ -0,0 +1,201 @@
import re
from typing import List
from py12306.app.app import Logger
from py12306.lib.api import API_QUERY_INIT_PAGE, API_LEFT_TICKETS
from py12306.lib.exceptions import RetryException
from py12306.lib.func import retry, number_of_time_period
from py12306.lib.helper import DataHelper, TrainSeat
from py12306.lib.request import Request
class TicketSeatData(DataHelper):
name: str
num: str
raw: str
class QueryTicketData(DataHelper):
left_date: str
left_station: str
arrive_station: str
left_periods: List[tuple] = []
allow_train_numbers: List[str] = []
execpt_train_numbers: List[str] = []
allow_seats: List[str] = []
available_seat: TicketSeatData
members: list
member_num: int
less_member: bool = False
def _after(self):
self.member_num = len(self.members)
class TicketData(DataHelper):
left_date: str = 'key:13'
ticket_num: str = 'key:11'
train_number: str = 'key:3'
train_no: str = 'key:2'
train_no: str = 'key:2'
left_station: str = 'key:6'
arrive_station: str = 'key:7'
order_text: str = 'key:1'
secret_str: str = 'key:0'
left_time: str = 'key:8'
arrive_time: str = 'key:9'
class Query:
@classmethod
def task_train_ticket(cls, task: dict):
QueryTicket().query_with_info(task)
class QueryTicket:
"""
车票查询
"""
api_type: str = None
time_out: int = 5
session: Request
def __init__(self):
self.session = Request()
def query_with_info(self, info: dict):
pass
@retry()
def get_query_api_type(self) -> str:
"""
动态获取查询的接口, 如 leftTicket/query
:return:
"""
if QueryTicket.api_type:
return QueryTicket.api_type
response = self.session.get(API_QUERY_INIT_PAGE)
if response.status_code == 200:
res = re.search(r'var CLeftTicketUrl = \'(.*)\';', response.text)
try:
QueryTicket.api_type = res.group(1)
except IndexError:
raise RetryException('获取车票查询地址失败')
return self.get_query_api_type()
@retry()
def get_ticket(self, data: dict):
data = QueryTicketData(data)
url = API_LEFT_TICKETS.format(left_date=data.left_date, left_station=data.left_station,
arrive_station=data.arrive_station, type=self.get_query_api_type())
resp = self.session.get(url, timeout=self.time_out, allow_redirects=False)
result = resp.json().get('data.result')
if not result:
Logger.error('车票查询失败, %s' % resp.reason)
tickets = QueryParser.parse_ticket(result)
for ticket in tickets:
self.is_ticket_valid(ticket, data)
if not data:
continue
# 验证完成,准备下单
Logger.info('[ 查询到座位可用 出发时间 {left_date} 车次 {train_number} 座位类型 {seat_type} 余票数量 {rest_num} ]'.format(
left_date=data.left_date, train_number=ticket.train_number, seat_type=data.available_seat.name,
rest_num=data.available_seat.raw))
def is_ticket_valid(self, ticket: TicketData, query: QueryTicketData) -> bool:
"""
验证 Ticket 信息是否可用
) 出发日期验证
) 车票数量验证
) 时间点验证(00:00 - 24:00)
) 车次验证
) 座位验证
) 乘车人数验证
:param ticket: 车票信息
:param query: 查询条件
:return:
"""
if not self.verify_ticket_num(ticket):
return False
if not self.verify_period(ticket.left_time, query.left_periods):
return False
if query.allow_train_numbers and ticket.train_no.upper() not in map(str.upper, query.allow_train_numbers):
return False
if query.execpt_train_numbers and ticket.train_no.upper() in map(str.upper, query.execpt_train_numbers):
return False
if not self.verify_seat(ticket, query):
return False
if not self.verify_member_count(query):
return False
return True
@staticmethod
def verify_period(period: str, available_periods: List[tuple]):
if not available_periods:
return True
period = number_of_time_period(period)
for available_period in available_periods:
if period < number_of_time_period(available_period[0]) or period > number_of_time_period(
available_period[1]):
return False
return True
@staticmethod
def verify_ticket_num(ticket: TicketData):
return ticket.ticket_num == 'Y' and ticket.order_text == '预订'
def verify_seat(self, ticket: TicketData, query: QueryTicketData) -> bool:
"""
检查座位是否可用
TODO 小黑屋判断 通过 车次 + 座位
:param ticket:
:param query:
:return:
"""
allow_seats = query.allow_seats
for seat in allow_seats:
seat_num = TrainSeat.types[seat]
raw = ticket.get_origin()[seat_num]
if self.verify_seat_text(raw):
query.available_seat = TicketSeatData({
'name': seat,
'num': seat_num,
'raw': raw
})
return True
return False
@staticmethod
def verify_seat_text(seat: str) -> bool:
return seat != '' and seat != '' and seat != '*'
@staticmethod
def verify_member_count(query: QueryTicketData) -> bool:
seat = query.available_seat
if not (seat.raw == '' or query.member_num <= int(seat.raw)):
rest_num = int(seat.raw)
if query.less_member:
query.member_num = rest_num
Logger.info(
'余票数小于乘车人数,当前余票数: %d, 实际人数 %d, 删减人车人数到: %d' % (rest_num, query.member_num, query.member_num))
else:
Logger.info('余票数 %d 小于乘车人数 %d,放弃此次提交机会' % (rest_num, query.member_num))
return False
return True
class QueryParser:
@classmethod
def parse_ticket(cls, items: List[dict]) -> List[TicketData]:
res = []
for item in items:
info = item.split('|')
info = {i: info[i] for i in range(0, len(info))} # conver to dict
res.append(TicketData(info))
return res

35
py12306/app/task.py Normal file
View File

@@ -0,0 +1,35 @@
from py12306.app.app import Logger, Config
from py12306.lib.func import new_thread_with_jobs
from py12306.lib.redis_lib import Redis
def get_routes() -> dict:
from py12306.app.user import User
from py12306.app.query import Query
return {
'user_login': User.task_user_login,
'train_ticket': Query.task_train_ticket,
}
class Task:
routes: dict = None
@classmethod
def listen(cls):
routes = get_routes()
keys = [Config.REDIS_PREFIX_KEY_TASKS + key for key, _ in routes.items()]
while True:
key, job = Redis.share().get_task_sync(keys)
Logger.info('获得新任务 %s' % key)
if Config.TEST_MODE: # ignore when in test env
return job
self = cls()
self.routes = routes
self.deal_job(key, job)
def deal_job(self, key: str, task: dict):
handler = self.routes.get(key)
if not handler:
return
new_thread_with_jobs(handler, wait=True, kwargs={'task': task})

11
py12306/app/user.py Normal file
View File

@@ -0,0 +1,11 @@
from py12306.lib.helper import ShareInstance
class User(ShareInstance):
@classmethod
def task_user_login(cls, task: dict):
pass
pass

0
py12306/lib/__init__.py Normal file
View File

11
py12306/lib/api.py Normal file
View File

@@ -0,0 +1,11 @@
HOST_API = 'kyfw.12306.cn'
BASE_API = 'https://' + HOST_API
# LEFT_TICKETS = {
# "url": BASE_URL_OF_12306 + "/otn/{type}?leftTicketDTO.train_date={left_date}&leftTicketDTO.from_station={left_station}&leftTicketDTO.to_station={arrive_station}&purpose_codes=ADULT",
# }
API_QUERY_INIT_PAGE = BASE_API + '/otn/leftTicket/init'
API_LEFT_TICKETS = BASE_API + '/otn/{type}?leftTicketDTO.train_date={left_date}&leftTicketDTO.from_station={' \
'left_station}&leftTicketDTO.to_station={arrive_station}&purpose_codes=ADULT'

View File

@@ -0,0 +1,6 @@
class RetryException(Exception):
pass
class MaxRetryException(Exception):
pass

83
py12306/lib/func.py Normal file
View File

@@ -0,0 +1,83 @@
import threading
def new_thread_with_jobs(jobs, wait=True, daemon=True, args=(), kwargs={}):
"""
Run each job with a new thread
:param jobs:
:param wait:
:param daemon:
:param args:
:param kwargs:
:return:
"""
threads = []
if not isinstance(jobs, list):
jobs = [jobs]
for job in jobs:
thread = threading.Thread(target=job, args=args, kwargs=kwargs)
thread.setDaemon(daemon)
thread.start()
threads.append(thread)
if wait:
for thread in threads:
thread.join()
def expand_class(cls, key, value, keep_old=True):
"""
Expand class method
:param cls:
:param key:
:param value:
:param keep_old:
:return:
"""
from types import MethodType
if keep_old:
setattr(cls, 'old_' + key, getattr(cls, key))
setattr(cls, key, MethodType(value, cls))
return cls
def retry(num: int = 3):
"""
Retry a func
:param num:
:return:
"""
from py12306.lib.exceptions import RetryException, MaxRetryException
retry_num_key = '_retry_num'
def decorator(func):
def wrapper(*args, **kwargs):
retry_num = num
if retry_num_key in kwargs:
retry_num = kwargs.get(retry_num_key)
kwargs.pop('_retry_num')
try:
res = func(*args, **kwargs)
except RetryException as err:
retry_num -= 1
from py12306.app.app import Logger
Logger.warning('重试 %s, 剩余次数 %s' % (func.__name__, retry_num))
if retry_num > 0:
kwargs[retry_num_key] = retry_num
return wrapper(*args, **kwargs)
raise MaxRetryException(*err.args) from None
return res
return wrapper
return decorator
def number_of_time_period(period: str) -> int:
"""
Example: 23:00 -> 2300
:param period:
:return:
"""
return int(period.replace(':', ''))

70
py12306/lib/helper.py Normal file
View File

@@ -0,0 +1,70 @@
class ShareInstance():
__session = None
@classmethod
def share(cls):
if not cls.__session:
cls.__session = cls()
return cls.__session
# Expand dict
class Dict(dict):
def get(self, key, default=None, sep='.'):
keys = key.split(sep)
for i, key in enumerate(keys):
try:
value = self[key]
if len(keys[i + 1:]) and isinstance(value, Dict):
return value.get(sep.join(keys[i + 1:]), default=default, sep=sep)
return value
except KeyError:
return self.dict_to_dict(default)
def __getitem__(self, k):
return self.dict_to_dict(super().__getitem__(k))
@staticmethod
def dict_to_dict(value):
return Dict(value) if isinstance(value, dict) else value
class DataHelper:
__origin: dict
__mappers: dict = {}
def __init__(self, data: dict):
self.__generate_mappers()
self.__origin = data
for key, val in data.items():
if str(key) in self.__mappers:
self.__dict__[self.__mappers[str(key)]] = val
elif key in self.__annotations__:
self.__dict__[key] = val
if getattr(self, '_after', None):
self._after()
def __generate_mappers(self):
for key, val in self.__annotations__.items():
try:
val = self.__getattribute__(key)
if isinstance(val, str) and val.startswith('key:'):
tags = val.split(';')
self.__dict__[key] = None
for tag in tags:
tag_info = tag.split(':')
if tag_info[0] == 'key':
self.__mappers[tag_info[1]] = key
elif tag_info[0] == 'default':
self.__dict__[key] = tag_info[1]
except (KeyError, AttributeError):
pass
def get_origin(self) -> dict:
return self.__origin
class TrainSeat:
types = {'特等座': 25, '商务座': 32, '一等座': 31, '二等座': 30, '软卧': 23, '硬卧': 28, '硬座': 29, '无座': 26, }
order_types = {'特等座': 'P', '商务座': 9, '一等座': 'M', '二等座': 'O', '软卧': 4, '硬卧': 3, '硬座': 1, '无座': 1}

27
py12306/lib/redis_lib.py Normal file
View File

@@ -0,0 +1,27 @@
from redis import Redis as PyRedis
from py12306.app.app import Config
from py12306.lib.helper import ShareInstance
import json
class Redis(PyRedis, ShareInstance):
def __init__(self, **kwargs):
if not kwargs:
kwargs = Config.REDIS
super().__init__(**kwargs)
def push_task(self, key: str, tasks: dict):
return self.rpush(key, json.dumps(tasks))
def get_task_sync(self, keys: list) -> tuple:
tasks = self.brpop(keys)
return tasks[0][len(Config.REDIS_PREFIX_KEY_TASKS):], json.loads(tasks[1])
if __name__ == '__main__':
res = Redis.share().keys('*')
print(res)
pass

68
py12306/lib/request.py Normal file
View File

@@ -0,0 +1,68 @@
import requests
from requests.exceptions import *
from requests_html import HTMLSession, HTMLResponse
from py12306.lib.func import expand_class
requests.packages.urllib3.disable_warnings()
class Request(HTMLSession):
"""
请求处理类
"""
def save_to_file(self, url, path):
response = self.get(url, stream=True)
with open(path, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024):
f.write(chunk)
return response
@staticmethod
def _handle_response(response, **kwargs) -> HTMLResponse:
"""
扩充 response
:param response:
:param kwargs:
:return:
"""
response = HTMLSession._handle_response(response, **kwargs)
expand_class(response, 'json', Request.json)
return response
def add_response_hook(self, hook):
hooks = self.hooks['response']
if not isinstance(hooks, list):
hooks = [hooks]
hooks.append(hook)
self.hooks['response'] = hooks
return self
def json(self, default={}):
"""
重写 json 方法,拦截错误
:return:
"""
from py12306.lib.helper import Dict
try:
result = self.old_json()
return Dict(result)
except:
return Dict(default)
def request(self, *args, **kwargs):
try:
if 'timeout' not in kwargs:
from py12306.app.app import Config
kwargs['timeout'] = Config.REQUEST_TIME_OUT
response = super().request(*args, **kwargs)
return response
except RequestException as e:
if e.response:
response = e.response
else:
response = HTMLResponse(HTMLSession)
# response.status_code = 500
expand_class(response, 'json', Request.json)
return response

View File

@@ -1,3 +1,4 @@
toml
-i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
appdirs==1.4.3
beautifulsoup4==4.7.0

0
tests/__init__.py Normal file
View File

15
tests/helper.py Normal file
View File

@@ -0,0 +1,15 @@
from unittest import TestCase
from py12306.app.app import Config
from py12306.lib.redis_lib import Redis
class BaseTest(TestCase):
redis: Redis = None
config: Config = None
def setUp(self) -> None:
super().setUp()
Config.TEST_MODE = True
self.config = Config
self.redis = Redis.share()

74
tests/test_query.py Normal file
View File

@@ -0,0 +1,74 @@
from unittest import TestCase
from tests.helper import BaseTest
from py12306.app.query import *
class TestQueryTicket(BaseTest):
task = {
'name': 'admin',
}
query_dict = {
'left_date': '2019-05-18',
'left_station': 'BJP',
'arrive_station': 'LZJ',
'allow_seats': ['二等座'],
'members': ['test']
}
query: QueryTicketData
ticket_str = 'iV6uPpzX3CcwqhHe4yzrJHp9hFVCouaXtS01wlUB8f%2BuA%2BKD%2FTV5KLu37w1aKHO2zlAlwMDa%2FDYY%0A2xykUxU964zvkfI3qZZ6uGEKWi0tXCT8fhkQTVvnRI43%2FAinVozab2W1Cq%2FMzJtGBv3D1Q3CscAj%0ANA1XmfNzd6Carglhzvyyy63MkbLIRvxrngx9F01W9jhKXnQupQNTOM3Pw4UIxbesBWkmQfYNj%2Fj%2F%0A3mU33kluoI5vbGVsm115Ec%2BS29KPeaM%2B4%2F2h2UZsiCb%2F5hKfew8Hijodr2VuFftbkge1meSTRRvz%0A|预订|240000G42704|G427|BXP|LAJ|BXP|LAJ|06:21|13:44|07:23|Y|nhsXhn1BbGmb4MI%2BEto43zoslKFQlIY8c356nXAHAEk9Zb2G|20190518|3|P3|01|05|0|0|||||||||||有|无|5||O090M0|O9M|0|0|null'
ticket: TicketData
def setUp(self) -> None:
super().setUp()
self.ticket = QueryParser.parse_ticket([self.ticket_str])[0]
self.query = QueryTicketData(self.query_dict)
def test_get_query_api_type(self):
res = QueryTicket().get_query_api_type()
self.assertEqual('leftTicket/query', res)
def test_get_ticket(self):
res = QueryTicket().get_ticket(self.query_dict)
def test_is_ticket_valid(self):
res = QueryTicket().is_ticket_valid(self.ticket, self.query)
self.assertEqual(res, True)
def test_verify_period(self):
self.query.left_periods = [('08:00', '16:00')]
res = QueryTicket.verify_period('12:00', self.query.left_periods)
self.assertEqual(res, True)
res = QueryTicket.verify_period('16:00', self.query.left_periods)
self.assertEqual(res, True)
res = QueryTicket.verify_period('16:01', self.query.left_periods)
self.assertEqual(res, False)
def test_verify_ticket_num(self):
self.ticket.ticket_num = 'Y'
res = QueryTicket.verify_ticket_num(self.ticket)
self.assertEqual(res, True)
def test_verify_seat(self):
self.query.allow_seats = ['硬座', '二等座']
res = QueryTicket().verify_seat(self.ticket, self.query)
self.assertEqual(res, True)
self.assertEqual(self.query.available_seat.num, 30)
class TestQueryParser(TestCase):
tickets = [
'iV6uPpzX3CcwqhHe4yzrJHp9hFVCouaXtS01wlUB8f%2BuA%2BKD%2FTV5KLu37w1aKHO2zlAlwMDa%2FDYY%0A2xykUxU964zvkfI3qZZ6uGEKWi0tXCT8fhkQTVvnRI43%2FAinVozab2W1Cq%2FMzJtGBv3D1Q3CscAj%0ANA1XmfNzd6Carglhzvyyy63MkbLIRvxrngx9F01W9jhKXnQupQNTOM3Pw4UIxbesBWkmQfYNj%2Fj%2F%0A3mU33kluoI5vbGVsm115Ec%2BS29KPeaM%2B4%2F2h2UZsiCb%2F5hKfew8Hijodr2VuFftbkge1meSTRRvz%0A|预订|240000G42704|G427|BXP|LAJ|BXP|LAJ|06:21|13:44|07:23|Y|nhsXhn1BbGmb4MI%2BEto43zoslKFQlIY8c356nXAHAEk9Zb2G|20190518|3|P3|01|05|0|0|||||||||||有|无|5||O090M0|O9M|0|0|null',
'UY8SmgFA1grdsKcN7%2B4133%2FSWTQqk8wVKcdLNsk6EAiuPIaE5aPPzUr9f%2FepLG0hLchNAKjlOl71%0AbMcW3HypGxckM8L3Hz1rg3ds77qPxXXDFxMITHRQfZzSoM8uqSKPdVwT4mEs6ynZ2Niw7M3iAHbq%0A0qjpuj%2FaAc5yiWsvHxAGc3UQPqchrXjcabyp9%2Bnmf7z84Ep74XirfcRmAmZopq%2B9ySctz9lnwule%0A%2FaSdcAWypluKLPobkAdSpxwndKk8bV2U%2Bq%2BbGPaNEzy2i9ixRdaBBLkg3OAqHaCBetr9gHFEiYXu%0A|预订|240000G4290C|G429|BXP|LAJ|BXP|LAJ|10:45|19:45|09:00|Y|4lti%2FihSxlRgd8xN4SFzPvGmpcT90cvJFfy4V5IGfmyCNl6r|20190518|3|P4|01|16|0|0|||||||||||有|有|5||O0M090|OM9|0|0|null',
'VUb2s9O%2BqvddST8Tk%2BT8PzHNjzrMsp301eZv9ukz3jw55DHXLMpQ3ZK92ystqCe9atpD7DFlHiHD%0AB9q%2F4EoAaoU3OwacLHAEMtr9fX%2FYXwuCMhHmQHw%2BL8eejS9QgR5ZQM8oV6%2FeaJ5x5KqCIutwZBtz%0AgzuRZ%2FpOHSGdg03WWOXdWHVpJrBUleLGpQZ%2BQJMz0YGrl1Md%2BpNu5ypNdyKg6AyYjmZs4fRz6Slj%0AwCbQlhkclS2mvxpAE5gSJZ3nY8IjFelQTAqt6XTEHPsZ7Rd%2FNHwOM7UtlbQy7NyBCHTLgIAjuB58%0ADEpzVw%3D%3D|预订|240000T1750J|T175|BXP|XNO|BXP|LZJ|13:05|07:44|18:39|Y|g%2F3wSCFH0UvzDmFPO8NuyXGeIMI26cl93Qzex2RLyufZ8M5i2%2FvdylS8zKM%3D|20190518|3|P4|01|13|0|0||||无|||有||有|有|||||10401030|1413|0|0|null',
'dPVMZOEQT2rtEi5BNTY2h1nNhp2H%2BA%2BKZaZINqEQ2RUbeKK%2BFeC1y%2Bm8NdO%2BlS4Ag8r6hsWfWHdL%0AX7DrJJMRMuEXnCwqcc%2Bnwd%2BfvdaeozWFuGE08OFZzJbGnnL%2F54VMSdUnapJ4jWVvsYLG2RUqopiX%0AjDavL7dBULGrfNZN4EMTBFqUz%2BzqnmDGvf3RaXr7EHrztAJSNEQc09PqlGHs65B3VaFhN%2Fa0%2BgVQ%0AXCIAP1YysdgqDXMndNNq4nkMX21Jruvi8ToQWsGnYCf%2F7OIzS5HwOu3PElDZ9bMfempLAFk%3D|预订|2400000Z550M|Z55|BXP|LZJ|BXP|LZJ|14:58|07:30|16:32|Y|nNj9EIzgMtJaVlhUo0gt3HKZi820vP3HktntoPUe%2FFW%2BDfiv|20190518|3|P4|01|06|0|0||11||有|||||有||||||604030|643|0|0|null',
'V%2F6N%2BhZhuqxSnfkLwZHsgPQBDsGMcJkhZXyuWQLCKlzv7T%2BMvJerzW2u2TBoM8aRbqVkjywXT99K%0AdGcCUHmNOqXqngnHnvg1yj0jvsfQHPRHKIPa6hl0QeX%2BgM%2F%2Ffyj8opU919pW4YT3HViE0hQ8vNRT%0AUQOdJmbSFo1b3xI05cuzh4j21RuP9sdgaA%2BnheYMTvyMoYiEUvN1%2BClGrlrbXnhHgSWFUMxu88sG%0AQGpnqTLtVx27AAe58c9qy5oq35lOnf5OV6%2BUebB7n4YYy7ZpugZ1gyPndGGhvQdg8j58HFo%2FY%2BC1%0AzbdwhA%3D%3D|预订|2400000Z7508|Z75|BXP|LZJ|BXP|LZJ|15:57|10:30|18:33|Y|bIdN7uqCXyxKnuLinwN1naNDcYioI7Xuk535Xl1xm6Wn8CRtk2knPYx5MW4%3D|20190518|3|P4|01|07|0|0||||有|||有||有|有|||||10401030|1413|0|0|null',
'2WQo8Fm2OT6Y016qIB5vRQNikHMVarIhB9YUu7sDFKMTC3RFxmi7Y%2BE9S%2BjdYxUoEfUiqhj%2F8ZX1%0A1GpE8Vikd5urQLbp5%2FjkES9798ohE3dQwZ0ffKHX%2FQiIl4maKmdVKebWTyV8IMgTThm5C1l%2B8csY%0ApM0kaFEsQtERyf8Mh9FH9vQDxn2Vtb%2FoOPY2UvNS%2F8Tf%2BNWni21Dh8tRZ0ZL9CBYl6%2BRbNphYSZy%0AhQASZ9fG%2BjJe96bZL%2FsuMvFa%2FTNG51k07G8mggtoqgREp0zP0cdBHjkOm%2BTmMuK7uqLS9gUodYds%0ApEj%2B%2Bg%3D%3D|预订|240000Z1510B|Z151|BXP|XNO|BXP|LZJ|16:03|11:19|19:16|Y|TM4VyMprWgU1m%2ByEJxolE3Hutch%2FGYoyOMLhudWSaubKi5OeWcwS8XZJvkU%3D|20190518|3|P2|01|09|0|0||||无|||有||有|有|||||10401030|1413|0|0|null',
'i4IZi6FeuPVecIlQ7MMdptQ77XQ6DEH14WRtbCN1%2FJViWj7liJ3qUEJ9ml3aC9%2B8cBPbKsVHycxa%0ApoLgwMxEcxJ8LdFDeWHSJ%2FbRPyw0Ygs3tYGz%2FTYv4Ys03Oc6NGJsXlt76XQ6Lmm6fDVs%2FKnsWARg%0Ar2NxqMn0ecRGgiDAcVRF4CApE3cdE2GW0%2Blt7xcbPTDc3R2vawIAk8zKlMWKaMReXfqgeeln%2BAIV%0Aa9KSTBxgR9pC85I%2BVMJe4mYVLeUaSa%2FI7fYrXfJyVu%2BDDdiQaWEwLsTlmh7cxkGZlosHLtJh14Ym%0A6xjrqg%3D%3D|预订|2400000Z210D|Z21|BXP|LSO|BXP|LZJ|20:00|12:17|16:17|Y|et1f50q%2B5c5I%2B9WjMAG7QRRd%2F5lr5LzzS%2Bijw0HrPjMGTPoFY0BytCT68Ho%3D|20190518|3|P2|01|05|0|0||||无|||有||无|有|||||10401030|1413|0|0|null',
'XyBvey3WmmF82TTYlMRIMGTG9tntgOjqf7d9Y7YgdZTP16T3Ts1loq5oqe9XUOrKNJxRGUmv4Q9h%0AkbnGYxvHA4LgWlDsyqO%2B%2B6SoX%2BW%2BtCH%2BC5JXvabJcaN%2BfZjQa8aBYvHHNx4li28D4tlCfrKnkB%2BU%0AzHfTSG6ekFF5K53clwbEyaljpJDdCi6uSQqMPUkslA1RQ4KAQPnXEbDbz4oC9IdjGiiTTPuC7QJU%0A0r2VW5TnKXvJr6toDWogGW8icGjeuDVcNKn%2B5OltBdNJD5bheKSE4hjzv8HauF8H%2Bb3c77jzHqSk%0ANtV%2Bgw%3D%3D|预订|250000K8880P|K885|TJP|LAJ|BJP|LZJ|23:43|05:07|29:24|Y|A1VIUe1w8dwrEhQacQ1O8SQntd7wRO0M%2Bck0TjWwuZgB%2Fi%2BVT2cxShZVqzQ%3D|20190518|3|P4|03|15|0|0||||无|||无||无|有|||||10403010|1431|1|0|null'
]
def test_parse_ticket(self):
res = QueryParser.parse_ticket(self.tickets)
self.assertEqual(res[0].left_station, 'BXP')
self.assertEqual(res[0].train_number, 'G427')

9
tests/test_redis.py Normal file
View File

@@ -0,0 +1,9 @@
from py12306.lib.redis_lib import Redis
from tests.helper import BaseTest
class TestRedis(BaseTest):
def test_connection(self):
res = Redis.share().info()
self.assertIsInstance(res, dict)

20
tests/test_task.py Normal file
View File

@@ -0,0 +1,20 @@
from tests.helper import BaseTest
from py12306.app.task import Task
class TestTask(BaseTest):
def test_push_task(self):
tasks = {
'query': {
'name': 'admin',
},
'user': {
'name': 'admin',
'password': 'password'
}
}
for key, task in tasks.items():
self.redis.push_task(self.config.REDIS_PREFIX_KEY_TASKS + key, task)
res = Task.listen()
self.assertIsInstance(res, dict)

5
tests/test_user.py Normal file
View File

@@ -0,0 +1,5 @@
from tests.helper import BaseTest
class TestUser(BaseTest):
pass