生成日报图表并发送邮件

使用pandas生成日报图表并发送邮件

前言

平常我们在项目中可能会用到Prometheus这样的监控报警系统,来了解项目系统内部的实际运行状态,以及做趋势分析、对照分析、告警与故障定位、数据可视化等。

虽然他有诸多优点,但是对于小项目来说,这个又有点太重。如果我们想在小项目里做一些简单的数据分析和可定制化的报表生成,是否有简单可复用的方法呢?有的!下面我们就用pandas结合python其他的一些基本的库,来做一个日报图表生成发送的小模板。


准备

需求(示例)

  • 现在每天有很多业务都要查询几个三方提供的数据服务接口,我们想要知道近几日每个数据源的查询总量、成功失败量这些基本的情况,以及每天对于各业务对各数据源的详细查询情况。(每个数据源我们都做了查询的记录(查询来源,查询状态等))
  • 每天定时分析数据,生成图表
  • 发送邮件(或者只针对异常情况发送告警邮件)

基本每日数据简略如下 :

  • A数据源:
请求编号 产品 查询状态 结果数据 创建时间 更新时间
10001 产品1 成功 2020-11-09 12:00:01 2020-11-09 12:01:03
10002 产品2 成功 2020-11-09 12:03:01 2020-11-09 12:04:03
10003 产品2 失败 2020-11-09 12:05:01 2020-11-09 12:06:12
10004 产品3 特殊失败 2020-11-09 12:06:01 2020-11-09 12:06:15
  • B/C/D数据源类似

开发环境准备

  • python版本: 2.7+/3.6+
  • python包(示例):
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    DBUtils==2.0
    matplotlib==3.3.3
    numpy==1.19.4
    pandas==1.1.4
    Pillow==8.0.1
    PyMySQL==0.10.1
    pyparsing==2.4.7
    python-dateutil==2.8.1
    pytz==2020.4
    six==1.15.0

开发

整个小项目分为获取数据、清洗数据、分析数据、生成图表、发送邮件大致几个模块。

获取数据

连接数据库

  • 数据库engine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pymysql
from DBUtils.PooledDB import PooledDB

from srf_log import logger


class MySQLEngine(object):
'''
mysql engine
'''
__tablename__ = None
placeholder = '%s'

def connect(self, **kwargs):
'''
mincached : 启动时开启的空连接数量(缺省值 0 意味着开始时不创建连接)
maxcached: 连接池使用的最多连接数量(缺省值 0 代表不限制连接池大小)
maxshared: 最大允许的共享连接数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用。
maxconnections: 最大允许连接数量(缺省值 0 代表不限制)
blocking: 设置在达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误;其他代表阻塞直到连接数减少)
maxusage: 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用)。当达到最大数值时,连接会自动重新连接(关闭和重新打开)
'''
db_host = kwargs.get('db_host', 'localhost')
db_port = kwargs.get('db_port', 3306)
db_user = kwargs.get('db_user', 'root')
db_pwd = kwargs.get('db_pwd', '')
db = kwargs.get('db', '')

self.pool = PooledDB(pymysql, maxconnections=5, mincached=1, maxcached=5, blocking=True, host=db_host,
user=db_user, passwd=db_pwd, db=db, port=db_port, charset='utf8')

logger.info('''connect mysql db_host:%s db_port:%d db_user:%s
db_pwd:%s db:%s''', db_host, db_port, db_user, db_pwd, db)

@staticmethod
def escape(string):
pass

def _check_parameter(self, sql_query, values, req_id=None):
count = sql_query.count('%s')
if count > 0:
for elem in values:
if not elem:
if req_id:
logger.debug('req_id:%s sql_query:%s values:%s check failed',
req_id, sql_query, values)
return False
return True

def _execute(self, sql_query, values=[], req_id=None):
'''
每次都使用新的连接池中的链接
'''
if not self._check_parameter(sql_query, values):
return
conn = self.pool.connection()
cur = conn.cursor()
cur.execute(sql_query, values)
conn.commit()
conn.close()
return cur

def select(self, sql_query, values=[], req_id=None):
sql_query = sql_query.replace('\n', '')
while ' ' in sql_query:
sql_query = sql_query.replace(' ', ' ')
if not self._check_parameter(sql_query, values, req_id):
return
cur = self._execute(sql_query, values, req_id)
for row in cur:
yield row

def execute(self, sql_query, values=[], req_id=None):
sql_query = sql_query.replace('\n', '')
while ' ' in sql_query:
sql_query = sql_query.replace(' ', ' ')
cur = self._execute(sql_query, values)


