Linux SoundTouch Contrôleur

Contenu du snippet

#!/usr/bin/python2
# -*- coding: iso-8859-15 -*-

from sys import exit, argv, exc_info
from os.path import expanduser, join, dirname
from websocket import create_connection, WebSocketApp
from re import compile, findall
from time import sleep, time
from gobject import threads_init

import socket
import gtk
import threading

Version = 0.02
Create = '20 juillet 2015'
Update = '25 juillet 2015'
Name = 'SoundTouch Control', 'stc'
Verbose = 0

# Réglage du volume "sourdine"
Sourdine = 8

"""
Auteur : Jean-Jacques Varvenne
eMail : jjvarvenne@gmail.com
Licence: Tous usages sauf commercial

Le but est de comprendre le fonctionnement d'un "SoundTouch 20" et de
le piloter avec une interface graphique GTK

Ce programme recherche le 1er appareil et si connecte.

Après connexion les présélections s'affichent, la station en cours et
le volume.

Un click sur une présélection change la station, le curseur permet
de régler le volume.

Pour l'instant rien d'autre!

"""

########################################################################
#
# GTK: GUI
#
########################################################################
class TGUI( object ):

def __init__( Self ):
super( TGUI, Self ).__init__()

Wnd = gtk.Window()
Wnd.set_title( 'SoundTouch Control' )
Wnd.connect( 'delete_event', gtk.main_quit )

Wnd.set_position( gtk.WIN_POS_CENTER_ALWAYS )

try:
Wnd.set_icon_from_file( join( dirname( argv[0] ), 'stc.png' ) )
except:
pass

Wnd.show()

# Création d'une boite verticale dont les compartiments sont homogène
# et avec un espace de 5 pixels
##################################################################
VBox = gtk.VBox( True, 5 )
Wnd.add( VBox )
VBox.show()

def VBoxAddWidget( Child ):
VBox.pack_start( Child )
Child.show()
return Child

# Création du cadre contenant la radio en cours
##################################################################
F = VBoxAddWidget( gtk.Frame() )
Station = gtk.Label()
Station.set_markup('<b>Un instant ...</b>' )

F.add( Station )
Station.show()

# Création des 6 boutons de préselection
##################################################################
Presets = []

for I in range(6):
Presets.append( VBoxAddWidget( gtk.Button( '-' ) ) )

# Bouton de réglage du volume et mute
##################################################################
Volume = VBoxAddWidget( gtk.HScale( gtk.Adjustment( 0, 0, 100 ) ) )
Volume.set_digits( 0 )
Volume.set_update_policy( gtk.UPDATE_DELAYED )

# Bouton "mute"
Self.Mute = VBoxAddWidget( gtk.Button( 'Sourdine' ) )

# Bouton ON/OFF
##################################################################
Self.Power = VBoxAddWidget( gtk.Button( '---' ) )

# Eléments accessibles depuis l'hôte
##################################################################
Self.Station = Station # Pour changer le nom de la radio en cours
Self.Presets = Presets # Les 6 boutons des 6 présélections
Self.Volume = Volume

return

########################################################################
#
# Contrôleur de l'appareil via le websocket
#
########################################################################
class TWS( object ):

# Initialisation de la classe et création de la connexion avec le appareil
######################################################################
def __init__( Self, CIV ):

super( TWS, Self ).__init__()

Self.CIV = CIV
Self.GUI = CIV.GUI # Plus simple par la suite
Self.WS = None
Self.IP = None
Self.DeviceID = None
Self.IsSendMsg = 0
Self.ReqID = 0
Self.BtnDown = time()
Self.BtnPresets = {}
Self.NoChangeVolume = True
Self.Volume = -1
Self.OldVolume = -1
Self.PowerStat = -1
Self.WSProto = '<msg>'
'<header deviceID="%(DeviceID)s" url="%(Url)s" method="%(Method)s">'
'<request requestID="%(ReqID)d">%(Request)s</request>'
'<info %(Info)s type="%(Type)s" />'
'</header>'
'%(Body)s'
'</msg>'

return

# Lance la connexion avec le serveur, ceci est bloquant
######################################################################
def Run( Self, URL ):

if Verbose:
print "WS1001: Tentative d'ouverture avec: %s" % URL

Self.WS = WebSocketApp( 'ws://%s:8080' % URL,
header = [ 'Sec-WebSocket-Protocol: gabbo' ],
on_message = Self.WSEventMessage,
on_open = Self.WSEventOpen,
on_error = Self.WSEventError,
on_close = Self.WSEventClose
)

# Ceci est bloquant!
Self.WS.run_forever()

return

