-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbt-headset-connect.py
executable file
·144 lines (125 loc) · 4.99 KB
/
bt-headset-connect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/python3.6
import time
import Bluetoothctl as blctl
import utils as utils
import argparse
import pulsectl
import sys
logger = utils.getLogger()
bl = blctl.Bluetoothctl()
pulse = pulsectl.Pulse('pulse-bt')
headset = utils.getHeadsetDetails()
args = None
def handleArguments():
"""Handles CLI arguments and saves them globally"""
parser = argparse.ArgumentParser(
description="A tool which tries to keep a Bluetooth headset connection alive. Also available: disconnect and version info"
)
parser.add_argument("--keep_alive", "-k", help="Set a flag whether to keep function alive. Take True/False as value")
parser.add_argument("--version", "-v", action="version",
version="%(prog)s " + "0.0.1")
parser.add_argument("--disconnect", "-d", help="Disconnect device", action="store_true")
parser.add_argument("--reconnect", "-r", help="Reconnect device", action="store_true")
global args
args = parser.parse_args()
def isSinkA2dp():
"""Checks whether current headset sink is A2DP sink"""
sink_list = pulse.sink_list()
if len(sink_list) < 1:
logger.error("No sinks detected")
return False
for sink in sink_list:
try:
if headset["mac_address"] == sink.proplist["device.string"]:
if "a2dp" in sink.name:
logger.info("Current sink is A2DP sink")
return True
else:
logger.info("Current sink is not A2DP sink: {}".format(sink.name))
return False
except KeyError:
logger.warn("Sink does not have property 'device.string': {}".format(str(sink)))
continue
# in case none of the sinks are related to the headset
return False
def getHeadsetCard():
"""Returns sound card which matches with headset description"""
card_list = pulse.card_list()
card_valid = False
while len(card_list) < 1 or not card_valid:
if len(card_list) < 1:
logger.warn("No cards detected")
else:
for card in card_list:
if utils.replaceColonWithUnderline(headset["mac_address"]) in card.name:
return card
else:
logger.debug("Card name ({}) does not match headset MAC address ({}). Trying next".format(card.name, headset["mac_address"]))
logger.warn("No card found. Available card names: {}".format(card_list))
reconnect()
card_list = pulse.card_list()
def changeCardActiveProfileToA2dp(card):
"""Changes active profile of card to A2DP"""
a2dp_profile_filter_result = filter(lambda x : x.name == "a2dp_sink", card.profile_list)
a2dp_profile_filter_result_list = list(a2dp_profile_filter_result)
if len(a2dp_profile_filter_result_list) != 1:
logger.error("No or multiple A2DP profile(s) found")
sys.exit(1)
a2dp_profile = a2dp_profile_filter_result_list[0]
if card.profile_active == a2dp_profile:
logger.info("Current card profile is A2DP")
else:
logger.info("Current card profile not A2DP: {}".format(card.profile_active))
logger.info("Trying to change to A2DP profile")
try:
pulse.card_profile_set(card, a2dp_profile)
except pulsectl.pulsectl.PulseOperationFailed:
logger.warning("Got PulseOperationFailed error (most likely because sound settings are screwed up). Reconnecting headset and trying again")
reconnect()
def ensureConnected():
"""Checks the connection state to the headset and eventually tries to reconnect"""
logger.info("Checking if headset is connected")
connected = bl.is_connected_with_headset()
while not connected:
connect()
connected = bl.is_connected_with_headset()
def connect():
"""Connects to the device and sleeps for a bit"""
logger.info("Trying to connect to headset")
bl.connect(headset['mac_address'])
time.sleep(5)
def disconnect():
"""Disconnects from the device and sleeps for a bit"""
logger.info("Trying to disconnect to headset")
bl.disconnect(headset['mac_address'])
time.sleep(5)
def reconnect():
"""Reconnects to the device"""
disconnect()
connect()
def ensureA2dp():
"""Checks the card profile state and eventually sets it to A2DP"""
logger.info("Checking if headset is in A2DP mode")
is_a2dp = isSinkA2dp()
while not is_a2dp:
logger.info("Trying to disconnect headset to enforce A2DP mode")
changeCardActiveProfileToA2dp(getHeadsetCard())
time.sleep(5)
is_a2dp = isSinkA2dp()
def main():
handleArguments()
if args.reconnect:
ensureConnected()
return reconnect()
elif args.disconnect:
ensureConnected()
return disconnect()
else:
ensureConnected()
ensureA2dp()
if args.keep_alive:
time.sleep(10)
main()
logger.info("Device is connected and in A2DP mode. Exiting")
if __name__ == "__main__":
main()