sql_engine = MySQLEngine()
  • 数据库连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# -*- coding: utf-8 -*-

import config
from utils import mysql_tools


class DBInterface(object):
def __init__(self):
self.db1_check = mysql_tools.MySQLEngine()
self.db1_check.connect(db_host=config.db1_host,
db_port=config.db1_port,
db_user=config.db1_user,
db_pwd=config.db1_pwd,
db=config.db1_db)

self.db2_check = mysql_tools.MySQLEngine()
self.db2_check.connect(db_host=config.db2_host,
db_port=config.db2_port,
db_user=config.db2_user,
db_pwd=config.db2_pwd,
db=config.db2_db)


db_interface = DBInterface()

获取近几日数据概况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def _get_range_data(self, begin_date, end_date):
"""
获取起止日期之间的数据
"""
STATUS_SUCCESS = 1 # 查询成功
STATUS_CreditA_FAIL = 2 # A数据源查询失败
STATUS_QUERYING = 3 # 查询中
STATUS_CreditB_FAIL = 4 # B数据源查询失败
STATUS_CreditC_FAIL = 5 # C数据源查询失败
STATUS_CreditD_FAIL = 6 # D数据源查询失败
total_cnt = 0 # 总量
success_cnt = 0 # 成功量
querying_cnt = 0 # 查询中量
fail_cnt = 0 # 失败量
special_fail_cnt = 0 # 特殊失败量
begin_date_str = datetime.datetime.strftime(begin_date, '%Y-%m-%d')
sql = '''
SELECT status, count(1) FROM {}
where source = 1 and method regexp 'rule' and created_at >= %s and created_at < %s
group by status
'''.format(self.credit_table) # 按照查询状态统计
values = [begin_date, end_date]
for row in db_interface.db1.select(sql, values):
total_cnt += int(row[1]) # 总量统计
if int(row[0]) == STATUS_SUCCESS:
success_cnt += int(row[1]) # 成功量统计
elif int(row[0]) == STATUS_QUERYING:
querying_cnt += int(row[1]) # 查询中统计
else:
# 失败量统计
fail_cnt += int(row[1])
# 失败细分统计
if self.credit_type == 'CreditA' and int(row[0]) == STATUS_CreditA_FAIL:
# A数据源失败
special_fail_cnt += int(row[1])
elif self.credit_type == 'CreditB' and int(row[0]) == STATUS_CreditB_FAIL:
# B数据源失败
special_fail_cnt += int(row[1])
elif self.credit_type == 'CreditC' and int(row[0]) == STATUS_CreditC_FAIL:
# C数据源失败
special_fail_cnt += int(row[1])
elif self.credit_type == 'CreditD' and int(row[0]) == STATUS_CreditD_FAIL:
# D数据源失败
special_fail_cnt += int(row[1])
data = [begin_date_str, self.credit_type, total_cnt, success_cnt,
querying_cnt, fail_cnt, special_fail_cnt] # 近几天时间序列数据
return data

def get_summary_data(self):
"""获取近几天的数据"""
sum_datas = list()
today = datetime.date.today()
# 日期起始列表
days_range = range(1, self.report_days + 1, 1) # self.report_days 是类的近几日的报告天数,可根据需求动态配置
date_pairs = [(today - datetime.timedelta(days=i), today - datetime.timedelta(days=i - 1)) for i in days_range]
for date_pair in date_pairs:
data = self._get_range_data(date_pair[0], date_pair[1])
sum_datas.append(data)
return sum_datas

获取昨日详细数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def get_yestoday_data(self, include_today=False):
"""获取前一日数据
include_today为True时,即为获取截止到当前时间的今天的数据,默认为获取昨天的数据"""
datas = list()
end_date = datetime.date.today()
begin_date = end_date - datetime.timedelta(days=1)
begin_date_str = datetime.datetime.strftime(begin_date, '%Y-%m-%d')
end_date_str = datetime.datetime.strftime(end_date, '%Y-%m-%d')
if include_today:
end_date += datetime.timedelta(days=1)
begin_date += datetime.timedelta(days=1)
sql = """ SELECT {} FROM {}
WHERE source = 1 AND created_at >= %s AND created_at < %s
""".format(self.query_fileds, self.credit_table) # 参数为要查询的字段和表名,可配置
values = [begin_date_str, end_date_str]
for row in db_interface.db1_check.select(sql, values):
record = [int(r) if index <= 1 else r for index, r in enumerate(row)] # apply_id/uniq_id 和status转int
datas.append(record)
return datas

