1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
| import os import sys import re import datetime import time import logging import multiprocessing try: import codecs except ImportError: codecs = None
class MultiprocessHandler(logging.FileHandler): """支持多进程的TimedRotatingFileHandler""" def __init__(self, filename, when='D', backupCount=0, encoding=None, delay=False): ''' Args: filename ([type]): 日志文件名 when (str, optional): 时间间隔的单位. backupCount (int, optional): 保留文件个数. encoding ([type], optional): [description]. delay (bool, optional): 是否开启 OutSteam缓存. True 表示开启缓存,OutStream输出到缓存,待缓存区满后,刷新缓存区,并输出缓存数据到文件 False表示不缓存,OutStrea直接输出到文件 ''' self.prefix = filename self.backupCount = backupCount self.when = when.upper() self.extMath = r"^\d{4}-\d{2}-\d{2}"
self.when_dict = { 'S': "%Y-%m-%d-%H-%M-%S", 'M': "%Y-%m-%d-%H-%M", 'H': "%Y-%m-%d-%H", 'D': "%Y-%m-%d" } self.suffix = self.when_dict.get(when) if not self.suffix: raise ValueError(u"指定的日期间隔单位无效: %s" % self.when) self.filefmt = os.path.join("logs", "%s.%s" % (self.prefix, self.suffix)) self.filePath = datetime.datetime.now().strftime(self.filefmt) _dir = os.path.dirname(self.filefmt) try: if not os.path.exists(_dir): os.makedirs(_dir) except Exception: print(u"创建文件夹失败") print(u"文件夹路径:" + self.filePath) pass if codecs is None: encoding = None logging.FileHandler.__init__(self, self.filePath, 'a+', encoding, delay)
def shouldChangeFileToWrite(self): ''' shouldChangeFileToWrite 更改日志写入目的写入文件 return True 表示已更改,False 表示未更改 ''' _filePath = datetime.datetime.now().strftime(self.filefmt) ''' 新日志文件日期 不等于 旧日志文件日期,则表示 已经到了日志切分的时候 更换日志写入目的为新日志文件 例如 按 天 (D)来切分日志 当前新日志日期等于旧日志日期,则表示在同一天内,还不到日志切分的时候 当前新日志日期不等于旧日志日期,则表示不在同一天内,进行日志切分,将日志内容写入新日志内。 ''' if _filePath != self.filePath: self.filePath = _filePath return True return False def doChangeFile(self): """输出信息到日志文件,并删除多于保留个数的所有日志文件""" self.baseFilename = os.path.abspath(self.filePath) if self.stream: self.stream.close() self.stream = None
if not self.delay: self.stream = self._open() if self.backupCount > 0: print('删除日志') for s in self.getFilesToDelete(): print(s) os.remove(s) def getFilesToDelete(self): ''' 获得过期需要删除的日志文件 分离出日志文件夹绝对路径,split返回一个元组(absFilePath,fileName) ''' dirName, _ = os.path.split(self.baseFilename) fileNames = os.listdir(dirName) result = [] prefix = self.prefix + '.' plen = len(prefix) for fileName in fileNames: if fileName[:plen] == prefix: suffix = fileName[plen:] if re.compile(self.extMath).match(suffix): result.append(os.path.join(dirName, fileName)) result.sort() if len(result) < self.backupCount: result = [] else: result = result[:len(result) - self.backupCount] return result def emit(self, record) -> None: ''' 发送一个日志记录 覆盖FileHandler中的emit方法,logging会自动调用此方法 ''' try: if self.shouldChangeFileToWrite(): self.doChangeFile() logging.FileHandler.emit(self,record) except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record)
def init_logging_handler(fname): formattler = '%(levelname)s - %(name)s - %(asctime)s - %(message)s' fmt = logging.Formatter(formattler) logger = logging.getLogger() logger.setLevel(logging.DEBUG) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setLevel(logging.DEBUG) stream_handler.setFormatter(fmt)
''' 使用我们写的多进程版Handler理器,定义日志输出到mylog.log文件内 文件打开方式默认为 a 按分钟进行日志切割 ''' file_handler = MultiprocessHandler(fname, when='D',encoding="utf-8") file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(fmt)
logger.addHandler(stream_handler) logger.addHandler(file_handler) return logger
logger = init_logging_handler("test") def test(num,m_queue): time.sleep(2) m_queue.put('日志测试{}:{}'.format(os.getpid(),num)) print("添加完毕")
def write_logging(m_queue): while True: data = m_queue.get() if data: if data == "end": break print("--->",data) logger.debug(data) if __name__ == "__main__": m_queue = multiprocessing.Manager().Queue(maxsize=1000)
pool = multiprocessing.Pool(processes=10)
p1 = multiprocessing.Process(target=write_logging, args=(m_queue,)) p1.start()
print("开始添加数据") for i in range(10): pool.apply_async(func=test, args=(i, m_queue,)) pool.close() pool.join() print("添加数据完毕") print("发送end结束信号") m_queue.put("end") print("接受完毕") p1.join() print("执行完毕")
|