Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

8.4. 替代 CentOS 7/8 中的 firewalld

8.4.1. Demo

		
########################################
# Demo Desktop PC
########################################
single = Firewall()
single.policy(single.INPUT,single.DROP)
single.policy(single.OUTPUT,single.ACCEPT)
single.policy(single.FORWARD,single.DROP)
single.input().protocol('icmp').drop()
single.input().protocol('tcp').dport(('3389','5900')).accept()
single.input().protocol('tcp').dport(('137','138','139','145')).accept()
#single.show()
#single.run()
#single.list()

########################################
# Demo Office Server
########################################
office = Firewall()
office.flush()
office.policy(office.INPUT,office.DROP)
office.policy(office.OUTPUT,office.ACCEPT)
office.policy(office.FORWARD,office.DROP)
office.input().state(('RELATED','ESTABLISHED')).accept()
office.input().protocol('icmp').accept()
office.input().inbound('eth0').protocol('udp').dport(('53','1194')).accept()
office.input().inbound('eth0').protocol('udp').dport(('68','68')).accept()
office.input().protocol('tcp').dport(('20','21','22','80')).accept()
office.input().protocol('tcp').dport(('5800','5900')).accept()
office.input().protocol('tcp').dport(('137','138','139','145')).accept()

office.show()
office.run()
office.list()
########################################
# Demo IDC Server
########################################
server = Firewall()
server.flush()
server.policy(server.INPUT,server.DROP)
server.policy(server.OUTPUT,server.DROP)
server.policy(server.FORWARD,server.DROP)
server.input().state(('RELATED','ESTABLISHED')).accept()
server.input().protocol('icmp').accept()
server.input().destination('192.168.0.0/24').accept()
server.input().protocol('tcp').dport(('21','22','80')).state('NEW').accept()
server.input().protocol('udp').dport(('53','1194')).accept()
server.input().protocol('tcp').source('172.16.1.0/24').dport('3306').accept()
server.output().protocol('icmp').accept()
server.output().destination('192.168.0.0/24').accept()
server.output().destination('172.16.0.5').reject()
server.output().destination('172.16.0.0/24').accept()
server.output().protocol('udp').dport('53').accept()
server.output().protocol('tcp').dport(('80','21','20','22','8000')).accept()
server.chain('PREROUTING').inbound('eth0').proto('tcp').dport('80').dnat('--to-destination 192.168.0.1:3128')
server.output().destination('172.16.0.10').proto('tcp').dport('3306').accept()
server.show()
server.run()
server.list()

########################################
# Linux Gateway via pppoe
########################################
gateway = Firewall()
gateway.input().drop()
gateway.output().accept()
gateway.inside().state(('RELATED','ESTABLISHED')).accept('# match test')
gateway.forward().destination('127.16.0.0/24').accept()
gateway.chain('POSTROUTING').inbound("ppp0").source('172.16.0.0/24').masquerade()
gateway.forward().source("172.16.0.1/24").protocol('tcp').string('sex').accept()
gateway.forward().dport("53").protocol('udp').time('8:00','18:00','Mon,Tue,Wed,Thu,Fri,Sat').accept()
gateway.forward().proto('udp').dport("53").string('movie').time('8:00','18:00','Mon,Tue,Wed,Thu,Fri,Sat').accept()
gateway.input().inbound('ppp0').connlimit(20).drop()
gateway.forward().reject('--reject-with icmp-host-prohibited')
gateway.show()

########################################
# Cisco ASA Style
########################################
gateway = Firewall()
gateway.inside().accept()
gateway.inside().state(('RELATED','ESTABLISHED')).accept('# match test')
gateway.outside().drop()
gateway.show()

########################################
# Juniper JunOS Style
########################################
gateway = Firewall()
gateway.trust().accept()
gateway.untrust().drop()
gateway.show()			
		
		

8.4.2. Firewall Script

		
#!/usr/bin/env python
###########################################
# Linux Firewall Management Pkg
###########################################
# homepage: http://netkiller.github.com
# author:	neo chen <openunix@163.com>
# nickname:	netkiller
###########################################
import os, sys
import types

class Service():
	
	def __init__(self):
		pass
	def name(self):
		pass
	def protocol(self):
		pass
	def port(self, src, dst):
		pass
	def www(self):
		return '80'
		
class Address():
	def __init__(self):
		pass
	def name(self):
		pass

class Protocol():
	IMCP	= 'ICMP'
	TCP		= 'TCP'
	UDP		= 'UDP'
	def __init__(self):
		pass