清洗转换数据

使用pandas将获取到的数据清理转换为DataFrame,根据需求,进行轴的转换以及聚合分析

  • 单个数据源处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
credit_name_map['CreditA'] = 'A数据源'
credit_name_map['CreditB'] = 'B数据源'
credit_name_map['CreditC'] = 'C数据源'
credit_name_map['CreditD'] = 'D数据源'
credit_special_fail_map = {
'CreditA': 'A数据特殊失败',
'CreditB': 'B数据特殊失败',
'CreditC': 'C数据特殊失败',
'CreditD': 'D数据特殊失败',
}
all_sum_datas = list()
for credit_type, credit_name in credit_name_map.items():
sum_data = get_data(credit_type=credit_type, summary=True, report_days=7)
all_sum_datas.extend(sum_data) # 添加到汇总数据
# 下面四行为表格生成准备代码
table_name = '{} 近7日查询概要'.format(credit_name)
header = ['', '数据源', '日查询总量', '查询成功量', '在查询中量', '查询失败总量', '特殊失败量']
header[-1] = credit_special_fail_map.get(credit_type, '特殊失败量')
self.html.add_table(table_name, header, sum_data)
# 单个数据源分析
# 转换为DataFrame
pd_data = pd.DataFrame(
np.array(sum_data),
columns=['date', 'credit_type', 'total', 'success', 'querying', 'fail', 'special_fail'])
# 设置日期行索引,逆序排列
pd_data = pd_data.set_index(['date']).drop(labels=['credit_type'], axis=1).sort_index(ascending=True)
pd_data = pd_data.apply(pd.to_numeric) # 转换为数字类型
  • 汇总数据源处理
1
2
3
4
5
# 选取想要分析的列生成DataFrame
all_sum_datas_pd = pd.DataFrame(np.array(all_sum_datas), columns=['date', 'credit_type', 'total', 'success', 'querying', 'fail', 'special_fail'])
# 设置日期和数据源作为行索引
all_sum_count_pd = all_sum_datas_pd.set_index(['date', 'credit_type']).apply(pd.to_numeric) # 重新索引
all_sum_count_goupby_date = all_sum_count_pd.groupby('date').sum() # 根据日期聚合汇总

分析数据生成图表

生成图片

  • 利用matplotlib对于处理好的数据生成对应的柱状图、折线图等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : gen_img.py
# @Author : cgDeepLearn
# @Create Date : 2020/11/4-11:18 上午

import matplotlib as mpl
# mpl.rcParams['font.sans-serif'] = ['SimHei']
# mpl.rcParams['font.serif'] = ['SimHei']
mpl.rcParams['axes.unicode_minus'] = False
mpl.use('Agg')

import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

CUR_PATH = os.path.dirname(os.path.abspath(__file__))
IMG_PATH = os.path.join(os.path.dirname(CUR_PATH), "files") # 图片保存位置


class GenIMG(object):
def __init__(self, img_name, pd_data):
self.img_name = img_name # 图片名
self.pd_data = pd_data # pandas数据

def process(self, sum=False):
kind = 'line' # 折线图
title = 'query {} info'.format(self.img_name)
if sum:
# 汇总的使用柱状图
kind = 'bar' # 柱状图
title = 'all credit summary info'
axes_subplot = self.pd_data.plot(kind=kind)
plt.title(title) # 标题
plt.xlabel("date") # 横轴
plt.ylabel("num") # 纵轴
plt.legend(loc="best") # 图例
plt.grid(True) # 网格
full_path_filename = os.path.join(IMG_PATH, '{}.png'.format(self.img_name)) # 图片保存位置
plt.savefig(full_path_filename) # 生成保存
return full_path_filename

近几日汇总数据

  • 近几日查询汇总数据生成
1
2
3
4
5
6
7
8
9
10
11
12
13
# 汇总数据
all_sum_datas_pd = pd.DataFrame(np.array(all_sum_datas), columns=['date', 'credit_type', 'total', 'success', 'querying', 'fail', 'special_fail'])
all_sum_count_pd = all_sum_datas_pd.set_index(['date', 'credit_type']).apply(pd.to_numeric) # 重新索引
all_sum_count_goupby_date = all_sum_count_pd.groupby('date').sum() # 根据日期聚合汇总

# 生成 图表
def df_to_img(self, img_name, df, summary=False):
gen_image = GenIMG(img_name=img_name, pd_data=df)
img_file = gen_image.process(sum=summary)
self.images.append(img_file)
self.html.add_img(img_name)

