Skip to content

Commit 580f72c

Browse files
authored
Merge pull request #48 from jkawamoto/timeline
Add transcript fetching with timestamps
2 parents 9dd29cc + 81f1e72 commit 580f72c

File tree

3 files changed

+238
-23
lines changed

3 files changed

+238
-23
lines changed

manifest.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
"name": "get_transcript",
3333
"description": "Fetches the transcript of a specified YouTube video."
3434
},
35+
{
36+
"name": "get_timed_transcript",
37+
"description": "Fetches the transcript of a specified YouTube video with timestamps."
38+
},
3539
{
3640
"name": "get_video_info",
3741
"description": "Fetches the metadata of a specified YouTube video."

src/mcp_youtube_transcript/__init__.py

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
# This software is released under the MIT License.
66
#
77
# http://opensource.org/licenses/mit-license.php
8+
from __future__ import annotations
9+
810
from contextlib import asynccontextmanager
911
from dataclasses import dataclass
1012
from datetime import datetime, timedelta
@@ -21,7 +23,7 @@
2123
from mcp.server import FastMCP
2224
from mcp.server.fastmcp import Context
2325
from pydantic import Field, BaseModel
24-
from youtube_transcript_api import YouTubeTranscriptApi
26+
from youtube_transcript_api import YouTubeTranscriptApi, FetchedTranscriptSnippet
2527
from youtube_transcript_api.proxies import WebshareProxyConfig, GenericProxyConfig, ProxyConfig
2628
from yt_dlp import YoutubeDL
2729
from yt_dlp.extractor.youtube import YoutubeIE
@@ -50,6 +52,31 @@ class Transcript(BaseModel):
5052
next_cursor: str | None = Field(description="Cursor to retrieve the next page of the transcript", default=None)
5153

5254

55+
class TranscriptSnippet(BaseModel):
56+
"""Transcript snippet of a YouTube video."""
57+
58+
text: str = Field(description="Text of the transcript snippet")
59+
start: float = Field(description="The timestamp at which this transcript snippet appears on screen in seconds.")
60+
duration: float = Field(description="The duration of how long the snippet in seconds.")
61+
62+
def __len__(self) -> int:
63+
return len(self.model_dump_json())
64+
65+
@classmethod
66+
def from_fetched_transcript_snippet(
67+
cls: type[TranscriptSnippet], snippet: FetchedTranscriptSnippet
68+
) -> TranscriptSnippet:
69+
return cls(text=snippet.text, start=snippet.start, duration=snippet.duration)
70+
71+
72+
class TimedTranscript(BaseModel):
73+
"""Transcript of a YouTube video with timestamps."""
74+
75+
title: str = Field(description="Title of the video")
76+
snippets: list[TranscriptSnippet] = Field(description="Transcript snippets of the video")
77+
next_cursor: str | None = Field(description="Cursor to retrieve the next page of the transcript", default=None)
78+
79+
5380
class VideoInfo(BaseModel):
5481
"""Video information."""
5582

@@ -68,8 +95,19 @@ def _parse_time_info(date: int, timestamp: int, duration: int) -> Tuple[datetime
6895
return upload_date, duration_str
6996

7097

98+
def _parse_video_id(url: str) -> str:
99+
parsed_url = urlparse(url)
100+
if parsed_url.hostname == "youtu.be":
101+
return parsed_url.path.lstrip("/")
102+
else:
103+
q = parse_qs(parsed_url.query).get("v")
104+
if q is None:
105+
raise ValueError(f"couldn't find a video ID from the provided URL: {url}.")
106+
return q[0]
107+
108+
71109
@lru_cache
72-
def _get_transcript(ctx: AppContext, video_id: str, lang: str) -> Tuple[str, list[str]]:
110+
def _get_transcript_snippets(ctx: AppContext, video_id: str, lang: str) -> Tuple[str, list[FetchedTranscriptSnippet]]:
73111
if lang == "en":
74112
languages = ["en"]
75113
else:
@@ -83,7 +121,7 @@ def _get_transcript(ctx: AppContext, video_id: str, lang: str) -> Tuple[str, lis
83121
title = soup.title.string if soup.title and soup.title.string else "Transcript"
84122

85123
transcripts = ctx.ytt_api.fetch(video_id, languages=languages)
86-
return title, [item.text for item in transcripts]
124+
return title, transcripts.snippets
87125

88126

89127
@lru_cache
@@ -124,16 +162,9 @@ async def get_transcript(
124162
next_cursor: str | None = Field(description="Cursor to retrieve the next page of the transcript", default=None),
125163
) -> Transcript:
126164
"""Retrieves the transcript of a YouTube video."""
127-
parsed_url = urlparse(url)
128-
if parsed_url.hostname == "youtu.be":
129-
video_id = parsed_url.path.lstrip("/")
130-
else:
131-
q = parse_qs(parsed_url.query).get("v")
132-
if q is None:
133-
raise ValueError(f"couldn't find a video ID from the provided URL: {url}.")
134-
video_id = q[0]
135165

136-
title, transcripts = _get_transcript(ctx.request_context.lifespan_context, video_id, lang)
166+
title, snippets = _get_transcript_snippets(ctx.request_context.lifespan_context, _parse_video_id(url), lang)
167+
transcripts = (item.text for item in snippets)
137168

138169
if response_limit is None or response_limit <= 0:
139170
return Transcript(title=title, transcript="\n".join(transcripts))
@@ -148,6 +179,34 @@ async def get_transcript(
148179

149180
return Transcript(title=title, transcript=res[:-1], next_cursor=cursor)
150181

182+
@mcp.tool()
183+
async def get_timed_transcript(
184+
ctx: Context[ServerSession, AppContext],
185+
url: str = Field(description="The URL of the YouTube video"),
186+
lang: str = Field(description="The preferred language for the transcript", default="en"),
187+
next_cursor: str | None = Field(description="Cursor to retrieve the next page of the transcript", default=None),
188+
) -> TimedTranscript:
189+
"""Retrieves the transcript of a YouTube video with timestamps."""
190+
191+
title, snippets = _get_transcript_snippets(ctx.request_context.lifespan_context, _parse_video_id(url), lang)
192+
193+
if response_limit is None or response_limit <= 0:
194+
return TimedTranscript(
195+
title=title, snippets=[TranscriptSnippet.from_fetched_transcript_snippet(s) for s in snippets]
196+
)
197+
198+
res = []
199+
size = len(title) + 1
200+
cursor = None
201+
for i, s in islice(enumerate(snippets), int(next_cursor or 0), None):
202+
snippet = TranscriptSnippet.from_fetched_transcript_snippet(s)
203+
if size + len(snippet) + 1 > response_limit:
204+
cursor = str(i)
205+
break
206+
res.append(snippet)
207+
208+
return TimedTranscript(title=title, snippets=res, next_cursor=cursor)
209+
151210
@mcp.tool()
152211
def get_video_info(
153212
ctx: Context[ServerSession, AppContext],
@@ -159,4 +218,4 @@ def get_video_info(
159218
return mcp
160219

161220

162-
__all__: Final = ["server", "Transcript", "VideoInfo"]
221+
__all__: Final = ["server", "Transcript", "TimedTranscript", "TranscriptSnippet", "VideoInfo"]

tests/test_mcp.py

Lines changed: 162 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import yt_dlp
2020
from yt_dlp.extractor.youtube import YoutubeIE
2121

22-
from mcp_youtube_transcript import Transcript, VideoInfo, _parse_time_info
22+
from mcp_youtube_transcript import Transcript, VideoInfo, _parse_time_info, TimedTranscript, TranscriptSnippet
2323

2424

2525
def fetch_title(url: str, lang: str) -> str:
@@ -37,10 +37,20 @@ async def mcp_client_session() -> AsyncGenerator[ClientSession, None]:
3737
yield session
3838

3939

40+
@pytest.fixture(scope="module")
41+
async def mcp_client_session_with_response_limit() -> AsyncGenerator[ClientSession, None]:
42+
params = StdioServerParameters(command="uv", args=["run", "mcp-youtube-transcript", "--response-limit", "3000"])
43+
async with stdio_client(params) as streams:
44+
async with ClientSession(streams[0], streams[1]) as session:
45+
await session.initialize()
46+
yield session
47+
48+
4049
@pytest.mark.anyio
4150
async def test_list_tools(mcp_client_session: ClientSession) -> None:
4251
res = await mcp_client_session.list_tools()
4352
assert any(tool.name == "get_transcript" for tool in res.tools)
53+
assert any(tool.name == "get_timed_transcript" for tool in res.tools)
4454
assert any(tool.name == "get_video_info" for tool in res.tools)
4555

4656

@@ -158,15 +168,6 @@ async def test_get_transcript_with_short_url(mcp_client_session: ClientSession)
158168
assert not res.isError
159169

160170

161-
@pytest.fixture(scope="module")
162-
async def mcp_client_session_with_response_limit() -> AsyncGenerator[ClientSession, None]:
163-
params = StdioServerParameters(command="uv", args=["run", "mcp-youtube-transcript", "--response-limit", "3000"])
164-
async with stdio_client(params) as streams:
165-
async with ClientSession(streams[0], streams[1]) as session:
166-
await session.initialize()
167-
yield session
168-
169-
170171
@pytest.mark.skipif(os.getenv("CI") == "true", reason="Skipping this test on CI")
171172
@pytest.mark.default_cassette("LPZh9BOjkQs.yaml")
172173
@pytest.mark.vcr
@@ -199,6 +200,157 @@ async def test_get_transcript_with_response_limit(mcp_client_session_with_respon
199200
assert transcript[:-1] == expect.transcript
200201

201202

203+
@pytest.mark.skipif(os.getenv("CI") == "true", reason="Skipping this test on CI")
204+
@pytest.mark.default_cassette("LPZh9BOjkQs.yaml")
205+
@pytest.mark.vcr
206+
@pytest.mark.anyio
207+
async def test_get_timed_transcript(mcp_client_session: ClientSession) -> None:
208+
video_id = "LPZh9BOjkQs"
209+
210+
expect = TimedTranscript(
211+
title=fetch_title(video_id, "en"),
212+
snippets=[TranscriptSnippet.from_fetched_transcript_snippet(s) for s in YouTubeTranscriptApi().fetch(video_id)],
213+
)
214+
215+
res = await mcp_client_session.call_tool(
216+
"get_timed_transcript",
217+
arguments={"url": f"https://www.youtube.com/watch?v={video_id}"},
218+
)
219+
assert isinstance(res.content[0], TextContent)
220+
221+
transcript = TimedTranscript.model_validate_json(res.content[0].text)
222+
assert transcript == expect
223+
assert not res.isError
224+
225+
226+
@pytest.mark.skipif(os.getenv("CI") == "true", reason="Skipping this test on CI")
227+
@pytest.mark.default_cassette("WjAXZkQSE2U.yaml")
228+
@pytest.mark.vcr
229+
@pytest.mark.anyio
230+
async def test_get_timed_transcript_with_language(mcp_client_session: ClientSession) -> None:
231+
video_id = "WjAXZkQSE2U"
232+
233+
expect = TimedTranscript(
234+
title=fetch_title(video_id, "ja"),
235+
snippets=[
236+
TranscriptSnippet.from_fetched_transcript_snippet(s) for s in YouTubeTranscriptApi().fetch(video_id, ["ja"])
237+
],
238+
)
239+
240+
res = await mcp_client_session.call_tool(
241+
"get_timed_transcript",
242+
arguments={"url": f"https://www.youtube.com/watch?v={video_id}", "lang": "ja"},
243+
)
244+
assert isinstance(res.content[0], TextContent)
245+
print(res.content[0].text)
246+
247+
transcript = TimedTranscript.model_validate_json(res.content[0].text)
248+
assert transcript == expect
249+
assert not res.isError
250+
251+
252+
@pytest.mark.skipif(os.getenv("CI") == "true", reason="Skipping this test on CI")
253+
@pytest.mark.default_cassette("LPZh9BOjkQs.yaml")
254+
@pytest.mark.vcr
255+
@pytest.mark.anyio
256+
async def test_get_timed_transcript_fallback_language(
257+
mcp_client_session: ClientSession,
258+
) -> None:
259+
video_id = "LPZh9BOjkQs"
260+
261+
expect = TimedTranscript(
262+
title=fetch_title(video_id, "en"),
263+
snippets=[TranscriptSnippet.from_fetched_transcript_snippet(s) for s in YouTubeTranscriptApi().fetch(video_id)],
264+
)
265+
266+
res = await mcp_client_session.call_tool(
267+
"get_timed_transcript",
268+
arguments={
269+
"url": f"https://www.youtube.com/watch?v={video_id}",
270+
"lang": "unknown",
271+
},
272+
)
273+
assert isinstance(res.content[0], TextContent)
274+
275+
transcript = TimedTranscript.model_validate_json(res.content[0].text)
276+
assert transcript == expect
277+
assert not res.isError
278+
279+
280+
@pytest.mark.anyio
281+
async def test_get_timed_transcript_invalid_url(mcp_client_session: ClientSession) -> None:
282+
res = await mcp_client_session.call_tool(
283+
"get_timed_transcript", arguments={"url": "https://www.youtube.com/watch?vv=abcdefg"}
284+
)
285+
assert res.isError
286+
287+
288+
@pytest.mark.skipif(os.getenv("CI") == "true", reason="Skipping this test on CI")
289+
@pytest.mark.default_cassette("error.yaml")
290+
@pytest.mark.vcr
291+
@pytest.mark.anyio
292+
async def test_get_timed_transcript_not_found(mcp_client_session: ClientSession) -> None:
293+
res = await mcp_client_session.call_tool(
294+
"get_timed_transcript", arguments={"url": "https://www.youtube.com/watch?v=a"}
295+
)
296+
assert res.isError
297+
298+
299+
@pytest.mark.skipif(os.getenv("CI") == "true", reason="Skipping this test on CI")
300+
@pytest.mark.default_cassette("LPZh9BOjkQs.yaml")
301+
@pytest.mark.vcr
302+
@pytest.mark.anyio
303+
async def test_get_timed_transcript_with_short_url(mcp_client_session: ClientSession) -> None:
304+
video_id = "LPZh9BOjkQs"
305+
306+
expect = TimedTranscript(
307+
title=fetch_title(video_id, "en"),
308+
snippets=[TranscriptSnippet.from_fetched_transcript_snippet(s) for s in YouTubeTranscriptApi().fetch(video_id)],
309+
)
310+
311+
res = await mcp_client_session.call_tool(
312+
"get_timed_transcript",
313+
arguments={"url": f"https://youtu.be/{video_id}"},
314+
)
315+
assert isinstance(res.content[0], TextContent)
316+
317+
transcript = TimedTranscript.model_validate_json(res.content[0].text)
318+
assert transcript == expect
319+
assert not res.isError
320+
321+
322+
@pytest.mark.skipif(os.getenv("CI") == "true", reason="Skipping this test on CI")
323+
@pytest.mark.default_cassette("LPZh9BOjkQs.yaml")
324+
@pytest.mark.vcr
325+
@pytest.mark.anyio
326+
async def test_get_timed_transcript_with_response_limit(mcp_client_session_with_response_limit: ClientSession) -> None:
327+
video_id = "LPZh9BOjkQs"
328+
329+
expect = TimedTranscript(
330+
title=fetch_title(video_id, "en"),
331+
snippets=[TranscriptSnippet.from_fetched_transcript_snippet(s) for s in YouTubeTranscriptApi().fetch(video_id)],
332+
)
333+
334+
snippets = []
335+
cursor = None
336+
while True:
337+
res = await mcp_client_session_with_response_limit.call_tool(
338+
"get_timed_transcript",
339+
arguments={"url": f"https://www.youtube.com/watch?v={video_id}", "next_cursor": cursor},
340+
)
341+
assert not res.isError
342+
assert isinstance(res.content[0], TextContent)
343+
344+
t = TimedTranscript.model_validate_json(res.content[0].text)
345+
snippets.extend(t.snippets)
346+
if t.next_cursor is None:
347+
break
348+
cursor = t.next_cursor
349+
350+
assert t.title == expect.title
351+
assert snippets == expect.snippets
352+
353+
202354
@pytest.mark.skipif(os.getenv("CI") == "true", reason="Skipping this test on CI")
203355
@pytest.mark.anyio
204356
async def test_get_video_info(mcp_client_session: ClientSession) -> None:

0 commit comments

Comments
 (0)