0x00、前言&思路
一直以来,蜜罐系统建立对溯源系统来说有鸡肋之嫌,基于公网的蜜罐系统最早的产品形式是: wooyun当时做的一套蜜罐系统,主要是模拟底层服务弱口令(ssh、rdp、telnet)、数据库服务弱口令(mysql、postgresql、redis)、web应用(nginx、apache、tomcat等)、应用漏洞(OpenSSL心脏出血、Java RMI 反序列化代码执行、Struts2远程代码执行等)但是,这种主要是针对具有扫描器功能漏洞渗透自动化程序其作用,获取攻击源IP地址,获取植入恶意样本,然后沙盒运行样本做C&C地址分析。但是为了获取蜜罐样本的全面性,我们需要投入大量的基础设施建设成本,同时针对,我们需要DDoS定向溯源的地址还不一定有帮助。
那有没有更好的方式方法,可以第一时间发现DDoS呢?我们能不能通过一种更简便更轻资产的办法获得目前公网存活的恶意样本?根据我们的DDoS溯源的以往的经验上看,很多黑客喜欢通过http共享恶意文件方式,HFS是他们首选,,那我们是不是可以把全网HFS扫描出来,然后把这些恶意程序放到一个养鸡场,当说发起DDoS攻击的时候,我们养鸡场中的Agent也接到攻击命令,我们就知道哪个C&C Server发起功能,攻击的目标是谁。在这里我们引入一个概念:养鸡场,就是把认为是恶意样本都放到一起,正常饲养起来。
0x01、概念验证&PoC
完成以上想法需要业务模块:
1. 快速识别全国HFS服务器
(1)现有网络空间搜索引擎系统调研
shodan
a. 免费版只能翻两页 20条记录。
censys
a. 是免费也提供API操作,但是准确性存疑,免费的东西。。。
zoomeye
a. 要求实名注册,国内的限制,这个。。。
FOFA
a. 专业版1个月3000提供API,在线查询10000条/次,基础版在线1000条。穷。。。
(2)扫描技术原理调研
masscan
a. banner识别有限
zmap+grab
a. censys就使用这种方式,结果不准确
nmap
a. 服务指纹识别准确但是速度是短板
Zoomeye
a. 前端:vue
b. 后端:Django
c. 数据库:MongoDB+Elasticsearch
d. 扫描Worker/调度:Lucifer,
e. 组件指纹识别引擎Wmap/Xmap
某朋友的扫描系统
a. 客户端:masscan+nmap+nginx+redis+haproxy
b. 服务器端:Elasticsearch+ celery + django+ spark
(3)我的解决方案
目前系统处在验证阶段先简单架构,先扫描一下国内IP端。
端口扫描:masscan
服务指纹识别:nmap
数据库:postgresql
恶意文件验证:下载HFS服务器文件通过API发送virustotal校验。
2. 养鸡场部署
(1)蜜罐系统选择
更少的资源跑更多的蜜罐(杜鹃)
(2)外网监控
IDS监控IP和DNS对外连接情况(Suricata+ES)
(3)DDoS监控
外网发包pps监控告警
0x02、代码实现
在这之前写过一篇有关扫描器的文章(Freebuf bt0sea),当时的需求只是监控本公司IP段高危端口连接。没几个B段。但是现在监控的是全中国IP地址段,这有点多,直接nmap猴年马月也扫不完呀。
Problem 1:如果准确的拿到国内IP段
主要第一次做先把范围缩小一点。
Apanic提供了每日更新的亚太地区IPv4,IPv6,AS号分配的信息表,访问url是http://ftp.apnic.net/apnic/stats/apnic/delegated-apnic-latest我们过滤其中的ipv4数据即可。
apnic|CN|ipv4|42.156.64.0|16384|20110412|allocated
apnic|CN|ipv4|42.156.128.0|32768|20110316|allocated
apnic|CN|ipv4|42.157.0.0|65536|20110316|allocated
apnic|CN|ipv4|42.158.0.0|65536|20110224|allocated
apnic|CN|ipv4|42.159.0.0|65536|20110224|allocated
apnic|CN|ipv4|42.160.0.0|1048576|20110216|allocated
apnic|CN|ipv4|42.176.0.0|524288|20110222|allocated
apnic|CN|ipv4|42.184.0.0|131072|20110228|allocated
apnic|CN|ipv4|42.186.0.0|65536|20110316|allocated
apnic|CN|ipv4|42.187.0.0|16384|20110412|allocated
def getip(f,ipinfo=None):
if ipinfo is None:
ipinfo = []
for i in (f):
if i.find("apnic") > -1 and i.find("ipv4") > -1 and i.find("summary") == -1:
iplist = i.split("|")
country = iplist[1]
if country == 'CN':
ipinfo.append(i)
return(ipinfo)
然后入库sqlite。
Problem 2:端口扫描
启动masscan 30个进程
def Scan():
try:
global g_queue
while not g_queue.empty():
item = g_queue.get()
result = "result"+item+".json"
p = subprocess.Popen("/root/masscan/masscan "+item+" -p80 -oJ "+result, shell=True)
p.wait()
if g_queue.qsize() == 0:
print (u'公网IP端口扫描结束')
return "ok"
except Exception,e:
print e
return e
if __name__ == '__main__':
reload(sys)
sys.setdefaultencoding('utf-8')
listThread = []
### 获取ip列表加入全局变量中
conn = sqlite3.connect("ipwhois.db")
cur = conn.cursor()
print "Opened database successfully"
cursor = cur.execute("SELECT inetnum from ipwhois")
for row in cursor:
tmpstring = re.split(' - ',row[0])
# print tmpstring[0], tmpstring[1],
tmp=tmpstring[0]+'-'+tmpstring[1]
g_queue.put(tmp)
print tmp
conn.close()
for i in xrange(g_threadNum):
thread = ScanThread(Scan)
thread.start()
listThread.append(thread)
for thread in listThread:
thread.join()
print thread
一共大约3亿2000万IP,当然没计算已经取消分配的IP段。大约扫描了3天时间。需要优化,我发现有的一个进程扫描一个/8的ip段需要一天的时间,2000多万IP,所以可以切割成多个B段进行扫描。
Problem 3:服务指纹识别扫描
(1)提取IP
def store(result):
fw = open('iplist.txt', 'a')
with open(result,'r') as f:
for line in f:
if line.startswith('{ '}:
try:
temp = json.loads(line[:-2])
fw.writelines(temp["ip"]+'n')
print temp["ip"]
except:
continue
def list_dir_names(urPath):
dirNames = os.listdir(urPath)
for ones in dirNames:
ones.decode('gbk')
return dirNames
if __name__ == '__main__':
reload(sys)
sys.setdefaultencoding('utf-8')
filelist=[]
filelist=glob.glob("*.json")
for item in filelist:
if os.path.getsize(item)!=0:
print item
store(item)
else:
continue
提取后大约开80端口IP为4685409。
def Scan():
try:
global g_queue
while not g_queue.empty():
item = g_queue.get()
result = "result"+item.strip()+".xml"
p = subprocess.Popen("/usr/bin/nmap -oX "+result+" -sV -p80 "+item, shell=True)
p.wait()
print "size = ", g_queue.qsize()
if g_queue.qsize() == 0:
print (u'公网IP扫描结束')
return "ok"
except Exception,e:
print e
return e
def ImportiIPlist():
storejson_file=open('iplist.txt','rb')
lines = storejson_file.readlines()
for line in lines:
print line
g_queue.put(line)
g_totalsize = g_queue.qsize()
print g_totalsize
if __name__ == '__main__':
reload(sys)
sys.setdefaultencoding('utf-8')
listThread = []
ImportiIPlist()
for i in xrange(g_threadNum):
thread = ScanThread(Scan)
thread.start()
listThread.append(thread)
for thread in listThread:
thread.join()
print thread
扫描大约5-6天时间。
Problem 4:扫描结果入库
def Scan():
try:
timeout = 300
tableName = "%s_%s" % ("scanresult", time.strftime("%Y%m%d"))
conn1 = psycopg2.connect(database="postgres", user="postgres", password="Qwe123!@#", host="127.0.0.1",
port="5432")
cur1 = conn1.cursor()
g_totalsize = g_queue.qsize()
while not g_queue.empty():
item = g_queue.get()
print item
ctime = strftime("%Y-%m-%d %H:%M:%S", gmtime())
print ctime
nmap_report = NmapParser.parse_fromfile(item)
for scanned_hosts in nmap_report.hosts:
print scanned_hosts.address
for serv in scanned_hosts.services:
if serv.state == "open":
m = serv.service_dict.get('extrainfo', '')
print m
if m.find(''')!=-1:
pass
else:
sql = "insert into %s (task_id,ctime, address,port,service,product,product_version,product_extrainfo,os) VALUES ('%s','%s','%s','%s','%s','%s','%s','%s','%s')"
sqlCmd = sql%(tableName,g_task_id,ctime,scanned_hosts.address,str(serv.port),serv.service,serv.service_dict.get('product', ''),serv.service_dict.get('version', ''),serv.service_dict.get('extrainfo', ''),'NULL')
print sqlCmd
cur1.execute(sqlCmd)
conn1.commit()
print "size = ", g_queue.qsize()
g_size = g_queue.qsize()
num = "%.2f" %(float(g_size)/float(g_totalsize))
if g_queue.qsize() == 0:
print (u'公网IP扫描结束')
return "ok"
except Exception,e:
print e
pass
def CreateTable():
curC = connC.cursor()
sqlCreate = "create table if not exists %s (
task_id TEXT,
ctime TEXT,
address TEXT,
port TEXT,
service TEXT,
product TEXT ,
product_version TEXT,
product_extrainfo TEXT,
os TEXT
)"
tableName = "%s_%s"%("scanresult", time.strftime("%Y%m%d"))
sqlCmd = sqlCreate%tableName
curC.execute(sqlCmd)
curC.close()
connC.commit()
connC.close()
def SetTask():
filelist = []
filelist = glob.glob("*.xml")
for item in filelist:
if os.path.getsize(item) != 0:
print item
g_queue.put(item)
else:
continue
def main():
listThread = []
SetTask()
CreateTable()
for i in xrange(g_threadNum):
thread = ScanThread(Scan)
thread.start()
listThread.append(thread)
for thread in listThread:
thread.join()
print thread
if __name__ == '__main__':
reload(sys)
sys.setdefaultencoding('utf-8')
main()
SELECT DISTINCT(address),port,product,product_version,product_extrainfo FROM scanresult_20171231 WHERE product LIKE '%HttpFileServer%'
以下是部分结果
Virustotal
威胁情报查询结果(360):
对外发包监控用开源IDS就好。
0x02、总结
通过以上的概念验证,我们可以获得我们自己想到的样本。只要稍微整理一下验证的代码。就可以快速建立起一个养鸡场,实时监控DDOS趋势,为DDoS溯源添砖加瓦。
领取专属 10元无门槛券
私享最新 技术干货