Adding ability to download to local storage

parent 79bf533f
......@@ -21,7 +21,10 @@ gi.require_version('Gdk', '3.0')
gi.require_version('Gio', '2.0')
gi.require_version('Gst', '1.0')
gi.require_version('Handy', '1')
from gi.repository import Gdk, GdkPixbuf, Gio, GLib, Gst, Gtk, Handy
gi.require_version('Soup', '2.4')
from gi.repository import Gdk, GdkPixbuf, Gio, GLib, Gst, Gtk, Handy, Soup
import json
Gst.init()
Gst.init_check()
......@@ -40,7 +43,9 @@ class ResultsBox(Gtk.Box):
slider = Gtk.Template.Child()
audio_dl = Gtk.Template.Child()
audio_dl_image = Gtk.Template.Child()
video_dl = Gtk.Template.Child()
video_dl_image = Gtk.Template.Child()
speed = Gtk.Template.Child()
fullscreen = Gtk.Template.Child()
unfullscreen = Gtk.Template.Child()
......@@ -89,11 +94,11 @@ class ResultsBox(Gtk.Box):
def on_file_read(self, poster_file, async_res, user_data):
stream = poster_file.read_finish(async_res)
GdkPixbuf.Pixbuf.new_from_stream_at_scale_async(stream,
self.video_box_width, self.video_box_height,
True, # preserve_aspect_ratio
None, # cancellable
self.on_stream_load, # callback
None) # user_data
self.video_box_width, self.video_box_height,
True, # preserve_aspect_ratio
None, # cancellable
self.on_stream_load, # callback
None) # user_data
def on_stream_load(self, source, async_res, context):
pixbuf = GdkPixbuf.Pixbuf.new_from_stream_finish(async_res)
......@@ -102,29 +107,66 @@ class ResultsBox(Gtk.Box):
self.poster_image.set_from_pixbuf(pixbuf)
def stream_at_scale_async(self, poster_file):
stream = poster_file.read_async(GLib.PRIORITY_DEFAULT, None, self.on_file_read, None)
stream = poster_file.read_async(GLib.PRIORITY_DEFAULT, None,
self.on_file_read, None)
def setup_stream(self, video_meta):
video_title = video_meta['title']
video_channel = video_meta['author']
video_id = video_meta['videoId']
poster_uri = video_meta['poster_uri']
self.get_duration(video_meta['lengthSeconds'])
def parse_video_results(self, session, result, message):
if message.status_code != 200:
print("video json details did not respond")
return False
self.title.set_label(video_title)
self.channel.set_label(video_channel)
self.duration.set_label(self.video_duration)
try:
self.json = json.loads(message.response_body.data)
except:
print("video json did not parse.")
return False
self.video_uri = None
for format_stream in self.json['formatStreams']:
uri = f"http://iteroni.com/latest_version?id={video_id}&itag=18"
if format_stream['qualityLabel'] == "360p":
self.video_uri = format_stream['url']
# this should be done on play button press
self.player.set_property("uri", uri)
# if (future) user-config desires 720p,
# check if it is available and if so use it instead
#if format_stream['qualityLabel'] == "720p":
# self.video_uri = format_stream['url']
if not self.video_uri:
# remove unplayable video urls from list
self.set_visible(False)
return False
self.get_download_uris()
self.player.set_property("uri", self.video_uri)
self.player.set_property("video-sink", self.sink)
poster_file = Gio.File.new_for_uri(poster_uri)
poster_file = Gio.File.new_for_uri(self.poster_uri)
self.stream_at_scale_async(poster_file)
def get_video_details(self):
uri = f"{self.instance}/api/v1/videos/{self.video_id}?fields=adaptiveFormats,formatStreams"
self.session = Soup.Session.new()
self.session.set_property("timeout", 5)
message = Soup.Message.new("GET", uri)
self.session.queue_message(message, self.parse_video_results, message)
def setup_stream(self, video_meta):
self.instance = video_meta['strong_instance']
self.video_id = video_meta['videoId']
self.video_title = video_meta['title']
self.video_channel = video_meta['author']
self.poster_uri = video_meta['poster_uri']
self.get_duration(video_meta['lengthSeconds'])
self.title.set_label(self.video_title)
self.channel.set_label(self.video_channel)
self.duration.set_label(self.video_duration)
self.get_video_details()
def update_slider(self):
if not self.is_playing:
return False
......@@ -153,13 +195,101 @@ class ResultsBox(Gtk.Box):
return True
def strictify_name(self, s):
return "".join( x for x in s if (x.isalnum() or x in "_- "))
def download_audio_uri(self, uri):
dl_stream = Gio.File.new_for_uri(uri)
dest_ext = "m4a"
try:
dest_dir = GLib.get_user_special_dir(GLib.UserDirectory.DIRECTORY_MUSIC)
except:
print("No Music Directory")
return False
dest_title = self.strictify_name(self.video_title)
dest_path = f"{dest_dir}/{dest_title}.{dest_ext}"
dest = Gio.File.new_for_path(dest_path)
dl_stream.copy_async(dest, Gio.FileCopyFlags.OVERWRITE,
GLib.PRIORITY_DEFAULT, None,
self.progress_audio_cb, (),
self.ready_audio_cb, None)
def download_video_uri(self, uri):
dl_stream = Gio.File.new_for_uri(uri)
dest_ext = "mp4"
try:
dest_dir = GLib.get_user_special_dir(GLib.UserDirectory.DIRECTORY_VIDEOS)
except:
print("No Videos Directory")
return False
dest_title = self.strictify_name(self.video_title)
dest_path = f"{dest_dir}/{dest_title}.{dest_ext}"
dest = Gio.File.new_for_path(dest_path)
dl_stream.copy_async(dest, Gio.FileCopyFlags.OVERWRITE,
GLib.PRIORITY_DEFAULT, None,
self.progress_video_cb, (),
self.ready_video_cb, None)
def show_success_icon(self, button):
if button == 'audio':
self.audio_dl_image.set_property('icon-name', 'object-select-symbolic')
elif button == 'video':
self.video_dl_image.set_property('icon-name', 'object-select-symbolic')
def show_progress_icon(self, button):
# could show progress in icon
if button == 'audio':
self.audio_dl_image.set_property('icon-name', 'document-save-as-symbolic')
elif button == 'video':
self.video_dl_image.set_property('icon-name', 'document-save-as-symbolic')
def show_error_icon(self, button):
if button == 'audio':
self.audio_dl_image.set_property('icon-name', 'dialog-error-symbolic')
elif button == 'video':
self.video_dl_image.set_property('icon-name', 'dialog-error-symbolic')
def progress_audio_cb(self, current_num_bytes, total_num_bytes, *user_data):
percentage = round(current_num_bytes / total_num_bytes * 100)
#print(f"Audio Downloading: {percentage}%", end="\r")
def progress_video_cb(self, current_num_bytes, total_num_bytes, *user_data):
percentage = round(current_num_bytes / total_num_bytes * 100)
#print(f"Video Downloading: {percentage}%", end="\r")
def ready_audio_cb(self, src, async_res, user_data):
try:
src.copy_finish(async_res)
except GLib.Error as e:
self.show_error_icon('audio')
print("Failed to write file stream %s", e.message)
return False
self.show_success_icon('audio')
def ready_video_cb(self, src, async_res, user_data):
try:
src.copy_finish(async_res)
except GLib.Error as e:
self.show_error_icon('video')
print("Failed to write file stream %s", e.message)
return False
self.show_success_icon('video')
@Gtk.Template.Callback()
def audio_dl_button(self, button):
print("audio_dl_button")
#print('requested to download ' + str(self.audio_dl_uri))
self.audio_dl.set_sensitive(False)
self.show_progress_icon('audio')
self.download_audio_uri(self.audio_dl_uri)
@Gtk.Template.Callback()
def video_dl_button(self, button):
print("video_dl_button")
#print('requested to download ' + str(self.video_dl_uri))
self.video_dl.set_sensitive(False)
self.show_progress_icon('video')
self.download_video_uri(self.video_dl_uri)
@Gtk.Template.Callback()
def play_button(self, button):
......@@ -254,15 +384,65 @@ class ResultsBox(Gtk.Box):
seek * Gst.SECOND / self.percent)
def poll_mouse(self):
if self.controls_box.get_visible():
self.controls_box.set_visible(False)
now_is = int(GLib.get_current_time())
if (int(now_is) - (self.last_move)) >= 1:
if self.controls_box.get_visible():
self.controls_box.set_visible(False)
@Gtk.Template.Callback()
def mouse_move(self, event, data):
self.last_move = int(GLib.get_current_time())
GLib.timeout_add_seconds(2, self.poll_mouse)
if not self.controls_box.get_visible():
self.controls_box.set_visible(True)
GLib.timeout_add_seconds(3, self.poll_mouse)
@Gtk.Template.Callback()
def swallow_slider_scroll_event(self, event, data):
return True
def get_download_uris(self):
# get download link urls based on (future) user-config
# video quality: ["480p", "720p", "1080p"] # default 720p
# audio structure:
# "bitrate": "142028",
# "type": "audio/webm; codecs=\"opus\"",
# "container": "webm",
# video structure:
# "bitrate": "440700",
# "type": "video/mp4; codecs=\"avc1.4d401f\"",
# "container": "mp4",
# "qualityLabel": "720p"
video_quality = "720p"
last_bitrate = None
self.audio_dl_uri = None
self.video_dl_uri = None
for af in self.json['adaptiveFormats']:
if af['type'].startswith('audio/mp4'):
if not self.audio_dl_uri:
last_bitrate = af['bitrate']
self.audio_dl_uri = af['url']
if af['bitrate'] > last_bitrate:
last_bitrate = af['bitrate']
self.audio_dl_uri = af['url']
elif af['type'].startswith('video/mp4'):
# set it to something
if not self.video_dl_uri:
self.video_dl_uri = af['url']
if 'qualityLabel' in af:
if af['qualityLabel'] == "720p" and video_quality == "720p":
self.video_dl_uri = af['url']
elif af['qualityLabel'] == "480p" and video_quality == "480p":
self.video_dl_uri = af['url']
elif af['qualityLabel'] == "1080p" and video_quality == "1080p":
self.video_dl_uri = af['url']
if self.audio_dl_uri:
self.audio_dl.set_sensitive(True)
if self.video_dl_uri:
self.video_dl.set_sensitive(True)
......@@ -34,6 +34,14 @@ class Search:
def get_strong_instance(self):
# lookup instances, get a strong one, to be done (stubbed for now)
# get urls from api.invidious.io
# https://api.invidious.io/instances.json?sort_by=health
# api urls to confirm are strong (some throw forbidden)
# api/v1/videos/{videoId}
# api/v1/search/query
#self.instance = "https://ytprivate.com"
#self.instance = "https://vid.puffyan.us"
#self.instance = "https://iteroni.com/"
self.instance = "https://invidious.xyz"
def clear_entries(self):
......@@ -87,6 +95,8 @@ class Search:
def get_poster_url(self):
# tweak json with local poster url
for video_meta in self.json:
# append the strong instance for results to use
video_meta['strong_instance'] = self.instance
for poster in video_meta['videoThumbnails']:
if poster['quality'] == 'medium':
video_meta['poster_uri'] = poster['url']
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment