前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用Cutter和Radare2对APT32恶意程序流程图进行反混淆处理

使用Cutter和Radare2对APT32恶意程序流程图进行反混淆处理

作者头像
FB客服
发布2019-07-22 16:25:31
7820
发布2019-07-22 16:25:31
举报
文章被收录于专栏:FreeBuf
Ocean Lotus Group,也被称之为APT32,这个黑客组织此前主要的攻击目标以越南、老挝和菲律宾等东亚国家为主,虽然私营企业是该组织的主要目标,但外国政府、政治活动家和新闻记者也是他们的攻击目标之一。

APT32的攻击工具非常多样化,从Mimikatz和Cobalt Strike这样的高级定制工具,到ShellCode以及后门等等,应有尽有。而且他们所使用的很多代码都经过了高度模糊处理或混淆处理,并使用了不同的技术来提升检测和分析的难度,导致研究人员更加难以对它们进行逆向分析。

在这篇文章中,我们将介绍该组织所使用的其中一种代码混淆技术,而这种技术也被APT32广泛应用到了他们的后门代码中。反混淆处理的过程中需要使用到Cutter以及官方开源逆向工程框架-radare2,还请各位同学自行搜索下载。

下载和安装Cutter

Cutter目前支持Linux、macOS和Windows。

Cutter下载地址:【点击底部阅读原文获取】

Cutter基础教程:【点击底部阅读原文获取】

后门分析

我们的样本(486be6b1ec73d98fdd3999abe2fa04368933a2ec)是多级感染链中的一部分,而且APT32在多个活动中都使用到了这个后门,例如恶意文件样本(115f3cb5bdfb2ffe5168ecb36b9aed54)。这个文档声称自己来自于360,但是其中包含了一个恶意VBA宏,这个恶意宏会向rundll32.exe注入恶意Shellcode。Shellcode中包含了解密代码,可以直接对恶意代码进行解密并将相应的DLL加载进内存,而DLL包含的就是后门逻辑。

首先,后门会解密一个配置文件,其中存储的信息包含C2服务器基础信息在内。接下来,代码会尝试使用自定义PE加载器向内存中加载恶意DLL。这个DLL会被HTTPProv.dll调用,并能够与C2服务器通信。后门还可以从C2服务器接收十几种不同的指令,包括Shellcode执行、新进程创建以及文件和目录修改等操作。

该组织所使用的很多混淆技术其目的就是要增加逆向分析的难度,而且其二进制代码中使用了大量的垃圾代码,这些垃圾代码会增加样本的体积和复杂性,以分散研究人员的注意力。而且,其中的代码集经常会与堆栈指针一起使用,而普通的反编译工具无法对这种情况进行有效处理。

混淆技术

APT32在进行代码混淆处理时,大量使用了控制流混淆,并且向函数流中注入了大量垃圾代码块。这些垃圾代码块不会实现任何功能,只是为了混淆视听而已。

大家可以从上图中看到,其中包含了大量垃圾代码块。仔细分析后我们会发现,所有需要跳转到这些代码段的条件判断结果都为False,而且都是以条件跳转结束的,跟之前的条件判断正好相反。比如说,垃圾代码段之前的条件判断为jo ,那么垃圾代码很有可能以jno结束。如果之前的代码段以jne 结束,那么垃圾代码段就会以je 结束。

这样一来,我们就可以对这些垃圾代码段定性了。第一种特性:出现两个连续的垃圾代码块,以相反的条件跳转到相同的目标地址并结束。第二种特性:要求第二个块不包含有意义的指令,如字符串引用或代码调用等等。

当满足这两个特征时,我们可以说第二个块很可能是垃圾代码块。这样,我们就可以将垃圾块从图表中删除了,并使用无条件跳转来修补源代码。

编写核心类

首先,我们要创建一个Python类作为我们的核心类,这个类需要包含查找和移除垃圾代码块的逻辑。先定义init函数,该函数可以接收管道消息,可以是来自redare2的r2pipe对象(importr2pipe),也可以是来自Cutter的cutter对象(import cutter)。

