Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Wistar smart curtain (0x4F6C) #786

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion broadlink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT
from .alarm import S1C
from .climate import hysen
from .cover import dooya, dooya2
from .cover import dooya, dooya2, wser
from .device import Device, ping, scan
from .hub import s3
from .light import lb1, lb2
Expand Down Expand Up @@ -183,6 +183,9 @@
dooya2: {
0x4F6E: ("DT360E-45/20", "Dooya"),
},
wser: {
0x4F6C: ("WSER", "Wistar"),
},
bg1: {
0x51E3: ("BG800/BG900", "BG Electrical"),
},
Expand Down
138 changes: 106 additions & 32 deletions broadlink/cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ class dooya(Device):
def _send(self, command: int, attribute: int = 0) -> int:
"""Send a packet to the device."""
packet = bytearray(16)
packet[0] = 0x09
packet[2] = 0xBB
packet[3] = command
packet[4] = attribute
packet[9] = 0xFA
packet[10] = 0x44
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
packet[0x00] = 0x09
packet[0x02] = 0xBB
packet[0x03] = command
packet[0x04] = attribute
packet[0x09] = 0xFA
packet[0x0A] = 0x44

resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload[4]

def open(self) -> int:
Expand Down Expand Up @@ -62,44 +63,117 @@ class dooya2(Device):

TYPE = "DT360E-2"

def _send(self, command: int, attribute: int = 0) -> int:
"""Send a packet to the device."""
checksum = 0xC0C4 + command + attribute & 0xFFFF
packet = bytearray(32)
packet[0] = 0x16
packet[2] = 0xA5
packet[3] = 0xA5
packet[4] = 0x5A
packet[5] = 0x5A
def _send(self, operation: int, data: bytes):
"""Send a command to the device."""
packet = bytearray(14)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B

data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8

packet += bytes(data)

checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[6] = checksum & 0xFF
packet[7] = checksum >> 8
packet[8] = 0x02
packet[9] = 0x0B
packet[10] = 0x0A
packet[15] = command
packet[16] = attribute

response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[0x11]
packet_len = len(packet) - 2
packet[0] = packet_len & 0xFF
packet[1] = packet_len >> 8

resp = self.send_packet(0x6a, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload

def open(self) -> None:
"""Open the curtain."""
self._send(0x01)
self._send(2, [0x00, 0x01, 0x00])

def close(self) -> None:
"""Close the curtain."""
self._send(0x02)
self._send(2, [0x00, 0x02, 0x00])

def stop(self) -> None:
"""Stop the curtain."""
self._send(0x03)
self._send(2, [0x00, 0x03, 0x00])

def get_percentage(self) -> int:
"""Return the position of the curtain."""
return self._send(0x06)
resp = self._send(1, [0x00, 0x06, 0x00])
return resp[0x11]

def set_percentage(self, new_percentage: int) -> None:
"""Set the position of the curtain."""
self._send(0x09, new_percentage)
self._send(2, [0x00, 0x09, new_percentage])


class wser(Device):
"""Controls a Wistar curtain motor"""

TYPE = "WSER"

def _send(self, operation: int, data: bytes):
"""Send a command to the device."""
packet = bytearray(14)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B

data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8

packet += bytes(data)

checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[6] = checksum & 0xFF
packet[7] = checksum >> 8

packet_len = len(packet) - 2
packet[0] = packet_len & 0xFF
packet[1] = packet_len >> 8

resp = self.send_packet(0x6a, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload

def get_position(self) -> int:
"""Return the position of the curtain."""
resp = self._send(1, [])
position = resp[0x0E]
return position

def open(self) -> int:
"""Open the curtain."""
resp = self._send(2, [0x4a, 0x31, 0xa0])
position = resp[0x0E]
return position

def close(self) -> int:
"""Close the curtain."""
resp = self._send(2, [0x61, 0x32, 0xa0])
position = resp[0x0E]
return position

def stop(self) -> int:
"""Stop the curtain."""
resp = self._send(2, [0x4c, 0x73, 0xa0])
position = resp[0x0E]
return position

def set_position(self, position: int) -> int:
"""Set the position of the curtain."""
resp = self._send(2, [position, 0x70, 0xa0])
position = resp[0x0E]
return position
Loading