199 lines
5.4 KiB
Python
199 lines
5.4 KiB
Python
#!/usr/bin/env python
|
|
|
|
import re
|
|
import os
|
|
import sys
|
|
import gzip
|
|
import json
|
|
import base64
|
|
import string
|
|
import urllib.request
|
|
from datetime import datetime
|
|
|
|
"""
|
|
youtube initial player response
|
|
|
|
it's probably best to use this through a config file; see res/yt-ipr.conf
|
|
|
|
but if you want to use plain arguments instead then:
|
|
-v srv/ytm:ytm:w:rw,ed
|
|
:c,e2ts:c,e2dsa
|
|
:c,sz=16k-1m:c,maxn=10,300:c,rotf=%Y-%m/%d-%H
|
|
:c,mtp=yt-id,yt-title,yt-author,yt-channel,yt-views,yt-private,yt-manifest,yt-expires=bin/mtag/yt-ipr.py
|
|
:c,mte=yt-id,yt-title,yt-author,yt-channel,yt-views,yt-private,yt-manifest,yt-expires
|
|
|
|
see res/yt-ipr.user.js for the example userscript to go with this
|
|
"""
|
|
|
|
|
|
def main():
|
|
try:
|
|
with gzip.open(sys.argv[1], "rt", encoding="utf-8", errors="replace") as f:
|
|
txt = f.read()
|
|
except:
|
|
with open(sys.argv[1], "r", encoding="utf-8", errors="replace") as f:
|
|
txt = f.read()
|
|
|
|
txt = "{" + txt.split("{", 1)[1]
|
|
|
|
try:
|
|
pd = json.loads(txt)
|
|
except json.decoder.JSONDecodeError as ex:
|
|
pd = json.loads(txt[: ex.pos])
|
|
|
|
# print(json.dumps(pd, indent=2))
|
|
|
|
if "videoDetails" in pd:
|
|
parse_youtube(pd)
|
|
else:
|
|
parse_freg(pd)
|
|
|
|
|
|
def get_expiration(url):
|
|
et = re.search(r"[?&]expire=([0-9]+)", url).group(1)
|
|
et = datetime.utcfromtimestamp(int(et))
|
|
return et.strftime("%Y-%m-%d, %H:%M")
|
|
|
|
|
|
def parse_youtube(pd):
|
|
vd = pd["videoDetails"]
|
|
sd = pd["streamingData"]
|
|
|
|
et = sd["adaptiveFormats"][0]["url"]
|
|
et = get_expiration(et)
|
|
|
|
mf = []
|
|
if "dashManifestUrl" in sd:
|
|
mf.append("dash")
|
|
if "hlsManifestUrl" in sd:
|
|
mf.append("hls")
|
|
|
|
r = {
|
|
"yt-id": vd["videoId"],
|
|
"yt-title": vd["title"],
|
|
"yt-author": vd["author"],
|
|
"yt-channel": vd["channelId"],
|
|
"yt-views": vd["viewCount"],
|
|
"yt-private": vd["isPrivate"],
|
|
# "yt-expires": sd["expiresInSeconds"],
|
|
"yt-manifest": ",".join(mf),
|
|
"yt-expires": et,
|
|
}
|
|
print(json.dumps(r))
|
|
|
|
freg_conv(pd)
|
|
|
|
|
|
def parse_freg(pd):
|
|
md = pd["metadata"]
|
|
r = {
|
|
"yt-id": md["id"],
|
|
"yt-title": md["title"],
|
|
"yt-author": md["channelName"],
|
|
"yt-channel": md["channelURL"].strip("/").split("/")[-1],
|
|
"yt-expires": get_expiration(list(pd["video"].values())[0]),
|
|
}
|
|
print(json.dumps(r))
|
|
|
|
|
|
def freg_conv(pd):
|
|
# based on getURLs.js v1.5 (2021-08-07)
|
|
# fmt: off
|
|
priority = {
|
|
"video": [
|
|
337, 315, 266, 138, # 2160p60
|
|
313, 336, # 2160p
|
|
308, # 1440p60
|
|
271, 264, # 1440p
|
|
335, 303, 299, # 1080p60
|
|
248, 169, 137, # 1080p
|
|
334, 302, 298, # 720p60
|
|
247, 136 # 720p
|
|
],
|
|
"audio": [
|
|
251, 141, 171, 140, 250, 249, 139
|
|
]
|
|
}
|
|
|
|
vid_id = pd["videoDetails"]["videoId"]
|
|
chan_id = pd["videoDetails"]["channelId"]
|
|
|
|
try:
|
|
thumb_url = pd["microformat"]["playerMicroformatRenderer"]["thumbnail"]["thumbnails"][0]["url"]
|
|
start_ts = pd["microformat"]["playerMicroformatRenderer"]["liveBroadcastDetails"]["startTimestamp"]
|
|
except:
|
|
thumb_url = f"https://img.youtube.com/vi/{vid_id}/maxresdefault.jpg"
|
|
start_ts = ""
|
|
|
|
# fmt: on
|
|
|
|
metadata = {
|
|
"title": pd["videoDetails"]["title"],
|
|
"id": vid_id,
|
|
"channelName": pd["videoDetails"]["author"],
|
|
"channelURL": "https://www.youtube.com/channel/" + chan_id,
|
|
"description": pd["videoDetails"]["shortDescription"],
|
|
"thumbnailUrl": thumb_url,
|
|
"startTimestamp": start_ts,
|
|
}
|
|
|
|
if [x for x in vid_id if x not in string.ascii_letters + string.digits + "_-"]:
|
|
print(f"malicious json", file=sys.stderr)
|
|
return
|
|
|
|
basepath = os.path.dirname(sys.argv[1])
|
|
|
|
thumb_fn = f"{basepath}/{vid_id}.jpg"
|
|
tmp_fn = f"{thumb_fn}.{os.getpid()}"
|
|
if not os.path.exists(thumb_fn) and (
|
|
thumb_url.startswith("https://img.youtube.com/vi/")
|
|
or thumb_url.startswith("https://i.ytimg.com/vi/")
|
|
):
|
|
try:
|
|
with urllib.request.urlopen(thumb_url) as fi:
|
|
with open(tmp_fn, "wb") as fo:
|
|
fo.write(fi.read())
|
|
|
|
os.rename(tmp_fn, thumb_fn)
|
|
except:
|
|
if os.path.exists(tmp_fn):
|
|
os.unlink(tmp_fn)
|
|
|
|
try:
|
|
with open(thumb_fn, "rb") as f:
|
|
thumb = base64.b64encode(f.read()).decode("ascii")
|
|
except:
|
|
thumb = "/9j/4AAQSkZJRgABAQEASABIAAD/2wBDAAMCAgICAgMCAgIDAwMDBAYEBAQEBAgGBgUGCQgKCgkICQkKDA8MCgsOCwkJDRENDg8QEBEQCgwSExIQEw8QEBD/yQALCAABAAEBAREA/8wABgAQEAX/2gAIAQEAAD8A0s8g/9k="
|
|
|
|
metadata["thumbnail"] = "data:image/jpeg;base64," + thumb
|
|
|
|
ret = {
|
|
"metadata": metadata,
|
|
"version": "1.5",
|
|
"createTime": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
}
|
|
|
|
for stream, itags in priority.items():
|
|
for itag in itags:
|
|
url = None
|
|
for afmt in pd["streamingData"]["adaptiveFormats"]:
|
|
if itag == afmt["itag"]:
|
|
url = afmt["url"]
|
|
break
|
|
|
|
if url:
|
|
ret[stream] = {itag: url}
|
|
break
|
|
|
|
fn = f"{basepath}/{vid_id}.urls.json"
|
|
with open(fn, "w", encoding="utf-8", errors="replace") as f:
|
|
f.write(json.dumps(ret, indent=4))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except:
|
|
# raise
|
|
pass
|