|
5 | 5 | # This software is released under the MIT License. |
6 | 6 | # |
7 | 7 | # http://opensource.org/licenses/mit-license.php |
8 | | -import logging |
| 8 | +from contextlib import asynccontextmanager |
| 9 | +from dataclasses import dataclass |
| 10 | +from functools import lru_cache, partial |
| 11 | +from typing import AsyncIterator |
9 | 12 | from typing import Final |
| 13 | +from urllib.parse import urlparse, parse_qs |
10 | 14 |
|
11 | | -import rich_click as click |
12 | | - |
13 | | -from mcp_youtube_transcript.server import new_server |
14 | | - |
15 | | - |
16 | | -@click.command() |
17 | | -@click.option( |
18 | | - "--webshare-proxy-username", |
19 | | - metavar="NAME", |
20 | | - envvar="WEBSHARE_PROXY_USERNAME", |
21 | | - help="Webshare proxy service username.", |
22 | | -) |
23 | | -@click.option( |
24 | | - "--webshare-proxy-password", |
25 | | - metavar="PASSWORD", |
26 | | - envvar="WEBSHARE_PROXY_PASSWORD", |
27 | | - help="Webshare proxy service password.", |
28 | | -) |
29 | | -@click.option("--http-proxy", metavar="URL", envvar="HTTP_PROXY", help="HTTP proxy server URL.") |
30 | | -@click.option("--https-proxy", metavar="URL", envvar="HTTPS_PROXY", help="HTTPS proxy server URL.") |
31 | | -@click.version_option() |
32 | | -def main( |
33 | | - webshare_proxy_username: str | None, |
34 | | - webshare_proxy_password: str | None, |
35 | | - http_proxy: str | None, |
36 | | - https_proxy: str | None, |
37 | | -) -> None: |
38 | | - """YouTube Transcript MCP server.""" |
39 | | - |
40 | | - logging.basicConfig(level=logging.INFO) |
41 | | - logger = logging.getLogger(__name__) |
42 | | - |
43 | | - logger.info("starting Youtube Transcript MCP server") |
44 | | - mcp = new_server(webshare_proxy_username, webshare_proxy_password, http_proxy, https_proxy) |
45 | | - mcp.run() |
46 | | - logger.info("closed Youtube Transcript MCP server") |
47 | | - |
48 | | - |
49 | | -__all__: Final = ["main"] |
| 15 | +import requests |
| 16 | +from bs4 import BeautifulSoup |
| 17 | +from mcp.server import FastMCP |
| 18 | +from mcp.server.fastmcp import Context |
| 19 | +from pydantic import Field |
| 20 | +from youtube_transcript_api import YouTubeTranscriptApi |
| 21 | +from youtube_transcript_api.proxies import WebshareProxyConfig, GenericProxyConfig, ProxyConfig |
| 22 | + |
| 23 | + |
| 24 | +@dataclass(frozen=True) |
| 25 | +class AppContext: |
| 26 | + http_client: requests.Session |
| 27 | + ytt_api: YouTubeTranscriptApi |
| 28 | + |
| 29 | + |
| 30 | +@asynccontextmanager |
| 31 | +async def _app_lifespan(_server: FastMCP, proxy_config: ProxyConfig | None) -> AsyncIterator[AppContext]: |
| 32 | + with requests.Session() as http_client: |
| 33 | + ytt_api = YouTubeTranscriptApi(http_client=http_client, proxy_config=proxy_config) |
| 34 | + yield AppContext(http_client=http_client, ytt_api=ytt_api) |
| 35 | + |
| 36 | + |
| 37 | +@lru_cache |
| 38 | +def _get_transcript(ctx: AppContext, video_id: str, lang: str) -> str: |
| 39 | + if lang == "en": |
| 40 | + languages = ["en"] |
| 41 | + else: |
| 42 | + languages = [lang, "en"] |
| 43 | + |
| 44 | + page = ctx.http_client.get( |
| 45 | + f"https://www.youtube.com/watch?v={video_id}", headers={"Accept-Language": ",".join(languages)} |
| 46 | + ) |
| 47 | + page.raise_for_status() |
| 48 | + soup = BeautifulSoup(page.text, "html.parser") |
| 49 | + title = soup.title.string if soup.title else "Transcript" |
| 50 | + |
| 51 | + transcripts = ctx.ytt_api.fetch(video_id, languages=languages) |
| 52 | + |
| 53 | + return f"# {title}\n" + "\n".join((item.text for item in transcripts)) |
| 54 | + |
| 55 | + |
| 56 | +def server( |
| 57 | + webshare_proxy_username: str | None = None, |
| 58 | + webshare_proxy_password: str | None = None, |
| 59 | + http_proxy: str | None = None, |
| 60 | + https_proxy: str | None = None, |
| 61 | +) -> FastMCP: |
| 62 | + """Initializes the MCP server.""" |
| 63 | + |
| 64 | + proxy_config: ProxyConfig | None = None |
| 65 | + if webshare_proxy_username and webshare_proxy_password: |
| 66 | + proxy_config = WebshareProxyConfig(webshare_proxy_username, webshare_proxy_password) |
| 67 | + elif http_proxy or https_proxy: |
| 68 | + proxy_config = GenericProxyConfig(http_proxy, https_proxy) |
| 69 | + |
| 70 | + mcp = FastMCP("Youtube Transcript", lifespan=partial(_app_lifespan, proxy_config=proxy_config)) |
| 71 | + |
| 72 | + @mcp.tool() |
| 73 | + async def get_transcript( |
| 74 | + ctx: Context, |
| 75 | + url: str = Field(description="The URL of the YouTube video"), |
| 76 | + lang: str = Field(description="The preferred language for the transcript", default="en"), |
| 77 | + ) -> str: |
| 78 | + """Retrieves the transcript of a YouTube video.""" |
| 79 | + parsed_url = urlparse(url) |
| 80 | + if parsed_url.hostname == "youtu.be": |
| 81 | + video_id = parsed_url.path.lstrip("/") |
| 82 | + else: |
| 83 | + q = parse_qs(parsed_url.query).get("v") |
| 84 | + if q is None: |
| 85 | + raise ValueError(f"couldn't find a video ID from the provided URL: {url}.") |
| 86 | + video_id = q[0] |
| 87 | + |
| 88 | + app_ctx: AppContext = ctx.request_context.lifespan_context # type: ignore |
| 89 | + return _get_transcript(app_ctx, video_id, lang) |
| 90 | + |
| 91 | + return mcp |
| 92 | + |
| 93 | + |
| 94 | +__all__: Final = ["server"] |
0 commit comments