代码语言:javascript
复制
class GraphDeobfuscator:   def __init__(self, pipe):       """an initializationfunction for the class             Arguments:           pipe {r2pipe} -- an instance ofr2pipe or Cutter's wrapper       """       self.pipe = pipe

现在我们就可以使用这个管道来执行radare2命令了。这个管道对象包含两种执行r2命令的方式。第一种为pipe.cmd(),它能够以字符串的形式返回命令执行结果。第二种为pipe.cmdj(j),它你能够根据radare2命令的输出结果返回解析后的JSON对象。

接下来就是从当前函数中获取所有的代码块,然后进行迭代。这里可以使用afbj米工龄来获取函数中所有代码块的JSON对象。

代码语言:javascript
复制
def clean_junk_blocks(self):       """Search a givenfunction for junk blocks, remove them and fix the flow.       """       # Get all the basic blocks of thefunction       blocks = self.pipe.cmdj("afbj @$F")       if not blocks:           print("[X] No blocks found. Is it afunction?")           return       modified = False       # Iterate over all the basic blocks ofthe function       for block in blocks:           # do something

针对每一个块,根据之前的判断条件进行分析,获取候选垃圾代码块:

代码语言:javascript
复制
def get_fail_block(self, block):       """Return the block towhich a block branches if the condition is fails             Arguments:           block {block_context} -- A JSONrepresentation of a block             Returns:           block_context -- The block to whichthe branch fails. If not exists, returns None       """       # Get the address of the"fail" branch       fail_addr = self.get_fail(block)       if not fail_addr:           return None       # Get a block context of the failaddress       fail_block = self.get_block(fail_addr)       return fail_block if fail_block elseNone   def is_successive_fail(self, block_A,block_B):       """Check if the endaddress of block_A is the start of block_B       Arguments:           block_A {block_context} -- A JSONobject to represent the first block           block_B {block_context} -- A JSONobject to represent the second block             Returns:           bool -- True if block_B comes immediatelyafter block_A, False otherwise       """      return ((block_A["addr"] +block_A["size"]) == block_B["addr"])
代码语言:javascript
复制
接下来,我们要判断候选垃圾代码段是否包含无效指令:
代码语言:javascript
复制
def contains_meaningful_instructions (self,block):       '''Check if a block contains meaningfulinstructions (references, calls, strings,...)             Arguments:           block {block_context} -- A JSONobject which represents a block             Returns:           bool -- True if the block containsmeaningful instructions, False otherwise       '''       # Get summary of block - strings, calls,references       summary = self.pipe.cmd("pdsb @{addr}".format(addr=block["addr"]))       return summary != ""
代码语言:javascript
复制
最后,枚举出所有对立的跳转条件:
代码语言:javascript
复制
jmp_pairs = [       ['jno', 'jo'],       ['jnp', 'jp'],       ['jb', 'jnb'],       ['jl', 'jnl'],       ['je', 'jne'],       ['jns', 'js'],       ['jnz', 'jz'],       ['jc', 'jnc'],       ['ja', 'jbe'],       ['jae', 'jb'],       ['je', 'jnz'],       ['jg', 'jle'],       ['jge', 'jl'],       ['jpe', 'jpo'],       ['jne', 'jz']]   def is_opposite_conditional(self, cond_A,cond_B):       """Check if two operandsare opposite conditional jump operands             Arguments:           cond_A {string} -- the conditionaljump operand of the first block           cond_B {string} -- the conditionaljump operand of the second block             Returns:           bool -- True if the operands areopposite, False otherwise       """       sorted_pair = sorted([cond_A, cond_B])       for pair in self.jmp_pairs:           if sorted_pair == pair:               return True       return False

将上述所有代码整合到cleanjunkblocks()函数中:

