常用操作总结

python相关库常用操作

Python

根据字符串计算哈希

  • 使用hashlib可跨进程保持一致性的hash,需要使用md5摘要

    1
    2
    3
    4
    import hashlib
    def consistency_hash(data):
    digest = hashlib.md5(data.encode(encoding='utf-8')).hexdigest()
    return digest
  • python自带的hash不能保持一致性

    1
    2
    3
    def disconsistency_hash(data):
    digest = hash(data)
    return digest

Docker

镜像push到个人仓库

step1——找到本地镜像的ID:docker images

step2——登陆Hub:docker login --username=username --password=password --email=email

step3——tag:docker tag <imageID> <namespace>/<image name>:<version tag eg latest>

step4——push镜像:docker push <namespace>/<image name>

MongoDB

查找操作

根据非主键简单操作

1
db.getCollection('item_spider').find({"subject":"政治"})

根据主键查找

1
db.getCollection('item_spider').find({"_id":ObjectId("6172655c091b1563a776ce76")})

py连接数据库

1
2
3
4
conn = MongoClient('x.x.x.x', 8888,username="root",password="pass)
# # 连接数据库
db = conn.item
item_spider = db.item_spider

Pandas相关操作

从数据库中导入数据

1
2
3
4
5
6
7
8
9
import pymysql
from sqlalchemy import create_engine
# 初始化数据库连接
engine = create_engine("mysql+pymysql://hujiao:hujiao@192.168.1.145:3306/zsy_tk",encoding='utf-8')

# 查询语句
sql7 = '''select * from unit;'''
# 获取数据
df_prob_kp_unit = pd.read_sql_query(sql7,con=engine)

对字符串进行聚合

既一个key有个str对应,且分布在多行,可以用以下方式,把多个str合并为一个,让一个key对应多个str。

1
df_single_agg = df_prob_single_sele.groupby("topic_id",as_index=False)['单选题选项'].agg(lambda x: x.str.cat(sep=',')) # 对单选题进行聚合

topic_id相当于key,单选题选项是需要聚合的,然后在lambda函数中将其拼接为一个。

删除指定的列

1
df_prob1234.drop(['单选题选项','多选题选项'],axis=1, inplace=True)

使用apply应用函数

默认情况下 axis=0 表示按列,axis=1 表示按行。

1
df_prob1234["选项"] = df_prob1234.apply(dealChoiceItem,axis=1)

筛选非空数据

筛选出content不为空的数据。

1
df_prob1234_filted = df_prob1234[pd.notnull(df_prob1234["content"])]

修改指定列的数据类型

1
df_prob_kp["unit_id"] = df_prob_kp["unit_id"].astype(str)

根据指定的列值查找行

1
df_prob_kp_agg.loc[df_prob_kp_agg['topic_id'] == 20]

删除列

1
del df_prob_kp_agg_bak["unit_id"]

切分DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import math
from tqdm import tqdm
def cut_df(dataFrame,n,fname):
'''
dataFrame:需要切分的dataFrame
n:需要切分的数量
fname:保存的文件名
'''
df_num = len(dataFrame)
every_epoch_num = math.floor((df_num/n))
for index in tqdm(range(n)):
file_name = './{}_{}.csv'.format(fname,index)
if index < n-1:
df_tem = dataFrame[every_epoch_num * index: every_epoch_num * (index + 1)]
else:
df_tem = dataFrame[every_epoch_num * index:]
df_tem.to_csv(file_name, index=False)

删除重复行

1
df_kp2prob_simple_filted.drop_duplicates(inplace=True) # 删除重复行

pandas将每一行转为字典

注意:要加records

1
df_need_data.head(6).to_dict('records')

随机抽取n个元素

抽取200个元素

1
df_data_filted_0_sampled = df_data_filted_0.sample(n=200)

使用apply返回多列

  • 使用zip* 返回多列

    1
    df_data['node_item'], df_data['formula_reg_list'],df_data['formula_list'] = zip(*df_data.apply(lambda row: clean_data(row["原始题干"]),axis=1))

Py相关

获取当前时间

  1. 获取当前时间转为字符串

    1
    datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

常用代码

线程池插入数据

这个可能是数据库对象的原因,没有发挥多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import pymysql
import traceback
import multiprocessing
from multiprocessing.managers import BaseManager
import os,sys,time
import random
import pandas as pd
from functools import partial # 使用偏函数传递多个参数
from hash_test import consistency_hash # md5编码

class MySql:
def __init__(self,host='192.168.3.21',user='root',passwd='lrr1996429',db='test',port=3306) -> None:
self.conn = pymysql.connect(
host=host,
port=port,
user=user,
passwd=passwd,
db=db,
charset='utf8'
)

self.cur = self.conn.cursor()

def executeSql(self,sql):
try:
res = self.cur.execute(sql)
self.conn.commit()
except Exception as e:
traceback.print_exc()

def quit(self):
self.cur.close()
self.conn.commit()
self.conn.close()

class MyManager(BaseManager):
pass

def my_Manager():
m = MyManager()
m.start()
return m

# 将MySql类注册到MyManager管理类中
MyManager.register("MySql",MySql)

def readData(f_path=r"D:\1-文件及文件夹集合\读研\2-大数据\大作业\train.csv"):
if f_path.endswith("csv"):
df_data = pd.read_csv(f_path,engine='python')
elif f_path.endswith("xlsx"):
df_data = pd.read_excel(f_path)
records = df_data.to_dict("records")
return records

def run(mysql_obj,sql_format,record):
# print("sql")
# print(record.keys())

topic_all = str(record['tsi_content']) + str(record['tsi_content2']) + str(record['content']) + str(record['选项']) + str(record['知识点名称'])
digest = consistency_hash(topic_all)
# print(digest)
# sql = sql_format % (record['topic_id'],record['topic_section_id'],record['tsi_content'],record['tsi_content2'],record['content'],record['选项'],record['知识点名称'])
sql = sql_format % (record['topic_id'],record['topic_section_id'],record['知识点名称'],digest)
mysql_obj.executeSql(sql)

pass

if __name__ == "__main__":
datas = readData(r"D:\工作相关\数学设问与条件位置判断\数据输出\数学题目信息整理_全部的选择题.xlsx")
print("数据长度:",len(datas))
# print(datas[0])
print("读取数据完成")
start = time.time()
manager = my_Manager()
mysql_obj = manager.MySql(db="excerse")
# sql_format = 'insert into topic(topic_id,topic_section_id,tsi_content,tsi_content2,content,options,kp_names,digest) values(%d,%d,"%s","%s","%s","%s","%s")'
sql_format = 'insert into topic(topic_id,tsi_section_id,kp_names,digest) values(%d,%d,"%s","%s")'
job = partial(run,mysql_obj,sql_format)
# job(datas[0])

# 并行执行时间: 249.74781250953674
pool = multiprocessing.Pool(8)
for data in datas:
pool.apply(job,args=(data,))
pool.close()
pool.join()
end = time.time()
print("并行执行时间:",end-start)

# 串行执行时间: 231.49206566810608
# for data in datas:
# job(data)
# end = time.time()
# print("串行执行时间:",end-start)


多进程插入数据-使用队列

并行用时用了70s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import os,sys
import multiprocessing
import warnings
import pymysql
import pandas as pd
from hash_test import consistency_hash # md5编码
from insert_mysql import MySql
import time


def insert_worker(sql_format,queue):
'''插入数据'''
mysql_obj = MySql(db="excerse")
while True:
record = queue.get()
if type(record) == dict:


topic_all = str(record['tsi_content']) + str(record['tsi_content2']) + str(record['content']) + str(record['选项']) + str(record['知识点名称'])
digest = consistency_hash(topic_all)
sql = sql_format % (record['topic_id'],record['topic_section_id'],record['知识点名称'],digest)
mysql_obj.executeSql(sql)
if record == "end":
break

def insert_paraller(sql_format,w=4):
queue = multiprocessing.Queue(maxsize=10*1000*2)
workers = []

for i in range(w):
p = multiprocessing.Process(target=insert_worker,args=(sql_format,queue))
p.start()
workers.append(p)

df_data = pd.read_excel(r"D:\工作相关\数学设问与条件位置判断\数据输出\数学题目信息整理_全部的选择题.xlsx")
records = df_data.to_dict("records")

# 往队列中加数据
for record in records:
queue.put(record)
# 给每个 worker 发送任务结束的信号
for i in range(w):
queue.put("end")
# 等待进程结束运行
for p in workers:
p.join()
# 并行用时用了70s
if __name__ == "__main__":
start = time.time()
sql_format = 'insert into topic(topic_id,tsi_section_id,kp_names,digest) values(%d,%d,"%s","%s")'
insert_paraller(sql_format)
end = time.time()
print("并行时间:",end-start)

单进程插入数据

串行时间: 263.9097445011139

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import os,sys
import multiprocessing
import warnings
import pymysql
import pandas as pd
from hash_test import consistency_hash # md5编码
from insert_mysql import MySql
import time


def insert_paraller(sql_format,w=4):

df_data = pd.read_excel(r"D:\工作相关\数学设问与条件位置判断\数据输出\数学题目信息整理_全部的选择题.xlsx")
records = df_data.to_dict("records")
mysql_obj = MySql(db="excerse")

for record in records:
topic_all = str(record['tsi_content']) + str(record['tsi_content2']) + str(record['content']) + str(record['选项']) + str(record['知识点名称'])
digest = consistency_hash(topic_all)
sql = sql_format % (record['topic_id'],record['topic_section_id'],record['知识点名称'],digest)
mysql_obj.executeSql(sql)



# 串行时间: 263.9097445011139
if __name__ == "__main__":
start = time.time()
sql_format = 'insert into topic(topic_id,tsi_section_id,kp_names,digest) values(%d,%d,"%s","%s")'
insert_paraller(sql_format)
end = time.time()
print("串行时间:",end-start)

多进程写入日志

需要注意一下情况,

  • multiprocessing.Manager().Queue(maxsize=1000) ,这个是共享队列,在多进程是共享的
  • multiprocessing.Queue(),这个不是共享队列,在多进程不共享
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# coding=utf-8
import os
import sys
import re
import datetime
import time
import logging
import multiprocessing

try:
import codecs
except ImportError:
codecs = None

class MultiprocessHandler(logging.FileHandler):
"""支持多进程的TimedRotatingFileHandler"""
def __init__(self, filename, when='D', backupCount=0, encoding=None, delay=False):
'''
Args:
filename ([type]): 日志文件名
when (str, optional): 时间间隔的单位.
backupCount (int, optional): 保留文件个数.
encoding ([type], optional): [description].
delay (bool, optional): 是否开启 OutSteam缓存.
True 表示开启缓存,OutStream输出到缓存,待缓存区满后,刷新缓存区,并输出缓存数据到文件
False表示不缓存,OutStrea直接输出到文件
'''
self.prefix = filename
self.backupCount = backupCount
self.when = when.upper()
# 正则匹配 年-月-日
self.extMath = r"^\d{4}-\d{2}-\d{2}"

# S 每秒建立一个新文件
# M 每分钟建立一个新文件
# H 每天建立一个新文件
# D 每天建立一个新文件
self.when_dict = {
'S': "%Y-%m-%d-%H-%M-%S",
'M': "%Y-%m-%d-%H-%M",
'H': "%Y-%m-%d-%H",
'D': "%Y-%m-%d"
}
# 日志文件日期后缀
self.suffix = self.when_dict.get(when)
if not self.suffix:
raise ValueError(u"指定的日期间隔单位无效: %s" % self.when)
# 拼接文件路径 格式化字符串
self.filefmt = os.path.join("logs", "%s.%s" % (self.prefix, self.suffix)) # 文件名加日期
# 使用当前时间,格式化文件格式化字符串
self.filePath = datetime.datetime.now().strftime(self.filefmt)
# 获得文件夹路径
_dir = os.path.dirname(self.filefmt)
try:
# 如果日志文件夹不存在,则创建文件夹
if not os.path.exists(_dir):
os.makedirs(_dir)
except Exception:
print(u"创建文件夹失败")
print(u"文件夹路径:" + self.filePath)
pass
if codecs is None:
encoding = None
logging.FileHandler.__init__(self, self.filePath, 'a+', encoding, delay)

def shouldChangeFileToWrite(self):
'''
shouldChangeFileToWrite 更改日志写入目的写入文件
return True 表示已更改,False 表示未更改
'''
_filePath = datetime.datetime.now().strftime(self.filefmt) # 以当前时间获得新日志文件路径
'''
新日志文件日期 不等于 旧日志文件日期,则表示 已经到了日志切分的时候
更换日志写入目的为新日志文件
例如 按 天 (D)来切分日志
当前新日志日期等于旧日志日期,则表示在同一天内,还不到日志切分的时候
当前新日志日期不等于旧日志日期,则表示不在同一天内,进行日志切分,将日志内容写入新日志内。
'''
if _filePath != self.filePath:
self.filePath = _filePath
return True
return False
def doChangeFile(self):
"""输出信息到日志文件,并删除多于保留个数的所有日志文件"""
# 日志文件的绝对路径
self.baseFilename = os.path.abspath(self.filePath)
# stream is not None 表示 OutStream中还有未输出完的缓存数据
if self.stream:
# flush close 都会刷新缓冲区,flush不会关闭stream,close则关闭stream
self.stream.close()
self.stream = None #关闭stream后必须重新设置stream为None,否则会造成对已关闭文件进行IO操作。

# delay 为False 表示 不 OutStream不缓存数据 直接输出所有,只需要关闭OutStream即可
if not self.delay:
# 这个地方如果关闭colse那么就会造成进程往已关闭的文件中写数据,从而造成IO错误
# delay == False 表示的就是 不缓存直接写入磁盘
# 我们需要重新在打开一次stream
# self.stream.close()
self.stream = self._open()
# 删除多于保留个数的所有日志文件
if self.backupCount > 0:
print('删除日志')
for s in self.getFilesToDelete():
print(s)
os.remove(s)
def getFilesToDelete(self):
'''
获得过期需要删除的日志文件
分离出日志文件夹绝对路径,split返回一个元组(absFilePath,fileName)
'''
dirName, _ = os.path.split(self.baseFilename)
fileNames = os.listdir(dirName)
result = []
prefix = self.prefix + '.' # 加上 点号 . 方便获取点号后面的日期
plen = len(prefix)
for fileName in fileNames:
if fileName[:plen] == prefix:
suffix = fileName[plen:] # 日期后缀 mylog.2017-03-19 中的 2017-03-19
if re.compile(self.extMath).match(suffix):
result.append(os.path.join(dirName, fileName))
result.sort()
# 返回 待删除的日志文件
# 多于 保留文件个数 backupCount的所有前面的日志文件。
if len(result) < self.backupCount:
result = []
else:
result = result[:len(result) - self.backupCount]
return result
def emit(self, record) -> None:
'''
发送一个日志记录
覆盖FileHandler中的emit方法,logging会自动调用此方法
'''
try:
if self.shouldChangeFileToWrite():
self.doChangeFile()
logging.FileHandler.emit(self,record)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)