# Arrête la connexion avec le serveur
######################################################################
def Stop( Self ):

if Verbose:
print 'WS1002: Demande de déconnexion'

# fermeture de la connexion
if Self.WS:
Self.WS.close()

return

# Ajoute une commande d'affichage à la liste
######################################################################
def AddCmd( Self, Widget, Msg ):
Self.CIV.AddCmd( ( Widget, Msg ) )
return

# Envoi un message au appareil
######################################################################
def SendMsg( Self,
Url,
Request = '',
Info = '',
Type = 'new',
Body = ''
):

# Interdit l'envoi de 2 messages en même temps
I = 0

while Self.IsSendMsg:

I += 1

if I > 100:
# Trop d'attente!
return

if Verbose:
print 'WS1004: SendMsg[%d]: refused !' % I

sleep( 0.2 )

# Message en cours d'envoi
Self.IsSendMsg += 1
try:

Self.ReqID += 1

XML = Self.WSProto % { 'DeviceID' : Self.DeviceID,
'Url' : Url,
'Method' : len( Body ) > 5 and 'POST' or 'GET',
'ReqID' : Self.ReqID,
'Request' : Request,
'Info' : Info,
'Type' : Type,
'Body' : len( Body ) > 5 and ( '<body>' + Body + '</body>' ) or ''
}

if Verbose:
print 'WS1005: SendMsg[%d]: %s' % ( Self.ReqID, XML )

# Envoi la commande
Self.WS.send( XML )

finally:
Self.IsSendMsg -= 1

return

# Event: Ouverture de la connexion avec le appareil
######################################################################
def WSEventOpen( Self, WS ):

if Verbose:
print 'WS1101: OPEN => %s' % Self.WS.url

# Demande les infos pour obtenir le deviceID
Self.SendMsg( 'info' )
sleep( 0.2 )

# Demande la station en cours
Self.SendMsg( 'now_playing' )
sleep( 0.2 )

# Demande les présélections
Self.SendMsg( 'presets' )
sleep( 0.2 )

# Demande le volume actuel
Self.SendMsg( 'volume' )
sleep( 0.2 )

# Activation des évènements des boutons de présélections
for Btn in Self.GUI.Presets:
Btn.connect( 'pressed' , Self.EventSelectDown )
Btn.connect( 'released', Self.EventSelectUp )

# Activation de l'évènement de changement du volume
Self.GUI.Volume.connect( 'value-changed', Self.VolumeUpdate )

# Activation de l'évènement de la sourdine
Self.GUI.Mute.connect( 'clicked', Self.EventMute )

# Activation de l'évènement de la mise en pause (power)
Self.GUI.Power.connect( 'clicked', Self.EventPower )

return

# Event: Erreur provenant du appareil
######################################################################
def WSEventError( Self, WS, Error ):
if Verbose:
print 'WS1301: ERROR: %s' % Error

return

# Event: Fermeture de la connexion avec le appareil
######################################################################
def WSEventClose( Self, WS ):
if Verbose:
print 'WS1302: CLOSE'

return

# Event: Réception d'un message provenant du appareil
######################################################################
def WSEventMessage( Self, WS, Msg ):

if Verbose:
print 'WS1102: MESSAGE: %s' % Msg

try:

# Volume
##############################################################
R = reGetActualVolume.findall( Msg )

if R:

# Pour ne pas entrer dans une boucle infernale!
Self.NoChangeVolume = True
try:
V = int( R[0] )
Self.Volume = V
Self.AddCmd( Self.GUI.Volume.set_value, V )
finally:
Self.NoChangeVolume = False

return

# Station en cours
##############################################################
R = reGetPlaying.findall( Msg )

if R:
Self.AddCmd( Self.GUI.Station.set_label, u'<b>%s</b>' % R[0] )
Self.AddCmd( Self.GUI.Power.set_label , u'Pause' )
Self.PowerStat = 1
return

# Poste en stanby?
##############################################################
if Msg.find( 'STANDBY' ) > 0:
Self.AddCmd( Self.GUI.Station.set_label, u'<b>STANDBY</b>' )
Self.AddCmd( Self.GUI.Power.set_label , u'Marche' )
Self.PowerStat = 0
return

# Préselections
##############################################################
R = reGetPresets.findall( Msg )

if R:

BtnPresets = {}

# Balayage des présélections
for S in R:
I = reGetIDAndItemName.findall( S )

if I:
# Trouvé, charge le texte du bouton correspondant
Idx = int( I[0][0] ) - 1
Btn = Self.GUI.Presets[ Idx ]
Self.AddCmd( Btn.set_label, I[0][1] )

# Nouveau dictionnaire des présélections
BtnPresets[ Btn ] = S

