使用Cutter和Radare2对APT32恶意程序流程图进行反混淆处理

Ocean Lotus Group,也被称之为APT32,这个黑客组织此前主要的攻击目标以越南、老挝和菲律宾等东亚国家为主,虽然私营企业是该组织的主要目标,但外国政府、政治活动家和新闻记者也是他们的攻击目标之一。

APT32的攻击工具非常多样化,从Mimikatz和Cobalt Strike这样的高级定制工具,到ShellCode以及后门等等,应有尽有。而且他们所使用的很多代码都经过了高度模糊处理或混淆处理,并使用了不同的技术来提升检测和分析的难度,导致研究人员更加难以对它们进行逆向分析。

在这篇文章中,我们将介绍该组织所使用的其中一种代码混淆技术,而这种技术也被APT32广泛应用到了他们的后门代码中。反混淆处理的过程中需要使用到Cutter以及官方开源逆向工程框架-radare2,还请各位同学自行搜索下载。

下载和安装Cutter

Cutter目前支持Linux、macOS和Windows。

Cutter下载地址:【点击底部阅读原文获取】

Cutter基础教程:【点击底部阅读原文获取】

后门分析

我们的样本(486be6b1ec73d98fdd3999abe2fa04368933a2ec)是多级感染链中的一部分,而且APT32在多个活动中都使用到了这个后门,例如恶意文件样本(115f3cb5bdfb2ffe5168ecb36b9aed54)。这个文档声称自己来自于360,但是其中包含了一个恶意VBA宏,这个恶意宏会向rundll32.exe注入恶意Shellcode。Shellcode中包含了解密代码,可以直接对恶意代码进行解密并将相应的DLL加载进内存,而DLL包含的就是后门逻辑。

首先,后门会解密一个配置文件,其中存储的信息包含C2服务器基础信息在内。接下来,代码会尝试使用自定义PE加载器向内存中加载恶意DLL。这个DLL会被HTTPProv.dll调用,并能够与C2服务器通信。后门还可以从C2服务器接收十几种不同的指令,包括Shellcode执行、新进程创建以及文件和目录修改等操作。

该组织所使用的很多混淆技术其目的就是要增加逆向分析的难度,而且其二进制代码中使用了大量的垃圾代码,这些垃圾代码会增加样本的体积和复杂性,以分散研究人员的注意力。而且,其中的代码集经常会与堆栈指针一起使用,而普通的反编译工具无法对这种情况进行有效处理。

混淆技术

APT32在进行代码混淆处理时,大量使用了控制流混淆,并且向函数流中注入了大量垃圾代码块。这些垃圾代码块不会实现任何功能,只是为了混淆视听而已。

大家可以从上图中看到,其中包含了大量垃圾代码块。仔细分析后我们会发现,所有需要跳转到这些代码段的条件判断结果都为False,而且都是以条件跳转结束的,跟之前的条件判断正好相反。比如说,垃圾代码段之前的条件判断为jo ,那么垃圾代码很有可能以jno结束。如果之前的代码段以jne 结束,那么垃圾代码段就会以je 结束。

这样一来,我们就可以对这些垃圾代码段定性了。第一种特性:出现两个连续的垃圾代码块,以相反的条件跳转到相同的目标地址并结束。第二种特性:要求第二个块不包含有意义的指令,如字符串引用或代码调用等等。

当满足这两个特征时,我们可以说第二个块很可能是垃圾代码块。这样,我们就可以将垃圾块从图表中删除了,并使用无条件跳转来修补源代码。

编写核心类

首先,我们要创建一个Python类作为我们的核心类,这个类需要包含查找和移除垃圾代码块的逻辑。先定义init函数,该函数可以接收管道消息,可以是来自redare2的r2pipe对象(importr2pipe),也可以是来自Cutter的cutter对象(import cutter)。

class GraphDeobfuscator:   def __init__(self, pipe):       """an initializationfunction for the class             Arguments:           pipe {r2pipe} -- an instance ofr2pipe or Cutter's wrapper       """       self.pipe = pipe

现在我们就可以使用这个管道来执行radare2命令了。这个管道对象包含两种执行r2命令的方式。第一种为pipe.cmd(),它能够以字符串的形式返回命令执行结果。第二种为pipe.cmdj(j),它你能够根据radare2命令的输出结果返回解析后的JSON对象。

接下来就是从当前函数中获取所有的代码块,然后进行迭代。这里可以使用afbj米工龄来获取函数中所有代码块的JSON对象。

def clean_junk_blocks(self):       """Search a givenfunction for junk blocks, remove them and fix the flow.       """       # Get all the basic blocks of thefunction       blocks = self.pipe.cmdj("afbj @$F")       if not blocks:           print("[X] No blocks found. Is it afunction?")           return       modified = False       # Iterate over all the basic blocks ofthe function       for block in blocks:           # do something

