Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | 51CTO学院 | CSDN程序员研修院 | OSChina 博客 | 腾讯云社区 | 阿里云栖社区 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏多维度架构

23.3. 监视文件系统

23.3.1. watchdog

watchdog 提供了指定目录/文件的变化监控,对于指定目录内的操作,被视为一次事件。如添加删除文件或目录、重命名文件或目录、修改文件内容等,每种变化都会触发一次事件,事件是用户定义的业务逻辑代码。

安装

			
pip install watchdog			
			
		

演示程序

		
import sys
import time
import logging
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler

if __name__ == "__main__":

    path = r'/tmp'

    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s - %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    event_handler = LoggingEventHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
		
		

23.3.1.1. Observer

			
# 设置超时时间		
watchdog.observers.Observer(timeout=30)			

# 监控指定路径path,该路径触发任何事件都会调用event_handler来处理,如果path是目录,recursive=True 开启会递归模式,监控该目录下的所有变化。
observer.schedule(event_handler, path, recursive=False)

# 添加一个事件处理器到watch中
observer.add_handler_for_watch(event_handler, watch)

# 从watch中移除一个事件处理器
observer.remove_handler_for_watch(event_handler, watch)

# 移除一个watch及这个watch上的所有事件处理器
observer.unschedule(watch)

# 移除所有watch及关联的事件处理器
observer.unschedule_all()
# 等同于observer.unschedule_all()
observer.on_thread_stop()

# 启动 observer 线程
observer.start()

# 停止observer线程
observer.stop()
			
			

Watchdog 几种 Observer 类型:

  • InotifyObserver,Linux系统默认使用的观察目录的调度事件,效率比较高。
  • PollingObserver,与平台无关,轮询目录以检测文件的更改,效率比较低。
  • WindowsApiObserver,Windows系统默认使用的观察目录的调度事件,效率比较高。
  • FSEventsObserver,macOS 系统默认使用的调度事件
  • KqueueObserver,FreeBSD 系统默认使用

默认 Observer 会判断操作系统类型,选择最佳的方式。下面👇是 Observer 的源码

			
# Linux系统
if platform.is_linux():
    try:
        from .inotify import InotifyObserver as Observer
    except UnsupportedLibc:
        from .polling import PollingObserver as Observer
# darwin系统
elif platform.is_darwin():
    # FIXME: catching too broad. Error prone
    try:
        from .fsevents import FSEventsObserver as Observer
    except:
        try:
            from .kqueue import KqueueObserver as Observer
            warnings.warn("Failed to import fsevents. Fall back to kqueue")
        except:
            from .polling import PollingObserver as Observer
            warnings.warn("Failed to import fsevents and kqueue. Fall back to polling.")
# bsd系统
elif platform.is_bsd():
    from .kqueue import KqueueObserver as Observer
# windows系统
elif platform.is_windows():
    # TODO: find a reliable way of checking Windows version and import
    # polling explicitly for Windows XP
    try:
        from .read_directory_changes import WindowsApiObserver as Observer
    except:
        from .polling import PollingObserver as Observer
        warnings.warn("Failed to import read_directory_changes. Fall back to polling.")

else:
    from .polling import PollingObserver as Observer		
			
			

手工选择 Observer,注意下面代码 observer = PollingObserver()

			
import sys
import time
import logging
# from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.events import LoggingEventHandler

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s - %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    path = sys.argv[1] if len(sys.argv) > 1 else '.'
    event_handler = LoggingEventHandler()
    observer = PollingObserver()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
			
			

23.3.1.2. 创建/删除/修改/移动

定义创建/删除/修改/移动四种事件

			
from watchdog.observers import Observer
from watchdog.events import *
import time


class DemoFileSystemEventHandler(FileSystemEventHandler):

    def __init__(self):
        FileSystemEventHandler.__init__(self)

    def on_moved(self, event):
        if event.is_directory:
            print("directory moved from {0} to {1}".format(
                event.src_path, event.dest_path))
        else:
            print("file moved from {0} to {1}".format(
                event.src_path, event.dest_path))

    def on_created(self, event):
        if event.is_directory:
            print("directory created:{0}".format(event.src_path))
        else:
            print("file created:{0}".format(event.src_path))

    def on_deleted(self, event):
        if event.is_directory:
            print("directory deleted:{0}".format(event.src_path))
        else:
            print("file deleted:{0}".format(event.src_path))

    def on_modified(self, event):
        if event.is_directory:
            print("directory modified:{0}".format(event.src_path))
        else:
            print("file modified:{0}".format(event.src_path))


