简单的python 微服务小框架

快速开发你的小服务

前言

现在微服务盛行,很多项目依赖一些协同工作小而自治的服务。那么对于python,虽然有grpcnameko这样的微服务框架,或者FlaskDjango这样的web框架。我们是否可以编写一个轻量可复用的服务框架呢?


准备

python: 2.7+/3.6+

主要库:PyMySQL、redis、DBUtils、requests、supervisor、urllib3、Werkzeug、gevent、gunicorn

简介

我们主要使用Werkzeuggeventgunicorn来实现。

Werkzeug是一个WSGI工具包,他可以作为一个Web框架的底层库。它可以作为一个 Web 框架的底层库,因为它封装好了很多 Web 框架的东西,例如 RequestResponse 等等,例如 Flask 框架就是一 Werkzeug 为基础开发的.

Gevent是一个基于greenlet的Python的并发框架,以微线程greenlet为核心,使用了epoll事件监听机制以及诸多其他优化而变得高效。

Gunicorn是一个unix上被广泛使用的高性能的Python WSGI UNIX HTTP Server。和大多数的web框架兼容,并具有实现简单,轻量级,高性能等特点。

一般微服务都是本质上是一些数据库简易CRUD操作的集合,一个服务基本包含数据库连接、日志记录,业务逻辑等。那么我们看看这些的基本实现.

实现

数据库连接

mysql我们使用pymysqlDBUtils做一个连接池,缓存我们使用redis.

操作数据库我们也使用了日志记录,日志记录实现请看下段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : db.py
# @Author : cgDeepLearn
# @Create Date : 2020/11/16-3:30 下午


import redis
from conf import config
import pymysql
from DBUtils.PooledDB import PooledDB
from utils.log import logger


class RedisOps(object):
FIELD_EXIST = 0
NEW_FIELD = 1

def __init__(self, host, port, password, db):
rd = redis.ConnectionPool(host=host, port=port, password=password, db=db)
self.rd = redis.Redis(connection_pool=rd)


class MysqlOps(object):

def __init__(self, host, port, user, passwd, db):
self.pool = PooledDB(
pymysql,
mincached=10,
maxcached=30,
maxconnections=0,
host=host,
user=user,
passwd=passwd,
db=db,
port=port,
charset='utf8')
self.user_apply = 'user_apply'
self.user_base = 'user_base'
self.flows = 'flows'
self.table_list = list()

def _execute(self, sql, values):
'''
每次都使用新的连接池中的链接
'''
conn = self.pool.connection()
cur = conn.cursor()
cur.execute(sql, values)
conn.commit()
conn.close()
return cur

def _check_parameter(self, sql, values):
count = sql.count('%s')
if count > 0:
for elem in values:
if not elem:
return False
return True

def _get_table_list(self):
if len(self.table_list) == 0:
sql = '''SELECT COUNT(id) FROM data_split_info'''
table_num = list(self.select(sql))[0][0]
self.table_list = [num for num in range(0, table_num)]

def _replace(self, sql, table, num):
if num == 0:
if table in sql:
string = ' AND %s.deleted_at is null' % table
sql = sql + string
else:
pattern = '%s' % table
string = '%s_%d' % (table, num)
sql = sql.replace(pattern, string)
return sql

def _mulselect(self, apply_id, sql, values):
self._get_table_list()

mulcur = list()
for num in self.table_list:
temp_c = 0
sql_tmp = sql
sql_tmp = self._replace(sql_tmp, self.user_apply, num)
sql_tmp = self._replace(sql_tmp, self.user_base, num)
sql_tmp = self._replace(sql_tmp, self.flows, num)

cur = self._execute(sql_tmp, values)
for row in cur:
temp_c = temp_c + 1
mulcur.append(row)
logger.info('apply_id:%d _mulselect sql:%s, values:%s, result:%s',
apply_id, sql_tmp, values, temp_c)

return mulcur

def mulselect(self, sql, values=[], apply_id=0, check=False, log=True):
'''
多表查询接口
1、支持mysql基本查询,不支持聚集函数和分组排序等
'''
sql = sql.replace('\n', '')
if check and not self._check_parameter(sql, values):
return
if log:
logger.info('apply_id:%d mulselect sql:%s, values:%s', apply_id,
sql, values)
cur = self._mulselect(apply_id, sql, values)
for row in cur:
yield row

