-
Notifications
You must be signed in to change notification settings - Fork 137
/
config.py
147 lines (118 loc) · 4.84 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# encoding: utf-8
import os
import multiprocessing
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
import logging
import time
from logging.handlers import TimedRotatingFileHandler
import urllib3.fields as f
import six
import email
basedir = os.path.abspath(os.path.dirname(__file__))
def my_format_header_param(name, value):
if not any(ch in value for ch in '"\\\r\n'):
result = '%s="%s"' % (name, value)
try:
result.encode('utf-8')
except (UnicodeEncodeError, UnicodeDecodeError):
pass
else:
return result
if not six.PY3 and isinstance(value, six.text_type): # Python 2:
value = value.encode('utf-8')
value = email.utils.encode_rfc2231(value, 'utf-8')
value = '%s*=%s' % (name, value)
return value
# 猴子补丁,修复request上传文件时,不能传中文
f.format_header_param = my_format_header_param
class SafeLog(TimedRotatingFileHandler):
"""
因为TimedRotatingFileHandler在多进程访问log文件时,切分log日志会报错文件被占用,所以修复这个问题
"""
def __init__(self, *args, **kwargs):
super(SafeLog, self).__init__(*args, **kwargs)
self.suffix_time = ""
self.origin_basename = self.baseFilename
def shouldRollover(self, record):
time_tuple = time.localtime()
if self.suffix_time != time.strftime(self.suffix, time_tuple) or not os.path.exists(
self.origin_basename + '.' + self.suffix_time):
return 1
else:
return 0
def doRollover(self):
if self.stream:
self.stream.close()
self.stream = None
current_time_tuple = time.localtime()
self.suffix_time = time.strftime(self.suffix, current_time_tuple)
self.baseFilename = self.origin_basename + '.' + self.suffix_time
self.mode = 'a'
with multiprocessing.Lock():
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
if not self.delay:
self.stream = self._open()
def getFilesToDelete(self):
# 将源代码的 self.baseFilename 改为 self.origin_basename
dir_name, base_name = os.path.split(self.origin_basename)
file_names = os.listdir(dir_name)
result = []
prefix = base_name + "."
p_len = len(prefix)
for fileName in file_names:
if fileName[:p_len] == prefix:
suffix = fileName[p_len:]
if self.extMatch.match(suffix):
result.append(os.path.join(dir_name, fileName))
if len(result) < self.backupCount:
result = []
else:
result.sort()
result = result[:len(result) - self.backupCount]
return result
def config_log():
"""
日志配置
:return:
"""
handler = SafeLog(filename=os.path.abspath('..') + r'/logs/' + 'logger', interval=1, backupCount=50, when="D",
encoding='UTF-8')
handler.suffix = "%Y-%m-%d.log"
logging_format = logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)s - %(message)s')
handler.setFormatter(logging_format)
# handler.setLevel(logging.INFO)
return handler
class Config:
SECRET_KEY = 'BaSeQuie'
basedir = os.path.abspath(os.path.dirname(__file__))
# sqlite数据库的地址
SQLALCHEMY_DATABASE_URI = "sqlite:///" + os.path.join(basedir, "data.sqlite")
SQLALCHEMY_COMMIT_ON_TEARDOWN = True
SQLALCHEMY_TRACK_MODIFICATIONS = False
CSRF_ENABLED = True
UPLOAD_FOLDER = '/upload'
# DEBUG = True
SCHEDULER_API_ENABLED = True
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(Config):
SQLALCHEMY_DATABASE_URI = os.environ.get('DEV_DATABASE_URL') or 'sqlite:///' + os.path.join(basedir, 'data.sqlite')
SCHEDULER_JOBSTORES = {'default': SQLAlchemyJobStore(url=SQLALCHEMY_DATABASE_URI)}
class ProductionConfig(Config):
# SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:[email protected]:3306/api_test?charset=utf8mb4' # 123456表示密码,test代表数据库名称
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:[email protected]:3306/api_test' # 123456表示密码,test代表数据库名称
# SCHEDULER_JOBSTORES = {'default': SQLAlchemyJobStore(url=SQLALCHEMY_DATABASE_URI,
# engine_options={'pool_pre_ping': True})}
SCHEDULER_JOBSTORES = {'default': SQLAlchemyJobStore(url=SQLALCHEMY_DATABASE_URI,
engine_options={'pool_pre_ping': True})}
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_POOL_SIZE = 1000
SQLALCHEMY_POOL_RECYCLE = 1800
config = {
# 'default': DevelopmentConfig,
'default': ProductionConfig,
'Production': ProductionConfig,
}