增加多线程查询

This commit is contained in:
Jalin
2019-01-06 17:02:36 +08:00
parent 85fb384b0a
commit 0553dc936b
12 changed files with 253 additions and 27 deletions

View File

@@ -1,5 +1,5 @@
# encoding=utf8
import os
from threading import Thread
from py12306.log.query_log import QueryLog

View File

@@ -7,9 +7,15 @@ USER_ACCOUNTS = []
# 查询任务
QUERY_JOBS = []
# 查询间隔
QUERY_INTERVAL = 1
# 用户心跳检测间隔
USER_HEARTBEAT_INTERVAL = 120
# 多线程查询
QUERY_JOB_THREAD_ENABLED = 0
SEAT_TYPES = {
'商务座': 32,
@@ -24,6 +30,7 @@ SEAT_TYPES = {
# Query
QUERY_DATA_DIR = 'runtime/query'
USER_DATA_DIR = 'runtime/user'
STATION_FILE = 'data/stations.txt'

View File

@@ -14,6 +14,21 @@ LEFT_TICKETS = {
"is_cdn": True,
}
API_BASE_LOGIN = {
"url": BASE_URL_OF_12306 + '/passport/web/login',
"method": "post",
"is_cdn": True,
}
API_USER_CHECK = {
"url": BASE_URL_OF_12306 + '/otn/login/checkUser',
"method": "post",
"is_cdn": True,
}
urls = {
"auth": { # 登录接口
"req_url": "/passport/web/auth/uamtk",

View File

@@ -1,4 +1,5 @@
import random
import threading
from time import sleep
from py12306 import config
@@ -60,4 +61,11 @@ def get_interval_num(interval, decimal=2):
def stay_second(second):
sleep(second)
def is_main_thread():
return threading.current_thread() == threading.main_thread()
def current_thread_id():
return threading.current_thread().ident
# def test:

View File

@@ -22,9 +22,13 @@ class Station:
@classmethod
def get_station_by_name(cls, name):
return cls.get_station_by(name, 'name')
@classmethod
def get_station_by(cls, value, field):
self = cls()
for station in self.stations:
if station.get('name') == name:
if station.get(field) == value:
return station
return None
@@ -32,3 +36,7 @@ class Station:
def get_station_key_by_name(cls, name):
return cls.get_station_by_name(name).get('key')
@classmethod
def get_station_name_by_key(cls, key):
return cls.get_station_by(key, 'key').get('name')

View File

@@ -1,26 +1,45 @@
import os
import sys
from py12306.helpers.func import *
class BaseLog:
logs = []
thread_logs = {}
quick_log = []
@classmethod
def add_log(cls, content):
self = cls()
self.logs.append(content)
# print('添加 Log 主进程{} 进程ID{}'.format(is_main_thread(), current_thread_id()))
if is_main_thread():
self.logs.append(content)
else:
tmp_log = self.thread_logs.get(current_thread_id(), [])
tmp_log.append(content)
self.thread_logs[current_thread_id()] = tmp_log
return self
@classmethod
def flush(cls, sep='\n', end='\n', file=None):
self = cls()
logs = self.quick_log if self.quick_log else self.logs
if self.quick_log:
logs = self.quick_log
else:
if is_main_thread():
logs = self.logs
else:
logs = self.thread_logs[current_thread_id()]
# for i in logs:
print(*logs, sep=sep, end=end, file=file)
if self.quick_log:
self.quick_log = []
else:
self.logs = []
if is_main_thread():
self.logs = []
else:
del self.thread_logs[current_thread_id()]
# print(self.logs)
@classmethod
@@ -28,3 +47,9 @@ class BaseLog:
self = cls()
self.quick_log.append(content)
return self
def notification(self, title, content=''):
if sys.platform == 'darwin':
os.system(
'osascript -e \'tell app "System Events" to display notification "{content}" with title "{title}"\''.format(
title=title, content=content))

View File

@@ -1,5 +1,6 @@
import datetime
import json
import sys
from os import path
from py12306.log.base import BaseLog
from py12306.helpers.func import *
@@ -17,7 +18,8 @@ class QueryLog(BaseLog):
MESSAGE_GIVE_UP_CHANCE_CAUSE_TICKET_NUM_LESS_THAN_SPECIFIED = '余票数小于乘车人数,放弃此次提交机会'
MESSAGE_QUERY_LOG_OF_EVERY_TRAIN = '{}-{}'
MESSAGE_QUERY_START_BY_DATE = '出发日期 {}: {} - {}'
MESSAGE_QUERY_LOG_OF_TRAIN_INFO = '{} {}'
MESSAGE_QUERY_START_BY_DATE = '出发日期 {}: {} - {}'
def __init__(self):
super().__init__()
@@ -35,11 +37,11 @@ class QueryLog(BaseLog):
@classmethod
def print_init_jobs(cls, jobs):
self = cls()
"""
输出初始化信息
:return:
"""
self = cls()
self.add_log('# 发现 {} 个任务 #'.format(len(jobs)))
index = 1
for job in jobs:
@@ -76,6 +78,16 @@ class QueryLog(BaseLog):
self.flush()
return self
@classmethod
def print_ticket_available(cls, left_date, train_number, rest_num):
self = cls()
self.add_quick_log('检查完成 开始提交订单 '.format())
self.notification('查询到可用车票', '时间 {left_date} 车次 {train_number} 余票数量 {rest_num}'.format(left_date=left_date,
train_number=train_number,
rest_num=rest_num))
self.flush()
return self
@classmethod
def print_query_error(cls, reason, code=None):
self = cls()
@@ -90,9 +102,13 @@ class QueryLog(BaseLog):
@classmethod
def print_job_start(cls):
self = cls()
self.add_quick_log('=== 正在进行第 {query_count} 次查询 === {time}'.format(query_count=self.data.get('query_count'), time=datetime.datetime.now()))
self.add_log('=== 正在进行第 {query_count} 次查询 === {time}'.format(query_count=self.data.get('query_count'),
time=datetime.datetime.now()))
self.refresh_data()
self.flush()
if is_main_thread():
self.flush()
else:
self.add_log('\n')
return self
@classmethod

25
py12306/log/user_log.py Normal file
View File

@@ -0,0 +1,25 @@
from py12306.log.base import BaseLog
from py12306.helpers.func import *
@singleton
class UserLog(BaseLog):
def __init__(self):
super().__init__()
self.init_data()
def init_data(self):
print('User Log 初始化')
@classmethod
def print_init_users(cls, users):
"""
输出初始化信息
:return:
"""
self = cls()
self.add_log('================== 发现 {} 个用户 =================='.format(len(users)))
self.add_log('')
self.flush()
return self

View File

@@ -25,10 +25,13 @@ class Job:
interval = {}
query = None
ticket_info = {}
INDEX_TICKET_NUM = 11
INDEX_TRAIN_NUMBER = 3
INDEX_LEFT_DATE = 13
INDEX_LEFT_STATION = 6 # 4 5 始发 终点
INDEX_ARRIVE_STATION = 7
INDEX_ORDER_TEXT = 1 # 下单文字
def __init__(self, info, query):
self.left_dates = info.get('left_dates')
@@ -64,8 +67,14 @@ class Job:
response = self.query_by_date(date)
self.handle_response(response)
self.safe_stay()
if is_main_thread():
QueryLog.flush(sep='\t\t')
else:
QueryLog.add_log('\n')
if is_main_thread():
QueryLog.add_quick_log('').flush()
else:
QueryLog.flush(sep='\t\t')
QueryLog.add_quick_log('').flush()
def query_by_date(self, date):
"""
@@ -92,10 +101,11 @@ class Job:
if not results:
return False
for result in results:
ticket_info = result.split('|')
self.ticket_info = ticket_info = result.split('|')
if not self.is_trains_number_valid(ticket_info): # 车次是否有效
continue
QueryLog.add_log(QueryLog.MESSAGE_QUERY_LOG_OF_EVERY_TRAIN.format(ticket_info[self.INDEX_TRAIN_NUMBER], ticket_info[self.INDEX_TICKET_NUM]))
QueryLog.add_log(QueryLog.MESSAGE_QUERY_LOG_OF_EVERY_TRAIN.format(self.get_info_of_train_number(),
self.get_info_of_ticket_num()))
if not self.is_has_ticket(ticket_info):
continue
allow_seats = self.allow_seats if self.allow_seats else list(config.SEAT_TYPES.values()) # 未设置 则所有可用
@@ -103,15 +113,21 @@ class Job:
ticket_of_seat = ticket_info[get_seat_number_by_name(seat)]
if not self.is_has_ticket_by_seat(ticket_of_seat): # 座位是否有效
continue
QueryLog.print_ticket_seat_available(left_date=ticket_info[self.INDEX_LEFT_DATE], train_number=ticket_info[self.INDEX_TRAIN_NUMBER], seat_type=seat, rest_num=ticket_of_seat)
QueryLog.print_ticket_seat_available(left_date=self.get_info_of_left_date(),
train_number=self.get_info_of_train_number(), seat_type=seat,
rest_num=ticket_of_seat)
if not self.is_member_number_valid(ticket_of_seat): # 乘车人数是否有效
if self.allow_less_member:
self.member_num_take = int(ticket_of_seat)
QueryLog.print_ticket_num_less_than_specified(ticket_of_seat, self)
else:
QueryLog.add_quick_log( QueryLog.MESSAGE_GIVE_UP_CHANCE_CAUSE_TICKET_NUM_LESS_THAN_SPECIFIED).flush()
QueryLog.add_quick_log(
QueryLog.MESSAGE_GIVE_UP_CHANCE_CAUSE_TICKET_NUM_LESS_THAN_SPECIFIED).flush()
continue
# 检查完成 开始提交订单
QueryLog.print_ticket_available(left_date=self.get_info_of_left_date(),
train_number=self.get_info_of_train_number(),
rest_num=ticket_of_seat)
print('检查完成 开始提交订单')
def get_results(self, response):
@@ -130,21 +146,39 @@ class Job:
return result if result else False
def is_has_ticket(self, ticket_info):
return ticket_info[11] == 'Y' and ticket_info[1] == '预订'
return self.get_info_of_ticket_num() == 'Y' and self.get_info_of_order_text() == '预订'
def is_has_ticket_by_seat(self, seat):
return seat != '' and seat != '' and seat != '*'
def is_trains_number_valid(self, ticket_info):
if self.allow_train_numbers:
return ticket_info[3] in self.allow_train_numbers
return self.get_info_of_train_number() in self.allow_train_numbers
return True
def is_member_number_valid(self, seat):
return seat == '' or self.member_num <= int(seat)
def safe_stay(self):
interval = get_interval_num(self.interval)
QueryLog.add_stay_log(interval)
stay_second(interval)
# 提供一些便利方法
def get_info_of_left_date(self):
return self.ticket_info[self.INDEX_LEFT_DATE]
def get_info_of_ticket_num(self):
return self.ticket_info[self.INDEX_TICKET_NUM]
def get_info_of_train_number(self):
return self.ticket_info[self.INDEX_TRAIN_NUMBER]
def get_info_of_left_station(self):
return Station.get_station_name_by_key(self.ticket_info[self.INDEX_LEFT_STATION])
def get_info_of_arrive_station(self):
return Station.get_station_name_by_key(self.ticket_info[self.INDEX_ARRIVE_STATION])
def get_info_of_order_text(self):
return self.ticket_info[self.INDEX_ORDER_TEXT]

View File

@@ -1,3 +1,5 @@
import threading
from requests_html import HTMLSession
from py12306.helpers.func import *
@@ -30,8 +32,17 @@ class Query:
self.init_jobs()
QueryLog.print_init_jobs(jobs=self.jobs)
while True:
for job in self.jobs:
job.run()
threads = []
if config.QUERY_JOB_THREAD_ENABLED: # 多线程
for job in self.jobs:
thread = threading.Thread(target=job.run)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
else:
for job in self.jobs:
job.run()
def init_jobs(self):
jobs = config.QUERY_JOBS

68
py12306/user/job.py Normal file
View File

@@ -0,0 +1,68 @@
from os import path
from requests_html import HTMLSession
from py12306.helpers.api import API_USER_CHECK, API_BASE_LOGIN
from py12306.helpers.func import *
class UserJob:
heartbeat = 60 * 2
key = None
user_name: ''
password: ''
user: None
def __init__(self, info, user):
self.session = HTMLSession()
# cookie TODO
self.heartbeat = user.heartbeat
self.key = info.get('key')
self.user_name = info.get('user_name')
self.password = info.get('password')
self.user = user
def run(self):
self.start()
def start(self):
self.check_heartbeat()
def check_heartbeat(self):
if self.is_first_time() or not self.check_user_is_login():
self.handle_login()
pass
# def init_cookies
def is_first_time(self):
return not self.get_user_cookie()
def handle_login(self):
self.base_login()
def base_login(self):
"""
获取验证码结果
:return:
"""
data = {
'username': self.user_name,
'password': self.password,
'appid': 'otn'
}
response = self.session.post(API_BASE_LOGIN.get('url'), data)
pass
def check_user_is_login(self):
response = self.session.get(API_USER_CHECK.get('url'))
is_login = response.json().get('status')
def get_user_cookie(self):
path = self.get_cookie_path()
if path.exists(path):
return open(path, encoding='utf-8').read()
return None
def get_cookie_path(self):
return config.USER_DATA_DIR + '/' + self.user_name + '.cookie'

View File

@@ -1,16 +1,15 @@
from py12306.helpers.func import *
from py12306.log.user_log import UserLog
from py12306.user.job import UserJob
@singleton
class User:
heartbeat = 60 * 2
users = []
def __init__(self):
"""
初始化用户
恢复
登录
"""
pass
self.interval = config.USER_HEARTBEAT_INTERVAL
@classmethod
def run(cls):
@@ -19,4 +18,14 @@ class User:
pass
def start(self):
pass
self.init_users()
UserLog.print_init_users(jobs=self.users)
while True:
for user in self.users:
user.run()
def init_users(self):
accounts = config.USER_ACCOUNTS
for account in accounts:
user = UserJob(info=account, user=self)
self.users.append(user)