| 知乎专栏 |
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################
# Home : https://www.netkiller.cn
# Author: Neo <netkiller@msn.com>
# Upgrade: 2023-07-27
##############################################
try:
import random, time, os, sys, re, requests, platform, shelve
import logging, logging.handlers
from geopy.distance import distance
from paho.mqtt import publish, client
from gpsdclient import GPSDClient
from datetime import datetime
from urllib.parse import urlencode
from optparse import OptionParser, OptionGroup
from queue import Queue
module = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/common"
sys.path.insert(0, ".")
sys.path.insert(0, module)
from config import ConfigFile
except ImportError as err:
print("ImportError: %s" % (err))
class Ropeway(ConfigFile):
speed = "缆车加速运行,请坐稳扶好"
welcome = "欢迎乘坐华山缆车"
broadcasts = {}
broadcasts["up"] = {}
broadcasts["down"] = {}
done = {}
done["up"] = {}
done["down"] = {}
def __init__(self, debug=False) -> None:
usage = "usage: %prog [options] start|stop|restart"
self.parser = OptionParser(usage)
self.parser.add_option(
"-c",
"--config",
dest="config",
help="config file",
default="/srv/config.ini",
metavar="/srv/config.ini",
)
self.parser.add_option(
"-l",
"--logfile",
dest="logfile",
help="log file",
default=None,
metavar="/tmp/" + os.path.basename(__file__).replace(".py", ".log"),
)
self.parser.add_option("", "--debug", action="store_true", dest="debug", help="debug mode")
self.parser.add_option("-d", "--daemon", dest="daemon", action="store_true", help="run as daemon")
self.parser.add_option("", "--demo", dest="demo", action="store_true", help="run as demo")
(self.options, self.args) = self.parser.parse_args()
try:
super().__init__(cfgfile=self.options.config, logfile=self.options.logfile)
except Exception as e:
print(e)
exit()
if self.options.logfile:
logging.basicConfig(level=logging.NOTSET, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", filename=self.options.logfile, filemode="a")
else:
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
self.logging = logging.getLogger()
self.host = self.option("ropeway", "host")
self.port = int(self.option("ropeway", "port"))
self.username = self.option("ropeway", "username")
self.password = self.option("ropeway", "password")
self.topic = self.option("ropeway", "topic")
self.queue = Queue(maxsize=10)
self.id = self.machineId()
def machineId(self):
id = ""
if platform.system() == "Linux":
with open("/etc/machine-id", "r") as file:
id = file.read()
else:
id = platform.system()
return id
def broadcast(self, msg):
try:
client_id = f"ropeway-{random.randint(0, 1000)}"
auth = None
if self.username:
auth = {"username": self.username, "password": self.password}
publish.single(self.topic, payload=msg, qos=2, retain=False, hostname=self.host, port=self.port, client_id=client_id, keepalive=60, will=None, auth=auth, tls=None, protocol=client.MQTTv5, transport="tcp")
self.logging.info(f"Broadcast topic: {self.topic}, msg: {msg}")
except Exception as err:
self.logging.error(f"MQTT: {str(err)}")
def is_business_hours(self):
current_time = datetime.now().time().strftime("%H:%M")
# current_time = "09:30"
start_time_str = "07:00"
end_time_str = "19:30"
time_obj = datetime.strptime(current_time, "%H:%M").time()
start_time_obj = datetime.strptime(start_time_str, "%H:%M").time()
end_time_obj = datetime.strptime(end_time_str, "%H:%M").time()
if start_time_obj <= time_obj <= end_time_obj:
return True
else:
return False
def traccar(self, gps):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
params = {"id": self.id, "lat": gps["lat"], "lon": gps["lon"], "timestamp": timestamp, "hdop": "3", "altitude": gps["alt"], "speed": gps["speed"]}
response = requests.get(self.option("telpher", "traccar"), params=params)
self.logging.info("%s, %s" % (response.status_code, response.url))
def match(self, gps):
deviation = float(20)
if self.climb() > 0:
direction = "up"
else:
direction = "down"
broadcasts = self.broadcasts[direction]
for alt, text in broadcasts.items():
altitude = float(alt)
if self.done[direction][alt]:
# self.logging.debug(f"Skip: {alt} - {altitude}")
continue
elif (gps["alt"] - deviation) < altitude and (gps["alt"] + deviation) > altitude:
self.broadcast(text)
self.reset(direction, alt)
# self.logging.debug(f"Altitude: {gps['alt']}, {alt} - {altitude} - {deviation}")
self.logging.debug(f"Altitude: {gps}, {alt} - {text}")
# self.traccar(gps)
elif gps["speed"] > 4 and self.speed:
self.logging.debug(f"Speed: {self.speed}")
self.broadcast(self.speed)
self.speed = None
# else:
# print("Speed skip: ", self.speed)
# for broadcast in self.broadcasts:
# if (gps["alt"] - deviation) < broadcast["altitude"] and (gps["alt"] + deviation) > broadcast["altitude"]:
# if "mountainbottom" in broadcast and broadcast["mountainbottom"]:
# broadcast["mountainbottom"] = False
# broadcast["mountaintop"] = True
# self.broadcast(broadcast["text"])
# # time.sleep(3)
# # self.broadcast(broadcast["promotion"])
# self.logging.debug("mountainbottom", broadcast)
# elif "mountaintop" in broadcast and broadcast["mountaintop"]:
# broadcast["mountainbottom"] = True
# broadcast["mountaintop"] = False
# self.broadcast(broadcast["text"])
# # time.sleep(3)
# # self.broadcast(broadcast["promotion"])
# self.logging.debug("mountaintop", broadcast)
# else:
# self.broadcast(broadcast["text"])
# # time.sleep(3)
# # self.broadcast(broadcast["promotion"])
# # self.traccar(gps)
# self.logging.debug("Range: ", broadcast["text"])
# else:
# print("Skip: ", broadcast)
def daemon(self):
pid = os.fork()
if pid > 0:
sys.exit(0)
def load(self):
with shelve.open("/var/tmp/telpher") as db:
for high, text in db.items():
self.broadcasts[high] = text
self.reset()
self.logging.info(f"Load broadcasts: {self.broadcasts}")
def reset(self, direction=None, altitude=None):
for alt, text in self.broadcasts["up"].items():
self.done["up"][alt] = False
for alt, text in self.broadcasts["down"].items():
self.done["down"][alt] = False
if direction and altitude:
self.done[direction][altitude] = True
self.logging.info(f"Reset: {self.done}")
def climb(self):
sum = 0
for n in range(5):
if not self.queue.empty():
sum += self.queue.get(block=False)
# self.logging.info(f"Climb: {sum}")
return sum
def gather(self, climb):
if self.queue.full():
self.queue.get()
self.queue.put(climb, block=False)
def demo(self):
# self.load()
self.broadcasts = {
"up": {"700": "海拔700米", "750": "各位游客,欢迎来到三特索道。三特索道采用先进的技术和设备,安全可靠,让您安心畅游山间。让我们一起享受这段美妙的旅程吧!,你可以这样问我,西岳华山有基座山峰?", "800": "海拔800米", "850": "乘坐北峰索道的时候,有可欣赏到一处石景。在一处巨大的石壁之上,中间部分凹陷进去,形成了像似耳朵又像似半块月亮的一处石景,传说乘坐北峰索道时,见此奇景,可祈福许愿,通过月耳崖,便可上达天听,愿望就会实现。", "900": "海拔900米"},
"down": {
"2000": "海拔2000米",
"1800": "自古华山一条道”的徒步登山道。你可以这样问我,西岳华山都有哪些名人",
"1500": "海拔1500米",
"1300": "马上要到站了,我来带你去华山论剑了。你可以这样问我,华山论剑在那座山峰?",
"1200": "海拔1200米",
"1000": "各位大侠请顺着缆车运营方向往右看,有一个形状酷似手指的奇石,我们叫他先人指路。你可以这样问我,华山有哪些美丽的景色?",
},
}
self.reset()
self.broadcast("缆车系统准备就绪")
time.sleep(2)
for alt in range(700, 1000, 10):
gps = {
"lat": 0,
"lon": 0,
"alt": float(alt),
"speed": 4.5,
"climb": round(random.uniform(0.0, 10), 2),
}
self.gather(gps["climb"])
self.logging.debug(gps)
self.match(gps)
time.sleep(random.randint(1, 3))
for alt in range(2000, 1000, -50):
gps = {
"lat": 0,
"lon": 0,
"alt": float(alt),
"speed": 4.5,
"climb": round(random.uniform(-0.0, -10), 2),
}
self.gather(gps["climb"])
self.logging.debug(gps)
self.match(gps)
time.sleep(random.randint(1, 3))
def run(self):
self.load()
self.broadcast("缆车系统准备就绪")
try:
with GPSDClient() as client:
for result in client.dict_stream(convert_datetime=True, filter=["TPV"]):
time.sleep(1)
lat = result.get("lat", 0.0)
lon = result.get("lon", 0.0)
alt = result.get("alt", 0.0)
climb = result.get("climb", 0.0)
speed = result.get("speed", 0.0)
point = (lat, lon)
gps = {
"lat": lat,
"lon": lon,
"alt": alt,
"speed": speed,
"climb": climb,
}
if lat == 0.0 or lon == 0.0:
continue
if not self.is_business_hours():
continue
self.gather(climb)
# self.logging.debug(result)
self.logging.debug(gps)
self.match(gps)
# elif alt < self.mountaintop and self.highSwitch == True:
# self.broadcast(self.broadcasts["welcome"])
# self.traccar(gps)
# self.highSwitch = False
# elif alt < self.mountainbottom and self.highSwitch == False:
# self.broadcast(self.broadcasts["welcome"])
# self.traccar(gps)
# self.highSwitch = True
# for broadcast in self.broadcasts["coordinate"]:
# self.logging.debug(broadcast)
# dist = distance(point, broadcast["point"]).m
# self.logging.info("两点间的距离为:{:.2f}米".format(dist))
# if dist < 50:
# self.broadcast(broadcast["text"])
# self.traccar(gps)
# # time.sleep(broadcast["sleep"])
except Exception as err:
self.logging.error("GPSDClient %s" % err)
def main(self):
if self.options.debug:
print("=" * 50)
print(self.options, self.args)
print("=" * 50)
if self.options.daemon:
self.daemon()
# print(self.args)
# if not self.args:
# self.parser.print_help()
# exit()
# else:
if self.options.demo:
self.demo()
else:
self.run()
if __name__ == "__main__":
try:
ropeway = Ropeway()
ropeway.main()
except KeyboardInterrupt:
print("Crtl+C Pressed. Shutting down.")
Traccar 是一个地图服务,把GPS坐标推送给 Traccar 之后,可以在 Traccar 上看到移动轨迹。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################
# Home : https://www.netkiller.cn
# Author: Neo <netkiller@msn.com>
# Upgrade: 2023-06-26
##############################################
try:
import random, time, os, sys, re, requests
import logging, logging.handlers
from geopy.distance import distance
from paho.mqtt import publish, client
from gpsdclient import GPSDClient
from datetime import datetime
from urllib.parse import urlencode
from optparse import OptionParser, OptionGroup
except ImportError as err:
print("ImportError: %s" % (err))
class Traccar:
host = ""
def __init__(self, debug=False) -> None:
usage = "usage: %prog [options] identifier"
self.parser = OptionParser(usage)
self.parser.add_option("", "--host", dest="host", help="记录日志文件", default=None, metavar="https://host:5055")
self.parser.add_option("-m", "--machine-id", action="store_true", dest="machine", help="使用 Linux 机器ID作为设备ID")
self.parser.add_option(
"-l",
"--logfile",
dest="logfile",
help="记录日志文件",
default=None,
metavar="/tmp/" + os.path.basename(__file__).replace(".py", ".log"),
)
self.parser.add_option("-t", "--test", action="store_true", dest="test", help="数据推送测试")
self.parser.add_option("-d", "--daemon", dest="daemon", action="store_true", help="后台运行")
self.parser.add_option("", "--debug", action="store_true", dest="debug", help="调试模式")
(self.options, self.args) = self.parser.parse_args()
if self.options.logfile:
logging.basicConfig(level=logging.NOTSET, format="%(asctime)s %(levelname)-6s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", filename=self.options.logfile, filemode="a")
else:
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s %(levelname)-6s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
self.logging = logging.getLogger()
def machineId(self):
id = ""
try:
with open("/etc/machine-id", "r") as file:
id = file.read()
except Exception as err:
self.logging.error(err)
return id
def osmand(self, gps):
try:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
params = {"id": self.id, "lat": gps["lat"], "lon": gps["lon"], "timestamp": timestamp, "hdop": "3", "altitude": gps["alt"], "speed": gps["speed"]}
response = requests.get(self.host, params=params)
self.logging.info(f"Traccar osmand: {response.status_code}, {response.reason}")
except Exception as err:
self.logging.error(err)
def test(self):
data = {
"lat": f"31.23{random.randint(0, 1000)}",
"lon": f"121.47{random.randint(0, 1000)}",
"alt": f"{random.randint(0, 1000)}",
"speed": f"{random.randint(0, 100)}",
"climb": -0.5,
}
self.osmand(data)
def daemon(self):
pid = os.fork()
if pid > 0:
sys.exit(0)
def run(self):
try:
with GPSDClient() as client:
for result in client.dict_stream(convert_datetime=True, filter=["TPV"]):
lat = result.get("lat", 0.0)
lon = result.get("lon", 0.0)
alt = result.get("alt", 0.0)
climb = result.get("climb", 0.0)
speed = result.get("speed", 0.0)
data = {
"lat": lat,
"lon": lon,
"alt": alt,
"speed": speed,
"climb": climb,
}
if lat == 0.0 or lon == 0.0:
time.sleep(1)
continue
self.logging.info(data)
self.osmand(data)
time.sleep(1)
except Exception as err:
self.logging.error("GPSDClient %s" % err)
def help(self):
self.parser.print_help()
exit()
def main(self):
if self.options.debug:
print("=" * 50)
print(self.options, self.args)
print("=" * 50)
if self.options.daemon:
self.daemon()
if self.options.host:
self.host = self.options.host
else:
self.host = "http://47.100.253.187:5055"
# self.help()
if self.options.machine:
self.id = self.machineId()
self.run()
exit()
if not self.args:
self.help()
else:
self.id = self.args[0]
if self.options.test:
self.test()
exit()
self.run()
if __name__ == "__main__":
try:
traccar = Traccar()
traccar.main()
except KeyboardInterrupt:
print("Crtl+C Pressed. Shutting down.")