效果:
后端:(两个http加一个websocket接口)
import json,sys,os,rosbag,time
from flask import Flask, jsonify
from flask_cors import CORS
from flask_sockets import Sockets
from gevent import monkey,pywsgi
from geventwebsocket.handler import WebSocketHandler
from analysisData import analysis_2dMap,analysis_poseData
mapPath = r"D:\czc\bagSever\bag\siwei_bag\siwei_bag\mapping"
PCDPath = r"D:\czc\bagSever\bag\map\map\test.pcd"
PGMPath = r"D:\czc\bagSever\bag\map\map\map.pgm"
app = Flask(__name__)
sockets = Sockets(app)
CORS(app, supports_credentials=True)
sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
sys.path.append("..")
monkey.patch_all() # 非阻塞
bagList = []
def FlaskSever():
@app.route("/queryBag", methods=["POST"])
def queryBag():
dirList = os.listdir(mapPath)
global bagList
bagList = []
for file in dirList:
if file[-4:] == ".bag":
bagList.append(file)
queryResult = []
for i in range(len(bagList)):
queryResult.append({"fileName": bagList[i]})
return jsonify(fileNameList=queryResult)
@sockets.route('/queryPoseData')
def queryPoseData(ws):
data = ws.receive()
data = json.loads(data)
print(data)
# data = {"index":0}
index = data["index"]
FileName = bagList[index]
bagFile = mapPath + "/" + FileName
bag = rosbag.Bag(bagFile, "r")
poseData = bag.read_messages('/updated_pose')
analysisPoseData = analysis_poseData(poseData)
while not ws.closed:
item = next(analysisPoseData)
sendData_dict = json.dumps({"x": item["x"], "y":item["y"], "z":item["z"]}, ensure_ascii=False)
ws.send(sendData_dict)
time.sleep(0.01)
@app.route('/query2DMap', methods=["POST"])
def query2DMap():
analysis2dMap_list = analysis_2dMap(PGMPath)
return jsonify(
type="map2DData",
mapData= analysis2dMap_list,
resolution=0.2,
origin=[-238.371414, -137.214859, 0.000000],
originGps=[10355.72169, 3044.70807]
)
server = pywsgi.WSGIServer(('0.0.0.0', 5100), app, handler_class=WebSocketHandler)
print('server start')
server.serve_forever()
if __name__ == '__main__':
FlaskSever()
前端:(部分展示)
import sys,time,websocket,json,requests,threading
from PyQt5 import QtGui,QtCore
from home_pane import homePane
from chooseBag_pane import chooseBagPane
import matplotlib.pyplot as plt
from PyQt5.QtWidgets import QApplication
# 定义一个接收小车运行轨迹的方法,用一个子线程调用该方法
def recvPointsOfCar():
global XList, YList
ws = websocket.WebSocket()
ws.connect("ws://127.0.0.1:5100/queryPoseData")
sendData_dict = json.dumps(
{
"index": BagIndex,
},
ensure_ascii=False
)
ws.send(sendData_dict)
while True:
if IsEnd == True:
break
else:
if IsPlay == True:
try:
data = json.loads(ws.recv())
XList.append(data["x"] / 0.2 + 238.371414 / 0.2)
YList.append(-data["y"] / 0.2 + 137.214859 / 0.2 - 200.214859)
except:
break
else:
time.sleep(0.5)
if __name__ == '__main__':
app = QApplication(sys.argv)
home_pane = homePane()
chooseBag_pane = chooseBagPane()
home_pane.show()
# 默认参数
BagIndex = 0 # 默认选第一个bag
IsPlay = True
IsEnd = False
IsStartThread = False # 表示线程是否已经启动
XList, YList = [], []
responseJson_query2DMap = requests.post("http://127.0.0.1:5100/query2DMap").json() # 使用http请求获取服务端的地图数据
home_pane.imageData = responseJson_query2DMap["mapData"] # 提取地图数据
plt.imshow(home_pane.imageData, plt.cm.gray) # 画灰度地图
home_pane.canvas.draw()
"""home_pane"""
def chooseTrackPoints():
chooseBag_pane.show()
home_pane.chooseTrackPoints_signal.connect(chooseTrackPoints)
def playAndPause():
global IsPlay,IsEnd
IsEnd = False
if home_pane.clickNum % 2 == 0:
IsPlay = True
icon = QtGui.QIcon()
icon.addPixmap(QtGui.QPixmap("resource/images/pause.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off)
home_pane.pushButton.setIcon(icon)
home_pane.pushButton.setIconSize(QtCore.QSize(30, 30))
global IsStartThread
if IsStartThread == False:
thread_obj = threading.Thread(target=recvPointsOfCar)
thread_obj.setDaemon(True)
thread_obj.start()
IsStartThread = True
else:
pass
else:
IsPlay = False
icon = QtGui.QIcon()
icon.addPixmap(QtGui.QPixmap("resource/images/play.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off)
home_pane.pushButton.setIcon(icon)
home_pane.pushButton.setIconSize(QtCore.QSize(30, 30))
home_pane.clickNum += 1
home_pane.playAndPause_signal.connect(playAndPause)
def update_plot():
global XList,YList
# print(XList)
plt.cla() # 清空画布
plt.imshow(home_pane.imageData, plt.cm.gray)
plt.plot(XList, YList, color='red', linewidth=0.1, linestyle='-', marker='.')
home_pane.canvas.draw()
home_pane.update_plot_signal.connect(update_plot)
def stopPlay():
global IsEnd
IsEnd = True
home_pane.stopPlay_signal.connect(stopPlay)
"""chooseBag_pane"""
def saveBagIndex(bagIndex):
global BagIndex
BagIndex = bagIndex
# print(BagIndex)
chooseBag_pane.saveBagIndex_signal.connect(saveBagIndex)
sys.exit(app.exec())