def sinselect(self, sql, values=[], apply_id=0, check=False, log=True):
sql = sql.replace('\n', '')
if check and not self._check_parameter(sql, values):
return
#过渡期间,增加deleted_at值判断
sql = self._replace(sql, self.user_apply, num=0)
sql = self._replace(sql, self.user_base, num=0)
sql = self._replace(sql, self.flows, num=0)

if log:
logger.info('apply_id:%d sinselect sql:%s, values:%s', apply_id,
sql, values)
cur = self._execute(sql, values)
for row in cur:
yield row

def select(self, sql, values=[], apply_id=0, check=False, log=True):
sql = sql.replace('\n', '')
if check and not self._check_parameter(sql, values):
return
if log:
logger.info('apply_id:%d select sql:%s, values:%s', apply_id, sql,
values)
cur = self._execute(sql, values)
for row in cur:
yield row

def execute(self, sql, values=[], apply_id=0, check=False, log=True):
sql = sql.replace('\n', '')
if check and not self._check_parameter(sql, values):
return
if log:
logger.info('apply_id:%d execute sql:%s, values:%s', apply_id, sql,
values)
cur = self._execute(sql, values)


redis_op = RedisOps(
host=config.redis_host, port=config.redis_port, password=config.redis_pwd, db=config.redis_db)


mysql_op = MysqlOps(
host=config.mysql_host,
port=config.mysql_port,
user=config.mysql_user,
passwd=config.mysql_pwd,
db=config.mysql_db)


if __name__ == '__main__':
print(dir(redis_op))
print(dir(mysql_op))

日志记录

使用logging模块,配置轮转

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : log.py
# @Author : cgDeepLearn
# @Create Date : 2020/11/12-2:50 下午


import logging
import logging.config

logger = logging.getLogger("debug")


def init_log(log_path, log_name, log_level="DEBUG"):
"""初始化log
log_path:日志保存路径
log_name:日志文件名
log_level:日志级别,默认为DEBUG
"""
log_level = log_level.upper()
LOG_PATH_DEBUG = "%s/%s.log" % (log_path, log_name)
LOG_PATH_ERROR = "%s/process_server_error.log" % log_path # 错误日志路径
LOG_FILE_BACKUP_COUNT = 7 # 日志保存天数
# 日志配置字典
log_conf = {
"version": 1,
"formatters": {
"format1": {
"format": # 日志格式
'%(asctime)-15s [%(thread)d] - [%(filename)s %(lineno)d] %(levelname)s %(message)s',
},
},
"handlers": {
"handler1": {
"class": "logging.handlers.TimedRotatingFileHandler",
"level": log_level,
"formatter": "format1",
"when": "midnight",
"backupCount": LOG_FILE_BACKUP_COUNT,
"filename": LOG_PATH_DEBUG
},
"handler2": {
"class": "logging.handlers.TimedRotatingFileHandler",
"level": 'ERROR',
"formatter": "format1",
"when": "midnight",
"backupCount": LOG_FILE_BACKUP_COUNT,
"filename": LOG_PATH_ERROR
},
},
"loggers": {
"debug": {
"handlers": ["handler1", "handler2"],
"level": log_level
},
}
}
logging.config.dictConfig(log_conf)


def close_log():
logging.shutdown()

请求返回

我们编写了一个expose装饰器来做路由:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : decorator.py
# @Author : cgDeepLearn
# @Create Date : 2020/11/12-2:56 下午

import time
from functools import wraps
from utils.log import logger
from werkzeug.routing import Map, Rule


url_map = Map()


def expose(rule, **kw):
def decorate(f):
kw['endpoint'] = f.__name__
url_map.add(Rule(rule, **kw))
return f

