Files
MediaCrawler/media_platform/douyin/login.py
程序员阿江(Relakkes) d614ccf247 docs: translate comments and metadata to English
Update Chinese comments, variable descriptions, and metadata across
multiple configuration and core files to English. This improves
codebase accessibility for international developers. Additionally,
removed the sponsorship section from README files.
2026-02-12 05:30:11 +08:00

275 lines
13 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2025 relakkes@gmail.com
#
# This file is part of MediaCrawler project.
# Repository: https://github.com/NanmiCoder/MediaCrawler/blob/main/media_platform/douyin/login.py
# GitHub: https://github.com/NanmiCoder
# Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1
#
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import asyncio
import functools
import sys
from typing import Optional
from playwright.async_api import BrowserContext, Page
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt,
wait_fixed)
import config
from base.base_crawler import AbstractLogin
from cache.cache_factory import CacheFactory
from tools import utils
class DouYinLogin(AbstractLogin):
def __init__(self,
login_type: str,
browser_context: BrowserContext, # type: ignore
context_page: Page, # type: ignore
login_phone: Optional[str] = "",
cookie_str: Optional[str] = ""
):
config.LOGIN_TYPE = login_type
self.browser_context = browser_context
self.context_page = context_page
self.login_phone = login_phone
self.scan_qrcode_time = 60
self.cookie_str = cookie_str
async def begin(self):
"""
Start login douyin website
The verification accuracy of the slider verification is not very good... If there are no special requirements, it is recommended not to use Douyin login, or use cookie login
"""
# popup login dialog
await self.popup_login_dialog()
# select login type
if config.LOGIN_TYPE == "qrcode":
await self.login_by_qrcode()
elif config.LOGIN_TYPE == "phone":
await self.login_by_mobile()
elif config.LOGIN_TYPE == "cookie":
await self.login_by_cookies()
else:
raise ValueError("[DouYinLogin.begin] Invalid Login Type Currently only supported qrcode or phone or cookie ...")
# If the page redirects to the slider verification page, need to slide again
await asyncio.sleep(6)
current_page_title = await self.context_page.title()
if "验证码中间页" in current_page_title:
await self.check_page_display_slider(move_step=3, slider_level="hard")
# check login state
utils.logger.info(f"[DouYinLogin.begin] login finished then check login state ...")
try:
await self.check_login_state()
except RetryError:
utils.logger.info("[DouYinLogin.begin] login failed please confirm ...")
sys.exit()
# wait for redirect
wait_redirect_seconds = 5
utils.logger.info(f"[DouYinLogin.begin] Login successful then wait for {wait_redirect_seconds} seconds redirect ...")
await asyncio.sleep(wait_redirect_seconds)
@retry(stop=stop_after_attempt(600), wait=wait_fixed(1), retry=retry_if_result(lambda value: value is False))
async def check_login_state(self):
"""Check if the current login status is successful and return True otherwise return False"""
current_cookie = await self.browser_context.cookies()
_, cookie_dict = utils.convert_cookies(current_cookie)
for page in self.browser_context.pages:
try:
local_storage = await page.evaluate("() => window.localStorage")
if local_storage.get("HasUserLogin", "") == "1":
return True
except Exception as e:
# utils.logger.warn(f"[DouYinLogin] check_login_state waring: {e}")
await asyncio.sleep(0.1)
if cookie_dict.get("LOGIN_STATUS") == "1":
return True
return False
async def popup_login_dialog(self):
"""If the login dialog box does not pop up automatically, we will manually click the login button"""
dialog_selector = "xpath=//div[@id='login-panel-new']"
try:
# check dialog box is auto popup and wait for 10 seconds
await self.context_page.wait_for_selector(dialog_selector, timeout=1000 * 10)
except Exception as e:
utils.logger.error(f"[DouYinLogin.popup_login_dialog] login dialog box does not pop up automatically, error: {e}")
utils.logger.info("[DouYinLogin.popup_login_dialog] login dialog box does not pop up automatically, we will manually click the login button")
login_button_ele = self.context_page.locator("xpath=//p[text() = '登录']")
await login_button_ele.click()
await asyncio.sleep(0.5)
async def login_by_qrcode(self):
utils.logger.info("[DouYinLogin.login_by_qrcode] Begin login douyin by qrcode...")
qrcode_img_selector = "xpath=//div[@id='animate_qrcode_container']//img"
base64_qrcode_img = await utils.find_login_qrcode(
self.context_page,
selector=qrcode_img_selector
)
if not base64_qrcode_img:
utils.logger.info("[DouYinLogin.login_by_qrcode] login qrcode not found please confirm ...")
sys.exit()
partial_show_qrcode = functools.partial(utils.show_qrcode, base64_qrcode_img)
asyncio.get_running_loop().run_in_executor(executor=None, func=partial_show_qrcode)
await asyncio.sleep(2)
async def login_by_mobile(self):
utils.logger.info("[DouYinLogin.login_by_mobile] Begin login douyin by mobile ...")
mobile_tap_ele = self.context_page.locator("xpath=//li[text() = '验证码登录']")
await mobile_tap_ele.click()
await self.context_page.wait_for_selector("xpath=//article[@class='web-login-mobile-code']")
mobile_input_ele = self.context_page.locator("xpath=//input[@placeholder='手机号']")
await mobile_input_ele.fill(self.login_phone)
await asyncio.sleep(0.5)
send_sms_code_btn = self.context_page.locator("xpath=//span[text() = '获取验证码']")
await send_sms_code_btn.click()
# Check if there is slider verification
await self.check_page_display_slider(move_step=10, slider_level="easy")
cache_client = CacheFactory.create_cache(config.CACHE_TYPE_MEMORY)
max_get_sms_code_time = 60 * 2 # Maximum time to get verification code is 2 minutes
while max_get_sms_code_time > 0:
utils.logger.info(f"[DouYinLogin.login_by_mobile] get douyin sms code from redis remaining time {max_get_sms_code_time}s ...")
await asyncio.sleep(1)
sms_code_key = f"dy_{self.login_phone}"
sms_code_value = cache_client.get(sms_code_key)
if not sms_code_value:
max_get_sms_code_time -= 1
continue
sms_code_input_ele = self.context_page.locator("xpath=//input[@placeholder='请输入验证码']")
await sms_code_input_ele.fill(value=sms_code_value.decode())
await asyncio.sleep(0.5)
submit_btn_ele = self.context_page.locator("xpath=//button[@class='web-login-button']")
await submit_btn_ele.click() # Click login
# todo ... should also check the correctness of the verification code, it may be incorrect
break
async def check_page_display_slider(self, move_step: int = 10, slider_level: str = "easy"):
"""
Check if slider verification appears on the page
:return:
"""
# Wait for slider verification to appear
back_selector = "#captcha-verify-image"
try:
await self.context_page.wait_for_selector(selector=back_selector, state="visible", timeout=30 * 1000)
except PlaywrightTimeoutError: # No slider verification, return directly
return
gap_selector = 'xpath=//*[@id="captcha_container"]/div/div[2]/img[2]'
max_slider_try_times = 20
slider_verify_success = False
while not slider_verify_success:
if max_slider_try_times <= 0:
utils.logger.error("[DouYinLogin.check_page_display_slider] slider verify failed ...")
sys.exit()
try:
await self.move_slider(back_selector, gap_selector, move_step, slider_level)
await asyncio.sleep(1)
# If the slider is too slow or verification failed, it will prompt "The operation is too slow", click the refresh button here
page_content = await self.context_page.content()
if "操作过慢" in page_content or "提示重新操作" in page_content:
utils.logger.info("[DouYinLogin.check_page_display_slider] slider verify failed, retry ...")
await self.context_page.click(selector="//a[contains(@class, 'secsdk_captcha_refresh')]")
continue
# After successful sliding, wait for the slider to disappear
await self.context_page.wait_for_selector(selector=back_selector, state="hidden", timeout=1000)
# If the slider disappears, it means the verification is successful, break the loop. If not, it means the verification failed, the above line will throw an exception and be caught to continue the loop
utils.logger.info("[DouYinLogin.check_page_display_slider] slider verify success ...")
slider_verify_success = True
except Exception as e:
utils.logger.error(f"[DouYinLogin.check_page_display_slider] slider verify failed, error: {e}")
await asyncio.sleep(1)
max_slider_try_times -= 1
utils.logger.info(f"[DouYinLogin.check_page_display_slider] remaining slider try times: {max_slider_try_times}")
continue
async def move_slider(self, back_selector: str, gap_selector: str, move_step: int = 10, slider_level="easy"):
"""
Move the slider to the right to complete the verification
:param back_selector: Selector for the slider verification background image
:param gap_selector: Selector for the slider verification slider
:param move_step: Controls the ratio of single movement speed, default is 1, meaning the distance moves in 0.1 seconds no matter how far, larger value means slower
:param slider_level: Slider difficulty easy hard, corresponding to the slider for mobile verification code and the slider in the middle of verification code
:return:
"""
# get slider background image
slider_back_elements = await self.context_page.wait_for_selector(
selector=back_selector,
timeout=1000 * 10, # wait 10 seconds
)
slide_back = str(await slider_back_elements.get_property("src")) # type: ignore
# get slider gap image
gap_elements = await self.context_page.wait_for_selector(
selector=gap_selector,
timeout=1000 * 10, # wait 10 seconds
)
gap_src = str(await gap_elements.get_property("src")) # type: ignore
# Identify slider position
slide_app = utils.Slide(gap=gap_src, bg=slide_back)
distance = slide_app.discern()
# Get movement trajectory
tracks = utils.get_tracks(distance, slider_level)
new_1 = tracks[-1] - (sum(tracks) - distance)
tracks.pop()
tracks.append(new_1)
# Drag slider to specified position according to trajectory
element = await self.context_page.query_selector(gap_selector)
bounding_box = await element.bounding_box() # type: ignore
await self.context_page.mouse.move(bounding_box["x"] + bounding_box["width"] / 2, # type: ignore
bounding_box["y"] + bounding_box["height"] / 2) # type: ignore
# Get x coordinate center position
x = bounding_box["x"] + bounding_box["width"] / 2 # type: ignore
# Simulate sliding operation
await element.hover() # type: ignore
await self.context_page.mouse.down()
for track in tracks:
# Loop mouse movement according to trajectory
# steps controls the ratio of single movement speed, default is 1, meaning the distance moves in 0.1 seconds no matter how far, larger value means slower
await self.context_page.mouse.move(x + track, 0, steps=move_step)
x += track
await self.context_page.mouse.up()
async def login_by_cookies(self):
utils.logger.info("[DouYinLogin.login_by_cookies] Begin login douyin by cookie ...")
for key, value in utils.convert_str_cookie_to_dict(self.cookie_str).items():
await self.browser_context.add_cookies([{
'name': key,
'value': value,
'domain': ".douyin.com",
'path': "/"
}])