基于Tornado的异步非阻塞自定义Web框架

自定义异步非阻塞Web框架

支持异步非阻塞Web框架– Tornado,Nodejs

  • 异步IO模块:
    我们作为客户端向服务端发起“并发”请求,select监听socket是否已经有变化
    future= Future()主要的

    1. 挂起当前请求,线程可以处理其他请求
    2. future设置值,当前挂起的请求返回
  • IO请求不占CPU,计算型需要使用CPU

  • 自定义web框架
    • 同步
      • 从Socket开始
      • 使用IO多路复用,当不发请求的时候,其他的可以过来,只是监听有没有变化
      • 步骤:
        • 请求头中获取url,将其处理为字典格式
        • 去路由中匹配,获取指定的函数
        • 执行函数,获取返回值
        • 将返回值send回去
    • 异步
      • 使用Future()对象,只要是future对象,来一个请求,只要future的result里面还没有给值,就可以将该请求挂起,仍然可以执行其他的请求,当future对象设置了值的时候,表示该请求结束,断开连接
      • 使用Future()对象,通过设置超时时间来判断请求是否断开,当请求到来时,不用管它,可以处理其他的请求,当连接的时间超过了预先设定的超时时间的时候,主动断开连接,结束请求,将socket移除

说明:详细的WebSocket讲解地址

使用

1.基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from snow import Snow
from snow import HttpResponse


def index(request):
return HttpResponse('OK')


routes = [
(r'/index/', index),
]

app = Snow(routes)
app.run(port=8012)
  1. 异步非阻塞:超时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from snow import Snow
from snow import HttpResponse
from snow import TimeoutFuture

request_list = []


def async(request):
obj = TimeoutFuture(5)
yield obj


def home(request):
return HttpResponse('home')


routes = [
(r'/home/', home),
(r'/async/', async),
]

app = Snow(routes)
app.run(port=8012)
  1. 异步非阻塞,等待
    基于等待模式可以完成自定制操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from snow import Snow
from snow import HttpResponse
from snow import Future

request_list = []


def callback(request, future):
return HttpResponse(future.value)


def req(request):
obj = Future(callback=callback)
request_list.append(obj)
yield obj


def stop(request):
obj = request_list[0]
del request_list[0]
obj.set_result('done')
return HttpResponse('stop')


routes = [
(r'/req/', req),
(r'/stop/', stop),
]

app = Snow(routes)
app.run(port=8012)

源码

源码下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import re
import socket
import select
import time


class HttpResponse(object):
"""
封装响应信息
"""
def __init__(self, content=''):
self.content = content

self.headers = {}
self.cookies = {}

def response(self):
return bytes(self.content, encoding='utf-8')


class HttpNotFound(HttpResponse):
"""
404时的错误提示
"""
def __init__(self):
super(HttpNotFound, self).__init__('404 Not Found')


class HttpRequest(object):
"""
用户封装用户请求信息
"""
def __init__(self, conn):
self.conn = conn

self.header_bytes = bytes()
self.header_dict = {}
self.body_bytes = bytes()

self.method = ""
self.url = ""
self.protocol = ""

self.initialize()
self.initialize_headers()

def initialize(self):

header_flag = False
while True:
try:
received = self.conn.recv(8096)
except Exception as e:
received = None
if not received:
break
if header_flag:
self.body_bytes += received
continue
temp = received.split(b'\r\n\r\n', 1)
if len(temp) == 1:
self.header_bytes += temp
else:
h, b = temp
self.header_bytes += h
self.body_bytes += b
header_flag = True

@property
def header_str(self):
return str(self.header_bytes, encoding='utf-8')

def initialize_headers(self):
headers = self.header_str.split('\r\n')
first_line = headers[0].split(' ')
if len(first_line) == 3:
self.method, self.url, self.protocol = headers[0].split(' ')
for line in headers:
kv = line.split(':')
if len(kv) == 2:
k, v = kv
self.header_dict[k] = v


class Future(object):
"""
异步非阻塞模式时封装回调函数以及是否准备就绪
"""
def __init__(self, callback):
self.callback = callback
self._ready = False
self.value = None

def set_result(self, value=None):
self.value = value
self._ready = True

@property
def ready(self):
return self._ready


class TimeoutFuture(Future):
"""
异步非阻塞超时
"""
def __init__(self, timeout):
super(TimeoutFuture, self).__init__(callback=None)
self.timeout = timeout
self.start_time = time.time()

@property
def ready(self):
current_time = time.time()
if current_time > self.start_time + self.timeout:
self._ready = True
return self._ready


class Snow(object):
"""
微型Web框架类
"""
def __init__(self, routes):
self.routes = routes
self.inputs = set()
self.request = None
self.async_request_handler = {}

def run(self, host='localhost', port=9999):
"""
事件循环
:param host:
:param port:
:return:
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port,))
sock.setblocking(False)
sock.listen(128)
sock.setblocking(0)
self.inputs.add(sock)
try:
while True:
readable_list, writeable_list, error_list = select.select(self.inputs, [], self.inputs,0.005)
for conn in readable_list:
if sock == conn:
client, address = conn.accept()
client.setblocking(False)
self.inputs.add(client)
else:
gen = self.process(conn)
if isinstance(gen, HttpResponse):
conn.sendall(gen.response())
self.inputs.remove(conn)
conn.close()
else:
yielded = next(gen)
self.async_request_handler[conn] = yielded
self.polling_callback()

except Exception as e:
pass
finally:
sock.close()

def polling_callback(self):
"""
遍历触发异步非阻塞的回调函数
:return:
"""
for conn in list(self.async_request_handler.keys()):
yielded = self.async_request_handler[conn]
if not yielded.ready:
continue
if yielded.callback:
ret = yielded.callback(self.request, yielded)
conn.sendall(ret.response())
self.inputs.remove(conn)
del self.async_request_handler[conn]
conn.close()

def process(self, conn):
"""
处理路由系统以及执行函数
:param conn:
:return:
"""
self.request = HttpRequest(conn)
func = None
for route in self.routes:
if re.match(route[0], self.request.url):
func = route[1]
break
if not func:
return HttpNotFound()
else:
return func(self.request)

snow.py