代码语言:javascript
复制
def clean_junk_blocks(self):       """Search a givenfunction for junk blocks, remove them and fix the flow.       """       # Get all the basic blocks of thefunction       blocks = self.pipe.cmdj("afbj @$F")       if not blocks:           print("[X] No blocks found. Isit a function?")           return       modified = False       # Iterate over all the basic blocks ofthe function       for block in blocks:           fail_block =self.get_fail_block(block)           if not fail_block or \           not self.is_successive_fail(block,fail_block) or \          self.contains_meaningful_instructions(fail_block) or \           notself.is_opposite_conditional(self.get_last_mnem_of_block(block),self.get_last_mnem_of_block(fail_block)):               continue

使用Radare2

代码语言:javascript
复制
if__name__ == "__main__":   graph_deobfuscator = GraphDeobfuscator(pipe)   graph_deobfuscator.clean_graph()

使用Cutter

代码语言:javascript
复制
ifcutter_available:   # This part will be executed only if Cutteris available.   # This will create the cutter plugin and UIobjects for the plugin   classGraphDeobfuscatorCutter(cutter.CutterPlugin):       name = "APT32 GraphDeobfuscator"       description = "Graph Deobfuscatorfor APT32 Samples"       version = "1.0"       author = "Itay Cohen(@Megabeets_)"       def setupPlugin(self):           pass       def setupInterface(self, main):           pass      def create_cutter_plugin():       return GraphDeobfuscatorCutter()

为了保证插件正常运行,我们还需要增加一个菜单入口来触发反混淆功能:

代码语言:javascript
复制
ifcutter_available:   # This part will be executed only if Cutteris available. This will   # create the cutter plugin and UI objectsfor the plugin   classGraphDeobfuscatorCutter(cutter.CutterPlugin):       name = "APT32 GraphDeobfuscator"       description = "Graph Deobfuscatorfor APT32 Samples"       version = "1.0"       author = "Megabeets"       def setupPlugin(self):           pass       def setupInterface(self, main):           # Create a new action (menu item)           action = QAction("APT32 GraphDeobfuscator", main)           action.setCheckable(False)          # Connect the action to a function - cleaner.           # A click on this action willtrigger the function          action.triggered.connect(self.cleaner)           # Add the action to the"Windows -> Plugins" menu           pluginsMenu =main.getMenuByType(main.MenuType.Plugins)           pluginsMenu.addAction(action)       def cleaner(self):           graph_deobfuscator =GraphDeobfuscator(pipe)           graph_deobfuscator.clean_graph()           cutter.refresh()   def create_cutter_plugin():       return GraphDeobfuscatorCutter()

接下来,我们就可以看到图形化的分析结果了:

移除垃圾代码段之后的结果图如下所示:

对比图如下:

样本SHA256值

代码语言:javascript
复制
Be6d5973452248cb18949711645990b6a56e7442dc30cc48a607a2afe7d8ec668d74d544396b57e6faa4f8fdf96a1a5e30b196d56c15f7cf05767a406708a6b2

APT32图形化反混淆工具-完整源代码

