答案1
嗯,这可能不是完美的,但它是一个用于pactl
监控的很好的 shell 原型。
以下是通话运行时的部分输出:
pactl list source-outputs
Source Output #2
Driver: protocol-native.c
Owner Module: 8
Client: 11
Source: 2
Sample Specification: s16le 2ch 44100Hz
Channel Map: front-left,front-right
Format: pcm, format.sample_format = "\"s16le\"" format.rate = "44100" format.channels = "2" format.channel_map = "\"front-left,front-right\""
Corked: no
Mute: no
Volume: front-left: 65536 / 100% / 0.00 dB, front-right: 65536 / 100% / 0.00 dB
balance 0.00
Buffer Latency: 0 usec
Source Latency: 66 usec
Resample method: n/a
Properties:
application.icon_name = "chromium-browser"
media.name = "RecordStream"
application.name = "Chrome input"
native-protocol.peer = "UNIX socket client"
native-protocol.version = "30"
application.process.id = "17053"
application.process.user = "sneetsher"
application.process.host = "sneetsher-blueskies"
application.process.binary = "skypeforlinux"
window.x11.display = ":0.0"
application.language = "en_US.UTF-8"
application.process.machine_id = "00074bc0a72a47d49284ce5b9bcda899"
application.process.session_id = "c4"
module-stream-restore.id = "source-output-by-application-name:Chrome input"
pactl list sink-inputs
Sink Input #166
Driver: protocol-native.c
Owner Module: 8
Client: 16
Sink: 1
Sample Specification: float32le 2ch 44100Hz
Channel Map: front-left,front-right
Format: pcm, format.sample_format = "\"float32le\"" format.rate = "44100" format.channels = "2" format.channel_map = "\"front-left,front-right\""
Corked: no
Mute: no
Volume: front-left: 65536 / 100% / 0.00 dB, front-right: 65536 / 100% / 0.00 dB
balance 0.00
Buffer Latency: 51519 usec
Sink Latency: 11124 usec
Resample method: copy
Properties:
application.icon_name = "chromium-browser"
media.name = "Playback"
application.name = "Skype for Linux Beta"
native-protocol.peer = "UNIX socket client"
native-protocol.version = "30"
application.process.id = "17053"
application.process.user = "sneetsher"
application.process.host = "sneetsher-blueskies"
application.process.binary = "skypeforlinux"
window.x11.display = ":0.0"
application.language = "en_US.UTF-8"
application.process.machine_id = "00074bc0a72a47d49284ce5b9bcda899"
application.process.session_id = "c4"
module-stream-restore.id = "sink-input-by-application-name:Skype for Linux Beta"
Sink Input #167
Driver: protocol-native.c
Owner Module: 8
Client: 17
Sink: 1
Sample Specification: float32le 2ch 44100Hz
Channel Map: front-left,front-right
Format: pcm, format.sample_format = "\"float32le\"" format.rate = "44100" format.channels = "2" format.channel_map = "\"front-left,front-right\""
Corked: no
Mute: no
Volume: front-left: 65536 / 100% / 0.00 dB, front-right: 65536 / 100% / 0.00 dB
balance 0.00
Buffer Latency: 28480 usec
Sink Latency: 11061 usec
Resample method: copy
Properties:
application.icon_name = "chromium-browser"
media.name = "Playback"
application.name = "Skype for Linux Beta"
native-protocol.peer = "UNIX socket client"
native-protocol.version = "30"
application.process.id = "17053"
application.process.user = "sneetsher"
application.process.host = "sneetsher-blueskies"
application.process.binary = "skypeforlinux"
window.x11.display = ":0.0"
application.language = "en_US.UTF-8"
application.process.machine_id = "00074bc0a72a47d49284ce5b9bcda899"
application.process.session_id = "c4"
module-stream-restore.id = "sink-input-by-application-name:Skype for Linux Beta"
答案2
好的,我明白了。它仍然不够完善,并且只在 Ubuntu 16.04 和 Python 2.7 上进行了测试,但无论如何:
#!/usr/bin/python
# This monitors pulseaudio streams via a dbus event-listener.
#
# On Ubuntu to enable pulseaudio dbus support:
#
# echo -e '.ifexists module-dbus-protocol.so\nload-module module-dbus-protocol\n.endif' >> ~/.pulse/pulse/default.pa
# # globally: /etc/pulse/default.pa
#
# And restart pulseaudio:
#
# pkill pulseaudio; pulseaudio
#
from __future__ import division, print_function, unicode_literals
import dbus, os, gobject, logging
from dbus.mainloop.glib import DBusGMainLoop
from logging import info, debug, error, warning as warn
#logging.getLogger().setLevel(logging.DEBUG)
skypestream = []
cstr = 'org.PulseAudio.Core1'
# convert byte array to string
def dbus2str(db):
if type(db)==dbus.Struct:
return str(tuple(dbus2str(i) for i in db))
if type(db)==dbus.Array:
return "".join([dbus2str(i) for i in db])
if type(db)==dbus.Dictionary:
return dict((dbus2str(k), dbus2str(v)) for k, v in db.items())
if type(db)==dbus.String:
return db+''
if type(db)==dbus.UInt32:
return str(db+0)
if type(db)==dbus.Byte:
return chr(db)
if type(db)==dbus.Boolean:
return db==True
if type(db)==dict:
return dict((dbus2str(k), dbus2str(v)) for k, v in db.items())
return "(%s:%s)" % (type(db), db)
def sig_handler(path=None, sender=None, msg=None):
debug( '\n\npath: %s\n%s\n\nsender: %s\n%s\n\nmsg: %s\n%s\n\n',
path, dir(path), sender, dir(sender), msg, dir(msg) )
mem = msg.get_member()
dbus_pstreams = (
dbus.Interface(
pulse_bus.get_object(object_path=path),
dbus_interface='org.freedesktop.DBus.Properties'
) for path in core1.Get(
cstr,
'PlaybackStreams',
dbus_interface='org.freedesktop.DBus.Properties' )
)
pstreams = {}
for pstream in dbus_pstreams:
try:
pstreams[pstream.Get(cstr+'.Stream', 'Index')] = pstream
except dbus.exceptions.DBusException:
pass
if pstreams:
for stream in pstreams.keys():
plist = pstreams[stream].Get(cstr+'.Stream', 'PropertyList')
appname = dbus2str(plist.get('application.name', None))
if mem == 'PlaybackStreamRemoved' and path in skypestream:
skypestream.remove(path)
print('no-skype')
if appname.find('Skype') > -1:
if mem == 'NewPlaybackStream':
skypestream.append(path)
print('busy', appname)
def pulse_bus_address():
address = None
if 'PULSE_DBUS_SERVER' in os.environ:
address = os.environ['PULSE_DBUS_SERVER']
else:
bus = dbus.SessionBus()
server_lookup = bus.get_object("org.PulseAudio1",
"/org/pulseaudio/server_lookup1")
address = server_lookup.Get("org.PulseAudio.ServerLookup1",
"Address", dbus_interface="org.freedesktop.DBus.Properties")
debug(address)
if not address: raise RuntimeError('No pulseaudio dbus address found!')
return address
if __name__ == "__main__":
DBusGMainLoop(set_as_default=True)
loop = gobject.MainLoop()
pulse_bus = dbus.connection.Connection(pulse_bus_address())
core1 = pulse_bus.get_object(object_path='/org/pulseaudio/core1')
core1.ListenForSignal(cstr+'.NewPlaybackStream', dbus.Array(signature="o"))
core1.ListenForSignal(cstr+'.PlaybackStreamRemoved', dbus.Array(signature="o"))
pulse_bus.add_signal_receiver(sig_handler, message_keyword='msg')
loop.run()