return decorate
  • 对应url_map绑定到环境:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @File : app.py
    # @Author : cgDeepLearn
    # @Create Date : 2020/11/12-2:47 下午


    from gevent import monkey
    monkey.patch_all()

    from traceback import format_exc
    from werkzeug.wrappers import Request, Response
    from werkzeug.exceptions import HTTPException, NotFound, MethodNotAllowed, BadRequest
    from server import g_server
    from utils.log import logger
    from decorator import url_map


    def g_app(environ, start_response):
    request = Request(environ)
    adapter = url_map.bind_to_environ(environ)
    try:
    endpoint, values = adapter.match()
    response = getattr(g_server, endpoint)(request, **values)
    except (NotFound, MethodNotAllowed, BadRequest) as e:
    response = e
    except HTTPException as e:
    response = e
    except:
    Response()
    response = Response('Uncatched Error')
    logger.error('app uncatched error, exception:%s', format_exc())
    return response(environ, start_response)
  • 请求/test响应示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : server.py
# @Author : edgar.chen
# @Create Date : 2020/11/12-2:47 下午

import os
import json
import uuid
from werkzeug.wrappers import Response
from werkzeug.exceptions import BadRequest
from traceback import format_exc

from conf import config
from utils.log import logger, init_log
from utils.errors import ServerProcessError, ParameterError
from decorator import timer, expose
from api import TestApi

class Server(object):

def __init__(self):
self._pid = os.getpid()

os.makedirs(config.log_path) if not os.path.exists(config.log_path) else None
log_name = "{}.{}".format(config.log_name, self._pid)
init_log(config.log_path, log_name)
logger.debug('server is running...')

def _rsp_encode(self, rsp):
return json.dumps(rsp, separators=(',', ':'))

def _req_decode(self, req):
try:
data = json.loads(req)
return data
except Exception:
raise ParameterError("post data not json string!")

def _errcode(self, code=0, msg='ok'):
return dict(errCode=code, errMsg=msg, err_code=code, err_msg=msg)

def _response(self, response):
"""包装返回结果"""
response['version'] = config.version
encode_response = self._rsp_encode(response)
return Response(encode_response, mimetype='application/json')

def _get_query_args(self, data, apply_detail):
"""获取检验请求参数"""
try:
apply_detail['order_id'] = int(data["order_id"])
except:
raise ParameterError('request params not complete or format not right')

@expose('/test', methods=['POST'])
@timer
def api_test(self, request):
"""请求/test"""
try:
req_id = str(uuid.uuid1())
request_data = request.get_data()
if not request_data:
raise BadRequest()

logger.debug('req_id: [{}] - request:{}'.format(req_id, request_data))
# 得到请求参数字典
data = self._req_decode(request_data)

# 获取需要的请求参数
apply_detail = dict()
apply_detail['req_id'] = req_id
self._get_query_args(data, apply_detail)

# 结果
res_data = TestApi(req_data=apply_detail).process()

# 返回
response = self._errcode(0)
order_id = apply_detail["order_id"]
result = {"order_id": order_id,
"uuid": req_id,
"data": res_data}
response.update(result)
logger.debug('req_id: [%s] - order_id: %s, predict_org, response:%s', req_id, order_id, response)
# 异常处理
except BadRequest:
logger.error('bad request, request params needed!')

response = self._errcode(-2, 'bad request, request params needed!')

except ParameterError as e:
logger.error(str(e))
response = self._errcode(-4, str(e))

except ServerProcessError as e:
logger.error('req_id: [%s] - apply_id: [%s] except: %s' % (req_id, order_id, str(e)))
response = self._errcode(-3, str(e))

except:
logger.error('req_id: [%s] - apply_id: [%s] except: %s' % (req_id, order_id, format_exc()))
response = self._errcode(-1, 'server error')
finally:
return self._response(response)


g_server = Server()

如果我们想要新增一个路由,只需再server模块增加对应处理即可

  • 业务逻辑简答示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @File : api.py
    # @Author : cgDeepLearn
    # @Create Date : 2020/11/16-3:40 下午

    import datetime
    import random


    class TestApi(object):
    def __init__(self, req_data):
    self.req_data = req_data

    def process(self):
    """
    处理返回
    """
    results = [-1, 0, 1]
    res = {
    'timestamp': datetime.datetime.now().timestamp(),
    'result': random.choice(results)
    }
    return res

gunicorn 配置

我们使用gevent协程工作方式,配置对应服务器ip、端口、进程数等参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : gunicorn_config.py
# @Author : cgDeepLearn
# @Create Date : 2020/11/12-3:17 下午


import os
import datetime

server_ip = '127.0.0.1' # 服务器地址
# linux get ip
# server_ip = os.popen('ifconfig eth0|grep inet|grep -v 127.0.0.1|grep -v inet6|awk \'{print $2}\'|tr -d "addr:"').readlines()[0].strip('\r\n')
server_port = 10001 # 端口号
bind = '%s:%s' % (server_ip, server_port)
workers = 1 # 工作进程数
keepalive = 600 # 保持连接时间,10分钟,避免过多短连接

backlog = 2048 # the maximum number of pending connections
worker_connections = 2048 # the maximum number of simultaneous clients
worker_class = 'gevent' # worker进程的工作方式。 有 sync, eventlet, gevent, tornado, gthread, 缺省值sync。我们使用gevent协程方式


loglevel = 'info'
# daemon = False # 应用是否以daemon方式运行。
script_path = os.path.dirname(os.path.abspath(__file__))
work_path = os.path.dirname(script_path)
log_name = 'server'
pidfile = '{}/gunicorn.pid'.format(script_path)
errorlog = '{}/log/gunicorn_error.log'.format(work_path)
chdir = '{}/src'.format(work_path)


def worker_exit(server, worker):
"""工作进程 退出钩子,添加之前日志日期"""
pid = worker.pid
logfile = os.path.join(work_path, 'log/{}.{}.log'.format(log_name,pid))

newfile = os.path.join(work_path, 'log/{}.{}.log.{}'.format(log_name,
pid, datetime.datetime.now().strftime('%Y-%m-%d')))
if os.path.exists(logfile):
os.rename(logfile, newfile)

启动脚本

脚本里使用gunicorn启动服务、通过kill命令停止、重载或者重启服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#!/bin/sh

# IP=`/sbin/ifconfig | grep 'inet addr:'| grep -v '127.0.0.1' | cut -d: -f2 | awk '{ print $1}'` # linux
BASE_DIR=` cd "$(dirname "$0")"; cd ..; pwd `

PROCESS_NAME=${BASE_DIR##*/}
CONFIG_PATH=$BASE_DIR/script/gunicorn_config.py
PID_FILE=$BASE_DIR/script/gunicorn.pid
CMD="gunicorn --daemon -c $CONFIG_PATH app:g_app "

function start_gunicorn() {
cd $BASE_DIR
$CMD
echo "$PROCESS_NAME is running."
}

function stop_gunicorn() {
if [ -f $PID_FILE ]; then
PID=`cat $PID_FILE`
echo "gunicorn: kill PID=${PID}"
kill -TERM ${PID}
else
echo "gunicorn: $PID_FILE not exists..."
fi
}

function reload_gunicorn() {
if [ -f $PID_FILE ]; then
PID=`cat $PID_FILE`
kill -HUP $PID
echo "$PID reloaded..."
else
echo "$PID_FILE not exists..."
fi
}

function restart_gunicorn () {
stop_gunicorn
sleep 1
start_gunicorn
}

case "$1" in
start)
start_gunicorn
exit $?
;;
stop)
stop_gunicorn
exit $?
;;
restart)
restart_gunicorn
exit $?
;;
reload)
reload_gunicorn
exit $?
;;
*)
echo "Usage: $0 { start | stop | restart | reload }"
exit 1
;;
esac

请求示例

  • 请求:

    1
    curl http://127.0.0.1:10001/test -d '{"order_id":12345}'
  • 返回:

    1
    {"order_id": 12345,"uuid": "e3bd4a7e-35eb-11eb-bd7e-5254008f07ce","err_msg":"ok","err_code":0,"data": {"timestamp":1589193745,"result":1}}

小结

以上我们使用Werkzeuggeventgunicorn实现了一个轻量的python服务,修改配置、添加对应业务逻辑即可快速开发一个业务对应的微服务。项目git代码: https://github.com/cgDeepLearn/pyserver

此外

  • 我们还可以使用supervisor来控制服务的监控拉起。
  • 使用docker来进行容器化管理
  • 使用cookiecutter来生成可配置的代码

  • 使用serviceMesh 来进行服务注册发现

-------------阅读完毕吐槽一番吧~-------------
0%