def init_logging_handler(fname):
# 定义日志输出格式
formattler = '%(levelname)s - %(name)s - %(asctime)s - %(message)s'
fmt = logging.Formatter(formattler)

logger = logging.getLogger()
logger.setLevel(logging.DEBUG) # 设置日志级别,默认是warning
# 设置handleer日志处理器,日志具体怎么处理都在日志处理器里面定义
# SteamHandler 流处理器,输出到控制台,输出方式为stdout
# 设置handler所处理的日志级别。
# 只能处理 >= 所设置handler级别的日志
# 设置日志输出格式
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(fmt)

'''
使用我们写的多进程版Handler理器,定义日志输出到mylog.log文件内
文件打开方式默认为 a
按分钟进行日志切割
'''
file_handler = MultiprocessHandler(fname, when='D',encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)

# 对logger增加handler日志处理器
logger.addHandler(stream_handler)
logger.addHandler(file_handler)
return logger

logger = init_logging_handler("test")
def test(num,m_queue):
time.sleep(2)
m_queue.put('日志测试{}:{}'.format(os.getpid(),num))
print("添加完毕")

def write_logging(m_queue):
while True:
data = m_queue.get() # 队列为空的话,其是阻塞在这里
if data:
if data == "end":
break
print("--->",data)
logger.debug(data)

if __name__ == "__main__":
# pass

m_queue = multiprocessing.Manager().Queue(maxsize=1000) # 这里这个才可以起到
# m_queue = multiprocessing.Queue() # 只用这个不能起到共享的作用



pool = multiprocessing.Pool(processes=10)


# 日志管理进程
p1 = multiprocessing.Process(target=write_logging, args=(m_queue,))
p1.start()

# 向队列中添加数据
print("开始添加数据")
for i in range(10):
pool.apply_async(func=test, args=(i, m_queue,))
pool.close()
pool.join()
print("添加数据完毕")
print("发送end结束信号")
m_queue.put("end")
print("接受完毕")
p1.join()
print("执行完毕")
# logger.debug("执行完毕")
作者

bd160jbgm

发布于

2021-11-07

更新于

2021-11-27

许可协议