使用pandas
生成日报图表并发送邮件
前言
平常我们在项目中可能会用到Prometheus这样的监控报警系统,来了解项目系统内部的实际运行状态,以及做趋势分析、对照分析、告警与故障定位、数据可视化等。
虽然他有诸多优点,但是对于小项目来说,这个又有点太重。如果我们想在小项目里做一些简单的数据分析和可定制化的报表生成,是否有简单可复用的方法呢?有的!下面我们就用pandas结合python其他的一些基本的库,来做一个日报图表生成发送的小模板。
准备
需求(示例)
- 现在每天有很多业务都要查询几个三方提供的数据服务接口,我们想要知道近几日每个数据源的查询总量、成功失败量这些基本的情况,以及每天对于各业务对各数据源的详细查询情况。(每个数据源我们都做了查询的记录(查询来源,查询状态等))
- 每天定时分析数据,生成图表
- 发送邮件(或者只针对异常情况发送告警邮件)
基本每日数据简略如下 :
- A数据源:
请求编号 | 产品 | 查询状态 | 结果数据 | 创建时间 | 更新时间 |
---|---|---|---|---|---|
10001 | 产品1 | 成功 | … | 2020-11-09 12:00:01 | 2020-11-09 12:01:03 |
10002 | 产品2 | 成功 | … | 2020-11-09 12:03:01 | 2020-11-09 12:04:03 |
10003 | 产品2 | 失败 | … | 2020-11-09 12:05:01 | 2020-11-09 12:06:12 |
10004 | 产品3 | 特殊失败 | … | 2020-11-09 12:06:01 | 2020-11-09 12:06:15 |
… | … | … | … | … | … |
- B/C/D数据源类似
开发环境准备
- python版本: 2.7+/3.6+
- python包(示例):
1
2
3
4
5
6
7
8
9
10DBUtils==2.0
matplotlib==3.3.3
numpy==1.19.4
pandas==1.1.4
Pillow==8.0.1
PyMySQL==0.10.1
pyparsing==2.4.7
python-dateutil==2.8.1
pytz==2020.4
six==1.15.0
开发
整个小项目分为获取数据、清洗数据、分析数据、生成图表、发送邮件大致几个模块。
获取数据
连接数据库
- 数据库engine
1 | #!/usr/bin/env python |
- 数据库连接
1 | # -*- coding: utf-8 -*- |
获取近几日数据概况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def _get_range_data(self, begin_date, end_date):
"""
获取起止日期之间的数据
"""
STATUS_SUCCESS = 1 # 查询成功
STATUS_CreditA_FAIL = 2 # A数据源查询失败
STATUS_QUERYING = 3 # 查询中
STATUS_CreditB_FAIL = 4 # B数据源查询失败
STATUS_CreditC_FAIL = 5 # C数据源查询失败
STATUS_CreditD_FAIL = 6 # D数据源查询失败
total_cnt = 0 # 总量
success_cnt = 0 # 成功量
querying_cnt = 0 # 查询中量
fail_cnt = 0 # 失败量
special_fail_cnt = 0 # 特殊失败量
begin_date_str = datetime.datetime.strftime(begin_date, '%Y-%m-%d')
sql = '''
SELECT status, count(1) FROM {}
where source = 1 and method regexp 'rule' and created_at >= %s and created_at < %s
group by status
'''.format(self.credit_table) # 按照查询状态统计
values = [begin_date, end_date]
for row in db_interface.db1.select(sql, values):
total_cnt += int(row[1]) # 总量统计
if int(row[0]) == STATUS_SUCCESS:
success_cnt += int(row[1]) # 成功量统计
elif int(row[0]) == STATUS_QUERYING:
querying_cnt += int(row[1]) # 查询中统计
else:
# 失败量统计
fail_cnt += int(row[1])
# 失败细分统计
if self.credit_type == 'CreditA' and int(row[0]) == STATUS_CreditA_FAIL:
# A数据源失败
special_fail_cnt += int(row[1])
elif self.credit_type == 'CreditB' and int(row[0]) == STATUS_CreditB_FAIL:
# B数据源失败
special_fail_cnt += int(row[1])
elif self.credit_type == 'CreditC' and int(row[0]) == STATUS_CreditC_FAIL:
# C数据源失败
special_fail_cnt += int(row[1])
elif self.credit_type == 'CreditD' and int(row[0]) == STATUS_CreditD_FAIL:
# D数据源失败
special_fail_cnt += int(row[1])
data = [begin_date_str, self.credit_type, total_cnt, success_cnt,
querying_cnt, fail_cnt, special_fail_cnt] # 近几天时间序列数据
return data
def get_summary_data(self):
"""获取近几天的数据"""
sum_datas = list()
today = datetime.date.today()
# 日期起始列表
days_range = range(1, self.report_days + 1, 1) # self.report_days 是类的近几日的报告天数,可根据需求动态配置
date_pairs = [(today - datetime.timedelta(days=i), today - datetime.timedelta(days=i - 1)) for i in days_range]
for date_pair in date_pairs:
data = self._get_range_data(date_pair[0], date_pair[1])
sum_datas.append(data)
return sum_datas
获取昨日详细数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def get_yestoday_data(self, include_today=False):
"""获取前一日数据
include_today为True时,即为获取截止到当前时间的今天的数据,默认为获取昨天的数据"""
datas = list()
end_date = datetime.date.today()
begin_date = end_date - datetime.timedelta(days=1)
begin_date_str = datetime.datetime.strftime(begin_date, '%Y-%m-%d')
end_date_str = datetime.datetime.strftime(end_date, '%Y-%m-%d')
if include_today:
end_date += datetime.timedelta(days=1)
begin_date += datetime.timedelta(days=1)
sql = """ SELECT {} FROM {}
WHERE source = 1 AND created_at >= %s AND created_at < %s
""".format(self.query_fileds, self.credit_table) # 参数为要查询的字段和表名,可配置
values = [begin_date_str, end_date_str]
for row in db_interface.db1_check.select(sql, values):
record = [int(r) if index <= 1 else r for index, r in enumerate(row)] # apply_id/uniq_id 和status转int
datas.append(record)
return datas
清洗转换数据
使用pandas
将获取到的数据清理转换为DataFrame
,根据需求,进行轴的转换以及聚合分析
- 单个数据源处理
1 | credit_name_map['CreditA'] = 'A数据源' |
- 汇总数据源处理
1 | # 选取想要分析的列生成DataFrame |
分析数据生成图表
生成图片
- 利用
matplotlib
对于处理好的数据生成对应的柱状图、折线图等
1 | #!/usr/bin/env python |
近几日汇总数据
- 近几日查询汇总数据生成
1 | # 汇总数据 |
- 生成结果示例图如下:
前一日数据
- 前一日数据分析生成
1 | credit_name_map = OrderedDict() |
生成表格
行列数据添加相应的html代码,使得能在邮件中显示
1 | #!/usr/bin/python |
- 昨日详细请求数据表格生成
1 | table_datas = table_data_df.values.tolist() |
发送邮件
使用smtplib
和 email
,添加对应配置,方邮件
1 | #!/usr/bin/python |
last
这样一个基本的数据获取、分析、生成报表发送邮件的基本框架就搭好了,当然还可以配置定时任务,来定时生成。或者对于异常的数据进行特殊告警发送。
总结
此项目github地址: daily_report
这只是一个基本的数据生成报表发送的简单示例,如果数据更加复杂,我们可能需要利用pandas
进行更加精细的操作.
或者如果有很多这样的项目需要数据可视化和实时告警,那么我们还是推荐使用Prometheus
这样的工具,来配合云平台或者容器平台。