多进程查询大量 IP 信息并入库 2017 年 2 月 9 日 17:22 #!/bin/env python #coding:utf-8 #arthur:yaungzhou
import os import struct import socket import urllib2 import json import sys import time import multiprocessing from multiprocessing.dummy import Pool as ThreadPool
def ip_show(ip):
url=”http://dbip.wsd.com/ip/search?token=b9539e0c40fe5b628ad735977298998f & ip={ip} & type=3 & bodyEncode=utf8”.format(ip=ip)
# 这里 ip 库接口有个坑 , 返回的 json 字符串后面有空字节 , 需要处理一下 \0x00
try:
_content=urllib2.urlopen(url).read().strip(‘\0x00’)
content = json.loads(_content)
# 以下 replace 作用为兼容 ip 库输出的各种带乱七八糟的特殊符号的字符串,保证正常入库
country=content.get(‘data’).get(‘country’,’unknow’).replace(“’”,”’’”).replace(“,”,”,,”)
prov=content.get(‘data’).get(‘province’,’unknow’).replace(“’”,”’’”).replace(“,”,”,,”)
city=content.get(‘data’).get(‘city’,’unknow’).replace(“’”,”’’”).replace(“,”,”,,”)
oper=content.get(‘data’).get(‘oper’,’unknow’).replace(“’”,”’’”).replace(“,”,”,,”).replace(‘”’,’’)
return country.encode(“UTF-8”),prov.encode(“UTF-8”).strip(“’”),city.encode(“UTF-8”),oper.encode(“UTF-8”)
except BaseException as e:
now_time=time.strftime(‘%m-%d %H:%M:%S’)
print “%s %s” % (now_time,e)
cmd_select=’mysql –default-character-set=utf8 -upcmgr_rw -pyw#1a3 -h10.240.64.140 -P3306 conn_quality -NBe “select src_ip_str, mainip_str, backip_str from halei_error_detail where src_ip is null;”’
def origin_str():
’’’ 从 db 获取所有需要修改的数据放到列表中 ‘’’
origin_list=[]
for i in os.popen(cmd_select).readlines():
i=i.strip(‘\n’)
_src_ip,_mainip,_backip = i.split(‘\t’)[0],i.split(‘\t’)[1],i.split(‘\t’)[2]
if _src_ip:
origin_tuple=(_src_ip,_mainip,_backip)
origin_list.append(origin_tuple)
return origin_list
def imp_data(origin_tuple):
’’’ 将一条 ip 数据通过 ip 库查询出需要的信息并更新到 db 中 ‘’’
_src_ip,src_ip,mainip,backip =origin_tuple[0],socket.inet_ntoa(struct.pack(“=I”, int(origin_tuple[0]))),socket.inet_ntoa(struct.pack(“=I”, int(origin_tuple[1]))),socket.inet_ntoa(struct.pack(“=I”, int(origin_tuple[2])))
country,prov,city,ap=ip_show(src_ip)
main_ser_country,main_ser_prov,main_ser_city,main_ser_ap=ip_show(mainip)
back_ser_country,back_ser_prov,back_ser_city,back_ser_ap=ip_show(backip)
cmd_update=’mysql –default-character-set=UTF8 -upcmgr_rw -pyw#1a3 -h10.240.64.140 -P3306 conn_quality -NBe “update halei_error_detail set src_ip='%s',country='%s',province='%s',city='%s', ap='%s',mainip='%s',main_ser_country='%s',main_ser_province='%s',main_ser_city='%s',main_ser_ap='%s', backip='%s',back_ser_country='%s',back_ser_province='%s',back_ser_city='%s',back_ser_ap='%s' where src_ip_str='%s' and ap is null”’ % (src_ip,country,prov,city,ap,mainip,main_ser_country,main_ser_prov,main_ser_city,main_ser_ap,backip,back_ser_country,back_ser_prov,back_ser_city,back_ser_ap,_src_ip)
try:
os.system(cmd_update)
except BaseException as e:
now_time=time.strftime(‘%m-%d %H:%M:%S’)
print “%s %s” % (now_time,e)
if name == ‘main’:
while True:
| process_moni_cmd=”ps -ef | grep ‘conn_error_detail_multi.py’ | grep -v grep | wc -l” |
process_num=os.popen(process_moni_cmd).read()
if int(process_num) > 4:
now_tim=time.strftime(‘%m-%d %H:%M:%S’)
print “%s process exist,quit” % now_tim
sys.exit(1)
# 删除异常上报,避免脚本出错
20161215
del_cmd=’mysql –default-character-set=UTF8 -upcmgr_rw -pyw#1a3 -h10.240.64.140 -P3306 conn_quality -NBe “delete from ver_hijack_cnt where mainserverip not regexp '^[0-9]+$'”;’
os.system(del_cmd)
origin_list=origin_str()
pool=multiprocessing.Pool(4) # 多进程
#pool=ThreadPool(4)
# 多线程
start_time=time.time()
pool.map(imp_data,origin_list)
pool.close()
pool.join()
stop_time=time.time()
used_time=stop_time - start_time
print “work used %ds.” % used_time
已使用 Microsoft OneNote 2016 创建。