使用Flask SocketIO实现WebSocket
使用 HTML 5 的 WebSocket 实现实时交互通信功能,替代 Ajax 轮训等方法,个人觉得比较适合实时监控类网站。我就尝试使用 Flask-SocketIO 实现支持 WebSocket 的服务器端,使用 socket.io 库实现客户端。
一个简单的 Flask-SocketIO 应用
仅完成就简单的通讯功能。
服务器端
使用 HTML 5 的 WebSocket 实现实时交互通信功能,替代 Ajax 轮训等方法,个人觉得比较适合实时监控类网站。我就尝试使用 Flask-SocketIO 实现支持 WebSocket 的服务器端,使用 socket.io 库实现客户端。
一个简单的 Flask-SocketIO 应用
仅完成就简单的通讯功能。
服务器端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from
flask
import
Flask
,
render_template
from
flask
.
ext
.
socketio
import
SocketIO
,
emit
app
=
Flask
(
__name__
)
app
.
debug
=
True
app
.
config
[
'SECRET_KEY'
]
=
'windroc-nwpc-project'
socketio
=
SocketIO
(
app
)
@
app
.
route
(
'/'
)
def
get_index_page
(
)
:
return
render_template
(
'index.html'
)
@
socketio
.
on
(
'connect'
,
namespace
=
'/test'
)
def
test_connect
(
)
:
emit
(
'my response'
,
{
'data'
:
'Connected'
,
'count'
:
0
}
)
@
socketio
.
on
(
'my event'
,
namespace
=
'/test'
)
def
test_message
(
message
)
:
emit
(
'my response'
,
{
'data'
:
message
[
'data'
]
,
'count'
:
2
}
)
if
__name__
==
"__main__"
:
socketio
.
run
(
app
,
host
=
'0.0.0.0'
,
port
=
5101
)
|
创建 Flask-SocketIO 应用的方法与 Flask 应用相似
1
2
3
|
app
=
Flask
(
__name__
)
socketio
=
SocketIO
(
app
)
|
开发时简单运行的方法也与不同的 Flask 应用相同。
1
2
3
|
if
__name__
==
"__main__"
:
socketio
.
run
(
app
,
host
=
'0.0.0.0'
,
port
=
5101
)
|
SocketIO 使用 event 表示客户端和服务端接受到的消息。后面可以看到,客户端使用 Javascript 回调函数处理。服务器端使用类似视图的路由注册函数来处理事件。
1
2
3
4
5
6
7
8
|
@
socketio
.
on
(
'connect'
,
namespace
=
'/test'
)
def
test_connect
(
)
:
emit
(
'my response'
,
{
'data'
:
'Connected'
,
'count'
:
0
}
)
@
socketio
.
on
(
'my event'
,
namespace
=
'/test'
)
def
test_message
(
message
)
:
emit
(
'my response'
,
{
'data'
:
message
[
'data'
]
,
'count'
:
2
}
)
|
客户端
使用 socket.io 的客户端 js 库
1
2
|
<script
type
=
"text/javascript"
src
=
"//cdnjs.cloudflare.com/ajax/libs/socket.io/0.9.16/socket.io.min.js"
>
</script>
|
监听 socket 事件的 js 代码:
1
2
3
4
5
6
7
|
$
(
document
)
.
ready
(
function
(
)
{
var
socket
=
io
.
connect
(
'http://127.0.0.1:5101/test'
)
;
socket
.
on
(
'connect'
,
function
(
)
{
socket
.
emit
(
'my event'
,
{
data
:
'I\'m connected!'
}
)
;
}
)
;
}
)
;
|
当 socket 连接成功时,向服务器发送 my event 事件,上一节的服务端使用 test_message 函数处理 my event 事件,向客户端发送 my response 事件。
下面看一个稍微复杂一些的应用,实现一个有实际价值的功能。
一个简单的实时监控应用
单位服务器使用 LoadLeveler 管理任务调度,绝大部分作业脚本都会提交到 LoadLeveler 上运行,很多时候需要运维使用 llq 命令查看 LoadLeveler 队列中的任务状态。或许可以开发一种可以实时获取信息的工具,以更直观的方式展示数据。
从最简单入手,实时获取 LoadLeveler 队列中任务数量和每种状态的任务数量。
数据生成
一个典型的 llq 输出如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
<
br
/
>
Id
Owner
Submitted
ST
PRI
Class
Running
On
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
-
--
--
-
--
--
--
--
--
--
--
--
--
--
--
-
cma20n01
.
1737149.0
nwp
_qu
5
/
11
03
:
20
R
100
serial_op
cma18n05
cma20n02
.
2425650.0
nwp
_qu
5
/
11
03
:
20
R
100
serial_op
cma18n07
.
.
.
cma20n01
.
1738085.0
bcccsmxp
5
/
11
06
:
41
I
50
special2
.
.
.
149
job
step
(
s
)
in
queue
,
57
waiting
,
0
pending
,
92
running
,
0
held
,
0
preempted
Id
Owner
Submitted
ST
PRI
Class
Running
On
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
-
--
--
-
--
--
--
--
--
--
--
--
--
--
--
-
cma20n01
.
1737149.0
nwp
_qu
5
/
11
03
:
20
R
100
serial_op
cma18n05
cma20n02
.
2425650.0
nwp
_qu
5
/
11
03
:
20
R
100
serial_op
cma18n07
.
.
.
cma20n01
.
1738085.0
bcccsmxp
5
/
11
06
:
41
I
50
special2
.
.
.
|
149 job step(s) in queue, 57 waiting, 0 pending, 92 running, 0 held, 0 preempted
其中…表示省略的条目。上述要求其实就是解析 llq 输出的最后一行,使用正则表达式解析。
1
2
3
4
5
6
7
8
9
10
11
|
total_pattern
=
"^(\d+) job step\(s\) in queue, (\d+) waiting, (\d+) pending, (\d+) running, (\d+) held, (\d+) preempted"
total_prog
=
re
.
compile
(
total_pattern
)
total_prog_result
=
total_prog
.
match
(
result_lines
[
-
2
]
)
llq_summary
=
dict
(
)
llq_summary
[
'in_queue'
]
=
total_prog_result
.
group
(
1
)
llq_summary
[
'waiting'
]
=
total_prog_result
.
group
(
2
)
llq_summary
[
'pending'
]
=
total_prog_result
.
group
(
3
)
llq_summary
[
'running'
]
=
total_prog_result
.
group
(
4
)
llq_summary
[
'held'
]
=
total_prog_result
.
group
(
6
)
llq_summary
[
'preempted'
]
=
total_prog_result
.
group
(
5
)
|
另外,该程序不在单位的服务器上运行,需要用 ssh 连接到服务器。我使用 paramiko 库执行远程命令。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
bin_path
=
'llq'
bin_param
=
''
try
:
ssh
=
paramiko
.
SSHClient
(
)
ssh
.
set_missing_host_key_policy
(
paramiko
.
AutoAddPolicy
(
)
)
ssh
.
connect
(
hostname
,
port
,
username
,
password
)
ssh_command
=
bin_path
+
' '
+
bin_param
ssh_stdin
,
ssh_stdout
,
ssh_stderr
=
ssh
.
exec_command
(
ssh_command
)
except
paramiko
.
SSHException
,
e
:
print
e
return
"{'error':'error'}"
result_lines
=
ssh_stdout
.
read
(
)
.
split
(
"\n"
)
ssh
.
close
(
)
|
如此可以得到数据。
应用结构
socket 服务器接受外界传来的 llq 队列消息,并将该消息发送给连接到 socket 服务器的所有客户端。因为对 socket 不够了解,我还是使用传统的 HTTP 方式传送消息到服务器,再使用 Flask-SocketIO 发送消息到客户端。
服务器端
添加一个 http 路由函数,Flask-SocketIO 对象可以使用 emit 函数直接向客户端发送消息。
1
2
3
4
5
6
7
8
9
10
11
|
@
app
.
route
(
'/api/v1/hpc/llq/info'
,
methods
=
[
'POST'
]
)
def
get_hpc_llq_info
(
)
:
r
=
request
hpc_llq_info_message
=
json
.
loads
(
request
.
form
[
'message'
]
)
print
"Receive llq info:"
,
hpc_llq_info_message
socketio
.
emit
(
'send_llq_info'
,
hpc_llq_info_message
,
namespace
=
'/hpc'
)
result
=
{
'status'
:
'ok'
}
return
jsonify
(
result
)
|
数据收集
每隔一段时间收集一次数据:
1
2
3
4
5
6
7
8
9
|
while
True
:
quota_result
=
hpcloadleveler
.
get_llq
(
hostname
,
port
,
username
,
password
)
logging
.
info
(
quota_result
[
'total'
]
)
post_data
=
{
'message'
:
json
.
dumps
(
quota_result
[
'total'
]
)
}
requests
.
post
(
post_url
,
data
=
post_data
)
time
.
sleep
(
2
)
|
客户端
接受数据后,更新界面显示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
var
socket
=
io
.
connect
(
'http://127.0.0.1:5101/hpc'
)
;
socket
.
on
(
'connect'
,
function
(
)
{
console
.
log
(
'I\'m connected!'
)
;
}
)
;
socket
.
on
(
'send_llq_info'
,
function
(
msg
)
{
/
*
console
.
log
(
msg
)
;
*
/
var
total
=
parseInt
(
msg
.
in_queue
)
;
var
waiting
=
parseInt
(
msg
.
waiting
)
;
var
held
=
parseInt
(
msg
.
held
)
;
var
running
=
parseInt
(
msg
.
running
)
;
var
pending
=
parseInt
(
msg
.
pending
)
;
var
preempted
=
parseInt
(
msg
.
preempted
)
;
var
llq_info
=
[
{
name
:
'total'
,
value
:
total
}
,
{
name
:
'waiting'
,
value
:
waiting
}
,
{
name
:
'pending'
,
value
:
pending
}
,
{
name
:
'running'
,
value
:
running
}
,
{
name
:
'held'
,
value
:
held
}
,
{
name
:
'preempted'
,
value
:
preempted
}
]
;
update_llq_type_chart
(
llq_info
)
;
$
(
'#total_llq_job_number'
)
.
html
(
total
)
;
$
(
'#waiting_llq_job_number'
)
.
html
(
waiting
)
;
$
(
'#held_llq_job_number'
)
.
html
(
held
)
;
$
(
'#running_llq_job_number'
)
.
html
(
running
)
;
$
(
'#pending_llq_job_number'
)
.
html
(
pending
)
;
$
(
'#preempted_llq_job_number'
)
.
html
(
preempted
)
;
}
)
;
|
创建 Flask-SocketIO 应用的方法与 Flask 应用相似
1
2
3
|
app
=
Flask
(
__name__
)
socketio
=
SocketIO
(
app
)
|
开发时简单运行的方法也与不同的 Flask 应用相同。
1
2
3
|
if
__name__
==
"__main__"
:
socketio
.
run
(
app
,
host
=
'0.0.0.0'
,
port
=
5101
)
|
SocketIO 使用 event 表示客户端和服务端接受到的消息。后面可以看到,客户端使用 Javascript 回调函数处理。服务器端使用类似视图的路由注册函数来处理事件。
1
2
3
4
5
6
7
8
|
@
socketio
.
on
(
'connect'
,
namespace
=
'/test'
)
def
test_connect
(
)
:
emit
(
'my response'
,
{
'data'
:
'Connected'
,
'count'
:
0
}
)
@
socketio
.
on
(
'my event'
,
namespace
=
'/test'
)
def
test_message
(
message
)
:
emit
(
'my response'
,
{
'data'
:
message
[
'data'
]
,
'count'
:
2
}
)
|
客户端
使用 socket.io 的客户端 js 库
1
2
|
<script
type
=
"text/javascript"
src
=
"//cdnjs.cloudflare.com/ajax/libs/socket.io/0.9.16/socket.io.min.js"
>
</script>
|
监听 socket 事件的 js 代码:
1
2
3
4
5
6
7
|
$
(
document
)
.
ready
(
function
(
)
{
var
socket
=
io
.
connect
(
'http://127.0.0.1:5101/test'
)
;
socket
.
on
(
'connect'
,
function
(
)
{
socket
.
emit
(
'my event'
,
{
data
:
'I\'m connected!'
}
)
;
}
)
;
}
)
;
|
当 socket 连接成功时,向服务器发送 my event 事件,上一节的服务端使用 test_message 函数处理 my event 事件,向客户端发送 my response 事件。
下面看一个稍微复杂一些的应用,实现一个有实际价值的功能。
一个简单的实时监控应用
单位服务器使用 LoadLeveler 管理任务调度,绝大部分作业脚本都会提交到 LoadLeveler 上运行,很多时候需要运维使用 llq 命令查看 LoadLeveler 队列中的任务状态。或许可以开发一种可以实时获取信息的工具,以更直观的方式展示数据。
从最简单入手,实时获取 LoadLeveler 队列中任务数量和每种状态的任务数量。
数据生成
一个典型的 llq 输出如下:
1
2
3
4
5
6
7
8
9
10
|
Id
Owner
Submitted
ST
PRI
Class
Running
On
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
--
-
--
--
-
--
--
--
--
--
--
--
--
--
--
--
-
cma20n01
.
1737149.0
nwp
_qu
5
/
11
03
:
20
R
100
serial_op
cma18n05
cma20n02
.
2425650.0
nwp
_qu
5
/
11
03
:
20
R
100
serial_op
cma18n07
.
.
.
cma20n01
.
1738085.0
bcccsmxp
5
/
11
06
:
41
I
50
special2
.
.
.
149
job
step
(
s
)
in
queue
,
57
waiting
,
0
pending
,
92
running
,
0
held
,
0
preempted
|
其中…表示省略的条目。上述要求其实就是解析 llq 输出的最后一行,使用正则表达式解析。
1
2
3
4
5
6
7
8
9
10
11
|
total_pattern
=
"^(\d+) job step\(s\) in queue, (\d+) waiting, (\d+) pending, (\d+) running, (\d+) held, (\d+) preempted"
total_prog
=
re
.
compile
(
total_pattern
)
total_prog_result
=
total_prog
.
match
(
result_lines
[
-
2
]
)
llq_summary
=
dict
(
)
llq_summary
[
'in_queue'
]
=
total_prog_result
.
group
(
1
)
llq_summary
[
'waiting'
]
=
total_prog_result
.
group
(
2
)
llq_summary
[
'pending'
]
=
total_prog_result
.
group
(
3
)
llq_summary
[
'running'
]
=
total_prog_result
.
group
(
4
)
llq_summary
[
'held'
]
=
total_prog_result
.
group
(
6
)
llq_summary
[
'preempted'
]
=
total_prog_result
.
group
(
5
)
|
另外,该程序不在单位的服务器上运行,需要用 ssh 连接到服务器。我使用 paramiko 库执行远程命令。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
bin_path
=
'llq'
bin_param
=
''
try
:
ssh
=
paramiko
.
SSHClient
(
)
ssh
.
set_missing_host_key_policy
(
paramiko
.
AutoAddPolicy
(
)
)
ssh
.
connect
(
hostname
,
port
,
username
,
password
)
ssh_command
=
bin_path
+
' '
+
bin_param
ssh_stdin
,
ssh_stdout
,
ssh_stderr
=
ssh
.
exec_command
(
ssh_command
)
except
paramiko
.
SSHException
,
e
:
print
e
return
"{'error':'error'}"
result_lines
=
ssh_stdout
.
read
(
)
.
split
(
"\n"
)
ssh
.
close
(
)
|
如此可以得到数据。
应用结构
socket 服务器接受外界传来的 llq 队列消息,并将该消息发送给连接到 socket 服务器的所有客户端。因为对 socket 不够了解,我还是使用传统的 HTTP 方式传送消息到服务器,再使用 Flask-SocketIO 发送消息到客户端。
服务器端
添加一个 http 路由函数,Flask-SocketIO 对象可以使用 emit 函数直接向客户端发送消息
1
2
3
4
5
6
7
8
9
10
11
|
@
app
.
route
(
'/api/v1/hpc/llq/info'
,
methods
=
[
'POST'
]
)
def
get_hpc_llq_info
(
)
:
r
=
request
hpc_llq_info_message
=
json
.
loads
(
request
.
form
[
'message'
]
)
print
"Receive llq info:"
,
hpc_llq_info_message
socketio
.
emit
(
'send_llq_info'
,
hpc_llq_info_message
,
namespace
=
'/hpc'
)
result
=
{
'status'
:
'ok'
}
return
jsonify
(
result
)
|
数据收集
每隔一段时间收集一次数据:
1
2
3
4
5
6
7
8
9
|
while
True
:
quota_result
=
hpcloadleveler
.
get_llq
(
hostname
,
port
,
username
,
password
)
logging
.
info
(
quota_result
[
'total'
]
)
post_data
=
{
'message'
:
json
.
dumps
(
quota_result
[
'total'
]
)
}
requests
.
post
(
post_url
,
data
=
post_data
)
time
.
sleep
(
2
)
|
客户端
接受数据后,更新界面显示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
var
socket
=
io
.
connect
(
'http://127.0.0.1:5101/hpc'
)
;
socket
.
on
(
'connect'
,
function
(
)
{
console
.
log
(
'I\'m connected!'
)
;
}
)
;
socket
.
on
(
'send_llq_info'
,
function
(
msg
)
{
/
*
console
.
log
(
msg
)
;
*
/
var
total
=
parseInt
(
msg
.
in_queue
)
;
var
waiting
=
parseInt
(
msg
.
waiting
)
;
var
held
=
parseInt
(
msg
.
held
)
;
var
running
=
parseInt
(
msg
.
running
)
;
var
pending
=
parseInt
(
msg
.
pending
)
;
var
preempted
=
parseInt
(
msg
.
preempted
)
;
var
llq_info
=
[
{
name
:
'total'
,
value
:
total
}
,
{
name
:
'waiting'
,
value
:
waiting
}
,
{
name
:
'pending'
,
value
:
pending
}
,
{
name
:
'running'
,
value
:
running
}
,
{
name
:
'held'
,
value
:
held
}
,
{
name
:
'preempted'
,
value
:
preempted
}
]
;
update_llq_type_chart
(
llq_info
)
;
$
(
'#total_llq_job_number'
)
.
html
(
total
)
;
$
(
'#waiting_llq_job_number'
)
.
html
(
waiting
)
;
$
(
'#held_llq_job_number'
)
.
html
(
held
)
;
$
(
'#running_llq_job_number'
)
.
html
(
running
)
;
$
(
'#pending_llq_job_number'
)
.
html
(
pending
)
;
$
(
'#preempted_llq_job_number'
)
.
html
(
preempted
)
;
}
)
;
|