self.df_to_img(img_name='sum', df=all_sum_count_goupby_date, summary=True) # 生成图片
  • 生成结果示例图如下:

近几日数据查询概况

前一日数据

  • 前一日数据分析生成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
credit_name_map = OrderedDict()
credit_name_map['CreditA'] = 'A数据源'
credit_name_map['CreditB'] = 'B数据源'
credit_name_map['CreditC'] = 'C数据源'
credit_name_map['CreditD'] = 'D数据源'
credit_special_fail_map = {
'CreditA': 'A数据特殊失败',
'CreditB': 'B数据特殊失败',
'CreditC': 'C数据特殊失败',
'CreditD': 'D数据特殊失败',
}
all_yestoday_data = list()
for credit_type, credit_name in credit_name_map.iteritems():
credit_data = get_data(credit_type=credit_type, include_today=include_today) # 获取前一日数据
if not credit_data:
continue # 该数据如果没有数据,跳过
all_yestoday_data.extend(credit_data) # 添加到昨日总数据中
# 生成DataFrame
pd_data = pd.DataFrame(np.array(credit_data),
columns=['apply_id', 'status', 'method', 'cache_key', 'product_id'])
pd_data['status'] = pd_data['status'].apply(int) # 数据类型转换
rule_df = pd_data[pd_data['method'].str.contains('rule')] # 规则
rule_df['product_name'] = rule_df.apply(cachekey_to_product, axis=1) # 产品名称映射
count_df = rule_df[['product_name', 'status']] # 只分析产品和查询状态
groupby_product = count_df.groupby(['product_name', 'status']).size() # 根据产品和状态汇总统计
product_status_count = groupby_product.unstack(level=1, fill_value=0) # status unstack到列
col_name = sorted(product_status_count.columns.tolist()) # 列名
col_name_str = [status_map[col] for col in col_name] # 列名字符串

product_status_count['query_total'] = product_status_count.sum(axis=1) # 新生成查询总量的列
# 添加请求列
col_name.insert(0, 'query_total')
col_name_str.insert(0, '请求')

sum_data = [product_status_count.sum(axis=0)[col] for col in col_name]
# 添加合计行
sum_info = ['合计'] + sum_data

product_status_count['product_name'] = product_status_count.index
# 添加产品列
col_name.insert(0, 'product_name')
col_name_str.insert(0, '产品')

table_data_df = product_status_count.reindex(columns=col_name) # 添加列后,根据新列重新索引
table_datas = table_data_df.values.tolist() # 总量列表

生成表格

行列数据添加相应的html代码,使得能在邮件中显示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#!/usr/bin/python
# coding:utf8


class GetHtml(object):
def __init__(self):
self._html_head = "<html><body>"
self._format_html_foot = """<p style="font-family: verdana,arial,sans-serif;font-size:10px;font-weight:lighter;">%s</p>"""
self._format_html_head = """<p style="font-family: verdana,arial,sans-serif;font-size:12px;font-weight:bold;">%s</p>"""
self._format_html_img = """<br><img src="cid:%s" alt="" width="1200" height="600"></br>"""
self._html_tail = "</body></html>"
self._html_p_head = """<p style="font-family: verdana,arial,sans-serif;font-size:12px;font-weight:bold;">%s</p>"""

self._table_head = """<table style="font-family: verdana,arial,sans-serif;font-size:11px;color:#333333;border-width: 1px;border-color: #666666;border-collapse: collapse;" border="1"><tr>"""
self._format_table_th = """<th style="border-width: 1px;padding: 8px;border-style: solid;border-color: #666666;background-color: #dedede;" nowrap>%s</th>"""

self._format_table_td = """<td style="border-width: 1px;padding: 8px;text-align: right;border-style: solid;border-color: #666666;background-color: #ffffff;" align="right" nowrap>%s</td>"""
self._table_tail = "</table>"
self._content = ""

self._table_html = []

def add_table(self, table_title, th_info, td_info_list):
"""添加表格数据"""
table_str = ""
table_p_head = self._html_p_head % (str(table_title))
table_str = table_p_head + self._table_head
# th
table_str += "<tr>"
for th in th_info:
temp_str = self._format_table_th % (str(th))
table_str += temp_str
table_str += "</tr>"
# td
for td_info in td_info_list:
table_str += "<tr>"
for td in td_info:
temp_str = self._format_table_td % (str(td))
table_str += temp_str
table_str += "</tr>"
#
table_str += self._table_tail
self._table_html.append(table_str)