Self.BtnPresets = BtnPresets

return

# Info?
##############################################################
R = reGetDeviceIDAndIP.findall( Msg )

if R:
# Ne sert pas à grand chose pour l'instant
Self.DeviceID = R[0][0]
Self.IP = R[0][1]
return

except:
# Affiche un code d'erreur
Self.AddCmd( Self.GUI.Station.set_label, u'WS1400: Erreur générale : %d' % LineError() )
pass

return

# Sélection d'une station / Changement de présélection
######################################################################
def EventSelectDown( Self, Widget ):
Self.BtnDown = time()
return

def EventSelectUp( Self, Widget ):

# Click de moins de 2 secondes?
if time() - Self.BtnDown < 2:

# Oui, alors change la station avec la présélection correspondante
Self.AddCmd( Self.GUI.Station.set_label, u'<b>. . .</b>' )

S = Self.BtnPresets[ Widget ]
I = S.find( '<ContentItem' )

if I > 0:
# Envoi la commande de changement
Self.SendMsg( 'select',
Body = S[I:]
)
return

# Changement de présélection avec la station en cours
# A faire...

return

# Changement de volume
######################################################################
def VolumeUpdate( Self, Widget ):

if Self.NoChangeVolume:
return

# Volume en cours de réglage
Self.Volume = int( Widget.get_adjustment().get_value() )

Self.SendMsg( 'volume',
Info = 'mainNode="volume"',
Body = '<volume>%d</volume>' % Self.Volume
)

return

# Mise en sourdine
######################################################################
def EventMute( Self, Widget ):

if Self.NoChangeVolume:
return

# Sortie de la sourdine?
if Self.OldVolume > 0:
# Oui, alors remise à l'ancien niveau
Self.AddCmd( Self.GUI.Volume.set_value, Self.OldVolume )
Self.OldVolume = -1
else:
# Non, alors mise en sourdine
Self.OldVolume = Self.Volume
Self.AddCmd( Self.GUI.Volume.set_value, Sourdine )

return

# Mise en marche / arrêt
######################################################################
def EventPower( Self, Widget ):

Self.SendMsg( 'key',
Info = 'mainNode="keyPress"',
Body = '<key state="press" sender="Gabbo">POWER</key>'
)

sleep( 0.02 )

Self.SendMsg( 'key',
Info = 'mainNode="keyRelease"',
Body = '<key state="release" sender="Gabbo">POWER</key>'
)

sleep( 0.2 )

# Demande l'état actuel
Self.SendMsg( 'now_playing' )

return

########################################################################
#
# Contrôleur de l'interface visuelle
#
########################################################################
class TCIV( threading.Thread ):

# Initialisation de la classe
######################################################################
def __init__( Self, GUI ):
super( TCIV, Self ).__init__()
Self.GUI = GUI
Self.IsRun = None
Self.Event = threading.Event()
Self.Lock = threading.Lock()
Self.CmdList = []
return

# Contrôle du thread: run
######################################################################
def run( Self ):

Self.IsRun = True

# Boucle tant que possible
while Self.IsRun:

# Attend un évènement "set"
# "wait" retourne True si un isSet() est déclenché, sinon la
# fonction fait une pause pendant 10 secondes.
# Passé les 10 secondes, un tour de boucle (while) est accomplie,
# histoire de faire quelque chose à interval régulier
if Self.Event.wait( 1 ):

# Boucle de "vidage" de la liste des commandes en cours
with Self.Lock:

while Self.CmdList:

Cmd = Self.CmdList[0]
del Self.CmdList[0]

if Self.GUI:
gtk.threads_enter()
try:
Cmd[0]( Cmd[1] )
finally:
gtk.threads_leave()

# Réinitialise le "set", sans cela la boucle de "vidage" se
# relance immédiatement à cause du "wait" qui passe outre
Self.Event.clear()

else:
# Fin du délai d'attente du wait()
pass

return

# Contrôle du thread: stop
######################################################################
def stop( Self ):
Self.IsRun = False
Self.Event.set()
return

# Contrôle du thread: ajoute une commande dans la liste
######################################################################
def AddCmd( Self, Cmd ):
with Self.Lock:
Self.CmdList.append( Cmd )
Self.Event.set()
return

########################################################################
#
# Recherche du premier appareil
#
########################################################################
class TDeviceCrontrol( threading.Thread ):

# Initialisation de la classe
######################################################################
def __init__( Self, CIV ):
super( TDeviceCrontrol, Self ).__init__()
Self.CIV = CIV
Self.Event = threading.Event()
Self.IP = None
return

# Recherche du 1er appareil connecté
######################################################################
def run( Self ):

