Skip to content
Snippets Groups Projects
Commit 04e74f7c authored by Guido Gunther's avatar Guido Gunther :zzz:
Browse files

Get modem and ussd interfaces async

parent dcb59c1d
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,12 @@ from gi.repository import Gio
import logging
MM_DBUS_SERVICE = 'org.freedesktop.ModemManager1'
MM_DBUS_TIMEOUT = 5000
MM_DBUS_FLAGS = (Gio.DBusProxyFlags.DO_NOT_LOAD_PROPERTIES |
Gio.DBusProxyFlags.DO_NOT_CONNECT_SIGNALS)
class ModemError(Exception):
def __init__(self, msg):
self.msg = msg
......@@ -32,14 +38,56 @@ class ModemError(Exception):
class Modem(GObject.GObject):
MM_DBUS_INTERFACE_MODEM = 'org.freedesktop.ModemManager1.Modem'
MM_DBUS_INTERFACE_MODEM_GSM_USSD = "{}.Modem3gpp.Ussd".format(MM_DBUS_INTERFACE_MODEM)
def on_new_proxy_done(self, proxy, res, iface_name):
try:
_proxy = proxy.new_for_bus_finish(res)
except GLib.Error:
logging.exception("Failed to get iface '%s' for modem %s",
iface_name, self.path)
setattr(self, "_%s_proxy" % iface_name, _proxy)
def __init__(self, path):
GObject.GObject.__init__(self)
self._path = path
self._modem_proxy = None
Gio.DBusProxy.new_for_bus(Gio.BusType.SYSTEM,
MM_DBUS_FLAGS,
None,
MM_DBUS_SERVICE,
self.path,
self.MM_DBUS_INTERFACE_MODEM,
None,
self.on_new_proxy_done,
'modem')
self._ussd_proxy = None
Gio.DBusProxy.new_for_bus(Gio.BusType.SYSTEM,
MM_DBUS_FLAGS,
None,
MM_DBUS_SERVICE,
self.path,
self.MM_DBUS_INTERFACE_MODEM_GSM_USSD,
None,
self.on_new_proxy_done,
'ussd')
@property
def path(self):
return self._path
@property
def modem_proxy(self):
return self._modem_proxy
@property
def ussd_proxy(self):
return self._ussd_proxy
class ModemManagerProxy(GObject.GObject):
"""Interface to ModemManager DBus API
@ivar request: current pending request to ModemManager
......@@ -50,15 +98,9 @@ class ModemManagerProxy(GObject.GObject):
DBUS_INTERFACE_PROPERTIES = 'org.freedesktop.DBus.Properties'
DBUS_INTERFACE_OBJECT_MANAGER = 'org.freedesktop.DBus.ObjectManager'
MM_DBUS_SERVICE = 'org.freedesktop.ModemManager1'
MM_DBUS_INTERFACE_MODEM_MANAGER = 'org.freedesktop.ModemManager1'
MM_DBUS_OBJECT_MODEM_MANAGER = '/org/freedesktop/ModemManager1'
MM_DBUS_INTERFACE_MODEM = 'org.freedesktop.ModemManager1.Modem'
MM_DBUS_INTERFACE_SIM = 'org.freedesktop.ModemManager1.Sim'
MM_DBUS_INTERFACE_MODEM_GSM_USSD = 'org.freedesktop.ModemManager1.Modem.Modem3gpp.Ussd'
MM_DBUS_TIMEOUT = 5000
MM_DBUS_FLAGS = (Gio.DBusProxyFlags.DO_NOT_LOAD_PROPERTIES |
Gio.DBusProxyFlags.DO_NOT_CONNECT_SIGNALS)
# IMSIs are 14 or 15 digits
IMSI_RE = r'\d{14,15}'
......@@ -84,7 +126,6 @@ class ModemManagerProxy(GObject.GObject):
def __init__(self):
GObject.GObject.__init__(self)
self.bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
self.request = None
self.reply_func = None
self.error_func = None
......@@ -94,9 +135,9 @@ class ModemManagerProxy(GObject.GObject):
self.object_manager = None
Gio.DBusProxy.new_for_bus(Gio.BusType.SYSTEM,
self.MM_DBUS_FLAGS,
MM_DBUS_FLAGS,
None,
self.MM_DBUS_SERVICE,
MM_DBUS_SERVICE,
self.MM_DBUS_OBJECT_MODEM_MANAGER,
self.DBUS_INTERFACE_OBJECT_MANAGER,
None,
......@@ -175,7 +216,7 @@ class ModemManagerProxy(GObject.GObject):
for obj in objs:
for path, ifaces in obj.items():
if self.MM_DBUS_INTERFACE_MODEM in ifaces:
if Modem.MM_DBUS_INTERFACE_MODEM in ifaces:
self._modems.append(Modem(path))
logging.debug("Found modems: %s", self.modems)
self.emit('got-modems', self)
......@@ -189,19 +230,20 @@ class ModemManagerProxy(GObject.GObject):
self.object_manager.call("GetManagedObjects",
None,
Gio.DBusCallFlags.NO_AUTO_START,
self.MM_DBUS_TIMEOUT,
MM_DBUS_TIMEOUT,
None,
self.on_get_managed_objects_finished,
None)
def get_imsi(self):
card = Gio.DBusProxy.new_sync(self.bus,
self.MM_DBUS_FLAGS,
None,
self.MM_DBUS_SERVICE,
self.objects()[self.modem.path][self.MM_DBUS_INTERFACE_MODEM]['Sim'],
self.DBUS_INTERFACE_PROPERTIES,
None)
card = Gio.DBusProxy.new_for_bus_sync(
Gio.BusType.SYSTEM,
MM_DBUS_FLAGS,
None,
MM_DBUS_SERVICE,
self.objects()[self.modem.path][Modem.MM_DBUS_INTERFACE_MODEM]['Sim'],
self.DBUS_INTERFACE_PROPERTIES,
None)
try:
return card.Get('(ss)', self.MM_DBUS_INTERFACE_SIM, 'Imsi')
except Exception as msg:
......@@ -215,29 +257,15 @@ class ModemManagerProxy(GObject.GObject):
@mm_request
def ussd_initiate(self, command, reply_func=None, error_func=None):
ussd = Gio.DBusProxy.new_sync(self.bus,
self.MM_DBUS_FLAGS,
None,
self.MM_DBUS_SERVICE,
self.modem.path,
self.MM_DBUS_INTERFACE_MODEM_GSM_USSD,
None)
ussd.call("Initiate", GLib.Variant('(s)', (command,)),
Gio.DBusCallFlags.NO_AUTO_START, self.MM_DBUS_TIMEOUT, None,
self.handle_dbus_reply, None)
self.modem.ussd_proxy.call("Initiate", GLib.Variant('(s)', (command,)),
Gio.DBusCallFlags.NO_AUTO_START, MM_DBUS_TIMEOUT, None,
self.handle_dbus_reply, None)
@mm_request
def _modem__enable(self, enable, reply_func=None, error_func=None):
ussd = Gio.DBusProxy.new_sync(self.bus,
self.MM_DBUS_FLAGS,
None,
self.MM_DBUS_SERVICE,
self.modem.path,
self.MM_DBUS_INTERFACE_MODEM,
None)
ussd.call("Enable", GLib.Variant('(b)', (enable,)),
Gio.DBusCallFlags.NO_AUTO_START, self.MM_DBUS_TIMEOUT, None,
self.handle_dbus_reply, None)
self.modem.modem_proxy.call("Enable", GLib.Variant('(b)', (enable,)),
Gio.DBusCallFlags.NO_AUTO_START, MM_DBUS_TIMEOUT, None,
self.handle_dbus_reply, None)
def modem_enable(self, reply_func=None, error_func=None):
self._modem__enable(True,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment