知乎专栏 |
watchdog 提供了指定目录/文件的变化监控,对于指定目录内的操作,被视为一次事件。如添加删除文件或目录、重命名文件或目录、修改文件内容等,每种变化都会触发一次事件,事件是用户定义的业务逻辑代码。
安装
pip install watchdog
演示程序
import sys import time import logging from watchdog.observers import Observer from watchdog.events import LoggingEventHandler if __name__ == "__main__": path = r'/tmp' logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') event_handler = LoggingEventHandler() observer = Observer() observer.schedule(event_handler, path, recursive=True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
# 设置超时时间 watchdog.observers.Observer(timeout=30) # 监控指定路径path,该路径触发任何事件都会调用event_handler来处理,如果path是目录,recursive=True 开启会递归模式,监控该目录下的所有变化。 observer.schedule(event_handler, path, recursive=False) # 添加一个事件处理器到watch中 observer.add_handler_for_watch(event_handler, watch) # 从watch中移除一个事件处理器 observer.remove_handler_for_watch(event_handler, watch) # 移除一个watch及这个watch上的所有事件处理器 observer.unschedule(watch) # 移除所有watch及关联的事件处理器 observer.unschedule_all() # 等同于observer.unschedule_all() observer.on_thread_stop() # 启动 observer 线程 observer.start() # 停止observer线程 observer.stop()
Watchdog 几种 Observer 类型:
默认 Observer 会判断操作系统类型,选择最佳的方式。下面👇是 Observer 的源码
# Linux系统 if platform.is_linux(): try: from .inotify import InotifyObserver as Observer except UnsupportedLibc: from .polling import PollingObserver as Observer # darwin系统 elif platform.is_darwin(): # FIXME: catching too broad. Error prone try: from .fsevents import FSEventsObserver as Observer except: try: from .kqueue import KqueueObserver as Observer warnings.warn("Failed to import fsevents. Fall back to kqueue") except: from .polling import PollingObserver as Observer warnings.warn("Failed to import fsevents and kqueue. Fall back to polling.") # bsd系统 elif platform.is_bsd(): from .kqueue import KqueueObserver as Observer # windows系统 elif platform.is_windows(): # TODO: find a reliable way of checking Windows version and import # polling explicitly for Windows XP try: from .read_directory_changes import WindowsApiObserver as Observer except: from .polling import PollingObserver as Observer warnings.warn("Failed to import read_directory_changes. Fall back to polling.") else: from .polling import PollingObserver as Observer
手工选择 Observer,注意下面代码 observer = PollingObserver()
import sys import time import logging # from watchdog.observers import Observer from watchdog.observers.polling import PollingObserver from watchdog.events import LoggingEventHandler if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') path = sys.argv[1] if len(sys.argv) > 1 else '.' event_handler = LoggingEventHandler() observer = PollingObserver() observer.schedule(event_handler, path, recursive=True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
定义创建/删除/修改/移动四种事件
from watchdog.observers import Observer from watchdog.events import * import time class DemoFileSystemEventHandler(FileSystemEventHandler): def __init__(self): FileSystemEventHandler.__init__(self) def on_moved(self, event): if event.is_directory: print("directory moved from {0} to {1}".format( event.src_path, event.dest_path)) else: print("file moved from {0} to {1}".format( event.src_path, event.dest_path)) def on_created(self, event): if event.is_directory: print("directory created:{0}".format(event.src_path)) else: print("file created:{0}".format(event.src_path)) def on_deleted(self, event): if event.is_directory: print("directory deleted:{0}".format(event.src_path)) else: print("file deleted:{0}".format(event.src_path)) def on_modified(self, event): if event.is_directory: print("directory modified:{0}".format(event.src_path)) else: print("file modified:{0}".format(event.src_path)) if __name__ == "__main__": path = r'/tmp' observer = Observer() event_handler = DemoFileSystemEventHandler() observer.schedule(event_handler, path, recursive=True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
on_any_event 任何事件都会触发
import time from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class MyHandler(FileSystemEventHandler): def on_any_event(self, event): if event.is_directory: print('Directory', event.event_type, event.src_path) else: print('File', event.event_type, event.src_path) if __name__ == "__main__": monitor_path = '/tmp' event_handler = MyHandler() observer = Observer() observer.schedule(event_handler, path=monitor_path, recursive=True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
import time import logging from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from watchdog.events import LoggingEventHandler class MyEventHandler(FileSystemEventHandler): def on_any_event(self, event): if event.is_directory: print('Directory', event.event_type, event.src_path) else: print('File', event.event_type, event.src_path) if __name__ == "__main__": watch_path = '/tmp' logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') loggingEventHandler = LoggingEventHandler() myEventHandler = MyEventHandler() observer = Observer() watch1 = observer.schedule( loggingEventHandler, path=watch_path, recursive=True) watch2 = observer.schedule( myEventHandler, path=watch_path, recursive=True) observer.add_handler_for_watch(loggingEventHandler, watch1) observer.add_handler_for_watch(myEventHandler, watch2) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
事情的 起因 - 经过 - 结果
做视频剪辑有一段时间了,苹果笔记本电脑硬盘比较小,视频剪辑空间不够,所以必须外挂一个剪辑盘。
我使用了由两块 SSD 组成的 RAID 硬盘盒子,起初是追求速度快,使用 RAID 0 模式,后期为了安全 切换到了 RAID 1 模式。这样硬件的安全就解决了,再也不用担心硬盘损坏数据丢失了。
某一天,就如日常剪辑一样,当我试图打开 Final cut pro 资源库的时候,悲剧了!!!
Fcpx 的工程文件打不开了,闪退后提示错误,这次翻车丢失了《Netkiller Architect 手札》6个视频,不过还好,那些视频都是初期做品,录制和剪辑都不算好,本就有重做的想法。
这次经历让我不在相信 fcpx 的资源库管理能力,鸡蛋放在一个篮子里及其危险的,《Netkiller Python 手札》我把一章内容放在一个资源库中,每节一个事件。
之前也偶尔备份,手工复制,有时比较懒,就没有备份。于是我便写了一个程序,来自动备份视频剪辑盘。
实现功能:
#!/usr/bin/python3 # -*- coding: UTF-8 -*- # ================================================================================ # 视频剪辑盘自动备份程序 # 作者: netkiller | netkiller@msn.com | http://www.netkiller.cn # ================================================================================ import time import os import signal import sys import atexit import subprocess # from atexit import register from watchdog.observers import Observer from watchdog.events import * class FileEventHandler(FileSystemEventHandler): src_path = '' dst_path = '' watchpath = {} def __init__(self, logger, watchpath): FileSystemEventHandler.__init__(self) self.watchpath = watchpath self.logger = logger def execute(self, cmd): fr = os.popen(cmd) # text = fr.read() text = fr.readlines() fr.close() # print(len(text), text) return(text) def isBusy(self, path): cmd = "lsof {}".format(path) # print(cmd) if len(self.execute(cmd)) == 0: self.logger.info("目录空闲: {0}".format(path)) return False else: self.logger.info("目录繁忙: {0}".format(path)) return True def mirror(self): self.logger.info("开始备份") cmd = "/usr/bin/rsync -auzv --delete --exclude=.Spotlight-V100 --exclude=.fseventsd {0} {1}".format( self.src_path, self.dst_path) # self.logger.info(cmd) sp = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) for line in iter(sp.stdout.readline, b''): self.logger.info(line.decode('UTF8').replace("\n", '')) sp.stdout.close() sp.wait() self.logger.info("备份完成") def umount(self): if not self.isBusy(self.src_path): self.logger.info("卸载移动硬盘") os.system("diskutil eject {0}".format( os.path.basename(self.src_path))) else: self.logger.info("移动硬盘 {0} 繁忙".format(self.src_path)) def backup(self): if not os.path.exists(self.dst_path): os.mkdir(self.dst_path) if not self.isBusy(self.src_path): if not self.isBusy(self.dst_path): self.mirror() time.sleep(5) self.umount() def on_created(self, event): if event.is_directory: self.logger.info("发现移动硬盘:{0}".format(event.src_path)) # self.path = self.watchpath[event.src_path] self.src_path = event.src_path self.dst_path = self.watchpath[event.src_path] self.backup() def on_deleted(self, event): if event.is_directory: self.logger.info("卸载移动硬盘: {0}".format(event.src_path)) class Backup(): pidfile = "/tmp/netkiller.pid" logdir = "/tmp" logfile = logdir+"/netkiller.log" logger = None loop = True job = True watchpath = {'/Volumes/Video': '/tmp/Backup', '/Volumes/Photo': '/tmp/Backup'} observer = Observer() def __init__(self): logging.basicConfig(level=logging.NOTSET, format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename=self.logfile, filemode='a') self.logger = logging.getLogger() # 保存进程ID def savepid(self, pid): with open(self.pidfile, 'w') as f: f.write(str(os.getpid())) # 注册退出函数,进程退出时自动移除pidfile文件 atexit.register(os.remove, self.pidfile) # 从pidfile中读取进程ID def getpid(self): pid = 0 try: with open(self.pidfile, 'r') as f: pid = int(f.readline()) except FileNotFoundError as identifier: print(identifier) return pid # 信号处理 def signalHandler(self, signum, frame): # global loop, job # print("SIGN ",str(signum)); if signum == signal.SIGHUP: # 优雅重启 self.job = False self.logger.info("优雅重启完毕") elif signum == signal.SIGINT: # 正常退出 self.loop = False self.job = False self.observer.stop() self.logger.info("正常退出") elif signum == signal.SIGUSR1: # 日志切割 pass def daemonize(self): # global job self.logger.info("启动守护进程") signal.signal(signal.SIGHUP, self.signalHandler) signal.signal(signal.SIGINT, self.signalHandler) signal.signal(signal.SIGUSR1, self.signalHandler) signal.alarm(5) pid = os.fork() sys.stdout.flush() sys.stderr.flush() if pid: sys.exit(0) # print(os.getpid()) self.savepid(str(os.getpid())) while self.loop: self.logger.info("Start!!!") self.main() self.logger.info("Exit!!!") def start(self): if os.path.isfile(self.pidfile): print("程序已经启动") sys.exit(1) else: self.daemonize() def stop(self): try: os.kill(self.getpid(), signal.SIGINT) except ProcessLookupError as identifier: print(identifier) def reloads(self): try: os.kill(self.getpid(), signal.SIGHUP) except ProcessLookupError as identifier: print(identifier) def logrotate(self): try: os.kill(self.getpid(), signal.SIGUSR1) except ProcessLookupError as identifier: print(identifier) def main(self): # 业务逻辑 event_handler = FileEventHandler(self.logger, self.watchpath) for src, dst in self.watchpath.items(): self.logger.info("监控 {0} => {1}".format(src, dst)) self.observer.schedule( event_handler, path=src, recursive=False) self.logger.info("启动 Watchdog") self.observer.start() self.observer.join() # 业务逻辑 def usage(): print(sys.argv[0] + " start | stop | restart | reload | log") if __name__ == "__main__": # print(sys.argv) backup = Backup() if len(sys.argv) > 1: arg = sys.argv[1] if arg == "start": backup.start() elif arg == "stop": backup.stop() elif arg == "restart": backup.stop() backup.start() elif arg == "reload": backup.reloads() else: usage() else: usage()