针对每一个块,根据之前的判断条件进行分析,获取候选垃圾代码块:

def get_fail_block(self, block):       """Return the block towhich a block branches if the condition is fails             Arguments:           block {block_context} -- A JSONrepresentation of a block             Returns:           block_context -- The block to whichthe branch fails. If not exists, returns None       """       # Get the address of the"fail" branch       fail_addr = self.get_fail(block)       if not fail_addr:           return None       # Get a block context of the failaddress       fail_block = self.get_block(fail_addr)       return fail_block if fail_block elseNone   def is_successive_fail(self, block_A,block_B):       """Check if the endaddress of block_A is the start of block_B       Arguments:           block_A {block_context} -- A JSONobject to represent the first block           block_B {block_context} -- A JSONobject to represent the second block             Returns:           bool -- True if block_B comes immediatelyafter block_A, False otherwise       """      return ((block_A["addr"] +block_A["size"]) == block_B["addr"])
接下来,我们要判断候选垃圾代码段是否包含无效指令:
def contains_meaningful_instructions (self,block):       '''Check if a block contains meaningfulinstructions (references, calls, strings,...)             Arguments:           block {block_context} -- A JSONobject which represents a block             Returns:           bool -- True if the block containsmeaningful instructions, False otherwise       '''       # Get summary of block - strings, calls,references       summary = self.pipe.cmd("pdsb @{addr}".format(addr=block["addr"]))       return summary != ""
最后,枚举出所有对立的跳转条件:
jmp_pairs = [       ['jno', 'jo'],       ['jnp', 'jp'],       ['jb', 'jnb'],       ['jl', 'jnl'],       ['je', 'jne'],       ['jns', 'js'],       ['jnz', 'jz'],       ['jc', 'jnc'],       ['ja', 'jbe'],       ['jae', 'jb'],       ['je', 'jnz'],       ['jg', 'jle'],       ['jge', 'jl'],       ['jpe', 'jpo'],       ['jne', 'jz']]   def is_opposite_conditional(self, cond_A,cond_B):       """Check if two operandsare opposite conditional jump operands             Arguments:           cond_A {string} -- the conditionaljump operand of the first block           cond_B {string} -- the conditionaljump operand of the second block             Returns:           bool -- True if the operands areopposite, False otherwise       """       sorted_pair = sorted([cond_A, cond_B])       for pair in self.jmp_pairs:           if sorted_pair == pair:               return True       return False

将上述所有代码整合到cleanjunkblocks()函数中:

def clean_junk_blocks(self):       """Search a givenfunction for junk blocks, remove them and fix the flow.       """       # Get all the basic blocks of thefunction       blocks = self.pipe.cmdj("afbj @$F")       if not blocks:           print("[X] No blocks found. Isit a function?")           return       modified = False       # Iterate over all the basic blocks ofthe function       for block in blocks:           fail_block =self.get_fail_block(block)           if not fail_block or \           not self.is_successive_fail(block,fail_block) or \          self.contains_meaningful_instructions(fail_block) or \           notself.is_opposite_conditional(self.get_last_mnem_of_block(block),self.get_last_mnem_of_block(fail_block)):               continue

使用Radare2

if__name__ == "__main__":   graph_deobfuscator = GraphDeobfuscator(pipe)   graph_deobfuscator.clean_graph()

使用Cutter

ifcutter_available:   # This part will be executed only if Cutteris available.   # This will create the cutter plugin and UIobjects for the plugin   classGraphDeobfuscatorCutter(cutter.CutterPlugin):       name = "APT32 GraphDeobfuscator"       description = "Graph Deobfuscatorfor APT32 Samples"       version = "1.0"       author = "Itay Cohen(@Megabeets_)"       def setupPlugin(self):           pass       def setupInterface(self, main):           pass      def create_cutter_plugin():       return GraphDeobfuscatorCutter()

为了保证插件正常运行,我们还需要增加一个菜单入口来触发反混淆功能:

ifcutter_available:   # This part will be executed only if Cutteris available. This will   # create the cutter plugin and UI objectsfor the plugin   classGraphDeobfuscatorCutter(cutter.CutterPlugin):       name = "APT32 GraphDeobfuscator"       description = "Graph Deobfuscatorfor APT32 Samples"       version = "1.0"       author = "Megabeets"       def setupPlugin(self):           pass       def setupInterface(self, main):           # Create a new action (menu item)           action = QAction("APT32 GraphDeobfuscator", main)           action.setCheckable(False)          # Connect the action to a function - cleaner.           # A click on this action willtrigger the function          action.triggered.connect(self.cleaner)           # Add the action to the"Windows -> Plugins" menu           pluginsMenu =main.getMenuByType(main.MenuType.Plugins)           pluginsMenu.addAction(action)       def cleaner(self):           graph_deobfuscator =GraphDeobfuscator(pipe)           graph_deobfuscator.clean_graph()           cutter.refresh()   def create_cutter_plugin():       return GraphDeobfuscatorCutter()

