# coding=utf-8 # !/usr/bin/python import sys, re, os import base64 import hashlib import requests from typing import Tuple try: from base.spider import Spider except ModuleNotFoundError: # Ensure project root (..\..\) is on sys.path when executed directly CURRENT_FILE = os.path.abspath(__file__) PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(CURRENT_FILE))) if PROJECT_ROOT not in sys.path: sys.path.insert(0, PROJECT_ROOT) from base.spider import Spider from datetime import datetime, timedelta from urllib.parse import quote, unquote from urllib3.util.retry import Retry sys.path.append('..') class Spider(Spider): def init(self, extend="{}"): # noqa: N802 (match framework signature) origin = 'https://zh.boyfriend.show' self.host = origin self.headers = { 'Origin': origin, 'Referer': f"{origin}/", 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:144.0) Gecko/20100101 Firefox/144.0' } self.stripchat_key = self.decode_key_compact() # 缓存字典 self._hash_cache = {} self.create_session_with_retry() def getName(self): return 'Boyfriend Show' def isVideoFormat(self, url): return bool(re.search(r'(m3u8|mp4)(?:\?|$)', url)) def manualVideoCheck(self): return False def destroy(self): try: self.session.close() except Exception: pass def homeContent(self, filter): CLASSES = [ {'type_name': '女主播g', 'type_id': 'girls'}, {'type_name': '情侣c', 'type_id': 'couples'}, {'type_name': '男主播m', 'type_id': 'men'}, {'type_name': '跨性别t', 'type_id': 'trans'} ] VALUE = ( {'n': '中国', 'v': 'tagLanguageChinese'}, {'n': '亚洲', 'v': 'ethnicityAsian'}, {'n': '白人', 'v': 'ethnicityWhite'}, {'n': '拉丁', 'v': 'ethnicityLatino'}, {'n': '混血', 'v': 'ethnicityMultiracial'}, {'n': '印度', 'v': 'ethnicityIndian'}, {'n': '阿拉伯', 'v': 'ethnicityMiddleEastern'}, {'n': '黑人', 'v': 'ethnicityEbony'} ) VALUE_MEN = ( {'n': '情侣', 'v': 'sexGayCouples'}, {'n': '直男', 'v': 'orientationStraight'} ) TIDS = ('girls', 'couples', 'men', 'trans') filters = { tid: [{'key': 'tag', 'value': VALUE_MEN + VALUE if tid == 'men' else VALUE}] for tid in TIDS } return { 'class': CLASSES, 'filters': filters } def homeVideoContent(self): return [] def categoryContent(self, tid, pg, filter, extend): limit = 60 offset = limit * (int(pg) - 1) url = f"{self.host}/api/front/models?improveTs=false&removeShows=false&limit={limit}&offset={offset}&primaryTag={tid}&sortBy=stripRanking&rcmGrp=A&rbCnGr=true&prxCnGr=false&nic=false" if 'tag' in extend: url += "&filterGroupTags=%5B%5B%22" + extend['tag'] + "%22%5D%5D" rsp = self.fetch(url).json() videos = [ { "vod_id": str(vod['username']).strip(), "vod_name": f"{self.country_code_to_flag(str(vod['country']).strip())}{str(vod['username']).strip()}", # 使用与站点一致的org域名 "vod_pic": f"https://img.doppiocdn.org/thumbs/{vod['snapshotTimestamp']}/{vod['id']}", "vod_remarks": "" if vod.get('status') == "public" else "🎫" } for vod in rsp.get('models', []) ] total = int(rsp.get('filteredCount', 0)) return { "list": videos, "page": pg, "pagecount": (total + limit - 1) // limit, "limit": limit, "total": total } def detailContent(self, array): username = array[0] rsp = self.fetch(f"{self.host}/api/front/v2/models/username/{username}/cam").json() info = rsp['cam'] user = rsp['user']['user'] id = str(user['id']) country = str(user['country']).strip() isLive = "" if user['isLive'] else " 已下播" flag = self.country_code_to_flag(country) remark, startAt = '', '' if show := info.get('show'): startAt = show.get('createdAt') elif show := info.get('groupShowAnnouncement'): startAt = show.get('startAt') if startAt: BJtime = (datetime.strptime(startAt, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=8)).strftime("%m月%d日 %H:%M") remark = f"🎫 始于 {BJtime}" vod = { "vod_id": id, "vod_name": str(info['topic']).strip(), "vod_pic": str(user['avatarUrl']), "vod_director": f"{flag}{username}{isLive}", "vod_remarks": remark, 'vod_play_from': 'Boyfriend Show', 'vod_play_url': f"{id}${id}" } return {'list': [vod]} def process_key(self, key: str) -> Tuple[str, str]: tags = {'G': 'girls', 'C': 'couples', 'M': 'men', 'T': 'trans'} parts = key.split(maxsplit=1) # 仅分割第一个空格 if len(parts) > 1 and (tag := tags.get(parts[0].upper())): return tag, parts[1].strip() return 'men', key.strip() def searchContent(self, key, quick, pg="1"): result = {} if int(pg) > 1: return result tag, key = self.process_key(key) url = f"{self.host}/api/front/v4/models/search/group/username?query={key}&limit=900&primaryTag={tag}" rsp = self.fetch(url).json() result['list'] = [ { "vod_id": str(user['username']).strip(), "vod_name": f"{self.country_code_to_flag(str(user['country']).strip())}{user['username']}", "vod_pic": f"https://img.doppiocdn.org/thumbs/{user['snapshotTimestamp']}/{user['id']}", "vod_remarks": "" if user['status'] == "public" else "🎫" } for user in rsp.get('models', []) if user['isLive'] ] return result def playerContent(self, flag, id, vipFlags): # 主索引m3u8 url = f"https://edge-hls.doppiocdn.net/hls/{id}/master/{id}_auto.m3u8?playlistType=lowLatency" rsp = self.fetch(url) lines = rsp.text.strip().split('\n') psch, pkey = '', '' urls = [] for i, line in enumerate(lines): if line.startswith('#EXT-X-MOUFLON:'): if parts := line.split(':'): if len(parts) >= 4: psch, pkey = parts[2], parts[3] if '#EXT-X-STREAM-INF' in line: name_start = line.find('NAME="') + 6 name_end = line.find('"', name_start) qn = line[name_start:name_end] # URL在下一行 url_base = lines[i + 1] # 组合最终的URL,并加上psch和pkey参数 full_url = f"{url_base}&psch={psch}&pkey={pkey}" proxy_url = f"{self.getProxyUrl()}&url={quote(full_url)}" # 将画质和URL添加到列表中 urls.extend([qn, proxy_url]) return { "url": urls, "parse": '0', "contentType": '', "header": self.headers } def localProxy(self, param): url = unquote(param['url']) data = self.fetch(url) if data.status_code == 403: data = self.fetch(re.sub(r'\d+p\d*\.m3u8', '160p_blurred.m3u8', url)) if data.status_code != 200: return [404, "text/plain", ""] data = data.text if "#EXT-X-MOUFLON:FILE" in data: data = self.process_m3u8_content_v2(data) return [200, "application/vnd.apple.mpegur", data] def process_m3u8_content_v2(self, m3u8_content): lines = m3u8_content.strip().split('\n') for i, line in enumerate(lines): if (line.startswith('#EXT-X-MOUFLON:FILE:') and 'media.mp4' in lines[i + 1]): encrypted_data = line.split(':', 2)[2].strip() try: decrypted_data = self.decrypt(encrypted_data, self.stripchat_key) except Exception: decrypted_data = self.decrypt(encrypted_data, "Zokee2OhPh9kugh4") lines[i + 1] = lines[i + 1].replace('media.mp4', decrypted_data) return '\n'.join(lines) def country_code_to_flag(self, country_code): if len(country_code) != 2 or not country_code.isalpha(): return country_code flag_emoji = ''.join([chr(ord(c.upper()) - ord('A') + 0x1F1E6) for c in country_code]) return flag_emoji def decode_key_compact(self): base64_str = "NTEgNzUgNjUgNjEgNmUgMzQgNjMgNjEgNjkgMzkgNjIgNmYgNGEgNjEgMzUgNjE=" decoded = base64.b64decode(base64_str).decode('utf-8') key_bytes = bytes(int(hex_str, 16) for hex_str in decoded.split(" ")) return key_bytes.decode('utf-8') def compute_hash(self, key: str) -> bytes: """计算并缓存SHA-256哈希""" if key not in self._hash_cache: sha256 = hashlib.sha256() sha256.update(key.encode('utf-8')) self._hash_cache[key] = sha256.digest() return self._hash_cache[key] def decrypt(self, encrypted_b64: str, key: str) -> str: # 修复Base64填充 padding = len(encrypted_b64) % 4 if padding: encrypted_b64 += '=' * (4 - padding) # 计算哈希并解密 hash_bytes = self.compute_hash(key) encrypted_data = base64.b64decode(encrypted_b64) # 异或解密 decrypted_bytes = bytearray() for i, cipher_byte in enumerate(encrypted_data): key_byte = hash_bytes[i % len(hash_bytes)] decrypted_bytes.append(cipher_byte ^ key_byte) return decrypted_bytes.decode('utf-8') def create_session_with_retry(self): self.session = requests.Session() retry_strategy = Retry( total=3, backoff_factor=0.3, status_forcelist=[429, 500, 502, 503, 504] ) adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter) def fetch(self, url): return self.session.get(url, headers=self.headers, timeout=10)