Verified Commit a1e847a1 authored by Todd Weaver's avatar Todd Weaver
Browse files

Pulling in resilience into results rather than different results per scroll-window

Adding HEAD test per video with instance fallback
Setting up for plugin results inclusion into main results
Moving results to pull from meta rather than do tests itself (helping with plugin style future)
Adding strength of video playback tests into search (and therefore future plugins)
Cleaning up excess (unused) libraries
parent 68d9a73b
......@@ -62,6 +62,9 @@ class Instances:
not instance_uri.endswith('.i2p')):
self.check_query_api_valid(instance[1]['uri'])
# add a backup
self.check_query_api_valid('https://iteroni.com')
# check the instance can run a query on the API
def check_query_api_valid(self, uri):
# /api/v1/search?q=query
......@@ -125,4 +128,4 @@ class Instances:
return False
self.app_window.strong_instances.append(uri)
self.app_window.navigation_current.set_property('icon-name', 'object-select-symbolic')
self.app_window.status_icon.set_property('icon-name', 'object-select-symbolic')
# iteroni.py
#
# Copyright 2021 Purism, SPC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import gi
gi.require_version('Soup', '2.4')
from gi.repository import GLib, Soup
import json
class Search:
def __init__(self, **kwargs):
self.add_result = kwargs.get('add_result', None)
self.set_headerbar_color = kwargs.get('set_headerbar_color', None)
self.hide_scroller_error_box = kwargs.get('hide_scroller_error_box', None)
self.show_scroller_error_box = kwargs.get('show_scroller_error_box', None)
def do_search(self, query):
esc_query = GLib.uri_escape_string(query, None, None)
self.this_instance = 'https://iteroni.com'
uri = f"{self.this_instance}/api/v1/search?q={esc_query};fields=title,videoId,author,lengthSeconds,videoThumbnails"
self.session = Soup.Session.new()
self.session.set_property("timeout", 5)
message = Soup.Message.new("GET", uri)
self.session.queue_message(message, self.show_results, message)
def show_results(self, session, result, message):
if message.status_code != 200:
self.show_scroller_error_box("Service Failure",
"There is no response from the this server.")
return False
try:
self.json = json.loads(message.response_body.data)
except:
self.show_scroller_error_box("Service Failure",
"This server response failed to parse results.")
return False
self.hide_scroller_error_box()
self.get_poster_url()
for video_meta in self.json:
self.add_result(video_meta)
self.set_headerbar_color()
def get_poster_url(self):
# tweak json with local poster url
for video_meta in self.json:
# append the strong instance for results to use
video_meta['strong_instance'] = self.this_instance
for poster in video_meta['videoThumbnails']:
if poster['quality'] == 'medium':
if poster['url'].startswith('/'):
video_meta['poster_uri'] = f"{self.this_instance}{poster['url']}"
else:
video_meta['poster_uri'] = poster['url']
......@@ -33,11 +33,7 @@ stream_sources = [
'about.py',
'window.py',
'search.py',
'iteroni.py',
'results.py',
'scrollers.py',
'instances.py',
]
......
......@@ -20,17 +20,10 @@ gi.require_version('Gtk', '3.0')
gi.require_version('Gdk', '3.0')
gi.require_version('Gio', '2.0')
gi.require_version('Gst', '1.0')
gi.require_version('Handy', '1')
gi.require_version('Soup', '2.4')
from gi.repository import Gdk, GdkPixbuf, Gio, GLib, Gst, Gtk, Handy, Soup
import json
from gi.repository import Gdk, GdkPixbuf, Gio, GLib, Gst, Gtk
Gst.init(None)
Gst.init_check(None)
Handy.init()
import time
@Gtk.Template(resource_path='/sm/puri/Stream/ui/results.ui')
class ResultsBox(Gtk.Box):
......@@ -61,13 +54,10 @@ class ResultsBox(Gtk.Box):
window_to_player_box_padding = 28
def __init__(self, app_window, priority_index, **kwargs):
def __init__(self, app_window, **kwargs):
super().__init__(**kwargs)
self.app_window = app_window
self.priority = 0
if priority_index > 0:
self.priority = GLib.PRIORITY_LOW
# listen for motion on the player box for controls show/hide
self.event_box.add_events(Gdk.EventMask.POINTER_MOTION_MASK)
......@@ -122,66 +112,10 @@ class ResultsBox(Gtk.Box):
self.poster_image.set_from_pixbuf(pixbuf)
def stream_at_scale_async(self, poster_file):
stream = poster_file.read_async(self.priority, None,
stream = poster_file.read_async(0, None,
self.on_file_read, None)
def check_video_playable(self, video_url):
session = Soup.Session.new()
session.set_property("timeout", 2)
message = Soup.Message.new("HEAD", video_url)
session.queue_message(message, self.check_video_playable_cb, None)
def check_video_playable_cb(self, session, results, user_data):
if results.status_code != 200:
self.set_visible(False)
def parse_video_results(self, session, result, message):
if message.status_code != 200:
# remove unplayable video urls from list
self.set_visible(False)
return False
try:
self.json = json.loads(message.response_body.data)
except:
self.set_visible(False)
return False
self.video_uri = None
for format_stream in self.json['formatStreams']:
if format_stream['qualityLabel'] == "360p":
self.video_uri = format_stream['url']
# if (future) user-config desires 720p,
# check if it is available and if so use it instead
#if format_stream['qualityLabel'] == "720p":
# self.video_uri = format_stream['url']
if not self.video_uri:
self.set_visible(False)
return False
self.check_video_playable(self.video_uri)
self.get_download_uris()
self.player.set_property("uri", self.video_uri)
self.player.set_property("video-sink", self.sink)
poster_file = Gio.File.new_for_uri(self.poster_uri)
self.stream_at_scale_async(poster_file)
def get_video_details(self):
uri = f"{self.instance}/api/v1/videos/{self.video_id}?fields=adaptiveFormats,formatStreams"
self.session = Soup.Session.new()
self.session.set_property("timeout", 5)
message = Soup.Message.new("GET", uri)
self.session.queue_message(message, self.parse_video_results, message)
def setup_stream(self, video_meta):
self.instance = video_meta['strong_instance']
self.video_id = video_meta['videoId']
self.video_title = video_meta['title']
self.video_channel = video_meta['author']
......@@ -192,7 +126,26 @@ class ResultsBox(Gtk.Box):
self.channel.set_label(self.video_channel)
self.duration.set_label(video_duration)
self.get_video_details()
self.player.set_property("uri", video_meta['video_uri'])
self.player.set_property("video-sink", self.sink)
poster_file = Gio.File.new_for_uri(video_meta['poster_uri'])
self.stream_at_scale_async(poster_file)
if 'audio_dl_uri' in video_meta:
if video_meta['audio_dl_uri']:
# enable download button
self.audio_dl.set_sensitive(True)
# set the download uri for download button
self.audio_dl_uri = video_meta['audio_dl_uri']
if 'video_dl_uri' in video_meta:
if video_meta['video_dl_uri']:
# enable download button
self.video_dl.set_sensitive(True)
# set the download uri for download button
self.video_dl_uri = video_meta['video_dl_uri']
def update_slider(self):
if not self.app_window.is_playing:
......@@ -403,7 +356,7 @@ class ResultsBox(Gtk.Box):
GLib.timeout_add(50, self.delay_grab)
# horizonal scrollbar, vertical scrollbar (do last)
scroller = self.app_window.scroller_stack.get_visible_child()
scroller = self.app_window.scroller
scroller.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.EXTERNAL)
scroller.set_kinetic_scrolling(False)
......@@ -428,7 +381,7 @@ class ResultsBox(Gtk.Box):
self.resize_player(set_width, set_height)
self.app_window.resize(self.app_orig_width, self.app_orig_height)
scroller = self.app_window.scroller_stack.get_visible_child()
scroller = self.app_window.scroller
scroller.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC)
scroller.set_kinetic_scrolling(True)
......@@ -465,50 +418,3 @@ class ResultsBox(Gtk.Box):
@Gtk.Template.Callback()
def swallow_slider_scroll_event(self, event, data):
return True
def get_download_uris(self):
# get download link urls based on (future) user-config
# video quality: ["480p", "720p", "1080p"] # default 720p
# audio structure:
# "bitrate": "142028",
# "type": "audio/webm; codecs=\"opus\"",
# "container": "webm",
# video structure:
# "bitrate": "440700",
# "type": "video/mp4; codecs=\"avc1.4d401f\"",
# "container": "mp4",
# "qualityLabel": "720p"
last_bitrate = None
self.audio_dl_uri = None
for af in self.json['adaptiveFormats']:
if af['type'].startswith('audio/mp4'):
if not self.audio_dl_uri:
last_bitrate = af['bitrate']
self.audio_dl_uri = af['url']
if af['bitrate'] > last_bitrate:
last_bitrate = af['bitrate']
self.audio_dl_uri = af['url']
video_quality = "720p"
self.video_dl_uri = None
for fs in self.json['formatStreams']:
if fs['type'].startswith('video/mp4'):
# set it to something
if not self.video_dl_uri:
self.video_dl_uri = fs['url']
if 'qualityLabel' in fs:
if fs['qualityLabel'] == "720p" and video_quality == "720p":
self.video_dl_uri = fs['url']
elif fs['qualityLabel'] == "480p" and video_quality == "480p":
self.video_dl_uri = fs['url']
elif fs['qualityLabel'] == "1080p" and video_quality == "1080p":
self.video_dl_uri = fs['url']
if self.audio_dl_uri:
self.audio_dl.set_sensitive(True)
if self.video_dl_uri:
self.video_dl.set_sensitive(True)
# scrollers.py
#
# Copyright 2021 Purism, SPC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from .results import ResultsBox
@Gtk.Template(resource_path='/sm/puri/Stream/ui/scrollers.ui')
class ScrollerBox(Gtk.ScrolledWindow):
__gtype_name__ = 'ScrollerBox'
results_list = Gtk.Template.Child()
scroller_error_box = Gtk.Template.Child()
scroller_error_heading = Gtk.Template.Child()
scroller_error_text = Gtk.Template.Child()
def __init__(self, app_window, priority_index, **kwargs):
super().__init__(**kwargs)
self.app_window = app_window
self.priority_index = priority_index
self.results_list.set_focus_vadjustment(self.props.vadjustment)
def add_result(self, video_meta):
results_box = ResultsBox(self.app_window, self.priority_index)
self.results_list.add(results_box)
results_box.setup_stream(video_meta)
def hide_scroller_error_box(self):
self.results_list.set_visible(True)
self.scroller_error_box.set_visible(False)
self.scroller_error_heading.set_label("Error")
self.scroller_error_text.set_label("...")
def show_scroller_error_box(self, heading, text):
self.results_list.set_visible(False)
self.scroller_error_box.set_visible(True)
self.scroller_error_heading.set_label(heading)
self.scroller_error_text.set_label(text)
@Gtk.Template.Callback()
def swallow_fullscreen_scroll_event(self, event, data):
if self.app_window.is_fullscreen:
return True
......@@ -26,22 +26,17 @@ class Search:
def __init__(self, **kwargs):
# for internal plugins only
self.app_window = kwargs.get('app_window', None)
self.instance_index = kwargs.get('instance_index', None)
self.sort_by = kwargs.get('sort_by', None)
self.scroller_stack = kwargs.get('scroller_stack', None)
self.spinner = kwargs.get('spinner', None)
self.si_index = 0
self.this_instance = self.app_window.strong_instances[self.si_index]
# limited access
self.add_result = kwargs.get('add_result', None)
self.set_headerbar_color = kwargs.get('set_headerbar_color', None)
self.add_result_meta = kwargs.get('add_result_meta', None)
def do_search(self, query):
esc_query = GLib.uri_escape_string(query, None, None)
this_instance = self.app_window.strong_instances[0]
if self.instance_index and len(self.app_window.strong_instances) > self.instance_index:
this_instance = self.app_window.strong_instances[self.instance_index]
uri = f"{this_instance}/api/v1/search?q={esc_query};sort_by={self.sort_by};fields=title,videoId,author,lengthSeconds,videoThumbnails"
uri = f"{self.this_instance}/api/v1/search?q={esc_query};fields=title,videoId,author,lengthSeconds,videoThumbnails"
self.session = Soup.Session.new()
self.session.set_property("timeout", 5)
......@@ -57,25 +52,118 @@ class Search:
return False
try:
self.json = json.loads(message.response_body.data)
self.search_json = json.loads(message.response_body.data)
except:
self.app_window.show_error_box("Service Failure",
"The streaming server response failed to parse results.")
return False
self.get_poster_url()
for video_meta in self.search_json:
self.get_poster_url(video_meta)
# loop to find a playable video instance
self.get_video_details(video_meta)
for video_meta in self.json:
self.add_result(video_meta)
def get_poster_url(self, video_meta):
for poster in video_meta['videoThumbnails']:
if poster['quality'] == 'medium':
if poster['url'].startswith('/'):
video_meta['poster_uri'] = f"{self.this_instance}{poster['url']}"
else:
video_meta['poster_uri'] = poster['url']
self.scroller_stack.set_visible(True)
self.set_headerbar_color()
def get_video_details(self, video_meta):
video_id = video_meta['videoId']
uri = f"{self.this_instance}/api/v1/videos/{video_id}?fields=adaptiveFormats,formatStreams"
self.session = Soup.Session.new()
self.session.set_property("timeout", 5)
message = Soup.Message.new("GET", uri)
self.session.queue_message(message, self.parse_video_results, video_meta)
def get_poster_url(self):
# tweak json with local poster url
for video_meta in self.json:
# append the strong instance for results to use
video_meta['strong_instance'] = self.app_window.strong_instances[0]
for poster in video_meta['videoThumbnails']:
if poster['quality'] == 'medium':
video_meta['poster_uri'] = poster['url']
def parse_video_results(self, session, results, video_meta):
if results.status_code != 200:
# remove unplayable video urls from list
return False
try:
self.video_json = json.loads(results.response_body.data)
except:
return False
video_meta['video_uri'] = None
for format_stream in self.video_json['formatStreams']:
if format_stream['qualityLabel'] == "360p":
video_meta['video_uri'] = format_stream['url']
# if (future) user-config desires 720p,
# check if it is available and if so use it instead
#if format_stream['qualityLabel'] == "720p":
# self.video_uri = format_stream['url']
if not video_meta['video_uri']:
return False
self.check_video_playable(video_meta)
self.get_download_uris(video_meta)
def check_video_playable(self, video_meta):
video_uri = video_meta['video_uri']
session = Soup.Session.new()
session.set_property("timeout", 2)
message = Soup.Message.new("HEAD", video_uri)
session.queue_message(message, self.check_video_playable_cb, video_meta)
def check_video_playable_cb(self, session, results, video_meta):
if results.status_code != 200:
video_meta.pop('video_uri')
self.si_index += 1
if len(self.app_window.strong_instances) > self.si_index:
self.this_instance = self.app_window.strong_instances[self.si_index]
self.get_video_details(video_meta)
else:
return False
if 'video_uri' in video_meta:
self.add_result_meta(video_meta)
def get_download_uris(self, video_meta):
# get download link urls based on (future) user-config
# video quality: ["480p", "720p", "1080p"] # default 720p
# audio structure:
# "bitrate": "142028",
# "type": "audio/webm; codecs=\"opus\"",
# "container": "webm",
# video structure:
# "bitrate": "440700",
# "type": "video/mp4; codecs=\"avc1.4d401f\"",
# "container": "mp4",
# "qualityLabel": "720p"
last_bitrate = None
video_meta['audio_dl_uri'] = None
for af in self.video_json['adaptiveFormats']:
if af['type'].startswith('audio/mp4'):
if not video_meta['audio_dl_uri']:
last_bitrate = af['bitrate']
video_meta['audio_dl_uri'] = af['url']
if af['bitrate'] > last_bitrate:
last_bitrate = af['bitrate']
video_meta['audio_dl_uri'] = af['url']
video_quality = "720p"
video_meta['video_dl_uri'] = None
for fs in self.video_json['formatStreams']:
if fs['type'].startswith('video/mp4'):
# set it to something
if not video_meta['video_dl_uri']:
video_meta['video_dl_uri'] = fs['url']
if 'qualityLabel' in fs:
if fs['qualityLabel'] == "720p" and video_quality == "720p":
video_meta['video_dl_uri'] = fs['url']
elif fs['qualityLabel'] == "480p" and video_quality == "480p":
video_meta['video_dl_uri'] = fs['url']
elif fs['qualityLabel'] == "1080p" and video_quality == "1080p":
video_meta['video_dl_uri'] = fs['url']
......@@ -6,7 +6,6 @@
<file>ui/about.ui</file>
<file>ui/window.ui</file>
<file>ui/results.ui</file>
<file>ui/scrollers.ui</file>
<file>ui/stream.css</file>
<file>ui/video-placeholder-332x186.png</file>
<file alias="gtk/help-overlay.ui">ui/help-overlay.ui</file>
......
<?xml version="1.0" encoding="UTF-8"?>
<!-- Generated with glade 3.38.2 -->
<interface>
<requires lib="gtk+" version="3.20"/>
<template class="ScrollerBox" parent="GtkScrolledWindow">
<property name="visible">True</property>
<property name="can-focus">True</property>
<property name="hexpand">False</property>
<property name="vexpand">False</property>
<property name="hscrollbar-policy">never</property>
<signal name="scroll-event" handler="swallow_fullscreen_scroll_event" swapped="no"/>
<child>
<object class="GtkStack" id="stack">
<property name="visible">True</property>
<property name="can-focus">True</property>
<property name="hexpand">False</property>
<property name="vexpand">True</property>
<child>
<object class="GtkFlowBox" id="results_list">
<property name="can-focus">True</property>
<property name="visible">True</property>
<property name="vexpand">False</property>
<property name="hexpand">False</property>
<property name="max-children-per-line">1</property>
<property name="selection-mode">none</property>
<property name="activate-on-single-click">False</property>
<child>
<placeholder/>
</child>
</object>
<packing>
<property name="position">2</property>
</packing>
</child>
<child>
<object class="GtkBox" id="scroller_error_box">
<property name="visible">False</property>
<property name="can-focus">False</property>
<property name="halign">center</property>
<property name="valign">center</property>
<property name="orientation">vertical</property>
<child>
<object class="GtkImage" id="scroller_error_image">
<property name="visible">True</property>
<property name="can-focus">False</property>
<property name="icon-name">network-error-symbolic</property>
<property name="icon_size">6</property>
</object>
<packing>
<property name="expand">False</property>
<property name="fill">True</property>
<property name="position">0</property>
</packing>
</child>
<child>
<object class="GtkLabel" id="scroller_error_heading">
<property name="visible">True</property>
<property name="can-focus">False</property>
<property name="margin-top">12</property>
<property name="wrap">True</property>
<property name="label" translatable="yes">Error</property>
<attributes>
<attribute name="weight" value="bold"/>
<attribute name="scale" value="1.2"/>
</attributes>
</object>
<packing>
<property name="expand">False</property>
<property name="fill">True</property>
<property name="position">1</property>
</packing>
</child>
<child>
<object class="GtkLabel" id="scroller_error_text">
<property name="visible">True</property>
<property name="can-focus">False</property>
<property name="margin-top">6</property>
<property name="wrap">True</property>
<property name="label" translatable="yes">...</property>