修复并发锁问题

This commit is contained in:
Jalin
2019-01-10 13:01:05 +08:00
parent 0d7558afeb
commit c3f3ba9ffc
11 changed files with 61 additions and 34 deletions

View File

@@ -40,6 +40,8 @@ class App:
@classmethod
def did_start(cls):
self = cls()
from py12306.helpers.station import Station
Station() # 防止多线程时初始化出现问题
# if Config.is_cluster_enabled():
# from py12306.cluster.cluster import Cluster
# Cluster().run()
@@ -73,8 +75,8 @@ class App:
if Config().USER_ACCOUNTS:
for account in Config().USER_ACCOUNTS:
if account:
return True
return False
return False
return True
@classmethod
def test_send_notifications(cls):
@@ -90,10 +92,10 @@ class App:
待优化
:return:
"""
if not cls.check_auto_code():
CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_AUTO_CODE_FAIL).flush(exit=True)
if not cls.check_user_account_is_empty():
CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_EMPTY_USER_ACCOUNT).flush(exit=True)
# CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_EMPTY_USER_ACCOUNT).flush(exit=True, publish=False) # 不填写用户则不自动下单
if not cls.check_auto_code():
CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_AUTO_CODE_FAIL).flush(exit=True, publish=False)
if Const.IS_TEST_NOTIFICATION: cls.test_send_notifications()

View File

@@ -30,6 +30,9 @@ class Cluster():
KEY_LOCK_DO_ORDER = 'lock_do_order' # 订单锁
lock_do_order_time = 60 * 1 # 订单锁超时时间
lock_prefix = 'lock_' # 锁键前缀
lock_info_prefix = 'info_'
KEY_MASTER = 1
KEY_SLAVE = 0
@@ -40,7 +43,6 @@ class Cluster():
keep_alive_time = 3 # 报告存活间隔
lost_alive_time = keep_alive_time * 2
locks = []
nodes = {}
node_name = None
is_ready = False
@@ -222,25 +224,23 @@ class Cluster():
timeout = int(time.time()) + timeout
res = self.session.setnx(key, timeout)
if res:
self.locks.append((key, timeout))
if info: self.session.set_dict(key + '_info', info) # 存储额外信息
if info: self.session.set_dict(self.lock_info_prefix + key, info) # 存储额外信息
return True
return False
def get_lock_info(self, key, default={}):
return self.session.get_dict(key + '_info', default=default)
return self.session.get_dict(self.lock_info_prefix + key, default=default)
def release_lock(self, key):
self.session.delete(key)
self.session.delete(key + '_info')
self.session.delete(self.lock_info_prefix + key)
def check_locks(self):
index = 0
for key, timeout in self.locks:
if timeout >= int(time.time()):
del self.locks[index]
locks = self.session.keys(self.lock_prefix + '*')
for key in locks:
val = self.session.get(key)
if val and int(val) <= time_int():
self.release_lock(key)
index += 1
@classmethod
def get_user_cookie(cls, key, default=None):

View File

@@ -8,6 +8,8 @@ from py12306.helpers.func import *
@singleton
class Config:
IS_DEBUG = False
USER_ACCOUNTS = []
# 查询任务
QUERY_JOBS = []

View File

@@ -44,16 +44,11 @@ class Notification():
method='GET', headers={
'Authorization': 'APPCODE {}'.format(appcode)
})
response_message = '-'
result = {}
try:
result = response.json()
response_message = result['showapi_res_body']['remark']
except:
pass
if response.status_code == 401 or response.status_code == 403:
result = response.json()
response_message = result.get('showapi_res_body.remark')
if response.status_code in [400, 401, 403]:
return CommonLog.add_quick_log(CommonLog.MESSAGE_VOICE_API_FORBID).flush()
if response.status_code == 200 and 'showapi_res_body' in result and result['showapi_res_body'].get('flag'):
if response.status_code == 200 and result.get('showapi_res_body.flag'):
CommonLog.add_quick_log(CommonLog.MESSAGE_VOICE_API_SEND_SUCCESS.format(response_message)).flush()
return True
else:

View File

@@ -7,6 +7,7 @@ from py12306.helpers.func import *
@singleton
class Station:
stations = []
station_kvs = {}
def __init__(self):
if path.exists(Config().STATION_FILE):
@@ -20,6 +21,7 @@ class Station:
'pinyin': tmp_info[3],
'id': tmp_info[5]
})
self.station_kvs[tmp_info[1]] = tmp_info[2]
@classmethod
def get_station_by_name(cls, name):
@@ -35,7 +37,8 @@ class Station:
@classmethod
def get_station_key_by_name(cls, name):
return cls.get_station_by_name(name).get('key')
self = cls()
return self.station_kvs[name]
@classmethod
def get_station_name_by_key(cls, key):

View File

@@ -33,11 +33,14 @@ class QueryLog(BaseLog):
MESSAGE_SKIP_ORDER = '跳过本次请求,节点 {} 用户 {} 正在处理该订单\n'
MESSAGE_QUERY_JOB_BEING_DESTROY = '当前查询任务 {} 已结束'
MESSAGE_QUERY_JOB_BEING_DESTROY = '当前查询任务 {} 已结束\n'
MESSAGE_INIT_PASSENGERS_SUCCESS = '初始化乘客成功'
MESSAGE_CHECK_PASSENGERS = '正在验证乘客信息'
MESSAGE_USER_IS_EMPTY_WHEN_DO_ORDER = '未配置自动下单账号,{} 秒后继续查询\n'
MESSAGE_ORDER_USER_IS_EMPTY = '未找到下单账号,{} 秒后继续查询'
cluster = None
def __init__(self):

View File

@@ -9,8 +9,6 @@ from py12306.helpers.type import UserType
from py12306.log.order_log import OrderLog
class Order:
"""
处理下单
@@ -51,6 +49,9 @@ class Order:
下单模式 暂时不清楚,使用正常步骤下单
:return:
"""
# Debug
if Config().IS_DEBUG:
return random.randint(0, 10) > 7
return self.normal_order()
def normal_order(self):

View File

@@ -96,6 +96,7 @@ class Job:
self.left_date = date
response = self.query_by_date(date)
self.handle_response(response)
if not self.is_alive: return
self.safe_stay()
if is_main_thread():
QueryLog.flush(sep='\t\t', publish=False)
@@ -143,6 +144,7 @@ class Job:
allow_seats = self.allow_seats if self.allow_seats else list(
Config.SEAT_TYPES.values()) # 未设置 则所有可用 TODO 合法检测
self.handle_seats(allow_seats, ticket_info)
if not self.is_alive: return
def handle_seats(self, allow_seats, ticket_info):
for seat in allow_seats: # 检查座位是否有票
@@ -166,12 +168,21 @@ class Job:
QueryLog.print_ticket_available(left_date=self.get_info_of_left_date(),
train_number=self.get_info_of_train_number(),
rest_num=ticket_of_seat)
if User.is_empty():
QueryLog.add_quick_log(QueryLog.MESSAGE_USER_IS_EMPTY_WHEN_DO_ORDER.format(self.retry_time))
return stay_second(self.retry_time)
order_result = False
user = self.get_user()
if not user:
QueryLog.add_quick_log(QueryLog.MESSAGE_ORDER_USER_IS_EMPTY.format(self.retry_time))
return stay_second(self.retry_time)
lock_id = Cluster.KEY_LOCK_DO_ORDER + '_' + user.key
if Config().is_cluster_enabled():
if self.cluster.get_lock(lock_id, Cluster.lock_do_order_time,
{'node': self.cluster.node_name}): # 获得下单锁
QueryLog.add_quick_log('拿到锁' + lock_id).flush()
order_result = self.do_order(user)
if not order_result: # 下单失败,解锁
self.cluster.release_lock(lock_id)
@@ -222,9 +233,13 @@ class Job:
退出任务
:return:
"""
from py12306.query.query import Query
QueryLog.add_quick_log(QueryLog.MESSAGE_QUERY_JOB_BEING_DESTROY.format(self.job_name)).flush()
# sys.exit(1) # 无法退出线程...
self.is_alive = False
# 手动移出jobs 防止单线程死循环
index = Query().jobs.index(self)
Query().jobs.pop(index)
def safe_stay(self):
interval = get_interval_num(self.interval)
@@ -241,9 +256,9 @@ class Job:
def get_user(self):
user = User.get_user(self.account_key)
if not user.check_is_ready():
# TODO user is not ready
pass
# if not user.check_is_ready(): # 这里不需要检测了,后面获取乘客时已经检测过
# #
# pass
return user
def check_passengers(self):

View File

@@ -48,13 +48,13 @@ class Query:
# return # DEBUG
self.init_jobs()
QueryLog.init_data()
app_available_check()
stay_second(3)
# 多线程
if Config().QUERY_JOB_THREAD_ENABLED: # 多线程
create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST)
else:
while True:
if not self.jobs: break
jobs_do(self.jobs, 'run')
if Const.IS_TEST: return
@@ -90,7 +90,6 @@ class Query:
self = cls()
return objects_find_object_by_key_value(self.jobs, 'account_key', account_key)
# def get_jobs_from_cluster(self):
# jobs = self.cluster.session.get_dict(Cluster.KEY_JOBS)
# return jobs

View File

@@ -83,6 +83,7 @@ class UserJob:
return True
# 只有主节点才能走到这
if self.is_first_time() or not self.check_user_is_login():
a = 1
self.is_ready = False
if not self.handle_login(): return
@@ -103,7 +104,7 @@ class UserJob:
def set_last_heartbeat(self):
if Config().is_cluster_enabled():
return self.cluster.session.set(Cluster.KEY_USER_LAST_HEARTBEAT, time_int())
self.last_heartbeat = time_now()
self.last_heartbeat = time_int()
# def init_cookies
def is_first_time(self):
@@ -227,6 +228,7 @@ class UserJob:
# 子节点访问会导致主节点登录失效 TODO 可快考虑实时同步 cookie
if user_data:
self.update_user_info({**user_data, **{'user_name': user_data.get('name')}})
self.save_user()
return True
return None

View File

@@ -68,6 +68,11 @@ class User:
if not array_dict_find_by_key_value(self.user_accounts, 'key', account.get('key')):
Event().user_job_destroy({'key': account.get('key')})
@classmethod
def is_empty(cls):
self = cls()
return not bool(self.users)
@classmethod
def get_user(cls, key) -> UserJob:
self = cls()