您的当前位置:首页正文

【PyQt5】在地图上绘制运行轨迹(前后端分离)

2024-11-27 来源:个人技术集锦

效果:

后端:(两个http加一个websocket接口)

import json,sys,os,rosbag,time
from flask import Flask, jsonify
from flask_cors import CORS
from flask_sockets import Sockets
from gevent import monkey,pywsgi
from geventwebsocket.handler import WebSocketHandler
from analysisData import analysis_2dMap,analysis_poseData

mapPath = r"D:\czc\bagSever\bag\siwei_bag\siwei_bag\mapping"
PCDPath = r"D:\czc\bagSever\bag\map\map\test.pcd"
PGMPath = r"D:\czc\bagSever\bag\map\map\map.pgm"

app = Flask(__name__)
sockets = Sockets(app)
CORS(app, supports_credentials=True)

sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
sys.path.append("..")

monkey.patch_all()  # 非阻塞

bagList = []

def FlaskSever():
    @app.route("/queryBag", methods=["POST"])
    def queryBag():
        dirList = os.listdir(mapPath)
        global bagList
        bagList = []
        for file in dirList:
            if file[-4:] == ".bag":
                bagList.append(file)
        queryResult = []
        for i in range(len(bagList)):
            queryResult.append({"fileName": bagList[i]})

        return jsonify(fileNameList=queryResult)

    @sockets.route('/queryPoseData')
    def queryPoseData(ws):
        data = ws.receive()
        data = json.loads(data)
        print(data)
        # data = {"index":0}
        index = data["index"]
        FileName = bagList[index]
        bagFile = mapPath + "/" + FileName
        bag = rosbag.Bag(bagFile, "r")
        poseData = bag.read_messages('/updated_pose')
        analysisPoseData = analysis_poseData(poseData)
        while not ws.closed:
            item = next(analysisPoseData)
            sendData_dict = json.dumps({"x": item["x"], "y":item["y"], "z":item["z"]}, ensure_ascii=False)
            ws.send(sendData_dict)
            time.sleep(0.01)

    @app.route('/query2DMap', methods=["POST"])
    def query2DMap():
        analysis2dMap_list = analysis_2dMap(PGMPath)
        return jsonify(
            type="map2DData",
            mapData= analysis2dMap_list,
            resolution=0.2,
            origin=[-238.371414, -137.214859, 0.000000],
            originGps=[10355.72169, 3044.70807]
        )

    server = pywsgi.WSGIServer(('0.0.0.0', 5100), app, handler_class=WebSocketHandler)
    print('server start')
    server.serve_forever()

if __name__ == '__main__':
    FlaskSever()

前端:(部分展示)

import sys,time,websocket,json,requests,threading
from PyQt5 import QtGui,QtCore
from home_pane import homePane
from chooseBag_pane import chooseBagPane
import matplotlib.pyplot as plt
from PyQt5.QtWidgets import QApplication


# 定义一个接收小车运行轨迹的方法,用一个子线程调用该方法
def recvPointsOfCar():
    global XList, YList
    ws = websocket.WebSocket()
    ws.connect("ws://127.0.0.1:5100/queryPoseData")
    sendData_dict = json.dumps(
        {
            "index": BagIndex,
        },
        ensure_ascii=False
    )
    ws.send(sendData_dict)
    while True:
        if IsEnd == True:
            break
        else:
            if IsPlay == True:
                try:
                    data = json.loads(ws.recv())
                    XList.append(data["x"] / 0.2 + 238.371414 / 0.2)
                    YList.append(-data["y"] / 0.2 + 137.214859 / 0.2 - 200.214859)
                except:
                    break
            else:
                time.sleep(0.5)

if __name__ == '__main__':
    app = QApplication(sys.argv)

    home_pane = homePane()
    chooseBag_pane = chooseBagPane()

    home_pane.show()

    # 默认参数
    BagIndex = 0  # 默认选第一个bag
    IsPlay = True
    IsEnd = False
    IsStartThread = False  # 表示线程是否已经启动
    XList, YList = [], []

    responseJson_query2DMap = requests.post("http://127.0.0.1:5100/query2DMap").json()  # 使用http请求获取服务端的地图数据
    home_pane.imageData = responseJson_query2DMap["mapData"]  # 提取地图数据
    plt.imshow(home_pane.imageData, plt.cm.gray)  # 画灰度地图
    home_pane.canvas.draw()

    """home_pane"""
    def chooseTrackPoints():
        chooseBag_pane.show()
    home_pane.chooseTrackPoints_signal.connect(chooseTrackPoints)

    def playAndPause():
        global IsPlay,IsEnd
        IsEnd = False
        if home_pane.clickNum % 2 == 0:
            IsPlay = True
            icon = QtGui.QIcon()
            icon.addPixmap(QtGui.QPixmap("resource/images/pause.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off)
            home_pane.pushButton.setIcon(icon)
            home_pane.pushButton.setIconSize(QtCore.QSize(30, 30))

            global IsStartThread
            if IsStartThread == False:
                thread_obj = threading.Thread(target=recvPointsOfCar)
                thread_obj.setDaemon(True)
                thread_obj.start()
                IsStartThread = True
            else:
                pass
        else:
            IsPlay = False
            icon = QtGui.QIcon()
            icon.addPixmap(QtGui.QPixmap("resource/images/play.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off)
            home_pane.pushButton.setIcon(icon)
            home_pane.pushButton.setIconSize(QtCore.QSize(30, 30))
        home_pane.clickNum += 1
    home_pane.playAndPause_signal.connect(playAndPause)

    def update_plot():
        global XList,YList
        # print(XList)
        plt.cla()  # 清空画布
        plt.imshow(home_pane.imageData, plt.cm.gray)
        plt.plot(XList, YList, color='red', linewidth=0.1, linestyle='-', marker='.')
        home_pane.canvas.draw()
    home_pane.update_plot_signal.connect(update_plot)

    def stopPlay():
        global IsEnd
        IsEnd = True
    home_pane.stopPlay_signal.connect(stopPlay)

    """chooseBag_pane"""
    def saveBagIndex(bagIndex):
        global BagIndex
        BagIndex = bagIndex
        # print(BagIndex)
    chooseBag_pane.saveBagIndex_signal.connect(saveBagIndex)

    sys.exit(app.exec())
显示全文