-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpyb_files.py
94 lines (83 loc) · 3.32 KB
/
pyb_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import textwrap
import sys
from pyboard import PyboardError
BUFFER_SIZE = 1024
class DirectoryExistsError(Exception):
...
class Files(object):
def __init__(self, pyboard):
self._pyboard = pyboard
def mkdir(self, directory=None, exists_okay=True, directory_list: list = None):
try:
self._pyboard.enter_raw_repl()
except PyboardError:
raise PyboardError
if directory_list is not None:
if directory is not None:
directory_list.append(directory)
for dir in directory_list:
# Execute os.mkdir command on the board.
command = """
try:
import os
except ImportError:
import uos as os
os.mkdir('{0}')
""".format(
dir
)
try:
self._pyboard.exec_(textwrap.dedent(command))
print(f"Directory Created: {dir}")
except PyboardError as ex:
# Check if this is an OSError #17, i.e. directory already exists.
if (
ex.args[2].decode("utf-8").find("OSError: [Errno 17] EEXIST")
!= -1
):
if not exists_okay:
raise DirectoryExistsError(
"Directory already exists: {0}".format(directory)
)
if exists_okay:
print(f"Directory already exists -> Verified: {dir}")
else:
raise ex
self._pyboard.exit_raw_repl()
def put(
self,
files_and_data_to_bulk_write,
):
try:
self._pyboard.enter_raw_repl()
except PyboardError:
raise PyboardError
try:
file_count = len(files_and_data_to_bulk_write.keys())
current_file = 0
for file, data in files_and_data_to_bulk_write.items():
current_file += 1
self._pyboard.exec_("f = open('{0}', 'wb')".format(file))
size = len(data)
written = 0
# Loop through and write a buffer size chunk of data at a time.
for i in range(0, size, BUFFER_SIZE):
sys.stdout.write(
f'\r[{current_file} of {file_count}] "{file}" >>> {written} of {size}'
)
sys.stdout.flush()
chunk_size = min(BUFFER_SIZE, size - i)
chunk = repr(data[i : i + chunk_size])
# Make sure to send explicit byte strings (handles python 2 compatibility).
if not chunk.startswith("b"):
chunk = "b" + chunk
self._pyboard.exec_("f.write({0})".format(chunk))
written = i
self._pyboard.exec_("f.close()")
sys.stdout.write(
f'\r[{current_file} of {file_count}] "{file}" >>> {size} of {size}\n'
)
except PyboardError as ex:
print(ex.args[2].decode("utf-8"))
raise ex
self._pyboard.exit_raw_repl()