import json import os import re import sys import time import uuid import math import random import string import secrets import hashlib import base64 import threading import argparse import urllib.parse import urllib.request import urllib.error import builtins from datetime import datetime, timezone, timedelta from urllib.parse import urlparse, parse_qs, urlencode, quote from dataclasses import dataclass from typing import Any, Dict, Optional from rich import print from curl_cffi import requests # 线程锁,用于处理并发输入 input_lock = threading.Lock() builtins.yasal_bypass_ip_choice = None MAILTM_BASE = "https://api.mail.tm" @dataclass(frozen=True) class TempMailbox: email: str provider: str token: str = "" api_base: str = "" login: str = "" domain: str = "" sid_token: str = "" password: str = "" def _mailtm_headers(*, token: str = "", use_json: bool = False) -> Dict[str, str]: headers = {"Accept": "application/json"} if use_json: headers["Content-Type"] = "application/json" if token: headers["Authorization"] = f"Bearer {token}" return headers def _hydra_domains(api_base: str, proxies: Any = None) -> list[str]: resp = requests.get( f"{api_base}/domains", headers=_mailtm_headers(), proxies=proxies, impersonate="chrome", timeout=15, ) if resp.status_code != 200: raise RuntimeError(f"[red] • 获取域名失败 [/red]") data = resp.json() domains = [] if isinstance(data, list): items = data elif isinstance(data, dict): items = data.get("hydra:member") or data.get("items") or [] else: items = [] for item in items: if not isinstance(item, dict): continue domain = str(item.get("domain") or "").strip() is_active = item.get("isActive", True) is_private = item.get("isPrivate", False) if domain and is_active and not is_private: domains.append(domain) return domains def _create_hydra_mailbox( *, api_base: str, provider_name: str, provider_key: str, proxies: Any = None, thread_id: int, ) -> Optional[TempMailbox]: try: domains = _hydra_domains(api_base, proxies) if not domains: print(f"[[red] • 没有可用域名 [/red]") return None for _ in range(5): local = f"oc{secrets.token_hex(5)}" domain = random.choice(domains) email = f"{local}@{domain}" password = secrets.token_urlsafe(18) create_resp = requests.post( f"{api_base}/accounts", headers=_mailtm_headers(use_json=True), json={"address": email, "password": password}, proxies=proxies, impersonate="chrome", timeout=15, ) if create_resp.status_code not in (200, 201): continue token_resp = requests.post( f"{api_base}/token", headers=_mailtm_headers(use_json=True), json={"address": email, "password": password}, proxies=proxies, impersonate="chrome", timeout=15, ) if token_resp.status_code == 200: token = str(token_resp.json().get("token") or "").strip() if token: return TempMailbox( email=email, provider=provider_key, token=token, api_base=api_base, password=password, ) print( f"[red] • 邮箱获取Token失败 [/red]" ) return None except Exception as e: print(f"[red] • 请求API出错 [/red]") return None def get_temp_mailbox( provider_key: str, thread_id: int, proxies: Any = None ) -> Optional[TempMailbox]: mailbox = None if provider_key == "mailtm": mailbox = _create_hydra_mailbox( api_base=MAILTM_BASE, provider_name="Mail.tm", provider_key="mailtm", proxies=proxies, thread_id=thread_id, ) if mailbox: return mailbox print("[red] • 邮箱获取失败 [/red]") return None def _poll_hydra_oai_code( *, api_base: str, token: str, email: str, thread_id: int, proxies: Any = None ) -> str: url_list = f"{api_base}/messages" regex = r"(? str: if mailbox.provider == "mailtm": if not mailbox.token: print( f"[red] • token为空,无法读取邮件 [/red]" ) return "" return _poll_hydra_oai_code( api_base=mailbox.api_base, token=mailbox.token, email=mailbox.email, thread_id=thread_id, proxies=proxies, ) print( f"[[red] • 不支持 {mailbox.provider} [/red]" ) return "" AUTH_URL = "https://auth.openai.com/oauth/authorize" TOKEN_URL = "https://auth.openai.com/oauth/token" CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann" DEFAULT_REDIRECT_URI = f"http://localhost:1455/auth/callback" DEFAULT_SCOPE = "openid email profile offline_access" def _b64url_no_pad(raw: bytes) -> str: return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=") def _sha256_b64url_no_pad(s: str) -> str: return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest()) def _random_state(nbytes: int = 16) -> str: return secrets.token_urlsafe(nbytes) def _pkce_verifier() -> str: return secrets.token_urlsafe(64) def _parse_callback_url(callback_url: str) -> Dict[str, str]: candidate = callback_url.strip() if not candidate: return {"code": "", "state": "", "error": "", "error_description": ""} if "://" not in candidate: if candidate.startswith("?"): candidate = f"http://localhost{candidate}" elif any(ch in candidate for ch in "/?#") or ":" in candidate: candidate = f"http://{candidate}" elif "=" in candidate: candidate = f"http://localhost/?{candidate}" parsed = urllib.parse.urlparse(candidate) query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True) fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True) for key, values in fragment.items(): if key not in query or not query[key] or not (query[key][0] or "").strip(): query[key] = values def get1(k: str) -> str: v = query.get(k, [""]) return (v[0] or "").strip() code = get1("code") state = get1("state") error = get1("error") error_description = get1("error_description") if code and not state and "#" in code: code, state = code.split("#", 1) if not error and error_description: error, error_description = error_description, "" return { "code": code, "state": state, "error": error, "error_description": error_description, } def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]: if not id_token or id_token.count(".") < 2: return {} payload_b64 = id_token.split(".")[1] pad = "=" * ((4 - (len(payload_b64) % 4)) % 4) try: payload = base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii")) return json.loads(payload.decode("utf-8")) except Exception: return {} def _decode_jwt_segment(seg: str) -> Dict[str, Any]: raw = (seg or "").strip() if not raw: return {} pad = "=" * ((4 - (len(raw) % 4)) % 4) try: decoded = base64.urlsafe_b64decode((raw + pad).encode("ascii")) return json.loads(decoded.decode("utf-8")) except Exception: return {} def _to_int(v: Any) -> int: try: return int(v) except (TypeError, ValueError): return 0 def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]: body = urllib.parse.urlencode(data).encode("utf-8") req = urllib.request.Request( url, data=body, method="POST", headers={ "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", }, ) try: with urllib.request.urlopen(req, timeout=timeout) as resp: raw = resp.read() if resp.status != 200: raise RuntimeError( f"token exchange failed: {resp.status}: {raw.decode('utf-8', 'replace')}" ) return json.loads(raw.decode("utf-8")) except urllib.error.HTTPError as exc: raw = exc.read() raise RuntimeError( f"token exchange failed: {exc.code}: {raw.decode('utf-8', 'replace')}" ) from exc @dataclass(frozen=True) class OAuthStart: auth_url: str state: str code_verifier: str redirect_uri: str def generate_oauth_url( *, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE ) -> OAuthStart: state = _random_state() code_verifier = _pkce_verifier() code_challenge = _sha256_b64url_no_pad(code_verifier) params = { "client_id": CLIENT_ID, "response_type": "code", "redirect_uri": redirect_uri, "scope": scope, "state": state, "code_challenge": code_challenge, "code_challenge_method": "S256", "prompt": "login", "id_token_add_organizations": "true", "codex_cli_simplified_flow": "true", } auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}" return OAuthStart( auth_url=auth_url, state=state, code_verifier=code_verifier, redirect_uri=redirect_uri, ) def submit_callback_url( *, callback_url: str, expected_state: str, code_verifier: str, redirect_uri: str = DEFAULT_REDIRECT_URI, ) -> str: cb = _parse_callback_url(callback_url) if cb["error"]: desc = cb["error_description"] raise RuntimeError(f"oauth error: {cb['error']}: {desc}".strip()) if not cb["code"]: raise ValueError("callback url missing ?code=") if not cb["state"]: raise ValueError("callback url missing ?state=") if cb["state"] != expected_state: raise ValueError("state mismatch") token_resp = _post_form( TOKEN_URL, { "grant_type": "authorization_code", "client_id": CLIENT_ID, "code": cb["code"], "redirect_uri": redirect_uri, "code_verifier": code_verifier, }, ) access_token = (token_resp.get("access_token") or "").strip() refresh_token = (token_resp.get("refresh_token") or "").strip() id_token = (token_resp.get("id_token") or "").strip() expires_in = _to_int(token_resp.get("expires_in")) claims = _jwt_claims_no_verify(id_token) email = str(claims.get("email") or "").strip() auth_claims = claims.get("https://api.openai.com/auth") or {} account_id = str(auth_claims.get("chatgpt_account_id") or "").strip() now = int(time.time()) expired_rfc3339 = time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0)) ) now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)) config = { "id_token": id_token, "access_token": access_token, "refresh_token": refresh_token, "account_id": account_id, "last_refresh": now_rfc3339, "email": email, "type": "codex", "expired": expired_rfc3339, } return json.dumps(config, ensure_ascii=False, separators=(",", ":")) def get_auto_proxy() -> Optional[str]: common_ports = [7890, 1080, 10809, 10808, 8888] import socket for port in common_ports: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(0.5) if sock.connect_ex(("127.0.0.1", port)) == 0: return f"http://127.0.0.1:{port}" return None def run( proxy: Optional[str], provider_key: str, thread_id: int ) -> Optional[tuple[str, str]]: proxies: Any = None if proxy: proxies = {"http": proxy, "https": proxy} impersonate_list = ["chrome", "chrome110", "chrome116", "safari", "edge"] current_impersonate = random.choice(impersonate_list) s = requests.Session(proxies=proxies, impersonate=current_impersonate) try: trace = s.get("https://cloudflare.com/cdn-cgi/trace", timeout=10) trace = trace.text loc_re = re.search(r"^loc=(.+)$", trace, re.MULTILINE) loc = loc_re.group(1) if loc_re else None if loc != "US": with input_lock: if builtins.yasal_bypass_ip_choice is None: print(f"[yellow] • 当前节点IP ({loc}) 非 US [/yellow]") ans = ( input("[yellow] • 是否强行绕过限制 [/yellow] (Y/n) ") .strip() .lower() ) if ans == "n": builtins.yasal_bypass_ip_choice = False else: builtins.yasal_bypass_ip_choice = True if not builtins.yasal_bypass_ip_choice: print(f"[red] • 退出线程 [/red]") return None else: # 修复后的语法结构 if loc == "CN" or loc == "HK": if builtins.yasal_bypass_ip_choice: print(f"[yellow] • 检测到敏感地区 ({loc}),尝试进行绕过... [/yellow]") if not proxy: auto_p = get_auto_proxy() if auto_p: proxies = {"http": auto_p, "https": auto_p} s.proxies = proxies else: print(f"[yellow] • 没有找到存活的本地代理端口,尝试本地直连 [/yellow]") pass # 继续执行后续逻辑 else: # 如果是 US,正常通过 pass except Exception as e: print(f"[red] • 网络连接断开 [/red]") return None mailbox = get_temp_mailbox(provider_key, thread_id, proxies) if not mailbox: return None email = mailbox.email oauth = generate_oauth_url() url = oauth.auth_url try: resp = s.get(url, timeout=15) did = s.cookies.get("oai-did") print(f"[线程 {thread_id}] Device ID: {did}") signup_body = f'{{"username":{{"value":"{email}","kind":"email"}},"screen_hint":"signup"}}' sen_req_body = f'{{"p":"","id":"{did}","flow":"authorize_continue"}}' sen_resp = requests.post( "https://sentinel.openai.com/backend-api/sentinel/req", headers={ "origin": "https://sentinel.openai.com", "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6", "content-type": "text/plain;charset=UTF-8", }, data=sen_req_body, proxies=proxies, impersonate=current_impersonate, timeout=15, ) if sen_resp.status_code != 200: print(f"[red] • 指纹被识别 [/red]") return None sen_token = sen_resp.json()["token"] sentinel = f'{{"p": "", "t": "", "c": "{sen_token}", "id": "{did}", "flow": "authorize_continue"}}' signup_resp = s.post( "https://auth.openai.com/api/accounts/authorize/continue", headers={ "referer": "https://auth.openai.com/create-account", "accept": "application/json", "content-type": "application/json", "openai-sentinel-token": sentinel, }, data=signup_body, ) if signup_resp.status_code == 403 or signup_resp.status_code == 429: print(f"[red] • 注册被拒绝 [/red]") return None s.post( "https://auth.openai.com/api/accounts/passwordless/send-otp", headers={ "referer": "https://auth.openai.com/create-account/password", "accept": "application/json", "content-type": "application/json", }, ) code = get_oai_code(mailbox, thread_id, proxies) if not code: return None code_body = f'{{"code":"{code}"}}' s.post( "https://auth.openai.com/api/accounts/email-otp/validate", headers={ "referer": "https://auth.openai.com/email-verification", "accept": "application/json", "content-type": "application/json", }, data=code_body, ) create_account_body = f'{{"name":"{random.choice(["Alex", "Chris", "Jordan", "Taylor", "Morgan", "Sam", "Casey"])} {random.choice(["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller"])}","birthdate":"{random.randint(1980, 2002)}-0{random.randint(1, 9)}-{random.randint(10, 28)}" }}' create_account_resp = s.post( "https://auth.openai.com/api/accounts/create_account", headers={ "referer": "https://auth.openai.com/about-you", "accept": "application/json", "content-type": "application/json", }, data=create_account_body, ) create_account_status = create_account_resp.status_code if create_account_status != 200: err_msg = create_account_resp.text print(f"[red] • 创建失败 [/red]") if "unsupported_email" in err_msg: print(f"[[red] • 当前邮箱域名被封禁 [/red]") return None auth_cookie = s.cookies.get("oai-client-auth-session") if not auth_cookie: print(f"[red] • 未获取授权Cookie [/red]") return None auth_json = _decode_jwt_segment(auth_cookie.split(".")[0]) workspaces = auth_json.get("workspaces") or [] if not workspaces: print(f"[red] • 授权Cookie里没有workspace信息 [/red]") return None workspace_id = str((workspaces[0] or {}).get("id") or "").strip() select_body = f'{{"workspace_id":"{workspace_id}"}}' select_resp = s.post( "https://auth.openai.com/api/accounts/workspace/select", headers={ "referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent", "content-type": "application/json", }, data=select_body, ) if select_resp.status_code != 200: print(f"[red] • 选择 workspace 失败 [/red]") return None continue_url = str((select_resp.json() or {}).get("continue_url") or "").strip() if not continue_url: return None current_url = continue_url for _ in range(6): final_resp = s.get(current_url, allow_redirects=False, timeout=15) location = final_resp.headers.get("Location") or "" if final_resp.status_code not in [301, 302, 303, 307, 308]: break if not location: break next_url = urllib.parse.urljoin(current_url, location) if "code=" in next_url and "state=" in next_url: token_json = submit_callback_url( callback_url=next_url, code_verifier=oauth.code_verifier, redirect_uri=oauth.redirect_uri, expected_state=oauth.state, ) return token_json, mailbox.password current_url = next_url print(f"[red] • 未能捕获到Callback URL [/red]") return None except Exception as e: print(f"[red] • 运行时错误 [/red]") return None def worker( thread_id: int, proxy: Optional[str], once: bool, sleep_min: int, sleep_max: int, provider_key: str, ) -> None: count = 0 while True: count += 1 print(f"\n———————————————————[bold blue] [{thread_id}] ·第 {count} 轮注册[/bold blue]———————————————") try: result = run(proxy, provider_key, thread_id) is_success = False if result: token_json, password = result try: t_data = json.loads(token_json) fname_email = t_data.get("email", "unknown").replace("@", "_") raw_email = t_data.get("email", "unknown") refresh_token = t_data.get("refresh_token", "") except Exception: fname_email = f"unknown_{thread_id}" raw_email = "unknown" refresh_token = "" os.makedirs("output", exist_ok=True) file_name = f"output/token_{fname_email}_{int(time.time())}.json" with open(file_name, "w", encoding="utf-8") as f: f.write(token_json) with open("output/accounts.txt", "a", encoding="utf-8") as f: f.write(f"{raw_email}----{password}----{refresh_token}\n") print(f"[green] • 注册成功[/green]") is_success = True else: print(f"[bold red] • 注册失败 [/boldred]") except Exception as e: print(f"[red] • 线程内部错误 [/red]") is_success = False if once: break wait_time = random.randint(sleep_min, sleep_max) if not is_success: wait_time += 10 time.sleep(wait_time) def main() -> None: parser = argparse.ArgumentParser(description="OpenAI 注册") parser.add_argument("--proxy", default=None, help="代理地址") parser.add_argument("--once", action="store_true", help="只运行一次") parser.add_argument("--sleep-min", type=int, default=10) parser.add_argument("--sleep-max", type=int, default=30) args = parser.parse_args() providers_list = ["mailtm", "mailtm", "mailtm"] threads = [] # 修正:确保线程循环不超过列表长度 for i in range(1, len(providers_list) + 1): provider_key = providers_list[i - 1] t = threading.Thread( target=worker, args=(i, args.proxy, args.once, args.sleep_min, args.sleep_max, provider_key), ) t.daemon = True t.start() threads.append(t) try: while True: time.sleep(1) if not any(t.is_alive() for t in threads): break except KeyboardInterrupt: print("\n • 正在结束任务...") if __name__ == "__main__": main()