if __name__ == "__main__":

    path = r'/tmp'

    observer = Observer()
    event_handler = DemoFileSystemEventHandler()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
			
			
			

on_any_event 任何事件都会触发

			
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler


class MyHandler(FileSystemEventHandler):
    def on_any_event(self, event):
        if event.is_directory:
            print('Directory', event.event_type, event.src_path)
        else:
            print('File', event.event_type, event.src_path)


if __name__ == "__main__":

    monitor_path = '/tmp'

    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path=monitor_path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
			
			
			

23.3.1.3. 多事件绑定

			
import time
import logging
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from watchdog.events import LoggingEventHandler


class MyEventHandler(FileSystemEventHandler):
    def on_any_event(self, event):
        if event.is_directory:
            print('Directory', event.event_type, event.src_path)
        else:
            print('File', event.event_type, event.src_path)


if __name__ == "__main__":

    watch_path = '/tmp'

    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s - %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    loggingEventHandler = LoggingEventHandler()
    myEventHandler = MyEventHandler()

    observer = Observer()

    watch1 = observer.schedule(
        loggingEventHandler, path=watch_path, recursive=True)
    watch2 = observer.schedule(
        myEventHandler, path=watch_path, recursive=True)

    observer.add_handler_for_watch(loggingEventHandler, watch1)
    observer.add_handler_for_watch(myEventHandler, watch2)

    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
			
			
			

23.3.1.4. 自动备份程序

事情的 起因 - 经过 - 结果

做视频剪辑有一段时间了,苹果笔记本电脑硬盘比较小,视频剪辑空间不够,所以必须外挂一个剪辑盘。

我使用了由两块 SSD 组成的 RAID 硬盘盒子,起初是追求速度快,使用 RAID 0 模式,后期为了安全 切换到了 RAID 1 模式。这样硬件的安全就解决了,再也不用担心硬盘损坏数据丢失了。

某一天,就如日常剪辑一样,当我试图打开 Final cut pro 资源库的时候,悲剧了!!!

Fcpx 的工程文件打不开了,闪退后提示错误,这次翻车丢失了《Netkiller Architect 手札》6个视频,不过还好,那些视频都是初期做品,录制和剪辑都不算好,本就有重做的想法。

这次经历让我不在相信 fcpx 的资源库管理能力,鸡蛋放在一个篮子里及其危险的,《Netkiller Python 手札》我把一章内容放在一个资源库中,每节一个事件。

之前也偶尔备份,手工复制,有时比较懒,就没有备份。于是我便写了一个程序,来自动备份视频剪辑盘。

实现功能:

  • 插入U设备,发现移动硬盘
  • 剪辑盘和备份盘没有其他程序占用
  • 开始备份
  • 备份完成后自动弹出移动设备
			
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
# ================================================================================
# 视频剪辑盘自动备份程序
# 作者: netkiller | netkiller@msn.com | http://www.netkiller.cn
# ================================================================================

import time
import os
import signal
import sys
import atexit
import subprocess
# from atexit import register
from watchdog.observers import Observer
from watchdog.events import *


class FileEventHandler(FileSystemEventHandler):
    src_path = ''
    dst_path = ''
    watchpath = {}

    def __init__(self, logger, watchpath):
        FileSystemEventHandler.__init__(self)
        self.watchpath = watchpath
        self.logger = logger

    def execute(self, cmd):
        fr = os.popen(cmd)
        # text = fr.read()
        text = fr.readlines()
        fr.close()
        # print(len(text), text)
        return(text)

    def isBusy(self, path):
        cmd = "lsof {}".format(path)
        # print(cmd)
        if len(self.execute(cmd)) == 0:
            self.logger.info("目录空闲: {0}".format(path))
            return False
        else:
            self.logger.info("目录繁忙: {0}".format(path))
            return True

    def mirror(self):
        self.logger.info("开始备份")
        cmd = "/usr/bin/rsync -auzv --delete --exclude=.Spotlight-V100 --exclude=.fseventsd {0} {1}".format(
            self.src_path, self.dst_path)
        # self.logger.info(cmd)

        sp = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
        for line in iter(sp.stdout.readline, b''):
            self.logger.info(line.decode('UTF8').replace("\n", ''))
        sp.stdout.close()
        sp.wait()
        self.logger.info("备份完成")

    def umount(self):
        if not self.isBusy(self.src_path):
            self.logger.info("卸载移动硬盘")
            os.system("diskutil eject {0}".format(
                os.path.basename(self.src_path)))
        else:
            self.logger.info("移动硬盘 {0} 繁忙".format(self.src_path))

    def backup(self):
        if not os.path.exists(self.dst_path):
            os.mkdir(self.dst_path)
        if not self.isBusy(self.src_path):
            if not self.isBusy(self.dst_path):
                self.mirror()
                time.sleep(5)
                self.umount()

    def on_created(self, event):
        if event.is_directory:
            self.logger.info("发现移动硬盘:{0}".format(event.src_path))
            # self.path = self.watchpath[event.src_path]
            self.src_path = event.src_path
            self.dst_path = self.watchpath[event.src_path]
            self.backup()

    def on_deleted(self, event):
        if event.is_directory:
            self.logger.info("卸载移动硬盘: {0}".format(event.src_path))


