-
Notifications
You must be signed in to change notification settings - Fork 3
/
upload_table.py
executable file
·149 lines (128 loc) · 4.31 KB
/
upload_table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/python3
import argparse
import csv
import io
import logging
import time
import requests
import config
from salesforce import get_SalesforceBulk
from tabledesc import TableDesc
DEFAULT_MAX_UPLOAD_SIZE = 10000000
DEFAULT_MAX_UPLOAD_RECORDS = 10000
csvdialect = {
'delimiter': ',',
'doublequote': True,
'escapechar': None,
'lineterminator': '\n',
'quotechar': '"',
'quoting': csv.QUOTE_MINIMAL,
'skipinitialspace': False,
'strict': True,
}
def csv_reader(csvfilename):
"""
Takes a postgresql csv file (commas, headers, escape " as "")
Yields lines, as string (not list)
"""
with open(csvfilename, encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, **csvdialect)
for line in reader:
buf = io.StringIO()
writer = csv.writer(buf, **csvdialect)
writer.writerow(line)
yield buf.getvalue()
def csv_split(
csvfilename,
max_size=DEFAULT_MAX_UPLOAD_SIZE,
max_records=DEFAULT_MAX_UPLOAD_RECORDS):
"""
Takes a postgresql csv file (commas, headers, escape " as "")
Yield readable StringIOs with the same format, and a maximum size
"""
logger = logging.getLogger(__name__)
headers = None
buff = ''
chunk_nb_lines = 0
for line in csv_reader(csvfilename):
if headers is None:
headers = line
buff = headers
chunk_nb_lines = 0
continue
if (chunk_nb_lines >= max_records
or len(buff) + len(line) >= max_size):
logger.debug(
"Chunk with %s bytes, %s lines",
len(buff), chunk_nb_lines)
yield io.BytesIO(buff.encode())
buff = headers
chunk_nb_lines = 0
buff += line
chunk_nb_lines += 1
logger.debug("Chunk with %s bytes, %s lines", len(buff), chunk_nb_lines)
yield io.BytesIO(buff.encode())
def upload_csv(
tabledesc,
csvfilename,
max_size=DEFAULT_MAX_UPLOAD_SIZE,
max_records=DEFAULT_MAX_UPLOAD_RECORDS):
logger = logging.getLogger(__name__)
bulk = get_SalesforceBulk()
jobid = bulk.create_update_job(tabledesc.name, contentType='CSV')
for chunk in csv_split(csvfilename, max_size, max_records):
batchid = bulk.post_batch(jobid, chunk)
while True:
try:
bulk.wait_for_batch(jobid, batchid)
except requests.exceptions.ConnectionError as exc:
logger.error('wait_for_batch failed, retrying...: %s', exc)
time.sleep(1)
else:
break
logger.debug("%s", bulk.get_batch_results(batchid))
bulk.close_job(jobid)
if __name__ == '__main__':
def main():
parser = argparse.ArgumentParser(
description='Upload a csv file into a salesforce table',
epilog='This uses a single Salesforce Bulk V1 "update" API.'
' The CSV file is cut in chunks.'
' Each chunk is submited as a batch in the job.'
)
parser.add_argument(
'--max-upload-size',
type=int,
default=DEFAULT_MAX_UPLOAD_SIZE,
help='cut csv file in chunks no larger than %(metavar)s bytes.'
' default=%(default)s',
metavar='SIZE',
)
parser.add_argument(
'--max-upload-records',
type=int,
default=DEFAULT_MAX_UPLOAD_RECORDS,
help='cut csv file in chunks with no more than %(metavar)s '
'records. default=%(default)s',
metavar='LIMIT',
)
parser.add_argument(
'sftable',
help='salesforce table name')
parser.add_argument(
'csvfile',
help='file to upload')
args = parser.parse_args()
logging.basicConfig(
filename=config.LOGFILE,
format=config.LOGFORMAT.format(
'upload_table '+args.sftable+' '+args.csvfile),
level=config.LOGLEVEL)
td = TableDesc(args.sftable)
csvfilename = args.csvfile
upload_csv(
td,
csvfilename,
max_size=args.max_upload_size,
max_records=args.max_upload_records)
main()