def add_head(self, head):
"""添加表头"""
head_str = self._format_html_head % (str(head))
self._table_html.append(head_str)

def add_foot(self, foot):
"""添加表格注脚"""
foot_str = self._format_html_foot % (str(foot))
self._table_html.append(foot_str)

def add_img(self, img):
"""添加图片"""
img_str = self._format_html_img % (str(img))
self._table_html.append(img_str)

def output_html(self):
"""输出html"""
html_content = self._html_head
for s in self._table_html:
html_content += s
html_content += self._html_tail
return html_content


if __name__ == "__main__":
gh = GetHtml()
p_title = "test"
th = [1, 2, 3, 4]
td = [[1, 2, 3, 4], [4, 5, 5, 56], [3, 3, 3, 3]]
gh.add_table(p_title, th, td)
cont = gh.output_html()
  • 昨日详细请求数据表格生成
1
2
3
4
5
6
table_datas = table_data_df.values.tolist()
table_datas.append(sum_info)
table_name = '{} 查征明细'.format(credit_name)
header = col_name_str
self.html.add_table(table_name, header, table_datas)
self.html.output_html()

昨日详情

发送邮件

使用smtplibemail,添加对应配置,方邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/python
# coding:utf-8

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
from config import email_configs # 邮件配置


class Email:
def __init__(self, to_list, sub, content):
'''
to_list:发给谁
sub:主题
content:内容
send_mail("aaa@126.com","sub","content")
'''
#####################
# 设置服务器,用户名、口令以及邮箱的后缀
self.to_list = to_list
self.mail_host = 'smtp.exmail.qq.com' # 邮箱host
self.mail_user = 'report' # 发送用户
self.mail_postfix = 'xxx.com' # 邮箱后缀,替换为你的

self.me = self.mail_user + "<" + self.mail_user + "@" + self.mail_postfix + ">"
# 尝试用utf8和GBK解码邮件内容和主题成unicode
try:
content = unicode(content, 'utf8')
sub = unicode(sub, 'utf8')
except UnicodeDecodeError:
try:
content = unicode(content, 'gbk')
sub = unicode(sub, 'gbk')
except UnicodeDecodeError:
# print format_exc()
return False
# 已经是unicode
except TypeError:
pass

self.msg = MIMEMultipart('related') # 超文本
self.msg['Subject'] = sub # 邮件主题
self.msg['From'] = 'report@xxx.com' # report用户没有可以建一个或者用已注册的其他的
self.msg['To'] = ";".join(to_list) # 收件人

txt = MIMEText(content.encode('utf-8'), 'html', 'UTF-8')
self.msg.attach(txt)

def add_image(self, file_name):
"""添加图片"""
# prefix = file_name.split('.')[0]
image = MIMEImage(open(file_name, 'rb').read())
end_index = file_name.rfind('/')
if end_index != -1:
tag = file_name[end_index + 1:]
else:
tag = file_name
tag = tag.split('.')[0]
image.add_header('Content-ID', '<' + tag + '>')
# image.add_header("Content-Disposition", "inline", filename=file_name)
# image.add_header('Content-Disposition', 'attachment', filename=file_name)

self.msg.attach(image)

def send_mail(self):
try:
s = smtplib.SMTP()
s.connect(self.mail_host, 587) # 连接邮件host服务器
s.ehlo()
s.starttls()
s.ehlo()
# s.set_debuglevel(1)
s.login('report@xxx.com', '***') # ***为密码,登录

s.sendmail(self.me, self.to_list, self.msg.as_string()) # 发送
s.close()
return True
except Exception, e:
print e
# print format_exc()
return False


def get_tag_name(pic_name):
end_index = pic_name.rfind('/')
if end_index != -1:
tag = pic_name[end_index + 1:]
else:
tag = pic_name
return tag


def send_mail(mail_list, sub, content, images=None):
email_sender = Email(mail_list, sub, content)
if images:
for image in images:
email_sender.add_image(image)
email_sender.send_mail()

last

这样一个基本的数据获取、分析、生成报表发送邮件的基本框架就搭好了,当然还可以配置定时任务,来定时生成。或者对于异常的数据进行特殊告警发送。

总结

此项目github地址: daily_report

这只是一个基本的数据生成报表发送的简单示例,如果数据更加复杂,我们可能需要利用pandas进行更加精细的操作.
或者如果有很多这样的项目需要数据可视化和实时告警,那么我们还是推荐使用Prometheus这样的工具,来配合云平台或者容器平台。

-------------阅读完毕吐槽一番吧~-------------
0%