知乎专栏 |
#!/usr/bin/env python # -*- coding: utf-8 -*- ############################################## # Home : https://www.netkiller.cn # Author: Neo <netkiller@msn.com> # Upgrade: 2023-07-27 ############################################## try: import random, time, os, sys, re, requests, platform, shelve import logging, logging.handlers from geopy.distance import distance from paho.mqtt import publish, client from gpsdclient import GPSDClient from datetime import datetime from urllib.parse import urlencode from optparse import OptionParser, OptionGroup from queue import Queue module = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/common" sys.path.insert(0, ".") sys.path.insert(0, module) from config import ConfigFile except ImportError as err: print("ImportError: %s" % (err)) class Ropeway(ConfigFile): speed = "缆车加速运行,请坐稳扶好" welcome = "欢迎乘坐华山缆车" broadcasts = {} broadcasts["up"] = {} broadcasts["down"] = {} done = {} done["up"] = {} done["down"] = {} def __init__(self, debug=False) -> None: usage = "usage: %prog [options] start|stop|restart" self.parser = OptionParser(usage) self.parser.add_option( "-c", "--config", dest="config", help="config file", default="/srv/config.ini", metavar="/srv/config.ini", ) self.parser.add_option( "-l", "--logfile", dest="logfile", help="log file", default=None, metavar="/tmp/" + os.path.basename(__file__).replace(".py", ".log"), ) self.parser.add_option("", "--debug", action="store_true", dest="debug", help="debug mode") self.parser.add_option("-d", "--daemon", dest="daemon", action="store_true", help="run as daemon") self.parser.add_option("", "--demo", dest="demo", action="store_true", help="run as demo") (self.options, self.args) = self.parser.parse_args() try: super().__init__(cfgfile=self.options.config, logfile=self.options.logfile) except Exception as e: print(e) exit() if self.options.logfile: logging.basicConfig(level=logging.NOTSET, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", filename=self.options.logfile, filemode="a") else: logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S") self.logging = logging.getLogger() self.host = self.option("ropeway", "host") self.port = int(self.option("ropeway", "port")) self.username = self.option("ropeway", "username") self.password = self.option("ropeway", "password") self.topic = self.option("ropeway", "topic") self.queue = Queue(maxsize=10) self.id = self.machineId() def machineId(self): id = "" if platform.system() == "Linux": with open("/etc/machine-id", "r") as file: id = file.read() else: id = platform.system() return id def broadcast(self, msg): try: client_id = f"ropeway-{random.randint(0, 1000)}" auth = None if self.username: auth = {"username": self.username, "password": self.password} publish.single(self.topic, payload=msg, qos=2, retain=False, hostname=self.host, port=self.port, client_id=client_id, keepalive=60, will=None, auth=auth, tls=None, protocol=client.MQTTv5, transport="tcp") self.logging.info(f"Broadcast topic: {self.topic}, msg: {msg}") except Exception as err: self.logging.error(f"MQTT: {str(err)}") def is_business_hours(self): current_time = datetime.now().time().strftime("%H:%M") # current_time = "09:30" start_time_str = "07:00" end_time_str = "19:30" time_obj = datetime.strptime(current_time, "%H:%M").time() start_time_obj = datetime.strptime(start_time_str, "%H:%M").time() end_time_obj = datetime.strptime(end_time_str, "%H:%M").time() if start_time_obj <= time_obj <= end_time_obj: return True else: return False def traccar(self, gps): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") params = {"id": self.id, "lat": gps["lat"], "lon": gps["lon"], "timestamp": timestamp, "hdop": "3", "altitude": gps["alt"], "speed": gps["speed"]} response = requests.get(self.option("telpher", "traccar"), params=params) self.logging.info("%s, %s" % (response.status_code, response.url)) def match(self, gps): deviation = float(20) if self.climb() > 0: direction = "up" else: direction = "down" broadcasts = self.broadcasts[direction] for alt, text in broadcasts.items(): altitude = float(alt) if self.done[direction][alt]: # self.logging.debug(f"Skip: {alt} - {altitude}") continue elif (gps["alt"] - deviation) < altitude and (gps["alt"] + deviation) > altitude: self.broadcast(text) self.reset(direction, alt) # self.logging.debug(f"Altitude: {gps['alt']}, {alt} - {altitude} - {deviation}") self.logging.debug(f"Altitude: {gps}, {alt} - {text}") # self.traccar(gps) elif gps["speed"] > 4 and self.speed: self.logging.debug(f"Speed: {self.speed}") self.broadcast(self.speed) self.speed = None # else: # print("Speed skip: ", self.speed) # for broadcast in self.broadcasts: # if (gps["alt"] - deviation) < broadcast["altitude"] and (gps["alt"] + deviation) > broadcast["altitude"]: # if "mountainbottom" in broadcast and broadcast["mountainbottom"]: # broadcast["mountainbottom"] = False # broadcast["mountaintop"] = True # self.broadcast(broadcast["text"]) # # time.sleep(3) # # self.broadcast(broadcast["promotion"]) # self.logging.debug("mountainbottom", broadcast) # elif "mountaintop" in broadcast and broadcast["mountaintop"]: # broadcast["mountainbottom"] = True # broadcast["mountaintop"] = False # self.broadcast(broadcast["text"]) # # time.sleep(3) # # self.broadcast(broadcast["promotion"]) # self.logging.debug("mountaintop", broadcast) # else: # self.broadcast(broadcast["text"]) # # time.sleep(3) # # self.broadcast(broadcast["promotion"]) # # self.traccar(gps) # self.logging.debug("Range: ", broadcast["text"]) # else: # print("Skip: ", broadcast) def daemon(self): pid = os.fork() if pid > 0: sys.exit(0) def load(self): with shelve.open("/var/tmp/telpher") as db: for high, text in db.items(): self.broadcasts[high] = text self.reset() self.logging.info(f"Load broadcasts: {self.broadcasts}") def reset(self, direction=None, altitude=None): for alt, text in self.broadcasts["up"].items(): self.done["up"][alt] = False for alt, text in self.broadcasts["down"].items(): self.done["down"][alt] = False if direction and altitude: self.done[direction][altitude] = True self.logging.info(f"Reset: {self.done}") def climb(self): sum = 0 for n in range(5): if not self.queue.empty(): sum += self.queue.get(block=False) # self.logging.info(f"Climb: {sum}") return sum def gather(self, climb): if self.queue.full(): self.queue.get() self.queue.put(climb, block=False) def demo(self): # self.load() self.broadcasts = { "up": {"700": "海拔700米", "750": "各位游客,欢迎来到三特索道。三特索道采用先进的技术和设备,安全可靠,让您安心畅游山间。让我们一起享受这段美妙的旅程吧!,你可以这样问我,西岳华山有基座山峰?", "800": "海拔800米", "850": "乘坐北峰索道的时候,有可欣赏到一处石景。在一处巨大的石壁之上,中间部分凹陷进去,形成了像似耳朵又像似半块月亮的一处石景,传说乘坐北峰索道时,见此奇景,可祈福许愿,通过月耳崖,便可上达天听,愿望就会实现。", "900": "海拔900米"}, "down": { "2000": "海拔2000米", "1800": "自古华山一条道”的徒步登山道。你可以这样问我,西岳华山都有哪些名人", "1500": "海拔1500米", "1300": "马上要到站了,我来带你去华山论剑了。你可以这样问我,华山论剑在那座山峰?", "1200": "海拔1200米", "1000": "各位大侠请顺着缆车运营方向往右看,有一个形状酷似手指的奇石,我们叫他先人指路。你可以这样问我,华山有哪些美丽的景色?", }, } self.reset() self.broadcast("缆车系统准备就绪") time.sleep(2) for alt in range(700, 1000, 10): gps = { "lat": 0, "lon": 0, "alt": float(alt), "speed": 4.5, "climb": round(random.uniform(0.0, 10), 2), } self.gather(gps["climb"]) self.logging.debug(gps) self.match(gps) time.sleep(random.randint(1, 3)) for alt in range(2000, 1000, -50): gps = { "lat": 0, "lon": 0, "alt": float(alt), "speed": 4.5, "climb": round(random.uniform(-0.0, -10), 2), } self.gather(gps["climb"]) self.logging.debug(gps) self.match(gps) time.sleep(random.randint(1, 3)) def run(self): self.load() self.broadcast("缆车系统准备就绪") try: with GPSDClient() as client: for result in client.dict_stream(convert_datetime=True, filter=["TPV"]): time.sleep(1) lat = result.get("lat", 0.0) lon = result.get("lon", 0.0) alt = result.get("alt", 0.0) climb = result.get("climb", 0.0) speed = result.get("speed", 0.0) point = (lat, lon) gps = { "lat": lat, "lon": lon, "alt": alt, "speed": speed, "climb": climb, } if lat == 0.0 or lon == 0.0: continue if not self.is_business_hours(): continue self.gather(climb) # self.logging.debug(result) self.logging.debug(gps) self.match(gps) # elif alt < self.mountaintop and self.highSwitch == True: # self.broadcast(self.broadcasts["welcome"]) # self.traccar(gps) # self.highSwitch = False # elif alt < self.mountainbottom and self.highSwitch == False: # self.broadcast(self.broadcasts["welcome"]) # self.traccar(gps) # self.highSwitch = True # for broadcast in self.broadcasts["coordinate"]: # self.logging.debug(broadcast) # dist = distance(point, broadcast["point"]).m # self.logging.info("两点间的距离为:{:.2f}米".format(dist)) # if dist < 50: # self.broadcast(broadcast["text"]) # self.traccar(gps) # # time.sleep(broadcast["sleep"]) except Exception as err: self.logging.error("GPSDClient %s" % err) def main(self): if self.options.debug: print("=" * 50) print(self.options, self.args) print("=" * 50) if self.options.daemon: self.daemon() # print(self.args) # if not self.args: # self.parser.print_help() # exit() # else: if self.options.demo: self.demo() else: self.run() if __name__ == "__main__": try: ropeway = Ropeway() ropeway.main() except KeyboardInterrupt: print("Crtl+C Pressed. Shutting down.")
Traccar 是一个地图服务,把GPS坐标推送给 Traccar 之后,可以在 Traccar 上看到移动轨迹。
#!/usr/bin/env python # -*- coding: utf-8 -*- ############################################## # Home : https://www.netkiller.cn # Author: Neo <netkiller@msn.com> # Upgrade: 2023-06-26 ############################################## try: import random, time, os, sys, re, requests import logging, logging.handlers from geopy.distance import distance from paho.mqtt import publish, client from gpsdclient import GPSDClient from datetime import datetime from urllib.parse import urlencode from optparse import OptionParser, OptionGroup except ImportError as err: print("ImportError: %s" % (err)) class Traccar: host = "" def __init__(self, debug=False) -> None: usage = "usage: %prog [options] identifier" self.parser = OptionParser(usage) self.parser.add_option("", "--host", dest="host", help="记录日志文件", default=None, metavar="https://host:5055") self.parser.add_option("-m", "--machine-id", action="store_true", dest="machine", help="使用 Linux 机器ID作为设备ID") self.parser.add_option( "-l", "--logfile", dest="logfile", help="记录日志文件", default=None, metavar="/tmp/" + os.path.basename(__file__).replace(".py", ".log"), ) self.parser.add_option("-t", "--test", action="store_true", dest="test", help="数据推送测试") self.parser.add_option("-d", "--daemon", dest="daemon", action="store_true", help="后台运行") self.parser.add_option("", "--debug", action="store_true", dest="debug", help="调试模式") (self.options, self.args) = self.parser.parse_args() if self.options.logfile: logging.basicConfig(level=logging.NOTSET, format="%(asctime)s %(levelname)-6s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", filename=self.options.logfile, filemode="a") else: logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s %(levelname)-6s %(message)s", datefmt="%Y-%m-%d %H:%M:%S") self.logging = logging.getLogger() def machineId(self): id = "" try: with open("/etc/machine-id", "r") as file: id = file.read() except Exception as err: self.logging.error(err) return id def osmand(self, gps): try: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") params = {"id": self.id, "lat": gps["lat"], "lon": gps["lon"], "timestamp": timestamp, "hdop": "3", "altitude": gps["alt"], "speed": gps["speed"]} response = requests.get(self.host, params=params) self.logging.info(f"Traccar osmand: {response.status_code}, {response.reason}") except Exception as err: self.logging.error(err) def test(self): data = { "lat": f"31.23{random.randint(0, 1000)}", "lon": f"121.47{random.randint(0, 1000)}", "alt": f"{random.randint(0, 1000)}", "speed": f"{random.randint(0, 100)}", "climb": -0.5, } self.osmand(data) def daemon(self): pid = os.fork() if pid > 0: sys.exit(0) def run(self): try: with GPSDClient() as client: for result in client.dict_stream(convert_datetime=True, filter=["TPV"]): lat = result.get("lat", 0.0) lon = result.get("lon", 0.0) alt = result.get("alt", 0.0) climb = result.get("climb", 0.0) speed = result.get("speed", 0.0) data = { "lat": lat, "lon": lon, "alt": alt, "speed": speed, "climb": climb, } if lat == 0.0 or lon == 0.0: time.sleep(1) continue self.logging.info(data) self.osmand(data) time.sleep(1) except Exception as err: self.logging.error("GPSDClient %s" % err) def help(self): self.parser.print_help() exit() def main(self): if self.options.debug: print("=" * 50) print(self.options, self.args) print("=" * 50) if self.options.daemon: self.daemon() if self.options.host: self.host = self.options.host else: self.host = "http://47.100.253.187:5055" # self.help() if self.options.machine: self.id = self.machineId() self.run() exit() if not self.args: self.help() else: self.id = self.args[0] if self.options.test: self.test() exit() self.run() if __name__ == "__main__": try: traccar = Traccar() traccar.main() except KeyboardInterrupt: print("Crtl+C Pressed. Shutting down.")