class Backup():

    pidfile = "/tmp/netkiller.pid"
    logdir = "/tmp"
    logfile = logdir+"/netkiller.log"
    logger = None
    loop = True
    job = True

    watchpath = {'/Volumes/Video': '/tmp/Backup',
                 '/Volumes/Photo': '/tmp/Backup'}

    observer = Observer()

    def __init__(self):
        logging.basicConfig(level=logging.NOTSET,
                            format='%(asctime)s %(levelname)-8s %(message)s',
                            datefmt='%Y-%m-%d %H:%M:%S',
                            filename=self.logfile,
                            filemode='a')
        self.logger = logging.getLogger()

    # 保存进程ID
    def savepid(self, pid):
        with open(self.pidfile, 'w') as f:
            f.write(str(os.getpid()))
        # 注册退出函数,进程退出时自动移除pidfile文件
        atexit.register(os.remove, self.pidfile)

    # 从pidfile中读取进程ID
    def getpid(self):
        pid = 0
        try:
            with open(self.pidfile, 'r') as f:
                pid = int(f.readline())
        except FileNotFoundError as identifier:
            print(identifier)
        return pid

    # 信号处理
    def signalHandler(self, signum, frame):
        # global loop, job
        # print("SIGN ",str(signum));
        if signum == signal.SIGHUP:
            # 优雅重启
            self.job = False
            self.logger.info("优雅重启完毕")
        elif signum == signal.SIGINT:
            # 正常退出
            self.loop = False
            self.job = False
            self.observer.stop()
            self.logger.info("正常退出")
        elif signum == signal.SIGUSR1:
            # 日志切割
            pass

    def daemonize(self):
        # global job
        self.logger.info("启动守护进程")
        signal.signal(signal.SIGHUP, self.signalHandler)
        signal.signal(signal.SIGINT, self.signalHandler)
        signal.signal(signal.SIGUSR1, self.signalHandler)
        signal.alarm(5)

        pid = os.fork()
        sys.stdout.flush()
        sys.stderr.flush()
        if pid:
            sys.exit(0)
        # print(os.getpid())
        self.savepid(str(os.getpid()))
        while self.loop:
            self.logger.info("Start!!!")
            self.main()
            self.logger.info("Exit!!!")

    def start(self):
        if os.path.isfile(self.pidfile):
            print("程序已经启动")
            sys.exit(1)
        else:
            self.daemonize()

    def stop(self):
        try:
            os.kill(self.getpid(), signal.SIGINT)
        except ProcessLookupError as identifier:
            print(identifier)

    def reloads(self):
        try:
            os.kill(self.getpid(), signal.SIGHUP)
        except ProcessLookupError as identifier:
            print(identifier)

    def logrotate(self):
        try:
            os.kill(self.getpid(), signal.SIGUSR1)
        except ProcessLookupError as identifier:
            print(identifier)

    def main(self):
        # 业务逻辑
        event_handler = FileEventHandler(self.logger, self.watchpath)
        for src, dst in self.watchpath.items():
            self.logger.info("监控 {0} => {1}".format(src, dst))
            self.observer.schedule(
                event_handler, path=src, recursive=False)
        self.logger.info("启动 Watchdog")
        self.observer.start()
        self.observer.join()
        # 业务逻辑


def usage():
    print(sys.argv[0] + " start | stop | restart | reload | log")


if __name__ == "__main__":
    # print(sys.argv)
    backup = Backup()
    if len(sys.argv) > 1:
        arg = sys.argv[1]
        if arg == "start":
            backup.start()
        elif arg == "stop":
            backup.stop()
        elif arg == "restart":
            backup.stop()
            backup.start()
        elif arg == "reload":
            backup.reloads()
        else:
            usage()
    else:
        usage()
			
			
			

23.3.2. pyinotify