添加扫码登录功能

Signed-off-by: Gardel <sunxinao@hotmail.com>
This commit is contained in:
Gardel
2021-09-24 21:36:37 +08:00
parent 49d35aabdc
commit 68a508e30a
9 changed files with 222 additions and 16 deletions

View File

@@ -6,12 +6,14 @@ USER_ACCOUNTS = [
{
'key': 0, # 如使用多个账号 key 不能重复
'user_name': 'your user name',
'password': 'your password'
'password': '忽略',
'type': 'qr' # qr 为扫码登录,填写其他为密码登录
},
# {
# 'key': 'wangwu',
# 'user_name': 'wangwu@qq.com',
# 'password': 'wangwu'
# 'password': 'wangwu',
# 'type': ''
# }
]

View File

@@ -6,12 +6,14 @@ USER_ACCOUNTS = [
{
'key': 0, # 如使用多个账号 key 不能重复
'user_name': 'your user name',
'password': 'your password'
'password': '忽略',
'type': 'qr' # qr 为扫码登录,填写其他为密码登录
},
# {
# 'key': 'wangwu',
# 'user_name': 'wangwu@qq.com',
# 'password': 'wangwu'
# 'password': 'wangwu',
# 'type': ''
# }
]

View File

@@ -1,6 +1,4 @@
# coding=utf-8
# 查询余票
import time
HOST_URL_OF_12306 = 'kyfw.12306.cn'
BASE_URL_OF_12306 = 'https://' + HOST_URL_OF_12306
@@ -15,6 +13,18 @@ API_BASE_LOGIN = {
API_USER_LOGIN_CHECK = BASE_URL_OF_12306 + '/otn/login/conf'
API_AUTH_QRCODE_BASE64_DOWNLOAD = {
'url': BASE_URL_OF_12306 + '/passport/web/create-qr64'
}
API_AUTH_QRCODE_CHECK = {
'url': BASE_URL_OF_12306 + '/passport/web/checkqr'
}
API_USER_LOGIN = {
'url': BASE_URL_OF_12306 + '/otn/login/userLogin'
}
API_AUTH_CODE_DOWNLOAD = {
'url': BASE_URL_OF_12306 + '/passport/captcha/captcha-image?login_site=E&module=login&rand=sjrand&_={random}'
}

93
py12306/helpers/qrcode.py Normal file
View File

@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
import os
import png
def print_qrcode(path):
"""
将二维码输出到控制台
需要终端尺寸足够大才能显示
:param path: 二维码图片路径 (PNG 格式)
:return: None
"""
reader = png.Reader(path)
width, height, rows, info = reader.read()
lines = list(rows)
# 识别二维码尺寸
x_flag = -1 # x 边距标志
y_flag = -1 # y 边距标志
x_white = -1 # 定位图案白块 x 坐标
y_white = -1 # 定位图案白块 y 坐标
i = y_flag
while i < height:
if y_white > 0 and x_white > 0:
break
j = x_flag
while j < width:
total = 0
for k in range(info['planes']):
px = lines[i][j * info['planes'] + k]
total += px
avg = total / info['planes']
mid = (2 ** info['bitdepth']) / 2
black = avg < mid
if y_white > 0 and x_white > 0:
break
if x_flag > 0 > x_white and not black:
x_white = j
if x_flag == -1 and black:
x_flag = j
if y_flag > 0 > y_white and not black:
y_white = i
if y_flag == -1 and black:
y_flag = i
if x_flag > 0 and y_flag > 0:
i += 1
j += 1
i += 1
assert y_white - y_flag == x_white - x_flag
scale = y_white - y_flag
assert width - x_flag == height - y_flag
module_count = int((width - x_flag * 2) / scale)
if os.name == 'nt':
white_block = '▇▇'
black_block = ' '
new_line = '\n'
else:
white_block = '\033[0;37;47m '
black_block = '\033[0;37;40m '
new_line = '\033[0m\n'
print('', flush=False)
for i in range(module_count + 2):
print(white_block, end='', flush=False)
print('', end=new_line, flush=False)
i = y_flag
while i < height - y_flag:
print(white_block, end='', flush=False)
j = x_flag
while j < width - x_flag:
total = 0
for k in range(info['planes']):
px = lines[i][j * info['planes'] + k]
total += px
avg = total / info['planes']
mid = (2 ** info['bitdepth']) / 2
black = avg < mid
if black:
print(black_block, end='', flush=False)
else:
print(white_block, end='', flush=False)
j += scale
print(white_block, end=new_line, flush=False)
i += scale
for i in range(module_count + 2):
print(white_block, end='', flush=False)
print('', end=new_line, flush=True)

View File

@@ -13,6 +13,9 @@ class UserLog(BaseLog):
MESSAGE_DOWNLAODING_THE_CODE = '正在下载验证码...'
MESSAGE_CODE_AUTH_FAIL = '验证码验证失败 错误原因: {}'
MESSAGE_CODE_AUTH_SUCCESS = '验证码验证成功 开始登录...'
MESSAGE_QRCODE_DOWNLOADING = '正在下载二维码...'
MESSAGE_QRCODE_DOWNLOADED = '二维码保存在: {},请使用手机客户端扫描'
MESSAGE_QRCODE_FAIL = '二维码获取失败: {}, {} 秒后重试'
MESSAGE_LOGIN_FAIL = '登录失败 错误原因: {}'
MESSAGE_LOADED_USER = '正在尝试恢复用户: {}'
MESSAGE_LOADED_USER_SUCCESS = '用户恢复成功: {}'

View File

@@ -1,4 +1,3 @@
import sys
from datetime import timedelta
from datetime import datetime
@@ -154,14 +153,14 @@ class Job:
QueryLog.add_quick_log(msg).flush(publish=False)
raise RuntimeError(msg)
else:
pass
return date_query.strftime("%Y-%m-%d")
def query_by_date(self, date):
"""
通过日期进行查询
:return:
"""
self.judge_date_legal(date)
date = self.judge_date_legal(date)
from py12306.helpers.cdn import Cdn
QueryLog.add_log(('\n' if not is_main_thread() else '') + QueryLog.MESSAGE_QUERY_START_BY_DATE.format(date,
self.left_station,

View File

@@ -1,3 +1,4 @@
from base64 import b64decode
from py12306.config import Config
from py12306.cluster.cluster import Cluster
from py12306.app import app_available_check
@@ -5,7 +6,7 @@ from py12306.helpers.func import *
from py12306.helpers.request import Request
from py12306.log.query_log import QueryLog
from py12306.query.job import Job
from py12306.helpers.api import API_QUERY_INIT_PAGE
from py12306.helpers.api import API_QUERY_INIT_PAGE, API_GET_BROWSER_DEVICE_ID
@singleton
@@ -29,6 +30,7 @@ class Query:
def __init__(self):
self.session = Request()
self.request_device_id()
self.cluster = Cluster()
self.update_query_interval()
self.update_query_jobs()
@@ -117,6 +119,32 @@ class Query:
self.jobs.append(job)
return job
def request_device_id(self):
"""
获取加密后的浏览器特征 ID
:return:
"""
response = self.session.get(API_GET_BROWSER_DEVICE_ID)
if response.status_code == 200:
try:
result = json.loads(response.text)
response = self.session.get(b64decode(result['id']).decode())
if response.text.find('callbackFunction') >= 0:
result = response.text[18:-2]
result = json.loads(result)
if not Config().is_cache_rail_id_enabled():
self.session.cookies.update({
'RAIL_EXPIRATION': result.get('exp'),
'RAIL_DEVICEID': result.get('dfp'),
})
else:
self.session.cookies.update({
'RAIL_EXPIRATION': Config().RAIL_EXPIRATION,
'RAIL_DEVICEID': Config().RAIL_DEVICEID,
})
except:
return False
@classmethod
def wait_for_ready(cls):
self = cls()

View File

@@ -1,4 +1,4 @@
import json
import base64
import pickle
import re
from os import path
@@ -11,6 +11,7 @@ from py12306.helpers.event import Event
from py12306.helpers.func import *
from py12306.helpers.request import Request
from py12306.helpers.type import UserType
from py12306.helpers.qrcode import print_qrcode
from py12306.log.order_log import OrderLog
from py12306.log.user_log import UserLog
from py12306.log.common_log import CommonLog
@@ -23,6 +24,7 @@ class UserJob:
key = None
user_name = ''
password = ''
type = 'qr'
user = None
info = {} # 用户信息
last_heartbeat = None
@@ -51,6 +53,7 @@ class UserJob:
self.key = str(info.get('key'))
self.user_name = info.get('user_name')
self.password = info.get('password')
self.type = info.get('type')
def update_user(self):
from py12306.user.user import User
@@ -111,7 +114,10 @@ class UserJob:
if expire: UserLog.print_user_expired()
self.is_ready = False
UserLog.print_start_login(user=self)
return self.login()
if self.type == 'qr':
return self.qr_login()
else:
return self.login()
def login(self):
"""
@@ -150,6 +156,69 @@ class UserJob:
return False
def qr_login(self):
self.request_device_id()
image_uuid, png_path = self.download_code()
while True:
data = {
'RAIL_DEVICEID': self.session.cookies.get('RAIL_DEVICEID'),
'RAIL_EXPIRATION': self.session.cookies.get('RAIL_EXPIRATION'),
'uuid': image_uuid,
'appid': 'otn'
}
response = self.session.post(API_AUTH_QRCODE_CHECK.get('url'), data)
result = response.json()
result_code = int(result.get('result_code'))
if result_code == 0:
time.sleep(2)
elif result_code == 1:
UserLog.add_quick_log('请确认登录').flush()
time.sleep(2)
elif result_code == 2:
break
elif result_code == 3:
image_uuid = self.download_code()
try:
os.remove(png_path)
except BaseException as e:
UserLog.add_quick_log('无法删除文件: {}'.format(e)).flush()
self.session.get(API_USER_LOGIN, allow_redirects=True)
new_tk = self.auth_uamtk()
user_name = self.auth_uamauthclient(new_tk)
self.update_user_info({'user_name': user_name})
self.session.get(API_USER_LOGIN, allow_redirects=True)
self.login_did_success()
return True
def download_code(self):
try:
UserLog.add_quick_log(UserLog.MESSAGE_QRCODE_DOWNLOADING).flush()
response = self.session.post(API_AUTH_QRCODE_BASE64_DOWNLOAD.get('url'), data={'appid': 'otn'})
result = response.json()
if result.get('result_code') == '0':
img_bytes = base64.b64decode(result.get('image'))
try:
os.mkdir(Config().USER_DATA_DIR + '/qrcode')
except FileExistsError:
pass
png_path = path.normpath(Config().USER_DATA_DIR + '/qrcode/%d.png' % time.time())
with open(png_path, 'wb') as file:
file.write(img_bytes)
file.close()
if os.name == 'nt':
os.startfile(png_path)
else:
print_qrcode(png_path)
UserLog.add_log(UserLog.MESSAGE_QRCODE_DOWNLOADED.format(png_path)).flush()
return result.get('uuid'), png_path
raise KeyError('获取二维码失败: {}'.format(result.get('result_message')))
except BaseException as e:
UserLog.add_quick_log(
UserLog.MESSAGE_QRCODE_FAIL.format(e, self.retry_time)).flush()
time.sleep(self.retry_time)
return self.download_code()
def check_user_is_login(self):
response = self.session.get(API_USER_LOGIN_CHECK)
is_login = response.json().get('data.is_login', False) == 'Y'
@@ -191,9 +260,8 @@ class UserJob:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"
}
from base64 import b64decode
self.session.headers.update(headers)
response = self.session.get(b64decode(result['id']).decode())
response = self.session.get(base64.b64decode(result['id']).decode())
if response.text.find('callbackFunction') >= 0:
result = response.text[18:-2]
result = json.loads(result)

View File

@@ -12,7 +12,7 @@ Flask-JWT-Extended==3.15.0
idna==2.8
itsdangerous==1.1.0
Jinja2==2.10
lxml==4.3.0
lxml==4.6.3
MarkupSafe==1.1.0
parse==1.9.0
pyee==5.0.0
@@ -28,6 +28,7 @@ tqdm==4.28.1
urllib3==1.24.2
w3lib==1.19.0
websockets==7.0
Werkzeug==0.15.3
Werkzeug==0.15.5
DingtalkChatbot==1.3.0
lightpush==0.1.3
pypng