代码语言:javascript
复制
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* """A plugin for Cutter and Radare2 to deobfuscate APT32 flow graphsThisis a python plugin for Cutter that is compatible as an r2pipe script forradare2as well. The plugin will help reverse engineers to deobfuscate and removejunkblocks from APT32 (Ocean Lotus) samples."""__author__  = "Itay Cohen, aka @megabeets_"__company__= "Check Point Software Technologies Ltd"#Check if we're running from cuttertry:    import cutter    from PySide2.QtWidgets import QAction    pipe = cutter    cutter_available = True# Ifno, assume running from radare2except:    import r2pipe    pipe = r2pipe.open()    cutter_available = FalseclassGraphDeobfuscator:    # A list of pairs of opposite conditionaljumps    jmp_pairs = [        ['jno', 'jo'],        ['jnp', 'jp'],        ['jb', 'jnb'],        ['jl', 'jnl'],        ['je', 'jne'],        ['jns', 'js'],        ['jnz', 'jz'],        ['jc', 'jnc'],        ['ja', 'jbe'],        ['jae', 'jb'],        ['je', 'jnz'],        ['jg', 'jle'],        ['jge', 'jl'],        ['jpe', 'jpo'],       ['jne', 'jz']]    def __init__(self, pipe, verbose=False):        """an initializationfunction for the class                Arguments:            pipe {r2pipe} -- an instance ofr2pipe or Cutter's wrapper                Keyword Arguments:            verbose {bool} -- if True willprint logs to the screen (default: {False})        """        self.pipe = pipe        self.verbose = verbose    def is_successive_fail(self, block_A,block_B):        """Check if the endaddress of block_A is the start of block_B        Arguments:            block_A {block_context} -- A JSONobject to represent the first block            block_B {block_context} -- A JSONobject to represent the second block                Returns:            bool -- True if block_B comesimmediately after block_A, False otherwise        """        return ((block_A["addr"] +block_A["size"]) == block_B["addr"])    def is_opposite_conditional(self, cond_A,cond_B):        """Check if two operandsare opposite conditional jump operands                Arguments:            cond_A {string} -- the conditionaljump operand of the first block            cond_B {string} -- the conditionaljump operand of the second block        Returns:            bool -- True if the operands areopposite, False otherwise        """        sorted_pair = sorted([cond_A, cond_B])        for pair in self.jmp_pairs:            if sorted_pair == pair:                return True        return False    defcontains_meaningful_instructions (self, block):        '''Check if a block contains meaningfulinstructions (references, calls, strings,...)                Arguments:            block {block_context} -- A JSONobject which represents a block                Returns:            bool -- True if the block containsmeaningful instructions, False otherwise        '''        # Get summary of block - strings,calls, references        summary = self.pipe.cmd("pdsb @{addr}".format(addr=block["addr"]))        return summary != ""    def get_block_end(self, block):        """Get the address ofthe last instruction in a given block                Arguments:            block {block_context} -- A JSONobject which represents a block                Returns:            The address of the last instructionin the block        """        # save current seek        self.pipe.cmd("s{addr}".format(addr=block['addr']))        # This will return the address of ablock's last instruction        block_end = self.pipe.cmd("?v $@B:-1")        return block_end    def get_last_mnem_of_block(self, block):        """Get the mnemonic ofthe last instruction in a block                Arguments:            block {block_context} -- A JSONobject which represents a block                Returns:            string -- the mnemonic of the lastinstruction in the given block        """        inst_info = self.pipe.cmdj("aoj @{addr}".format(addr=self.get_block_end(block)))[0]        return inst_info["mnemonic"]    def get_jump(self, block):        """Get the address towhich a block jumps                Arguments:            block {block_context} -- A JSONobject which represents a block                Returns:            addr -- the address to which theblock jumps to. If such address doesn't exist, returns False         """        return block["jump"] if"jump" in block else None    def get_fail_addr(self, block):        """Get the address towhich a block fails                Arguments:            block {block_context} -- A JSONobject which represents a block                Returns:            addr -- the address to which theblock fail-branches to. If such address doesn't exist, returns False         """        return block["fail"] if"fail" in block else None    def get_block(self, addr):        """Get the block contextin a given address                Arguments:            addr {addr} -- An address in ablock                Returns:            block_context -- the block to whichthe address belongs        """        block = self.pipe.cmdj("abj. @{offset}".format(offset=addr))        return block[0] if block else None    def get_fail_block(self, block):        """Return the block towhich a block branches if the condition is fails                Arguments:            block {block_context} -- A JSONrepresentation of a block                Returns:            block_context -- The block to whichthe branch fails. If not exists, returns None        """        # Get the address of the"fail" branch        fail_addr = self.get_fail_addr(block)        if not fail_addr:            return None        # Get a block context of the failaddress        fail_block = self.get_block(fail_addr)        return fail_block if fail_block elseNone    def reanalize_function(self):        """Re-Analyze a functionat a given address                Arguments:            addr {addr} -- an address of afunction to be re-analyze        """        # Seek to the function's start        self.pipe.cmd("s $F")        # Undefine the function in this address        self.pipe.cmd("af- $")        # Define and analyze a function in thisaddress        self.pipe.cmd("afr @ $")           def overwrite_instruction(self, addr):        """Overwrite aconditional jump to an address, with a JMP to it                Arguments:            addr {addr} -- address of aninstruction to be overwritten        """        jump_destination =self.get_jump(self.pipe.cmdj("aoj @ {addr}".format(addr=addr))[0])        if (jump_destination):            self.pipe.cmd("wai jmp0x{dest:x} @ {addr}".format(dest=jump_destination, addr=addr))    def get_current_function(self):        """Return the startaddress of the current function        Return Value:            The address of the currentfunction. None if no function found.        """        function_start =int(self.pipe.cmd("?vi $FB"))        return function_start if function_start!= 0 else None    def clean_junk_blocks(self):        """Search a givenfunction for junk blocks, remove them and fix the flow.        """        # Get all the basic blocks of thefunction        blocks = self.pipe.cmdj("afbj @$F")        if not blocks:            print("[X] No blocks found. Isit a function?")            return        # Have we modified any instruction inthe function?        # If so, a reanalyze of the function isrequired        modified = False        # Iterate over all the basic blocks ofthe function        for block in blocks:            fail_block =self.get_fail_block(block)            # Make validation checks            if not fail_block or \            not self.is_successive_fail(block,fail_block) or \           self.contains_meaningful_instructions(fail_block) or \            notself.is_opposite_conditional(self.get_last_mnem_of_block(block),self.get_last_mnem_of_block(fail_block)):                continue            if self.verbose:                print ("Potential junk:0x{junk_block:x}(0x{fix_block:x})".format(junk_block=fail_block["addr"],fix_block=block["addr"]))           self.overwrite_instruction(self.get_block_end(block))            modified = True        if modified:            self.reanalize_function()            def clean_graph(self):        """the initial functionof the class. Responsible to enable cache and start the cleaning        """        # Enable cache writing mode. changeswill only take place in the session and        # will not override the binary        self.pipe.cmd("eio.cache=true")        self.clean_junk_blocks()        ifcutter_available:    # This part will be executed only if Cutteris available. This will    # create the cutter plugin and UI objectsfor the plugin    classGraphDeobfuscatorCutter(cutter.CutterPlugin):        name = "APT32 GraphDeobfuscator"        description = "Graph Deobfuscatorfor APT32 Samples"        version = "1.0"        author = "Itay Cohen(@Megabeets_)"        def setupPlugin(self):            pass        def setupInterface(self, main):            # Create a new action (menu item)            action = QAction("APT32 GraphDeobfuscator", main)            action.setCheckable(False)            # Connect the action to a function- cleaner.            # A click on this action willtrigger the function            action.triggered.connect(self.cleaner)            # Add the action to the"Windows -> Plugins" menu            pluginsMenu =main.getMenuByType(main.MenuType.Plugins)            pluginsMenu.addAction(action)        def cleaner(self):            graph_deobfuscator =GraphDeobfuscator(pipe)            graph_deobfuscator.clean_graph()            cutter.refresh()    def create_cutter_plugin():        return GraphDeobfuscatorCutter()if__name__ == "__main__":    graph_deobfuscator =GraphDeobfuscator(pipe)graph_deobfuscator.clean_graph()
*/
代码语言:javascript
复制
*本文作者:checkpoint,转载请注明来自FreeBuf.COM
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-07-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FreeBuf 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 下载和安装Cutter
  • 后门分析
  • 混淆技术
  • 编写核心类
  • 使用Radare2
  • 使用Cutter
    • 样本SHA256值
      • APT32图形化反混淆工具-完整源代码
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档