class Firewall(Service, Address):
	INPUT 	= 'INPUT'
	OUTPUT	= 'OUTPUT'
	FORWARD	= 'FORWARD'
	PREROUTING	= 'PREROUTING'
	POSTROUTING	= 'POSTROUTING'

	ACCEPT	= 'ACCEPT'
	DROP	= 'DROP'
	REJECT	= 'REJECT'
	
	def __init__(self):
		self.accesslist = []
		self.match 		= []
		self.nic		= []
		self.iptables = 'iptables'
		self.A = ''
		self.p = ''
		self.src = ''
		self.dst = ''
		self.port = ''
		self.ip = ''
		self.m = ''
		self.err 	= True
		#self.clear()
	def clear(self):
		self.match 		= []
		self.nic		= []
		self.A = ''
		self.p = ''
		self.src = ''
		self.dst = ''
		self.port = ''
		self.ip = ''
		self.m = ''
		self.err 	= True
		pass
	def flush(self):
		self.accesslist.append('iptables -F')
		self.accesslist.append('iptables -F -t nat')
		self.accesslist.append('iptables -F -t filter')
		self.accesslist.append('iptables -t nat -P PREROUTING ACCEPT')
		self.accesslist.append('iptables -t nat -P POSTROUTING ACCEPT')
	def policy(self,chain = None, target = None):
		if chain and target:
			self.accesslist.append('iptables -P '+chain+' '+target)
		else:
			self.accesslist.append('iptables -P INPUT ACCEPT')
			self.accesslist.append('iptables -P OUTPUT ACCEPT')
			self.accesslist.append('iptables -P FORWARD ACCEPT')
		pass
	def chain(self,tmp):
		if tmp in ('INPUT', 'OUTPUT', 'FORWARD'):
			self.A = '-A ' + tmp
			self.err = False
		elif tmp in ('PREROUTING', 'POSTROUTING'):
			self.A = '-t nat -A ' + tmp
			self.err = False
		else:
			self.A = None
			self.err = True
		return( self )
	def input(self):
		return self.chain('INPUT')
	def output(self):
		return self.chain('OUTPUT')
	def forward(self):
		return self.chain('FORWARD')	
	def inside(self):
		return self.chain('OUTPUT')
	def outside(self):
		return self.chain('INPUT')
	def trust(self):
		return self.chain('OUTPUT')
	def untrust(self):
		return self.chain('INPUT')		
	def interface(self,inter, name):
		if inter and name:
			self.nic.append(inter + ' ' + name)
		return( self )	
	def inbound(self,tmp):
		if tmp:
			self.interface('-i', tmp)
		return( self )
	def outbound(self,tmp):
		if tmp:
			self.interface('-o', tmp)
		return( self )	
	def protocol(self,tmp):
		if tmp in ('tcp', 'udp', 'icmp','gre'):
			self.p = "-p " + tmp
		else:
			self.p = ''
		return( self )
	def proto(self,tmp):
		return self.protocol(tmp)
	def source(self, src):
		if src :
			self.src = "-s " + src
		else:
			self.src = ''
		return( self )
	def destination(self, dst):
		if dst:
			self.dst = "-d " + dst
		else:
			self.dst = ''
		return( self )
	def state(self, tmp):
		if type(tmp) == types.StringType:
			self.match.append('-m state --state ' + tmp)
		elif type(tmp) == types.TupleType:
			self.match.append('-m state --state ' + ','.join(tmp))
		else:
			pass
		return( self )
	def string(self, tmp):
		if tmp:
			self.match.append('-m string --string "' +tmp+'"')
		else:
			pass
		return( self )
	def time(self, start, stop, days):
		if start and stop and days:
			self.match.append('-m time --timestart '+start+' --timestop '+stop+' --days ' +days+' ')
		else:
			pass
		return( self )
	def connlimit(self, tmp):
		if tmp:
			self.match.append('-m connlimit --connlimit-above ' +str(tmp)+'') 
		else:
			pass
		return( self )
	def sport(self,tmp):			
		if type(tmp) == types.StringType:
			self.match.append('--sport ' + tmp)
		elif type(tmp) == types.TupleType:
			self.match.append('-m multiport --sports ' + ','.join(tmp))
		else:
			pass
		return( self )
	def dport(self,tmp):
		type(tmp)
		if type(tmp) == types.StringType:
			self.match.append('--dport ' + str(tmp))
		elif type(tmp) == types.TupleType:
			self.match.append('-m multiport --dports ' + ','.join(tmp))
		else:
			pass
		return( self )
		
	def target(self, targetname, desc = None):
		if targetname in ('ACCEPT', 'DROP', 'REJECT', 'RETURN', 'QUEUE', 'MASQUERADE', 'DNAT', 'SNAT'):
			self.acl_line = []
			self.acl_line.append(self.iptables)
			if self.A: 		self.acl_line.append(self.A)
			if self.nic:	self.acl_line.append(' '.join(self.nic))
			if self.p: 		self.acl_line.append(self.p)
			if self.src: 	self.acl_line.append(self.src)
			if self.dst:	self.acl_line.append(self.dst)
			if self.match:		self.acl_line.append(' '.join(self.match))
			self.acl_line.append('-j ' + targetname)
		if desc:
			self.acl_line.append(desc)
			
		if self.err:
			acsess_list = '# ' + ' '.join(self.acl_line)
		else:
			acsess_list = ' '.join(self.acl_line)
		self.accesslist.append(acsess_list)
		self.clear()
		
	def accept(self,desc = None):
		self.target('ACCEPT', desc)

	def reject(self,desc = None):
		self.target('REJECT', desc)
	
	def drop(self,desc = None):
		self.target('DROP', desc)

	def masquerade(self):
		self.target('MASQUERADE')
	def dnat(self,desc = None):
		self.target('DNAT',desc)
	def snat(self,desc = None):
		self.target('SNAT',desc)
	def show(self):
		print('\n'.join(self.accesslist))
	def run(self):
		for line in self.accesslist:
			os.system(line)
	def save(self,filename):
		try:
			ipt = open(filename,'w')
			for line in self.accesslist:
				ipt.write(line)
				ipt.write("\n")
			ipt.close()
		except IOError as e:
			print(e)
	def list(self):
		os.system('sudo iptables -S')
		#os.system('sudo iptables -L --line-numbers')