@@ -66,19 +66,18 @@ def read_register(self, register: int, byte_count: int = 1, max_tries: int = 5)
66
66
buf_r = bytearray (byte_count )
67
67
buf_r [0 ] = register
68
68
buf_w = bytearray (byte_count )
69
+ cur_try = 0
69
70
for cur_try in range (1 , 1 + max_tries ):
70
71
try :
71
72
self ._i2c_bus .writeto_then_readfrom (address = self ._i2c_adr , buffer_out = buf_r , buffer_in = buf_w )
72
73
return convert_bytearry_to_uint (buf_w )
73
- except OSError as e :
74
+ # protect against sporadic errors on actual devices
75
+ # (maybe we can do something to prevent these errors?)
76
+ except (OSError , RuntimeError ) as e :
74
77
# [Errno 121] Remote I/O error
75
- LH .warning ("[%s] Failed to read register 0x%02X (%i/%i): %s" , __name__ , register , cur_try , max_tries , e )
76
- time .sleep (0.001 )
77
- except RuntimeError as e :
78
78
LH .warning ("[%s] Unable to read register 0x%02X (%i/%i): %s" , __name__ , register , cur_try , max_tries , e )
79
79
time .sleep (0.001 )
80
- else :
81
- raise RuntimeError (f"Unable to read register 0x{ register :02X} after { cur_try } attempts. Giving up." )
80
+ raise RuntimeError (f"Unable to read register 0x{ register :02X} after { cur_try } attempts. Giving up." )
82
81
83
82
def write_register (self , register : int , value : int , byte_count : int = 1 , max_tries : int = 3 ):
84
83
"""
@@ -91,23 +90,21 @@ def write_register(self, register: int, value: int, byte_count: int = 1, max_tri
91
90
"""
92
91
_validate_register_address (register )
93
92
ba = convert_uint_to_bytearry (value , byte_count )
94
- buf = bytearray (1 + len ( ba ) )
95
- buf [ 0 ] = register
96
- for i in range ( len ( ba )):
97
- buf [ 1 + i ] = ba [ i ]
93
+ buf = bytearray ([ register ] )
94
+ for byte in ba :
95
+ buf . append ( byte )
96
+ cur_try = 0
98
97
for cur_try in range (1 , 1 + max_tries ):
99
98
try :
100
99
self ._i2c_bus .writeto (address = self ._i2c_adr , buffer = buf )
101
100
return
102
- except OSError as e :
101
+ # protect against sporadic errors on actual devices
102
+ # (maybe we can do something to prevent these errors?)
103
+ except (OSError , RuntimeError ) as e :
103
104
# [Errno 121] Remote I/O error
104
- LH .warning ("[%s] Failed to read register 0x%02X (%i/%i): %s" , __name__ , register , cur_try , max_tries , e )
105
+ LH .warning ("[%s] Unable to write register 0x%02X (%i/%i): %s" , __name__ , register , cur_try , max_tries , e )
105
106
time .sleep (0.1 )
106
- except RuntimeError as e :
107
- LH .warning ("[%s] Unable to read register 0x%02X (%i/%i): %s" , __name__ , register , cur_try , max_tries , e )
108
- time .sleep (0.1 )
109
- else :
110
- raise RuntimeError (f"Unable to read register 0x{ register :02X} after { cur_try } attempts. Giving up." )
107
+ raise RuntimeError (f"Unable to read register 0x{ register :02X} after { cur_try } attempts. Giving up." )
111
108
112
109
# it is unclear if it's possible to have a multi-byte state registers
113
110
# (a register write looks exactly like a multi-byte state write)
@@ -124,19 +121,18 @@ def get_state(self, byte_count: int = 1, max_tries: int = 5) -> int:
124
121
LH .warning ("Multi byte reads are not implemented yet! Returning a single byte instead." )
125
122
byte_count = 1
126
123
buf = bytearray (byte_count )
124
+ cur_try = 0
127
125
for cur_try in range (1 , 1 + max_tries ):
128
126
try :
129
127
self ._i2c_bus .readfrom_into (address = self ._i2c_adr , buffer = buf )
130
128
return buf [0 ]
131
- except OSError as e :
129
+ # protect against sporadic errors on actual devices
130
+ # (maybe we can do something to prevent these errors?)
131
+ except (OSError , RuntimeError ) as e :
132
132
# [Errno 121] Remote I/O error
133
- LH .warning ("[%s] Failed to read state (%i/%i): %s" , __name__ , cur_try , max_tries , e )
134
- time .sleep (0.001 )
135
- except RuntimeError as e :
136
133
LH .warning ("[%s] Unable to read state (%i/%i): %s" , __name__ , cur_try , max_tries , e )
137
134
time .sleep (0.001 )
138
- else :
139
- raise RuntimeError (f"Unable to read state after { cur_try } attempts. Giving up." )
135
+ raise RuntimeError (f"Unable to read state after { cur_try } attempts. Giving up." )
140
136
141
137
def set_state (self , value : int , byte_count : int = 1 , max_tries : int = 3 ):
142
138
"""
@@ -149,19 +145,18 @@ def set_state(self, value: int, byte_count: int = 1, max_tries: int = 3):
149
145
LH .warning ("Multi byte writes are not implemented yet! Returning a single byte instead." )
150
146
byte_count = 1
151
147
buf = convert_uint_to_bytearry (value , byte_count )
148
+ cur_try = 0
152
149
for cur_try in range (1 , 1 + max_tries ):
153
150
try :
154
151
self ._i2c_bus .writeto (address = self ._i2c_adr , buffer = buf )
155
152
return
156
- except OSError as e :
153
+ # protect against sporadic errors on actual devices
154
+ # (maybe we can do something to prevent these errors?)
155
+ except (OSError , RuntimeError ) as e :
157
156
# [Errno 121] Remote I/O error
158
- LH .warning ("[%s] Failed to write state (%i/%i): %s" , __name__ , cur_try , max_tries , e )
157
+ LH .warning ("[%s] Unable to write state (%i/%i): %s" , __name__ , cur_try , max_tries , e )
159
158
time .sleep (0.1 )
160
- except RuntimeError as e :
161
- LH .warning ("[%s] Unable to write state 0x%02X (%i/%i): %s" , __name__ , cur_try , max_tries , e )
162
- time .sleep (0.1 )
163
- else :
164
- raise RuntimeError (f"Unable to write state after { cur_try } attempts. Giving up." )
159
+ raise RuntimeError (f"Unable to write state after { cur_try } attempts. Giving up." )
165
160
166
161
167
162
class BurstHandler :
@@ -182,6 +177,8 @@ def __init__(self, i2c_bus: busio.I2C, i2c_adr: int, timeout_ms: int | None = 50
182
177
self ._timeout_ms = timeout_ms
183
178
else :
184
179
raise ValueError ("Provided timeout is not a positive integer or 'None'!" )
180
+ # register '_timestart_ns' - we will populate it later on
181
+ self ._timestart_ns = 0
185
182
186
183
def __enter__ (self ) -> BurstHandle :
187
184
"""
0 commit comments