Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

refactoring, support for smart blinds, cli interface, updated howto #45

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
further simplications for BulbDevice, refactoring with separate utils…
….py, minor changes
dominik dienlin committed Nov 12, 2018
commit 708b1382ae41bb0705703da60fc0eaae03834f09
123 changes: 69 additions & 54 deletions pytuya/devices.py
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@


class XenonDevice(object):
def __init__(self, dev_id, address, local_key=None, dev_type=None, connection_timeout=10):
def __init__(self, dev_id, address, local_key=None, dev_type='device', connection_timeout=10):
""" Represents a Tuya device.

Args:
@@ -71,14 +71,25 @@ def _send_receive(self, payload):
Args:
payload(bytes): Data to send.
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.settimeout(self.connection_timeout)
s.connect((self.address, self.port))
s.send(payload)
data = s.recv(1024)
s.close()
return data

success, tries = False, 0
while not success and tries < 3:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.settimeout(self.connection_timeout)
s.connect((self.address, self.port))
s.send(payload)
data = s.recv(1024)
s.close()
success = True
except ConnectionResetError as e:
tries += 1

if not success:
logging.warning("failed to communicate %s" % e)
else:
return data

def generate_payload(self, command, data=None):
""" Generate the payload to send.
@@ -127,9 +138,6 @@ def generate_payload(self, command, data=None):


class Device(XenonDevice):
def __init__(self, dev_id, address, local_key=None, dev_type=None):
super(Device, self).__init__(dev_id, address, local_key, dev_type)

def status(self):
log.debug('status() entry')
# open device, send request, then close connection
@@ -212,15 +220,14 @@ def set_timer(self, num_secs):

class OutletDevice(Device):
def __init__(self, dev_id, address, local_key=None):
dev_type = 'device'
super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type)
super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type='device')


class BulbDevice(Device):
DPS_INDEX_ON = '1'
DPS_INDEX_MODE = '2'
DPS_INDEX_BRIGHTNESS = '3'
DPS_INDEX_COLOURTEMP = '4'
DPS_INDEX_COLOUR_TEMP = '4'
DPS_INDEX_COLOUR = '5'

DPS = 'dps'
@@ -231,56 +238,43 @@ def __init__(self, dev_id, address, local_key=None):
dev_type = 'device'
super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type)

def _send(self, mode=None, colour=None, brightness=None, colour_temp=None):
payload_data = {self.DPS_INDEX_MODE: mode,
self.DPS_INDEX_COLOUR: colour,
self.DPS_INDEX_BRIGHTNESS: brightness,
self.DPS_INDEX_COLOUR_TEMP: colour_temp}
payload = self.generate_payload(SET, {k: v for k, v in payload_data.items() if v is not None})
return self._send_receive(payload)

def set_colour(self, r, g, b):
""" Set colour of an rgb bulb.

Args:
r(int): Value for the colour red as int from 0-255.
g(int): Value for the colour green as int from 0-255.
b(int): Value for the colour blue as int from 0-255.
"""
if not 0 <= r <= 255:
raise ValueError("The value for red needs to be between 0 and 255.")
if not 0 <= g <= 255:
raise ValueError("The value for green needs to be between 0 and 255.")
if not 0 <= b <= 255:
raise ValueError("The value for blue needs to be between 0 and 255.")

print(BulbDevice)
hex_value = Colour.rgb_to_hex_value(r, g, b)

payload = self.generate_payload(SET, {
self.DPS_INDEX_MODE: self.DPS_MODE_COLOUR,
self.DPS_INDEX_COLOUR: hex_value})
data = self._send_receive(payload)
return data
b(int): Value for the colour blue as int from 0-255. """

for value, name in ((r, "red"), (b, "blue"), (g, "green")):
if not 0 <= value <= 255:
raise ValueError("The %s for red needs to be between 0 and 255." % name)

return self._send(self.DPS_INDEX_MODE, colour=Colour.rgb_to_hex_value(r, g, b))

def set_white(self, brightness, colour_temp):
""" Set white coloured theme of an rgb bulb.

Args:
brightness(int): Value for the brightness (25-255).
colour_temp(int): Value for the colour temperature (0-255).
"""
colour_temp(int): Value for the colour temperature (0-255). """
if not 25 <= brightness <= 255:
raise ValueError("The brightness needs to be between 25 and 255.")
if not 0 <= colour_temp <= 255:
raise ValueError("The colour temperature needs to be between 0 and 255.")

payload = self.generate_payload(SET, {
self.DPS_INDEX_MODE: self.DPS_MODE_WHITE,
self.DPS_INDEX_BRIGHTNESS: brightness,
self.DPS_INDEX_COLOURTEMP: colour_temp})

data = self._send_receive(payload)
return data
return self._send(self.DPS_MODE_WHITE, brightness=brightness, colour_temp=colour_temp)

def set_brightness(self, brightness):
""" Set the brightness value of an rgb bulb.

Args:
brightness(int): Value for the brightness (25-255).
"""
brightness(int): Value for the brightness (25-255). """
if not 25 <= brightness <= 255:
raise ValueError("The brightness needs to be between 25 and 255.")

@@ -290,24 +284,20 @@ def set_brightness(self, brightness):

def set_colour_temp(self, colour_temp):
""" Set the colour temperature of an rgb bulb.

Args:
colour_temp(int): Value for the colour temperature (0-255).
"""
colour_temp(int): Value for the colour temperature (0-255). """
if not 0 <= colour_temp <= 255:
raise ValueError("The colour temperature needs to be between 0 and 255.")

payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colour_temp})
data = self._send_receive(payload)
return data
return self._send(colour_temp=colour_temp)

def brightness(self):
""" Return brightness value """
return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS]

def colour_temp(self):
""" Return colour temperature """
return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP]
return self.status()[self.DPS][self.DPS_INDEX_COLOUR_TEMP]

def colour_rgb(self):
""" Return colour as RGB value """
@@ -324,6 +314,31 @@ def state(self):
return dict(is_on=dps[self.DPS_INDEX_ON],
mode=dps[self.DPS_INDEX_MODE],
brightness=dps[self.DPS_INDEX_BRIGHTNESS],
colourtemp=dps[self.DPS_INDEX_COLOURTEMP],
colourtemp=dps[self.DPS_INDEX_COLOUR_TEMP],
colour=dps[self.DPS_INDEX_COLOUR])


class CoverDevice(Device):
action_open = {'2': '1'}
action_close = {'2': '2'}
action_stop = {'2': '3'}

def state(self):
status = self.status()
if type(status) is bytes:
return str(status)
return {'1': "opening or open", '2': "closing or closed", '3': "stopped"}.get(status.get('dps').get('1'))

def send_action(self, action):
payload = self.generate_payload(command=SET, data=action)
self._send_receive(payload)
return

def open(self):
self.send_action(self.action_open)

def close(self):
self.send_action(self.action_close)

def stop(self):
self.send_action(self.action_stop)
Loading