接下来,我们就可以看到图形化的分析结果了:

移除垃圾代码段之后的结果图如下所示:

对比图如下:

样本SHA256值

Be6d5973452248cb18949711645990b6a56e7442dc30cc48a607a2afe7d8ec668d74d544396b57e6faa4f8fdf96a1a5e30b196d56c15f7cf05767a406708a6b2

APT32图形化反混淆工具-完整源代码

"""A plugin for Cutter and Radare2 to deobfuscate APT32 flow graphsThisis a python plugin for Cutter that is compatible as an r2pipe script forradare2as well. The plugin will help reverse engineers to deobfuscate and removejunkblocks from APT32 (Ocean Lotus) samples."""__author__  = "Itay Cohen, aka @megabeets_"__company__= "Check Point Software Technologies Ltd"#Check if we're running from cuttertry:    import cutter    from PySide2.QtWidgets import QAction    pipe = cutter    cutter_available = True# Ifno, assume running from radare2except:    import r2pipe    pipe = r2pipe.open()    cutter_available = FalseclassGraphDeobfuscator:    # A list of pairs of opposite conditionaljumps    jmp_pairs = [        ['jno', 'jo'],        ['jnp', 'jp'],        ['jb', 'jnb'],        ['jl', 'jnl'],        ['je', 'jne'],        ['jns', 'js'],        ['jnz', 'jz'],        ['jc', 'jnc'],        ['ja', 'jbe'],        ['jae', 'jb'],        ['je', 'jnz'],        ['jg', 'jle'],        ['jge', 'jl'],        ['jpe', 'jpo'],       ['jne', 'jz']]    def __init__(self, pipe, verbose=False):        """an initializationfunction for the class                Arguments:            pipe {r2pipe} -- an instance ofr2pipe or Cutter's wrapper                Keyword Arguments:            verbose {bool} -- if True willprint logs to the screen (default: {False})        """        self.pipe = pipe        self.verbose = verbose    def is_successive_fail(self, block_A,block_B):        """Check if the endaddress of block_A is the start of block_B        Arguments:            block_A {block_context} -- A JSONobject to represent the first block            block_B {block_context} -- A JSONobject to represent the second block                Returns:            bool -- True if block_B comesimmediately after block_A, False otherwise        """        return ((block_A["addr"] +block_A["size"]) == block_B["addr"])    def is_opposite_conditional(self, cond_A,cond_B):        """Check if two operandsare opposite conditional jump operands                Arguments:            cond_A {string} -- the conditionaljump operand of the first block            cond_B {string} -- the conditionaljump operand of the second block        Returns:            bool -- True if the operands areopposite, False otherwise        """        sorted_pair = sorted([cond_A, cond_B])        for pair in self.jmp_pairs:            if sorted_pair == pair:                return True        return False    defcontains_meaningful_instructions (self, block):        '''Check if a block contains meaningfulinstructions (references, calls, strings,...)                Arguments:            block {block_context} -- A JSONobject which represents a block                Returns:            bool -- True if the block containsmeaningful instructions, False otherwise        '''        # Get summary of block - strings,calls, references        summary = self.pipe.cmd("pdsb @{addr}".format(addr=block["addr"]))        return summary != ""    def get_block_end(self, block):        """Get the address ofthe last instruction in a given block                Arguments:            block {block_context} -- A JSONobject which represents a block                Returns:            The address of the last instructionin the block        """        # save current seek        self.pipe.cmd("s{addr}".format(addr=block['addr']))        # This will return the address of ablock's last instruction        block_end = self.pipe.cmd("?v $@B:-1")        return block_end    def get_last_mnem_of_block(self, block):        """Get the mnemonic ofthe last instruction in a block                Arguments:            block {block_context} -- A JSONobject which represents a block                Returns:            string -- the mnemonic of the lastinstruction in the given block        """        inst_info = self.pipe.cmdj("aoj @{addr}".format(addr=self.get_block_end(block)))[0]        return inst_info["mnemonic"]    def get_jump(self, block):        """Get the address towhich a block jumps                Arguments:            block {block_context} -- A JSONobject which represents a block                Returns:            addr -- the address to which theblock jumps to. If such address doesn't exist, returns False         """        return block["jump"] if"jump" in block else None    def get_fail_addr(self, block):        """Get the address towhich a block fails                Arguments:            block {block_context} -- A JSONobject which represents a block                Returns:            addr -- the address to which theblock fail-branches to. If such address doesn't exist, returns False         """        return block["fail"] if"fail" in block else None    def get_block(self, addr):        """Get the block contextin a given address                Arguments:            addr {addr} -- An address in ablock                Returns:            block_context -- the block to whichthe address belongs        """        block = self.pipe.cmdj("abj. @{offset}".format(offset=addr))        return block[0] if block else None    def get_fail_block(self, block):        """Return the block towhich a block branches if the condition is fails                Arguments:            block {block_context} -- A JSONrepresentation of a block                Returns:            block_context -- The block to whichthe branch fails. If not exists, returns None        """        # Get the address of the"fail" branch        fail_addr = self.get_fail_addr(block)        if not fail_addr:            return None        # Get a block context of the failaddress        fail_block = self.get_block(fail_addr)        return fail_block if fail_block elseNone    def reanalize_function(self):        """Re-Analyze a functionat a given address                Arguments:            addr {addr} -- an address of afunction to be re-analyze        """        # Seek to the function's start        self.pipe.cmd("s $F")        # Undefine the function in this address        self.pipe.cmd("af- $")        # Define and analyze a function in thisaddress        self.pipe.cmd("afr @ $")           def overwrite_instruction(self, addr):        """Overwrite aconditional jump to an address, with a JMP to it                Arguments:            addr {addr} -- address of aninstruction to be overwritten        """        jump_destination =self.get_jump(self.pipe.cmdj("aoj @ {addr}".format(addr=addr))[0])        if (jump_destination):            self.pipe.cmd("wai jmp0x{dest:x} @ {addr}".format(dest=jump_destination, addr=addr))    def get_current_function(self):        """Return the startaddress of the current function        Return Value:            The address of the currentfunction. None if no function found.        """        function_start =int(self.pipe.cmd("?vi $FB"))        return function_start if function_start!= 0 else None    def clean_junk_blocks(self):        """Search a givenfunction for junk blocks, remove them and fix the flow.        """        # Get all the basic blocks of thefunction        blocks = self.pipe.cmdj("afbj @$F")        if not blocks:            print("[X] No blocks found. Isit a function?")            return        # Have we modified any instruction inthe function?        # If so, a reanalyze of the function isrequired        modified = False        # Iterate over all the basic blocks ofthe function        for block in blocks:            fail_block =self.get_fail_block(block)            # Make validation checks            if not fail_block or \            not self.is_successive_fail(block,fail_block) or \           self.contains_meaningful_instructions(fail_block) or \            notself.is_opposite_conditional(self.get_last_mnem_of_block(block),self.get_last_mnem_of_block(fail_block)):                continue            if self.verbose:                print ("Potential junk:0x{junk_block:x}(0x{fix_block:x})".format(junk_block=fail_block["addr"],fix_block=block["addr"]))           self.overwrite_instruction(self.get_block_end(block))            modified = True        if modified:            self.reanalize_function()            def clean_graph(self):        """the initial functionof the class. Responsible to enable cache and start the cleaning        """        # Enable cache writing mode. changeswill only take place in the session and        # will not override the binary        self.pipe.cmd("eio.cache=true")        self.clean_junk_blocks()        ifcutter_available:    # This part will be executed only if Cutteris available. This will    # create the cutter plugin and UI objectsfor the plugin    classGraphDeobfuscatorCutter(cutter.CutterPlugin):        name = "APT32 GraphDeobfuscator"        description = "Graph Deobfuscatorfor APT32 Samples"        version = "1.0"        author = "Itay Cohen(@Megabeets_)"        def setupPlugin(self):            pass        def setupInterface(self, main):            # Create a new action (menu item)            action = QAction("APT32 GraphDeobfuscator", main)            action.setCheckable(False)            # Connect the action to a function- cleaner.            # A click on this action willtrigger the function            action.triggered.connect(self.cleaner)            # Add the action to the"Windows -> Plugins" menu            pluginsMenu =main.getMenuByType(main.MenuType.Plugins)            pluginsMenu.addAction(action)        def cleaner(self):            graph_deobfuscator =GraphDeobfuscator(pipe)            graph_deobfuscator.clean_graph()            cutter.refresh()    def create_cutter_plugin():        return GraphDeobfuscatorCutter()if__name__ == "__main__":    graph_deobfuscator =GraphDeobfuscator(pipe)graph_deobfuscator.clean_graph()
*本文作者:checkpoint,转载请注明来自FreeBuf.COM

原文发布于微信公众号 - FreeBuf(freebuf)

原文发表时间:2019-07-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券