| 知乎专栏 |
sp = subprocess.check_output(cmd)
text = sp.decode('UTF8')
print(text)
获取IP地址
import subprocess
command = "/usr/bin/ip addr show eth0 | grep 'inet ' | awk '{print $2}' | cut -d/ -f1"
screen = subprocess.check_output(command, shell=True)
print(screen.decode().replace("\n", ""))
制定运行目录
#!/usr/bin/python
# -*-coding:utf-8-*-
import subprocess
output = subprocess.check_output("ls", cwd="/")
print(output.decode())
output = subprocess.check_output("/usr/bin/git pull", cwd="/opt/netkiller", shell=True)
print(output.decode())
from threading import Thread
import time
def fun1():
print("fun1 begin")
time.sleep(2)
print("fun1 end")
def fun2():
print("fun2 begin")
time.sleep(6)
print("fun2 end")
threads = []
threads.append(Thread(target=fun1))
threads.append(Thread(target=fun2))
print(threads)
if __name__ == "__main__":
for t in threads:
print(t)
t.start()
print("Done")
import threading
class MyThread(threading.Thread):
def
__init__(self, name=None):
threading.Thread.__init__(self)
self.name =
name
def run(self):
print self.name
def test():
for i in range(0, 100):
t = MyThread("thread_" + str(i))
t.start()
if __name__=='__main__':
test()
这里实现了一个计数器 count 这个全局变量会被多个线程同时操作,使其能够被顺序相加,需要靠线程锁的帮助。
#-*- encoding: utf-8 -*-
import threading
import time
class Test(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self._run_num = num
def run(self):
global count, mutex
threadname = threading.currentThread().getName()
for x in range(int(self._run_num)):
mutex.acquire()
count = count + 1
mutex.release()
print (threadname, x, count)
time.sleep(1)
if __name__ == '__main__':
global count, mutex
threads = []
num = 5
count = 0
# 创建锁
mutex = threading.Lock()
# 创建线程对象
for x in range(num):
threads.append(Test(10))
# 启动线程
for t in threads:
t.start()
# 等待子线程结束
for t in threads:
t.join()
ref: http://www.ibm.com/developerworks/aix/library/au-threadingpython/
#!/usr/bin/env python
import Queue
import threading
import urllib2
import time
hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]
queue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
#grabs host from queue
host = self.queue.get()
#grabs urls of hosts and prints first 1024 bytes of page
url = urllib2.urlopen(host)
print url.read(1024)
#signals to queue job is done
self.queue.task_done()
start = time.time()
def main():
#spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadUrl(queue)
t.setDaemon(True)
t.start()
#populate queue with data
for host in hosts:
queue.put(host)
#wait on the queue until everything has been processed
queue.join()
main()
print "Elapsed Time: %s" % (time.time() - start)
http://www.myelin.co.nz/post/2003/3/13/#200303135
#!/usr/bin/env python
import os, sys
print "I'm going to fork now - the child will write something to a pipe, and the parent will read it back"
r, w = os.pipe() # r,w是文件描述符, 不是文件对象
pid = os.fork()
if pid:
# 父进程
os.close(w) # 关闭一个文件描述符
r = os.fdopen(r) # 将r转化为文件对象
print "parent: reading"
txt = r.read()
os.waitpid(pid, 0) # 确保子进程被撤销
else:
# 子进程
os.close(r)
w = os.fdopen(w, 'w')
print "child: writing"
w.write("here's some text from the child")
w.close()
print "child: closing"
sys.exit(0)
print "parent: got it; text =", txt
import sys, os
if __name__ == "__main__":
# do the UNIX double-fork magic, see Stevens' "Advanced
# Programming in the UNIX Environment" for details (ISBN 0201563177)
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent, print eventual PID before
print "Daemon PID %d" % pid
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
# start the daemon main loop
# Redirect standard file descriptors
sys.stdin = open('/dev/null', 'r')
sys.stdout = open('/dev/null', 'w')
sys.stderr = open('/dev/null', 'w')