-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample2.py
163 lines (127 loc) · 6.27 KB
/
example2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import asyncio
import logging
from nicett6.tt6_connection import open_connection
from nicett6.ttbus_device import TTBusDeviceAddress
from nicett6.utils import parse_example_args, run_coro_after_delay
_LOGGER = logging.getLogger(__name__)
async def run_example_commands1(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await asyncio.sleep(0.5)
await writer.send_simple_command(mask_tt_addr, "MOVE_DOWN")
await asyncio.sleep(1.0)
await writer.send_simple_command(screen_tt_addr, "READ_POS")
await asyncio.sleep(3.0)
await writer.send_simple_command(mask_tt_addr, "READ_POS")
await writer.send_simple_command(mask_tt_addr, "MOVE_UP")
await asyncio.sleep(5.0)
_LOGGER.info("run_example_commands1 completed")
async def run_example_commands2(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await writer.send_web_pos_request(screen_tt_addr)
await writer.send_hex_move_command(screen_tt_addr, 0xE0)
await asyncio.sleep(5.0)
await writer.send_simple_command(screen_tt_addr, "MOVE_UP")
await asyncio.sleep(5.0)
_LOGGER.info("run_example_commands2 completed")
async def run_example_commands3(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await writer.send_web_pos_request(screen_tt_addr)
await writer.send_hex_move_command(screen_tt_addr, 0xE0)
await asyncio.sleep(5.0)
await writer.send_web_move_command(screen_tt_addr, 0.4)
await asyncio.sleep(5.0)
await writer.send_simple_command(screen_tt_addr, "MOVE_UP")
await asyncio.sleep(15.0)
await writer.send_web_pos_request(screen_tt_addr)
await asyncio.sleep(0.5)
_LOGGER.info("run_example_commands3 completed")
async def run_example_commands_screen_up(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await writer.send_simple_command(screen_tt_addr, "MOVE_UP")
_LOGGER.info("run_example_commands_screen_up completed")
async def run_example_commands_screen_up_step(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await writer.send_simple_command(screen_tt_addr, "MOVE_UP_STEP")
_LOGGER.info("run_example_commands_screen_up_step completed")
async def run_example_commands_mask_up_step(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await writer.send_simple_command(mask_tt_addr, "MOVE_UP_STEP")
_LOGGER.info("run_example_commands_mask_up_step completed")
async def run_example_commands_screen_down(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await writer.send_simple_command(screen_tt_addr, "MOVE_DOWN")
_LOGGER.info("run_example_commands_screen_down completed")
async def run_example_commands_screen_down_step(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await writer.send_simple_command(screen_tt_addr, "MOVE_DOWN_STEP")
_LOGGER.info("run_example_commands_screen_down_step completed")
async def run_example_commands_mask_down_step(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await writer.send_simple_command(mask_tt_addr, "MOVE_DOWN_STEP")
_LOGGER.info("run_example_commands_mask_down_step completed")
async def run_example_commands_ar235(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await writer.send_hex_move_command(screen_tt_addr, 0x12)
await writer.send_hex_move_command(mask_tt_addr, 0x4A)
_LOGGER.info("run_example_commands_mask_down_step completed")
async def run_example_commands_ar20(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await writer.send_hex_move_command(screen_tt_addr, 0x0D)
await writer.send_hex_move_command(mask_tt_addr, 0xA0)
_LOGGER.info("run_example_commands_mask_down_step completed")
async def run_example_commands_ar20b(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await writer.send_hex_move_command(screen_tt_addr, 0x0B)
await writer.send_hex_move_command(mask_tt_addr, 0xA0)
_LOGGER.info("run_example_commands_mask_down_step completed")
async def run_example_commands_top_gun(writer, screen_tt_addr, mask_tt_addr):
await writer.send_web_on()
await writer.send_hex_move_command(screen_tt_addr, 0x07)
await writer.send_hex_move_command(mask_tt_addr, 0xB5)
_LOGGER.info("run_example_commands_mask_down_step completed")
async def request_screen_position(writer, tt_addr):
_LOGGER.info("requesting screen position")
responses = await writer.process_request(
writer.send_simple_command(tt_addr, "READ_POS"), 0.5
)
_LOGGER.info("screen position request responses: %r", responses)
async def read_sequences_simple(reader):
_LOGGER.info("message reader started")
async for msg in reader:
_LOGGER.info(f"msg:{msg}")
_LOGGER.info("message reader finished")
async def main(serial_port, run_example_commands):
async with open_connection(serial_port) as conn:
screen_tt_addr = TTBusDeviceAddress(0x02, 0x04)
mask_tt_addr = TTBusDeviceAddress(0x03, 0x04)
reader = conn.add_reader()
reader_task = asyncio.create_task(read_sequences_simple(reader))
writer = conn.get_writer()
example_task = asyncio.create_task(
run_example_commands(writer, screen_tt_addr, mask_tt_addr)
)
request_task = asyncio.create_task(
run_coro_after_delay(request_screen_position(writer, screen_tt_addr))
)
await example_task
await request_task
await reader_task
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
examples = (
("ex1", run_example_commands1),
("ex2", run_example_commands2),
("ex3", run_example_commands3),
("up", run_example_commands_screen_up),
("down", run_example_commands_screen_down),
("up_step", run_example_commands_screen_up_step),
("down_step", run_example_commands_screen_down_step),
("mask_up_step", run_example_commands_mask_up_step),
("mask_down_step", run_example_commands_mask_down_step),
("ar235", run_example_commands_ar235),
("ar20", run_example_commands_ar20),
("ar20b", run_example_commands_ar20b),
("tg", run_example_commands_top_gun),
)
serial_port, example = parse_example_args(examples)
asyncio.run(main(serial_port, example))