小鹅通 M3U8 视频下载器

支持自动解密、多线程并发与浏览器联动的本地命令行下载工具,配合浏览器,能够无缝接管课程下载任务,真正进入高效的“边看边存”状态

核心特性
浏览器联动,一键发送:运行后在本地启动 HTTP 服务(端口 8910),配合浏览器脚本,在课程页面点击“发送到下载”即可自动拉起后台下载,零繁琐链接复制,彻底解放双手。
开箱即用,免配依赖:脚本内置环境自检逻辑。首次运行自动检测并安装缺失的 Python 依赖(requests, pycryptodome, tqdm)。
全自动解密与合并,效率翻倍:
自动解密 → 自动获取并解析 AES-128 密钥与 IV,在内存中完成 TS 切片解密。
智能合并 → 极速调用 FFmpeg 将切片合并为 MP4,内置“Stream Copy”与“重编码”双重回退机制,确保视频完美合并。
多线程并发,极速下载:采用 ThreadPoolExecutor 提供多线程并发下载,搭配 tqdm 进度条,直观展示下载进度与切片完成情况。
稳定可靠,日志追溯:提供“控制台+本地日志(downloader.log)”双重记录,所有下载历史有迹可循。

重点说明:
// @match ://.xiaoeknow.com/
// @match
://.xet.tech/
// @match ://.xiaoe-tech.com/
// @match
://.xet.pomoho.com/
// @match ://.xet-pc.citv.cn/*
因平台支持自定义域名,因此若脚本中不包含所购买的课程的自定义域名,则需要手动进行添加

使用步骤:一、环境与工具准备安装扩展:为实现网页端联动,用户需要手动在浏览器(如 Chrome、Edge)的扩展商店中安装 Tampermonkey(油猴) 插件。添加脚本:在油猴插件中,添加并启用文件目录下的网页端用户脚本(用于在页面生成交互按钮)
图片[1]-小鹅通 M3U8 视频下载器
免配置环境:程序已设置好本地 Winpython32-3.8.3.0 环境,无需手动创建环境,无需任何编译,真正做到开箱即用。
启动服务:直接运行工具,程序会自动完成自检。当控制台出现“等待浏览器发送下载任务…”提示时,代表本地监听服务已成功启动。

二、一键发送任务,全自动下载
保持下载器的命令行窗口在后台运行,切勿关闭。
打开课程到播放页面,如需特定清晰度需要手动调整,等获取到视频地址后,直接点击由油猴脚本生成的“发送到下载”按钮。
接收到任务后,下载器将在本地 downloads 文件夹下全自动完成切片下载、密钥解密与最终的 MP4 视频合并导出。
图片[2]-小鹅通 M3U8 视频下载器
图片[3]-小鹅通 M3U8 视频下载器
最低系统可支持到win7 32位,若存在不兼容问题可以直接替换环境
[Python]

"""小鹅通 M3U8 视频下载器 — 交互式命令行版"""
import os, time, shutil, subprocess, binascii, re
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from tqdm import tqdm

# 强制 stdout 行缓冲,确保 Windows CMD 中实时输出
if hasattr(sys.stdout, 'reconfigure'):
    sys.stdout.reconfigure(line_buffering=True)

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
OUT = os.path.join(BASE_DIR, 'downloads')
os.makedirs(OUT, exist_ok=True)
LOG_FILE = os.path.join(BASE_DIR, 'downloader.log')

def _console_encoding():
    """获取 Windows 控制台实际代码页,避免 UTF-8/GBK 猜错"""
    try:
        import ctypes
        cp = ctypes.windll.kernel32.GetConsoleOutputCP()
        if cp == 65001:
            return 'utf-8'
        elif cp == 936:
            return 'gbk'
        else:
            return f'cp{cp}'
    except Exception:
        return (sys.stdout.encoding or 'utf-8').lower()

CONSOLE_ENC = _console_encoding()

def log(msg, end='\n'):
    """同时输出到控制台和日志文件。控制台按实际代码页编码,避免乱码"""
    ts = time.strftime('%H:%M:%S')
    line = f'[{ts}] {msg}'
    # 1. 文件日志(始终可靠)
    try:
        with open(LOG_FILE, 'a', encoding='utf-8') as f:
            f.write(line)
            if end:
                f.write(end)
            f.flush()
    except Exception:
        pass
    # 2. 控制台:按真实代码页编码
    try:
        data = (line + end).encode(CONSOLE_ENC, errors='replace')
    except Exception:
        data = (line + end).encode('utf-8', errors='replace')
    try:
        os.write(1, data)
    except Exception:
        try:
            os.write(2, data)
        except Exception:
            pass

