Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

2.14. GPS

2.14.1. gpsdclient

	
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################
# Home	: https://www.netkiller.cn
# Author: Neo <netkiller@msn.com>
# Upgrade: 2023-07-27
##############################################
try:
    import random, time, os, sys, re, requests, platform, shelve
    import logging, logging.handlers
    from geopy.distance import distance
    from paho.mqtt import publish, client
    from gpsdclient import GPSDClient
    from datetime import datetime
    from urllib.parse import urlencode
    from optparse import OptionParser, OptionGroup
    from queue import Queue

    module = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/common"
    sys.path.insert(0, ".")
    sys.path.insert(0, module)
    from config import ConfigFile
except ImportError as err:
    print("ImportError: %s" % (err))


class Ropeway(ConfigFile):
    speed = "缆车加速运行,请坐稳扶好"
    welcome = "欢迎乘坐华山缆车"

    broadcasts = {}
    broadcasts["up"] = {}
    broadcasts["down"] = {}
    done = {}
    done["up"] = {}
    done["down"] = {}

    def __init__(self, debug=False) -> None:
        usage = "usage: %prog [options] start|stop|restart"
        self.parser = OptionParser(usage)
        self.parser.add_option(
            "-c",
            "--config",
            dest="config",
            help="config file",
            default="/srv/config.ini",
            metavar="/srv/config.ini",
        )
        self.parser.add_option(
            "-l",
            "--logfile",
            dest="logfile",
            help="log file",
            default=None,
            metavar="/tmp/" + os.path.basename(__file__).replace(".py", ".log"),
        )
        self.parser.add_option("", "--debug", action="store_true", dest="debug", help="debug mode")
        self.parser.add_option("-d", "--daemon", dest="daemon", action="store_true", help="run as daemon")
        self.parser.add_option("", "--demo", dest="demo", action="store_true", help="run as demo")
        (self.options, self.args) = self.parser.parse_args()
        try:
            super().__init__(cfgfile=self.options.config, logfile=self.options.logfile)
        except Exception as e:
            print(e)
            exit()

        if self.options.logfile:
            logging.basicConfig(level=logging.NOTSET, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", filename=self.options.logfile, filemode="a")
        else:
            logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
        self.logging = logging.getLogger()

        self.host = self.option("ropeway", "host")
        self.port = int(self.option("ropeway", "port"))
        self.username = self.option("ropeway", "username")
        self.password = self.option("ropeway", "password")
        self.topic = self.option("ropeway", "topic")

        self.queue = Queue(maxsize=10)
        self.id = self.machineId()

    def machineId(self):
        id = ""
        if platform.system() == "Linux":
            with open("/etc/machine-id", "r") as file:
                id = file.read()
        else:
            id = platform.system()
        return id

    def broadcast(self, msg):
        try:
            client_id = f"ropeway-{random.randint(0, 1000)}"
            auth = None
            if self.username:
                auth = {"username": self.username, "password": self.password}
            publish.single(self.topic, payload=msg, qos=2, retain=False, hostname=self.host, port=self.port, client_id=client_id, keepalive=60, will=None, auth=auth, tls=None, protocol=client.MQTTv5, transport="tcp")
            self.logging.info(f"Broadcast topic: {self.topic}, msg: {msg}")
        except Exception as err:
            self.logging.error(f"MQTT: {str(err)}")

    def is_business_hours(self):
        current_time = datetime.now().time().strftime("%H:%M")
        # current_time = "09:30"
        start_time_str = "07:00"
        end_time_str = "19:30"

        time_obj = datetime.strptime(current_time, "%H:%M").time()
        start_time_obj = datetime.strptime(start_time_str, "%H:%M").time()
        end_time_obj = datetime.strptime(end_time_str, "%H:%M").time()

        if start_time_obj <= time_obj <= end_time_obj:
            return True
        else:
            return False

    def traccar(self, gps):
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        params = {"id": self.id, "lat": gps["lat"], "lon": gps["lon"], "timestamp": timestamp, "hdop": "3", "altitude": gps["alt"], "speed": gps["speed"]}
        response = requests.get(self.option("telpher", "traccar"), params=params)
        self.logging.info("%s, %s" % (response.status_code, response.url))

    def match(self, gps):
        deviation = float(20)

        if self.climb() > 0:
            direction = "up"
        else:
            direction = "down"

        broadcasts = self.broadcasts[direction]

        for alt, text in broadcasts.items():
            altitude = float(alt)

            if self.done[direction][alt]:
                # self.logging.debug(f"Skip: {alt} - {altitude}")
                continue
            elif (gps["alt"] - deviation) < altitude and (gps["alt"] + deviation) > altitude:
                self.broadcast(text)
                self.reset(direction, alt)
                # self.logging.debug(f"Altitude: {gps['alt']}, {alt} - {altitude} - {deviation}")
                self.logging.debug(f"Altitude: {gps}, {alt} - {text}")
                # self.traccar(gps)
            elif gps["speed"] > 4 and self.speed:
                self.logging.debug(f"Speed: {self.speed}")
                self.broadcast(self.speed)
                self.speed = None
            # else:
            #     print("Speed skip: ", self.speed)

        # for broadcast in self.broadcasts:
        #     if (gps["alt"] - deviation) < broadcast["altitude"] and (gps["alt"] + deviation) > broadcast["altitude"]:
        #         if "mountainbottom" in broadcast and broadcast["mountainbottom"]:
        #             broadcast["mountainbottom"] = False
        #             broadcast["mountaintop"] = True
        #             self.broadcast(broadcast["text"])
        #             # time.sleep(3)
        #             # self.broadcast(broadcast["promotion"])
        #             self.logging.debug("mountainbottom", broadcast)
        #         elif "mountaintop" in broadcast and broadcast["mountaintop"]:
        #             broadcast["mountainbottom"] = True
        #             broadcast["mountaintop"] = False
        #             self.broadcast(broadcast["text"])
        #             # time.sleep(3)
        #             # self.broadcast(broadcast["promotion"])
        #             self.logging.debug("mountaintop", broadcast)
        #         else:
        #             self.broadcast(broadcast["text"])
        #             # time.sleep(3)
        #             # self.broadcast(broadcast["promotion"])
        #             # self.traccar(gps)
        #             self.logging.debug("Range: ", broadcast["text"])
        # else:
        #     print("Skip: ", broadcast)

    def daemon(self):
        pid = os.fork()
        if pid > 0:
            sys.exit(0)

    def load(self):
        with shelve.open("/var/tmp/telpher") as db:
            for high, text in db.items():
                self.broadcasts[high] = text
        self.reset()
        self.logging.info(f"Load broadcasts: {self.broadcasts}")

    def reset(self, direction=None, altitude=None):
        for alt, text in self.broadcasts["up"].items():
            self.done["up"][alt] = False
        for alt, text in self.broadcasts["down"].items():
            self.done["down"][alt] = False

        if direction and altitude:
            self.done[direction][altitude] = True

        self.logging.info(f"Reset: {self.done}")

    def climb(self):
        sum = 0
        for n in range(5):
            if not self.queue.empty():
                sum += self.queue.get(block=False)
        # self.logging.info(f"Climb: {sum}")
        return sum

    def gather(self, climb):
        if self.queue.full():
            self.queue.get()
        self.queue.put(climb, block=False)

    def demo(self):
        # self.load()
        self.broadcasts = {
            "up": {"700": "海拔700米", "750": "各位游客,欢迎来到三特索道。三特索道采用先进的技术和设备,安全可靠,让您安心畅游山间。让我们一起享受这段美妙的旅程吧!,你可以这样问我,西岳华山有基座山峰?", "800": "海拔800米", "850": "乘坐北峰索道的时候,有可欣赏到一处石景。在一处巨大的石壁之上,中间部分凹陷进去,形成了像似耳朵又像似半块月亮的一处石景,传说乘坐北峰索道时,见此奇景,可祈福许愿,通过月耳崖,便可上达天听,愿望就会实现。", "900": "海拔900米"},
            "down": {
                "2000": "海拔2000米",
                "1800": "自古华山一条道”的徒步登山道。你可以这样问我,西岳华山都有哪些名人",
                "1500": "海拔1500米",
                "1300": "马上要到站了,我来带你去华山论剑了。你可以这样问我,华山论剑在那座山峰?",
                "1200": "海拔1200米",
                "1000": "各位大侠请顺着缆车运营方向往右看,有一个形状酷似手指的奇石,我们叫他先人指路。你可以这样问我,华山有哪些美丽的景色?",
            },
        }
        self.reset()
        self.broadcast("缆车系统准备就绪")
        time.sleep(2)
        for alt in range(700, 1000, 10):
            gps = {
                "lat": 0,
                "lon": 0,
                "alt": float(alt),
                "speed": 4.5,
                "climb": round(random.uniform(0.0, 10), 2),
            }
            self.gather(gps["climb"])
            self.logging.debug(gps)
            self.match(gps)
            time.sleep(random.randint(1, 3))

        for alt in range(2000, 1000, -50):
            gps = {
                "lat": 0,
                "lon": 0,
                "alt": float(alt),
                "speed": 4.5,
                "climb": round(random.uniform(-0.0, -10), 2),
            }
            self.gather(gps["climb"])
            self.logging.debug(gps)
            self.match(gps)
            time.sleep(random.randint(1, 3))

    def run(self):
        self.load()
        self.broadcast("缆车系统准备就绪")
        try:
            with GPSDClient() as client:
                for result in client.dict_stream(convert_datetime=True, filter=["TPV"]):
                    time.sleep(1)
                    lat = result.get("lat", 0.0)
                    lon = result.get("lon", 0.0)
                    alt = result.get("alt", 0.0)
                    climb = result.get("climb", 0.0)
                    speed = result.get("speed", 0.0)
                    point = (lat, lon)
                    gps = {
                        "lat": lat,
                        "lon": lon,
                        "alt": alt,
                        "speed": speed,
                        "climb": climb,
                    }

                    if lat == 0.0 or lon == 0.0:
                        continue
                    if not self.is_business_hours():
                        continue

                    self.gather(climb)
                    # self.logging.debug(result)
                    self.logging.debug(gps)
                    self.match(gps)

                    # elif alt < self.mountaintop and self.highSwitch == True:
                    #     self.broadcast(self.broadcasts["welcome"])
                    #     self.traccar(gps)
                    #     self.highSwitch = False
                    # elif alt < self.mountainbottom and self.highSwitch == False:
                    #     self.broadcast(self.broadcasts["welcome"])
                    #     self.traccar(gps)
                    #     self.highSwitch = True

                    # for broadcast in self.broadcasts["coordinate"]:
                    #     self.logging.debug(broadcast)
                    #     dist = distance(point, broadcast["point"]).m
                    #     self.logging.info("两点间的距离为:{:.2f}米".format(dist))
                    #     if dist < 50:
                    #         self.broadcast(broadcast["text"])
                    #         self.traccar(gps)
                    #         # time.sleep(broadcast["sleep"])

        except Exception as err:
            self.logging.error("GPSDClient %s" % err)

    def main(self):
        if self.options.debug:
            print("=" * 50)
            print(self.options, self.args)
            print("=" * 50)

        if self.options.daemon:
            self.daemon()
        # print(self.args)
        # if not self.args:
        #     self.parser.print_help()
        #     exit()
        # else:
        if self.options.demo:
            self.demo()
        else:
            self.run()


if __name__ == "__main__":
    try:
        ropeway = Ropeway()
        ropeway.main()

    except KeyboardInterrupt:
        print("Crtl+C Pressed. Shutting down.")
	
	
		

2.14.2. Traccar

Traccar 是一个地图服务,把GPS坐标推送给 Traccar 之后,可以在 Traccar 上看到移动轨迹。

		
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################
# Home	: https://www.netkiller.cn
# Author: Neo <netkiller@msn.com>
# Upgrade: 2023-06-26
##############################################
try:
    import random, time, os, sys, re, requests
    import logging, logging.handlers
    from geopy.distance import distance
    from paho.mqtt import publish, client
    from gpsdclient import GPSDClient
    from datetime import datetime
    from urllib.parse import urlencode
    from optparse import OptionParser, OptionGroup

except ImportError as err:
    print("ImportError: %s" % (err))


class Traccar:
    host = ""

    def __init__(self, debug=False) -> None:
        usage = "usage: %prog [options] identifier"
        self.parser = OptionParser(usage)
        self.parser.add_option("", "--host", dest="host", help="记录日志文件", default=None, metavar="https://host:5055")
        self.parser.add_option("-m", "--machine-id", action="store_true", dest="machine", help="使用 Linux 机器ID作为设备ID")
        self.parser.add_option(
            "-l",
            "--logfile",
            dest="logfile",
            help="记录日志文件",
            default=None,
            metavar="/tmp/" + os.path.basename(__file__).replace(".py", ".log"),
        )
        self.parser.add_option("-t", "--test", action="store_true", dest="test", help="数据推送测试")
        self.parser.add_option("-d", "--daemon", dest="daemon", action="store_true", help="后台运行")
        self.parser.add_option("", "--debug", action="store_true", dest="debug", help="调试模式")

        (self.options, self.args) = self.parser.parse_args()

        if self.options.logfile:
            logging.basicConfig(level=logging.NOTSET, format="%(asctime)s %(levelname)-6s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", filename=self.options.logfile, filemode="a")
        else:
            logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s %(levelname)-6s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
        self.logging = logging.getLogger()

    def machineId(self):
        id = ""
        try:
            with open("/etc/machine-id", "r") as file:
                id = file.read()
        except Exception as err:
            self.logging.error(err)
        return id

    def osmand(self, gps):
        try:
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            params = {"id": self.id, "lat": gps["lat"], "lon": gps["lon"], "timestamp": timestamp, "hdop": "3", "altitude": gps["alt"], "speed": gps["speed"]}
            response = requests.get(self.host, params=params)
            self.logging.info(f"Traccar osmand: {response.status_code}, {response.reason}")
        except Exception as err:
            self.logging.error(err)

    def test(self):
        data = {
            "lat": f"31.23{random.randint(0, 1000)}",
            "lon": f"121.47{random.randint(0, 1000)}",
            "alt": f"{random.randint(0, 1000)}",
            "speed": f"{random.randint(0, 100)}",
            "climb": -0.5,
        }
        self.osmand(data)

    def daemon(self):
        pid = os.fork()
        if pid > 0:
            sys.exit(0)

    def run(self):
        try:
            with GPSDClient() as client:
                for result in client.dict_stream(convert_datetime=True, filter=["TPV"]):
                    lat = result.get("lat", 0.0)
                    lon = result.get("lon", 0.0)
                    alt = result.get("alt", 0.0)
                    climb = result.get("climb", 0.0)
                    speed = result.get("speed", 0.0)

                    data = {
                        "lat": lat,
                        "lon": lon,
                        "alt": alt,
                        "speed": speed,
                        "climb": climb,
                    }
                    if lat == 0.0 or lon == 0.0:
                        time.sleep(1)
                        continue
                    self.logging.info(data)
                    self.osmand(data)
                    time.sleep(1)

        except Exception as err:
            self.logging.error("GPSDClient %s" % err)

    def help(self):
        self.parser.print_help()
        exit()

    def main(self):
        if self.options.debug:
            print("=" * 50)
            print(self.options, self.args)
            print("=" * 50)

        if self.options.daemon:
            self.daemon()

        if self.options.host:
            self.host = self.options.host
        else:
            self.host = "http://47.100.253.187:5055"
            # self.help()

        if self.options.machine:
            self.id = self.machineId()
            self.run()
            exit()

        if not self.args:
            self.help()
        else:
            self.id = self.args[0]

        if self.options.test:
            self.test()
            exit()

        self.run()


if __name__ == "__main__":
    try:
        traccar = Traccar()
        traccar.main()

    except KeyboardInterrupt:
        print("Crtl+C Pressed. Shutting down.")