# Tente la connexion avec la précédente IP
###########################################################
try:
F = open( LastIPFile )
R = F.readlines()
IP = R[0].strip()

F.close()

# Lance directement le contrôleur
try:
Self.WS = TWS( Self.CIV )
Self.WS.Run( IP )

# OK, arrêt du thread
return

except:
print 'DC1001: Erreur générale du contrôleur websocket:', LineError()

except:
# Raté, soit l'appareil n'est pas connecté, soit il à changé de IP
pass

# Début de la procédure de recherche d'un appareil
###########################################################
try:

BIP = socket.gethostbyname( socket.gethostname() ).split( '.' )

# Balayage de toutes les IPs
for I in range( 1, 255 ):

if Self.Event.isSet():
break

# Construction de l'adresse IP de test
BIP[3] = '%d' % I

try:

# IP en chaîne
IP = '.'.join( BIP )

# Affichage dans l'interface de la progression
Self.CIV.AddCmd(
( CIV.GUI.Station.set_label,
u"Recherche d'un appareil: %d %%" % ( float( I ) / 254 * 100 )
) )

# Tente une ouverture de socket
Sock = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
Sock.settimeout( 0.2 )
Conn = Sock.connect( ( IP, 8080 ) )
Sock.close()

try:
WS = create_connection( "ws://%s:8080" % IP,
header = [ 'Sec-WebSocket-Protocol: gabbo' ],
timeout = 0.4
)

# Recherche le nom de l'appareil
WS.send( 'info' )

Self.CIV.AddCmd(
( CIV.GUI.Station.set_label,
u'Trouvé : %sn<b>%s</b>' % ( IP, reGetDeviceName.findall( WS.recv() )[0] )
) )

WS.close()

# Renseigne l'adresse IP de l'appareil
Self.IP = IP

# Device localisé, enregistre l'IP pour la prochaine fois
F = open( LastIPFile, 'wb' )
F.write( IP + 'n' )
F.close()

# Lance le contrôleur
try:
Self.WS = TWS( Self.CIV )
Self.WS.Run( IP )
except:
print 'DC1002: Erreur générale du contrôleur websocket'

# Fin du thread
break

except:
# Essaie l'ip suivante
continue

except:
Sock.close()

else:
Self.CIV.AddCmd( ( CIV.GUI.Station.set_label, u'Aucun appareil de localisé' ) )

except:
try:
Self.CIV.AddCmd(
( CIV.GUI.Station.set_label,
u'DC1098: Erreur générale (%d)' % LineError()
) )
except:
print 'DC1099: Erreur générale: %d' % LineError()

return

# Contrôle du thread: stop
######################################################################
def stop( Self ):
Self.WS.Stop()
Self.Event.set()
return

########################################################################
#
# Initialisatons et créations des controleurs
#
########################################################################

# Permet de tenter une relance du précédent appareil directement
LastIPFile = join( expanduser( '~' ), '.stc.ip' )

# Décodages des réponses
reGetDeviceName = compile( r'<name>([^<]+)' )
reGetDeviceIDAndIP = compile( r'<info.+deviceID="([^"]+).+<ipAddress>([^0<]+)<' )
reGetPlaying = compile( r'<stationName>([^<]+)' )
reGetPresets = compile( r'<preset (.*?)</preset>' )
reGetIDAndItemName = compile( r'id="([^"]+).+<itemName>([^<]+)' )
reGetActualVolume = compile( r'<actualvolume>([^<]+)' )

# Permet de débugger plus facilement en affichant le numéro de la ligne
# qui à produite l'erreur
def LineError():
return exc_info()[2].tb_lineno

# Interface visuelle GTK
threads_init()
GUI = TGUI()

# Contrôleur de l'interface visuelle
CIV = TCIV( GUI )
CIV.start()
sleep( 0.3 )

# Contrôleur de l'appareil: recherche, connexion, contrôle
DC = TDeviceCrontrol( CIV )
DC.start()

# Lance le contrôle de l'interface visuelle GTK
try:
gtk.main()
finally:

# Arrête le contrôleur de l'appareil
DC.stop()

# Arrête le contrôleur de l'interface visuelle
CIV.stop()

# Attends les arrêts des threads
DC.join()
CIV.join()

Compatibilité : 0.02

A voir également

Vous n'êtes pas encore membre ?

inscrivez-vous, c'est gratuit et ça prend moins d'une minute !

Les membres obtiennent plus de réponses que les utilisateurs anonymes.

Le fait d'être membre vous permet d'avoir un suivi détaillé de vos demandes et codes sources.

Le fait d'être membre vous permet d'avoir des options supplémentaires.