FFMPEG = os.path.join(BASE_DIR, 'ffmpeg.exe')

def safe_name(name):
    """去除文件名中的非法字符"""
    return re.sub(r'[<>:"/\\|?*]', '_', name).strip()

def find_ffmpeg():
    for p in [FFMPEG, shutil.which('ffmpeg'), shutil.which('ffmpeg.exe')]:
        if p and os.path.isfile(p):
            return p
    return None

def download_one(name, m3u8_url):
    """下载单个视频"""
    s = requests.Session()
    s.headers.update({
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Referer': 'https://xiaoe-tech.com/',
    })

    log(f'\n  {name}')
    log(f'  {"─" * 50}')

    # 1. 下载 M3U8
    log('  [1/4] 下载索引...', end=' ')
    try:
        resp = s.get(m3u8_url, timeout=30)
        resp.raise_for_status()
    except Exception as e:
        log(f'失败: {e}')
        return False
    m3u8 = resp.text
    base = '/'.join(m3u8_url.split('/')[:-1]) + '/'

    # 2. 解析
    segments = []
    key_url = None
    iv = b'\x00' * 16
    for line in m3u8.split('\n'):
        line = line.strip()
        if 'URI=' in line and 'AES-128' in line:
            a = line.find('URI="') + 5
            b = line.find('"', a)
            key_url = line[a:b]
            if not key_url.startswith('http'):
                key_url = urljoin(base, key_url)
            iv_s = line.find('IV=0x')
            if iv_s != -1:
                iv = binascii.unhexlify(line[iv_s + 5:iv_s + 37])
        elif line and not line.startswith('#'):
            u = line if line.startswith('http') else urljoin(base, line)
            segments.append(u)

    log(f'{len(segments)} 片段')

    # 3. 获取密钥
    log('  [2/4] 获取密钥...', end=' ')
    try:
        key = s.get(key_url, timeout=15).content
        log(f'{len(key)} 字节')
    except Exception as e:
        log(f'失败: {e}')
        return False

    # 4. 并行下载
    tmp = os.path.join(OUT, f'tmp_{int(time.time())}')
    os.makedirs(tmp, exist_ok=True)

    def dl_one(url, key, iv, idx):
        for _ in range(3):
            try:
                data = s.get(url, timeout=60).content
                break
            except:
                time.sleep(1)
        else:
            return None
        seg_iv = iv[:12] + idx.to_bytes(4, 'big')
        dec = AES.new(key, AES.MODE_CBC, iv=seg_iv).decrypt(data)
        try:
            dec = unpad(dec, AES.block_size)
        except:
            pass
        fp = os.path.join(tmp, f's_{idx:05d}.ts')
        with open(fp, 'wb') as f:
            f.write(dec)
        return fp

    t0 = time.time()
    results = {}
    with ThreadPoolExecutor(max_workers=6) as ex:
        fut = {ex.submit(dl_one, u, key, iv, i): i for i, u in enumerate(segments)}
        with tqdm(total=len(segments), desc='  [3/4] 下载中', unit='片',
                  ncols=60, bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt}') as pbar:
            for f in as_completed(fut):
                if f.result():
                    results[fut[f]] = f.result()
                    pbar.update(1)
    elapsed = time.time() - t0

    if not results:
        log('  所有片段下载失败')
        shutil.rmtree(tmp, ignore_errors=True)
        return False

    # 5. 合并
    log(f'  [4/4] 合并 ({elapsed:.1f}s 下载, {len(results)}/{len(segments)} 成功)...', end=' ')
    files = [results[i] for i in sorted(results)]
    output = os.path.join(OUT, f'{safe_name(name)}.mp4')

    # Concat 列表
    lst = os.path.join(OUT, '_concat.txt')
    with open(lst, 'w', encoding='utf-8') as f:
        for fp in files:
            f.write(f"file '{fp.replace(os.sep, '/')}'\n")

    if os.path.exists(output):
        os.remove(output)

    cmd = [find_ffmpeg() or 'ffmpeg', '-f', 'concat', '-safe', '0',
           '-i', lst, '-c', 'copy', '-movflags', '+faststart', '-y', output]
    r = subprocess.run(cmd, capture_output=True, encoding='utf-8',
                       errors='replace', timeout=600)
    if os.path.exists(lst):
        os.remove(lst)

    if r.returncode != 0:
        # 重编码回退
        lst2 = os.path.join(OUT, '_concat2.txt')
        with open(lst2, 'w', encoding='utf-8') as f:
            for fp in files:
                f.write(f"file '{fp.replace(os.sep, '/')}'\n")
        cmd2 = [find_ffmpeg() or 'ffmpeg', '-f', 'concat', '-safe', '0',
                '-i', lst2, '-c:v', 'libx264', '-c:a', 'aac',
                '-movflags', '+faststart', '-y', output]
        subprocess.run(cmd2, capture_output=True, encoding='utf-8',
                       errors='replace', timeout=600)
        if os.path.exists(lst2):
            os.remove(lst2)

    shutil.rmtree(tmp, ignore_errors=True)

    if os.path.exists(output):
        mb = os.path.getsize(output) / 1048576
        log(f'  {mb:.1f} MB  ->  {output}')
        return True
    else:
        log('  合并失败')
        return False

