python异常处理
try的工作原理是,当开始一个try语句后,python就在当前程序的上下文中作标记,这样当异常出现时就可以回到这里,try子句先执行,接下来会发生什么依赖于执行时是否出现异常。
try:
<语句> #运行别的代码
except <名字>:
<语句> #如果在try部份引发了'name'异常
except <名字>,<数据>:
<语句> #如果引发了'name'异常,获得附加的数据
else:
<语句> #如果没有异常发生
finaly:
<语句> #总会执行
try的工作原理是,当开始一个try语句后,python就在当前程序的上下文中作标记,这样当异常出现时就可以回到这里,try子句先执行,接下来会发生什么依赖于执行时是否出现异常。
- 如果当try后的语句执行时发生异常,python就跳回到try并执行第一个匹配该异常的except子句,异常处理完毕,控制流就通过整个try语句(除非在处理异常时又引发新的异常)。
- 如果在try后的语句里发生了异常,却没有匹配的except子句,异常将被递交到上层的try,或者到程序的最上层(这样将结束程序,并打印默认的出错信息)。
- 如果在try子句执行时没有发生异常,python将执行else语句后的语句(如果有else的话),然后控制流通过整个try语句。
触发异常
raise [Exception [, args [, traceback]]]
flask
def add_app_hook(app):
@app.errorhandler(APIException)
def handler_api_exception(exception):
return http_error_handler(exception)
@app.errorhandler(Exception)
def handler_all_exception(exception):
return http_error_handler(exception)
django
viewset(raise)
class UsersGenericMixinAPIView(ListModelMixin,
CreateModelMixin,
RetrieveModelMixin,
UpdateModelMixin,
DestroyModelMixin,
GenericViewSet):
queryset = Work.objects.all()
serializer_class =
filter_class =
filter_backends = (DjangoFilterBackend, OrderingFilter)
ordering = ("name",)
pagination_class = None
def create(self, request, *args, **kwargs):
try:
...
except:
raize MyError
return Response()
APIView(no raise)
class UserView(APIView):
def get(req):
try:
...
except:
return Response()
return Response()
gin
func RecoveryMiddleware(f func(c *gin.Context, err error)) gin.HandlerFunc {
return func(c *gin.Context) {
defer func() {
if err := recover(); err != nil {
f(c, err.(error))
}
}()
c.Next()
}
}
func StopExec(err error) {
if err == nil {
return
}
panic(err)
}
引申出来的一个问题,如何保证一个程序的健康
- 异常检测处理
- 生命周期记录-日志
flask生命周期
def log_app(app): from flask import request @app.before_request def log_request(): request_uuid = getattr(thread_local, "log_uuid", None) if not request_uuid: thread_local.log_uuid = request_uuid = str(uuid4()) LOG.info( "Request %s: Method: %s, Url: %s, Headers: %s, Body: %s", *(request_uuid, request.method, request.url, request.headers, request.get_data()) ) @app.after_request def log_response(response): response.direct_passthrough = False request_uuid = getattr(thread_local, "log_uuid", None) if not request_uuid: thread_local.log_uuid = request_uuid = str(uuid4()) LOG.info( "Response %s: Code: %s, Headers: %s, Body: %s", *(request_uuid, response.status_code, response.headers, response.get_data()) ) return response
gin生命周期
```go func GinLogMiddleware() gin.HandlerFunc { // 一个日志地方,处理两个信息 // 一个是记录输入 // 一个是记录最后的输出 // 一个请求的完整的生命周期都可以看得到 return func(c *gin.Context) { startT := time.Now() path := c.Request.RequestURI method := c.Request.Method ctype := strings.ToLower(c.Request.Header.Get(“Content-Type”)) clientIp := c.ClientIP() reqMsg := fmt.Sprintf(“[Req][%s][%s][%s][%s]”, method, path, clientIp, ctype)
// 判断是否打印 header if PrintHeader { hmsg := "" for k, v := range c.Request.Header { val := strings.Join(v, " ") hmsg += fmt.Sprintf("%s:%s\n", k, val) } if hmsg != "" { reqMsg += "\n" + hmsg[0: len(hmsg) - 1] } } // 判断是否有 request body,如果有,就转存读取 if c.Request.ContentLength > 0 && !isIgnoreReadBodyPath(c.Request.URL.Path) { body, err := ioutil.ReadAll(c.Request.Body) if err != nil { // 忽略掉错误,不继续处理 Logger.Errorf(GetReqId(c), "读取请求body失败, %s", err.Error()) } else { // 还原回去 c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(body)) if len(body) > 0 { reqMsg += "\n" + string(body) } } } Logger.Info(GetReqId(c), reqMsg) // rewrite writer,方便后续转存数据 c.Writer = &respWriter{ ResponseWriter: c.Writer, cache: bytes.NewBufferString(""), } c.Next() // 下面的内容会在请求结束后执行 statusCode := c.Writer.Status() respBody := "" respMsg := fmt.Sprintf("[Resp][%s][%s][%d]", method, path, statusCode) rw, ok := c.Writer.(*respWriter) if !ok { Logger.Warnf(GetReqId(c), "处理response数据,转回respwriter失败") } else { if rw.cache.Len() > 0 && !isIgnoreReadBodyPath(c.Request.URL.Path) { respBody = "\n" + rw.cache.String() } } latency := time.Now().Sub(startT) respMsg += fmt.Sprintf("[%v]", latency) if respBody != "" { respMsg += respBody } Logger.Info(GetReqId(c), respMsg) } }
```