代码大清理,删除已经不再使用的代码;国内IP段改成从Loyalsoldier/geoip项目获取;支持IPv6;支持纯IP HOST的规则查找;IP查找使用前缀树;尽量减少生成的pac文件体积;优化规则判断;美化README;

This commit is contained in:
zhiyi
2024-10-03 01:06:25 +08:00
parent f926c61f1f
commit dafd79edda
9 changed files with 18757 additions and 91240 deletions

View File

@@ -1,20 +1,12 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import re
import math
import socket
import struct
import pkgutil
import urllib.parse
import json
import logging
import urllib.request, urllib.error, urllib.parse
from argparse import ArgumentParser
import base64
gfwlist_url = 'https://raw.githubusercontent.com/gfwlist/gfwlist/master/gfwlist.txt'
import ipaddress
import json
def parse_args():
parser = ArgumentParser()
@@ -31,222 +23,70 @@ def parse_args():
parser.add_argument('--localtld-domains', dest='localtld_rule',
help='本地 TLD 规则文件, 不走代理, 每行一个,以 . 开头')
parser.add_argument('--ip-file', dest='ip_file',
help='delegated-apnic-latest from apnic.net')
help='中国IP地址段文件')
return parser.parse_args()
#from https://github.com/Leask/Flora_Pac
def ip2long(ip):
packedIP = socket.inet_aton(ip)
return struct.unpack("!L", packedIP)[0]
#from https://github.com/Leask/Flora_Pac
def fetch_ip_data():
args = parse_args()
if (args.ip_file):
with open(args.ip_file, 'r') as f:
data = f.read()
def convert_cidr(cidr):
if '/' in cidr:
network = ipaddress.ip_network(cidr.strip(), strict=False)
network_address = network.network_address
prefixlen = network.prefixlen
else:
#fetch data from apnic
print("Fetching data from apnic.net, it might take a few minutes, please wait...")
url=r'https://ftp.apnic.net/apnic/stats/apnic/delegated-apnic-latest'
# url=r'http://flora/delegated-apnic-latest' #debug
data=urllib.request.urlopen(url).read().decode('utf-8')
network = ipaddress.ip_address(cidr.strip())
network_address = network
prefixlen = network.max_prefixlen
if network.version == 4:
return hex(int(network_address))[2:] + '/' + str(prefixlen)
else:
return network.compressed
cnregex=re.compile(r'apnic\|cn\|ipv4\|[0-9\.]+\|[0-9]+\|[0-9]+\|a.*',re.IGNORECASE)
cndata=cnregex.findall(data)
def generate_cnip_cidrs():
""" 从文件中读取CIDR地址 """
args = parse_args()
with open(args.ip_file, 'r') as file:
cidrs = file.read().splitlines()
converted_cidrs = []
for cidr in cidrs:
converted_cidrs.append(convert_cidr(cidr))
results=[]
prev_net=''
cidr_list = ','.join(converted_cidrs)
return f"'{cidr_list}'.split(',')"
for item in cndata:
unit_items=item.split('|')
starting_ip=unit_items[3]
num_ip=int(unit_items[4])
imask=0xffffffff^(num_ip-1)
#convert to string
imask=hex(imask)[2:]
mask=[0]*4
mask[0]=imask[0:2]
mask[1]=imask[2:4]
mask[2]='0' #imask[4:6]
mask[3]='0' #imask[6:8]
#convert str to int
mask=[ int(i,16 ) for i in mask]
mask="%d.%d.%d.%d"%tuple(mask)
#mask in *nix format
mask2=32-int(math.log(num_ip,2))
ip=starting_ip.split('.')
ip[2] = '0'
ip[3] = '0'
starting_ip = '.'.join(ip)
if starting_ip != prev_net:
results.append((ip2long(starting_ip), ip2long(mask), mask2))
prev_net = starting_ip
results.insert(0, (ip2long('127.0.0.1'), ip2long('255.0.0.0'), 0))
results.insert(1, (ip2long('10.0.0.0'), ip2long('255.0.0.0'), 0))
results.insert(2, (ip2long('172.16.0.0'), ip2long('255.240.0.0'), 0))
results.insert(3, (ip2long('192.168.0.0'), ip2long('255.255.0.0'), 0))
def ip(item):
return item[0]
results = sorted(results, key = ip)
return results
def decode_gfwlist(content):
# decode base64 if have to
try:
if '.' in content:
raise Exception()
return base64.b64decode(content).decode('utf-8')
except:
return content
def get_hostname(something):
try:
# quite enough for GFW
if not something.startswith('http:'):
something = 'http://' + something
r = urllib.parse.urlparse(something)
return r.hostname
except Exception as e:
logging.error(e)
return None
def add_domain_to_set(s, something):
hostname = get_hostname(something)
if hostname is not None:
s.add(hostname)
def combine_lists(content, user_rule=None):
gfwlist = content.splitlines(False)
if user_rule:
gfwlist.extend(user_rule.splitlines(False))
return gfwlist
def parse_gfwlist(gfwlist):
domains = set()
for line in gfwlist:
if line.find('.*') >= 0:
continue
elif line.find('*') >= 0:
line = line.replace('*', '/')
if line.startswith('||'):
line = line.lstrip('||')
elif line.startswith('|'):
line = line.lstrip('|')
elif line.startswith('.'):
line = line.lstrip('.')
if line.startswith('!'):
continue
elif line.startswith('['):
continue
elif line.startswith('@'):
# ignore white list
continue
add_domain_to_set(domains, line)
return domains
def reduce_domains(domains):
# reduce 'www.google.com' to 'google.com'
# remove invalid domains
with open('./tld.txt', 'r') as f:
tld_content = f.read()
tlds = set(tld_content.splitlines(False))
new_domains = set()
for domain in domains:
domain_parts = domain.split('.')
last_root_domain = None
for i in range(0, len(domain_parts)):
root_domain = '.'.join(domain_parts[len(domain_parts) - i - 1:])
if i == 0:
if not tlds.__contains__(root_domain):
# root_domain is not a valid tld
break
last_root_domain = root_domain
if tlds.__contains__(root_domain):
continue
else:
break
if last_root_domain is not None:
new_domains.add(last_root_domain)
uni_domains = set()
for domain in new_domains:
domain_parts = domain.split('.')
for i in range(0, len(domain_parts)-1):
root_domain = '.'.join(domain_parts[len(domain_parts) - i - 1:])
if domains.__contains__(root_domain):
break
else:
uni_domains.add(domain)
return uni_domains
def generate_pac_fast(domains, proxy, direct_domains, cnips, local_tlds):
def generate_pac_fast(domains, proxy, direct_domains, cidrs, local_tlds):
# render the pac file
with open('./pac-template', 'r') as f:
proxy_content = f.read()
domains_dict = {}
domains_list = []
for domain in domains:
domains_dict[domain] = 1
domains_list.append(domain)
proxy_content = proxy_content.replace('__PROXY__', json.dumps(str(proxy)))
proxy_content = proxy_content.replace(
'__DOMAINS__',
json.dumps(domains_dict, indent=2, sort_keys=True)
json.dumps(domains_list, sort_keys=True, separators=(',', ':'))
)
direct_domains_dict = {}
direct_domains_list = []
for domain in direct_domains:
direct_domains_dict[domain] = 1
direct_domains_list.append(domain)
proxy_content = proxy_content.replace(
'__DIRECT_DOMAINS__',
json.dumps(direct_domains_dict, indent=2, sort_keys=True)
json.dumps(direct_domains_list, sort_keys=True, separators=(',', ':'))
)
proxy_content = proxy_content.replace(
'__CN_IPS__',
json.dumps(cnips, indent=2, sort_keys=False)
'__CIDRS__', cidrs
)
tlds_dict = {}
tlds_list = []
for domain in local_tlds:
tlds_dict[domain] = 1
tlds_list.append(domain)
proxy_content = proxy_content.replace(
'__LOCAL_TLDS__',
json.dumps(tlds_dict, indent=2, sort_keys=True)
json.dumps(tlds_list, sort_keys=True, separators=(',', ':'))
)
return proxy_content
def generate_pac_precise(rules, proxy):
def grep_rule(rule):
if rule:
if rule.startswith('!'):
return None
if rule.startswith('['):
return None
return rule
return None
# render the pac file
proxy_content = pkgutil.get_data('gfwlist2pac', './abp.js')
rules = list(filter(grep_rule, rules))
proxy_content = proxy_content.replace('__PROXY__', json.dumps(str(proxy)))
proxy_content = proxy_content.replace('__RULES__',
json.dumps(rules, indent=2))
return proxy_content
def main():
args = parse_args()
user_rule = None
@@ -292,10 +132,10 @@ def main():
else:
localtld_rule = []
cnips = fetch_ip_data()
cidrs = generate_cnip_cidrs()
# domains = reduce_domains(domains)
pac_content = generate_pac_fast(user_rule, args.proxy, direct_rule, cnips, localtld_rule)
pac_content = generate_pac_fast(user_rule, args.proxy, direct_rule, cidrs, localtld_rule)
with open(args.output, 'w') as f:
f.write(pac_content)