再把Driller看一下,主要參考Driller工具分析,源碼有更新。
1.Driller介紹
Driller是基于fuzz工具AFL和符號執(zhí)行工具angr來實現(xiàn)的,當模糊程序卡住時調用符號執(zhí)行來求解能夠到達新路徑的輸入,使得fuzz能夠快速突破條件判斷語句。具體技術請參考文末Driller和angr的論文鏈接。
具體實現(xiàn)是,通過監(jiān)測AFL的執(zhí)行,我們可以決定什么時候開始符號執(zhí)行以探索新路徑。如果AFL執(zhí)行了x輪后,bitmap上顯示沒有發(fā)現(xiàn)新的狀態(tài)轉換(也即新的代碼塊轉移),說明AFL卡住了,這時候調用angr進行符號執(zhí)行。每個具體輸入對應于PathGroup中的單個路徑, 在PathGroup的每一步中,檢查每個分支以確保最新的跳轉指令引入先前AFL未知的路徑。 當發(fā)現(xiàn)這樣的跳轉時,SMT求解器被查詢以創(chuàng)建一個輸入來驅動執(zhí)行到新的跳轉。這個輸入反饋給AFL,AFL在未來的模糊步驟中進行變異。 這個反饋循環(huán)使我們能夠將昂貴的符號執(zhí)行時間與廉價的模糊時間進行平衡,并且減輕了模糊對程序操作的低語義洞察力。
2.使用方法
安裝教程參考Driller安裝教程,也可直接使用docker更方便—shellphish/mechaphish。
(1)命令行
官方推薦的driller的使用方法是通過shellphuzz工具來使用,使用方式如下,“-i”選項指定afl-fuzz的線程數,“-d”選項指定driller(即符號執(zhí)行工具)的線程數,如果不使用-d或者-d 0,則不使用符號執(zhí)行。
# fuzz with 4 AFL cores
$ shellphuzz -i -c 4 /path/to/binary
# perform symbolic-assisted fuzzing with 4 AFL cores and 2 symbolic tracing (drilling) cores.
$ shellphuzz -i -c 4 -d 2 /path/to/binary
(2)代碼調用
方法一:直接調用Fuzzer對象(包含了fuzz和angr)
#change from /fuzzer/shellphuzz.py
import driller
import time
import fuzzer
def test():
def robo_fuzzer():
"""fuzz a single cb,copy it from shellphuzz"""
work_path = './work'
print "[*] Drilling..."
drill_extension = driller.LocalCallback(num_workers=4)
grease_extension = None
# Timeout=1800
first_crash = True
stuck_callback = (
(lambda f: (grease_extension(f), drill_extension(f))) if drill_extension and grease_extension
else drill_extension or grease_extension
)
print "[*] Creating fuzzer..."
fuzz = fuzzer.Fuzzer(
"./20190529", "./work", afl_count=1, force_interval=None,
create_dictionary=False, stuck_callback=stuck_callback, time_limit=1000000
)
# start it!
print "[*] Starting fuzzer..."
fuzz.start()
start_time = time.time()
robo_fuzzer()
test()
方法二:Driller和AFL并行運行的過程(需安裝AFL并提供接口),將driller中求解輸入的部分和AFL fuzz的部分分別放在兩個terminal運行 。
#terminal 1:用AFL進行fuzz
mkdir -p workdir/input
echo 'init' > workdir/input/seed1 # 提供初始化種子輸入
echo core | sudo tee /proc/sys/kernel/core_pattern
afl-2.52b/afl-fuzz -M fuzzer-master -i workdir/input/ -o workdir/output/ -Q ./buggy
提供了一個運行腳本run_driller.py
#改編自/driller/driller/local_callback.py
#!/usr/bin/env python
import errno
import os
import os.path
import sys
import time
from driller import Driller
def save_input(content, dest_dir, count):
"""Saves a new input to a file where AFL can find it.
File will be named id:XXXXXX,driller (where XXXXXX is the current value of
count) and placed in dest_dir.
"""
name = 'id:%06d,driller' % count
with open(os.path.join(dest_dir, name), 'w') as destfile:
destfile.write(content)
def main():
if len(sys.argv) != 3:
print 'Usage: %s <binary> <fuzzer_output_dir>' % sys.argv[0]
sys.exit(1)
_, binary, fuzzer_dir = sys.argv
# Figure out directories and inputs
with open(os.path.join(fuzzer_dir, 'fuzz_bitmap')) as bitmap_file:
fuzzer_bitmap = bitmap_file.read()
source_dir = os.path.join(fuzzer_dir, 'queue')
dest_dir = os.path.join(fuzzer_dir, '..', 'driller', 'queue')
# Make sure destination exists
try:
os.makedirs(dest_dir)
except os.error as e:
if e.errno != errno.EEXIST:
raise
seen = set() # Keeps track of source files already drilled
count = len(os.listdir(dest_dir)) # Helps us name outputs correctly
# Repeat forever in case AFL finds something new
while True:
# Go through all of the files AFL has generated, but only once each
for source_name in os.listdir(source_dir):
if source_name in seen or not source_name.startswith('id:'):
continue
seen.add(source_name)
with open(os.path.join(source_dir, source_name)) as seedfile:
seed = seedfile.read()
#就是把卡住的種子和bitmap輸入給driller,然后生成能執(zhí)行新路徑的輸入,寫到種子隊列中去。
print 'Drilling input: %s' % seed
for _, new_input in Driller(binary, seed, fuzzer_bitmap).drill_generator():
save_input(new_input, dest_dir, count)
count += 1
# Try a larger input too because Driller won't do it for you
seed = seed + '0000'
print 'Drilling input: %s' % seed
for _, new_input in Driller(binary, seed, fuzzer_bitmap).drill_generator():
save_input(new_input, dest_dir, count)
count += 1
time.sleep(10)
if __name__ == '__main__':
main()
#terminal 2:調用driller
source venv/bin/activate
python run_driller.py ./buggy workdir/output/fuzzer-master
3.源碼分析
這里主要分析shellphuzz、driller、afl部分的代碼,主要解釋3個工具如何一起工作,符號執(zhí)行的時機是什么時候等。
(1) shellphuzz
shellphuzz工具是AFL工具的一層Python接口,可以看做afl的python封裝。shellphuzz支持啟動afl、添加slave worker、注入或刪除測試case,檢查性能數據,使用符號執(zhí)行等。
shellphuzz啟動流程:
#/fuzzer/shellphuzz.py
if args.driller_workers:
print "[*] Drilling..."
drill_extension = driller.LocalCallback(num_workers=args.driller_workers)
stuck_callback = (
(lambda f: (grease_extension(f), drill_extension(f))) if drill_extension and grease_extension
else drill_extension or grease_extension
)
print "[*] Creating fuzzer..."
fuzzer = fuzzer.Fuzzer(
args.binary, args.work_dir, afl_count=args.afl_cores, force_interval=args.force_interval,
create_dictionary=not args.no_dictionary, stuck_callback=stuck_callback, time_limit=args.timeout
)
# start it!
print "[*] Starting fuzzer..."
fuzzer.start()
start_time = time.time()
shellphuzz開始運行后,如果加了-d選項,會注冊一個指向driller模塊的callback;然后實例化一個Fuzzer類的對象,然后啟動fuzzer。
#/fuzzer/fuzzer/fuzzer.py
def start(self):
'''
start fuzzing
'''
# spin up the AFL workers
self._start_afl() # 根據參數啟動多個afl線程
# start the callback timer
self._timer.start() # 啟動一個InfiniteTimer類的對象,會周期性的調用stuck_callback,即driller的callback。
(2)Driller
- driller_callback:
如上所述,shellphuzz通過計時器,周期性的調用driller的callback方法。該方法的代碼如下:
#/driller/driller/local_callback.py:
def driller_callback(self, fuzz):
l.warning("Driller stuck callback triggered!")
# remove any workers that aren't running
self._running_workers = [x for x in self._running_workers if x.is_alive()]
# get the files in queue
queue = self._queue_files(fuzz)
#for i in range(1, fuzz.fuzz_id):
# fname = "fuzzer-%d" % i
# queue.extend(self.queue_files(fname))
# start drilling
not_drilled = set(queue) - self._already_drilled_inputs
if len(not_drilled) == 0:
l.warning("no inputs left to drill")
while len(self._running_workers) < self._num_workers and len(not_drilled) > 0:
to_drill_path = list(not_drilled)[0]
not_drilled.remove(to_drill_path)
self._already_drilled_inputs.add(to_drill_path)
proc = multiprocessing.Process(target=_run_drill, args=(self, fuzz, to_drill_path))
proc.start()
self._running_workers.append(proc)
__call__ = driller_callback
對隊列queue中沒有經過符號執(zhí)行的輸入文件進行符號執(zhí)行,同一時間在運行的符號執(zhí)行器的個數不超過-d選項指定的個數。
該方法最后調用_run_driller方法來啟動driller。
- _run_driller:
_run_driller方法提取了driller所需要的參數,并調用main來真正啟動driller
#/driller/driller/local_callback.py:
def _run_drill(drill, fuzz, _path_to_input_to_drill):
_binary_path = fuzz.binary_path
_fuzzer_out_dir = fuzz.out_dir
_bitmap_path = os.path.join(_fuzzer_out_dir, 'fuzzer-master', "fuzz_bitmap")
_timeout = drill._worker_timeout
l.warning("starting drilling of %s, %s", os.path.basename(_binary_path), os.path.basename(_path_to_input_to_drill))
args = (
"timeout", "-k", str(_timeout+10), str(_timeout),
sys.executable, os.path.abspath(__file__),
_binary_path, _fuzzer_out_dir, _bitmap_path, _path_to_input_to_drill
)
p = subprocess.Popen(args, stdout=subprocess.PIPE)
print p.communicate()
- main:
main的基本流程為(這里可以學習學習):
- 設置參數,打開文件
- 初始化driller對象
- 通過driller的drill_generator方法進行符號執(zhí)行,并生成滿足afl需求的新輸入文件
- 保存能執(zhí)行新路徑的輸入,存入driller_queue_dir目錄
#/driller/driller/local_callback.py:
# this is for running with bash timeout
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Driller local callback")
parser.add_argument('binary_path')
parser.add_argument('fuzzer_out_dir')
parser.add_argument('bitmap_path')
parser.add_argument('path_to_input_to_drill')
parser.add_argument('--length-extension', help="Try extending inputs to driller by this many bytes", type=int)
args = parser.parse_args()
logcfg_file = os.path.join(os.getcwd(), '.driller.ini')
if os.path.isfile(logcfg_file):
logging.config.fileConfig(logcfg_file)
binary_path, fuzzer_out_dir, bitmap_path, path_to_input_to_drill = sys.argv[1:5]
fuzzer_bitmap = open(args.bitmap_path, "rb").read()
# create a folder
driller_dir = os.path.join(args.fuzzer_out_dir, "driller")
driller_queue_dir = os.path.join(driller_dir, "queue")
try: os.mkdir(driller_dir)
except OSError: pass
try: os.mkdir(driller_queue_dir)
except OSError: pass
l.debug('drilling %s', path_to_input_to_drill)
# get the input
inputs_to_drill = [open(args.path_to_input_to_drill, "rb").read()]
if args.length_extension:
inputs_to_drill.append(inputs_to_drill[0] + b'\0' * args.length_extension)
for input_to_drill in inputs_to_drill:
d = driller.Driller(args.binary_path, input_to_drill, fuzzer_bitmap)
count = 0
for new_input in d.drill_generator():
id_num = len(os.listdir(driller_queue_dir))
fuzzer_from = args.path_to_input_to_drill.split("sync/")[1].split("/")[0] + args.path_to_input_to_drill.split("id:")[1].split(",")[0]
filepath = "id:" + ("%d" % id_num).rjust(6, "0") + ",from:" + fuzzer_from
filepath = os.path.join(driller_queue_dir, filepath)
with open(filepath, "wb") as f:
f.write(new_input[1])
count += 1
l.warning("found %d new inputs", count)
關鍵代碼在/driller/driller/driller_main.py中。
drill_generator:
生成滿足需求的文件的driller接口。最終調用_drill_input這個方法真正的去實現(xiàn)符號執(zhí)行。_drill_input:
沿著一個指定的trace流一步步進行符號執(zhí)行(angr引擎,采用driller_core技術),如果發(fā)現(xiàn)新的路徑則記錄下來。
#/driller/driller/driller_main.py
def _drill_input(self):
"""
Symbolically step down a path with a tracer, trying to concretize inputs for unencountered
state transitions.
"""
# initialize the tracer
r = tracer.qemu_runner.QEMURunner(self.binary, self.input, argv=self.argv)
p = angr.Project(self.binary)
for addr, proc in self._hooks.items():
p.hook(addr, proc)
l.debug("Hooking %#x -> %s...", addr, proc.display_name)
if p.loader.main_object.os == 'cgc':
p.simos.syscall_library.update(angr.SIM_LIBRARIES['cgcabi_tracer'])
s = p.factory.entry_state(stdin=angr.SimFileStream, flag_page=r.magic)
else:
s = p.factory.full_init_state(stdin=angr.SimFileStream)
s.preconstrainer.preconstrain_file(self.input, s.posix.stdin, True)
simgr = p.factory.simulation_manager(s, save_unsat=True, hierarchy=False, save_unconstrained=r.crash_mode)
t = angr.exploration_techniques.Tracer(trace=r.trace, crash_addr=r.crash_addr, copy_states=True)
self._core = angr.exploration_techniques.DrillerCore(trace=r.trace)
simgr.use_technique(t)
simgr.use_technique(angr.exploration_techniques.Oppologist())
simgr.use_technique(self._core)
self._set_concretizations(simgr.one_active)
l.debug("Drilling into %r.", self.input)
l.debug("Input is %r.", self.input)
while simgr.active and simgr.one_active.globals['trace_idx'] < len(r.trace) - 1:
simgr.step()
# Check here to see if a crash has been found.
if self.redis and self.redis.sismember(self.identifier + '-finished', True):
return
if 'diverted' not in simgr.stashes:
continue
while simgr.diverted:
state = simgr.diverted.pop(0)
l.debug("Found a diverted state, exploring to some extent.")
w = self._writeout(state.history.bbl_addrs[-1], state)
if w is not None:
yield w
for i in self._symbolic_explorer_stub(state):
yield i
參考:
Driller工具分析—https://blog.csdn.net/Chen_zju/article/details/80791281
Driller安裝教程—https://blog.csdn.net/xiaosatianyu/article/details/60874004
Driller的安裝與使用—http://m.itdecent.cn/p/6e3943474f41
Driller論文—https://sites.cs.ucsb.edu/~vigna/publications/2016_NDSS_Driller.pdf
angr論文—https://sites.cs.ucsb.edu/~vigna/publications/2016_SP_angrSoK.pdf