多进程查询大量 IP 信息并入库 2017 年 2 月 9 日 17:22 #!/bin/env python #coding:utf-8 #arthur:yaungzhou

import os import struct import socket import urllib2 import json import sys import time import multiprocessing from multiprocessing.dummy import Pool as ThreadPool

def ip_show(ip):

url=”http://dbip.wsd.com/ip/search?token=b9539e0c40fe5b628ad735977298998f & ip={ip} & type=3 & bodyEncode=utf8”.format(ip=ip)

# 这里 ip 库接口有个坑 , 返回的 json 字符串后面有空字节 , 需要处理一下 \0x00

try:

_content=urllib2.urlopen(url).read().strip(‘\0x00’)

content = json.loads(_content)

# 以下 replace 作用为兼容 ip 库输出的各种带乱七八糟的特殊符号的字符串,保证正常入库

country=content.get(‘data’).get(‘country’,’unknow’).replace(“’”,”’’”).replace(“,”,”,,”)

prov=content.get(‘data’).get(‘province’,’unknow’).replace(“’”,”’’”).replace(“,”,”,,”)

city=content.get(‘data’).get(‘city’,’unknow’).replace(“’”,”’’”).replace(“,”,”,,”)

oper=content.get(‘data’).get(‘oper’,’unknow’).replace(“’”,”’’”).replace(“,”,”,,”).replace(‘”’,’’)

return country.encode(“UTF-8”),prov.encode(“UTF-8”).strip(“’”),city.encode(“UTF-8”),oper.encode(“UTF-8”)

except BaseException as e:

now_time=time.strftime(‘%m-%d %H:%M:%S’)

print “%s %s” % (now_time,e)

cmd_select=’mysql –default-character-set=utf8 -upcmgr_rw -pyw#1a3 -h10.240.64.140 -P3306 conn_quality -NBe “select src_ip_str, mainip_str, backip_str from halei_error_detail where src_ip is null;”’

def origin_str():

’’’ 从 db 获取所有需要修改的数据放到列表中 ‘’’

origin_list=[]

for i in os.popen(cmd_select).readlines():

i=i.strip(‘\n’)

_src_ip,_mainip,_backip = i.split(‘\t’)[0],i.split(‘\t’)[1],i.split(‘\t’)[2]

if _src_ip:

origin_tuple=(_src_ip,_mainip,_backip)

origin_list.append(origin_tuple)

return origin_list

def imp_data(origin_tuple):

’’’ 将一条 ip 数据通过 ip 库查询出需要的信息并更新到 db 中 ‘’’

_src_ip,src_ip,mainip,backip =origin_tuple[0],socket.inet_ntoa(struct.pack(“=I”, int(origin_tuple[0]))),socket.inet_ntoa(struct.pack(“=I”, int(origin_tuple[1]))),socket.inet_ntoa(struct.pack(“=I”, int(origin_tuple[2])))

country,prov,city,ap=ip_show(src_ip)

main_ser_country,main_ser_prov,main_ser_city,main_ser_ap=ip_show(mainip)

back_ser_country,back_ser_prov,back_ser_city,back_ser_ap=ip_show(backip)

cmd_update=’mysql –default-character-set=UTF8 -upcmgr_rw -pyw#1a3 -h10.240.64.140 -P3306 conn_quality -NBe “update halei_error_detail set src_ip='%s',country='%s',province='%s',city='%s', ap='%s',mainip='%s',main_ser_country='%s',main_ser_province='%s',main_ser_city='%s',main_ser_ap='%s', backip='%s',back_ser_country='%s',back_ser_province='%s',back_ser_city='%s',back_ser_ap='%s' where src_ip_str='%s' and ap is null”’ % (src_ip,country,prov,city,ap,mainip,main_ser_country,main_ser_prov,main_ser_city,main_ser_ap,backip,back_ser_country,back_ser_prov,back_ser_city,back_ser_ap,_src_ip)

try:

os.system(cmd_update)

except BaseException as e:

now_time=time.strftime(‘%m-%d %H:%M:%S’)

print “%s %s” % (now_time,e)

if name == ‘main’:

while True:

process_moni_cmd=”ps -ef grep ‘conn_error_detail_multi.py’ grep -v grep wc -l”

process_num=os.popen(process_moni_cmd).read()

if int(process_num) > 4:

now_tim=time.strftime(‘%m-%d %H:%M:%S’)

print “%s process exist,quit” % now_tim

sys.exit(1)

# 删除异常上报,避免脚本出错

20161215

del_cmd=’mysql –default-character-set=UTF8 -upcmgr_rw -pyw#1a3 -h10.240.64.140 -P3306 conn_quality -NBe “delete from ver_hijack_cnt where mainserverip not regexp '^[0-9]+$'”;’

os.system(del_cmd)

origin_list=origin_str()

pool=multiprocessing.Pool(4) # 多进程

#pool=ThreadPool(4)

# 多线程

start_time=time.time()

pool.map(imp_data,origin_list)

pool.close()

pool.join()

stop_time=time.time()

used_time=stop_time - start_time

print “work used %ds.” % used_time

已使用 Microsoft OneNote 2016 创建。