# ============================================================
#  HTTP 服务 — 接收浏览器发来的下载任务
# ============================================================

import json
import traceback
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler

PORT = 8910
total_count = 0
count_lock = threading.Lock()

# 线程池:支持同时下载最多3个
download_pool = ThreadPoolExecutor(max_workers=3)

class Handler(BaseHTTPRequestHandler):
    def log_message(self, *args):
        pass

    def do_OPTIONS(self):
        log(f'HTTP OPTIONS {self.path}')
        self._cors()

    def do_GET(self):
        log(f'HTTP GET {self.path}')
        self._cors()
        self._json({'status': 'running', 'port': PORT, 'total': total_count})

    def do_POST(self):
        global total_count
        log(f'HTTP POST {self.path} 开始处理')
        try:
            length = int(self.headers.get('Content-Length', 0))
            raw = self.rfile.read(length)
            data = json.loads(raw)
            name = data.get('name', '').strip()
            url = data.get('url', '').strip()

            log(f'HTTP POST {self.path} body_len={length}')
            log(f'JSON: name={name!r} url={url[:60]}...')

            if not name or not url:
                log('拒绝任务:name 或 url 为空')
                self._cors()
                self._json({'status': 'error', 'msg': 'missing name or url'})
                return

            log(f'>>> 收到任务: [{name}]')
            log(f'    URL: {url[:80]}...')

            self._cors()
            self._json({'status': 'accepted', 'msg': f'已接收: {name}'})
            log(f'已响应 accepted,提交后台下载...')

            download_pool.submit(self._do_download, name, url)

        except Exception as e:
            tb = traceback.format_exc()
            log(f'HTTP POST 异常: {e}')
            log(f'异常堆栈: {tb}')
            self._cors()
            self._json({'status': 'error', 'msg': str(e)})

    def _do_download(self, name, url):
        """在单独线程中执行下载"""
        global total_count
        log(f'开始下载: [{name}]')
        ok = download_one(name, url)
        with count_lock:
            if ok:
                total_count += 1
            log(f'--- 任务结束: [{name}] {"成功" if ok else "失败"} (累计: {total_count}) ---')
            log('等待新任务...')

    def _cors(self):
        self.send_response(200)
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS')
        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
        self.send_header('Content-Type', 'application/json')
        self.end_headers()

    def _json(self, obj):
        body = json.dumps(obj, ensure_ascii=False).encode('utf-8')
        self.wfile.write(body)

def main():
    ffmpeg = find_ffmpeg()
    if not ffmpeg:
        log('未找到 ffmpeg.exe,请放到本目录下')
        sys.exit(1)

    server = HTTPServer(('127.0.0.1', PORT), Handler)

    log('=' * 50)
    log('小鹅通 M3U8 视频下载器 启动')
    log(f'ffmpeg: {ffmpeg}')
    log(f'输出:   {OUT}')
    log(f'端口:   localhost:{PORT}')
    log('等待浏览器发送下载任务...')
    log('在课程页面点击 "发送到下载" 即可')
    log('按 Ctrl+C 退出')
    log('=' * 50)

    try:
        server.serve_forever()
    except KeyboardInterrupt:
        log('\n退出。本次共下载 {} 个视频。'.format(total_count))
        server.shutdown()

if __name__ == '__main__':
    main()

点击下载

© 版权声明
THE END
喜欢就支持一下吧
点赞5 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容