Unicorn 调用SO之加载模块

Unicorn 调用SO之加载模块

本文为看雪论坛精华文章

看雪论坛作者ID: 无名侠

Unicorn 调用SO之加载模块

Android 是基于Linux开发的, Android Native 原生库是 ELF 文件格式。 Unicorn 并不能加载ELF文件,所以我们要自己将ELF文件加载到 Unicorn 虚拟机的内存中去。加载ELF 文件是一个很复杂的过程,涉及到ELF文件解析、重定位、符号解析、依赖库加载等。

python 可以使用elftools库解析ELF 文件。

elftools 安装

pip install pyelftools

映射ELF 文件

ELF 文件有两种视图,链接视图和执行视图。 elftools 是基于链接视图解析 ELF 格式的,然而现在有一些ELF文件的section信息是被抹掉的,elftools就无法正常工作,我也没时间重写一个 elf loader ,就只能凑合用一 下elftools

我已经在前面一篇文章介绍了内存分配方面的东西,加载ELF文件第一步需要将ELF文件映射到内存。如何映射呢?只需要找到类型为 PT_LOAD的segment ,按照segment的信息映射即可。

代码如下:

load_segments = [x for x in elf.iter_segments() if x.header.p_type == ‘PT_LOAD’ ]

for segment in load_segments:

prot = UC_PROT_ALL

self .emu.memory.mem_map(load_base + segment.header.p_vaddr, segment.header.p_memsz, prot)

self

.emu.memory.mem_write(load_base + segment.header.p_vaddr, segment.data())

解析 init_array

ELF 的有一个列表,用于存储初始化函数地址,在动态链接的时候,linker会依次调用init_array中的每一个函数。 init_array 中的函数一般用于初始化程序,偶尔也有ELF外壳程序在init_array中添加自解密代码,另外有一些字符串解密也是在init_array中完成的。想要模拟native程序,必然需要调用init_array 中的函数。

init_array 是一个数组, 一般情况下,每一项都是函数入口偏移, 然而也有为0的情况。因为init_array实际解析时机在重定位完成之后, init_array 也可能被重定位。所以要解析 init_array 的时候还需要判断重定位表。

我的策略是,当读出 init_array 中为0的条目的时候就去重定位表中查找重定位值。

for _ in range ( int (init_array_size / 4 )):

# covert va to file offset

for seg in load_segment s:

if seg.header.p_vaddr <= init_array_offset < seg.header.p_vaddr + seg.header.p_memsz:

init_array_foffset = init_array_offset – seg.header.p_vaddr + seg.header.p_offset

fstream.seek(init_array_foffset)

data = fstream. read ( 4 )

fun_ptr = struct.unpack( ‘I’ , data)[ 0 ]

if fun_ptr != 0 :

# fun_ptr += load_base

init_array. append (fun_ptr + load_base)

print ( "find init array for :%s %x" % (filename, fun_ptr))

else :

# search in reloc

for rel in rel_section.iter_relocations():

rel_info_type = rel[ ‘r_info_type’ ]

rel_addr = rel[ ‘r_offset’ ]

if rel_info_type == arm.R_ARM_ABS32 and rel_addr == init_array_offse t:

sym = dynsym.get_symbol(rel[ ‘r_info_sym’ ])

sym_value = sym[ ‘st_value’ ]

init_array. append (load_base + sym_value)

print ( "find init array for :%s %x" % (filename, sym_value))

break

4

解析符号

32位ELF文件 symbol table entry 的定义如下:

typedef struct {

Elf32_Word st_name;

Elf32_Addr st_value;

Elf32_Word st_size;

unsigned char st_info;

unsigned char st_other;

Elf32_Half st_shndx;

} Elf32_Sym;

当st_shndx字段的值为 SHN_UNDEF 时,表明该符号在当前模块没有定义,是一个导入符号,要去其它模块查找。

为了便于管理已经加载模块的符号地址,应该用一个 map ,将 name address 映射起来。

其它情况,简单起见,均看成导出符号,将地址重定位后加入到管理符号map。

# Resolve all symbols.

symbols_resolved = dict()

for section in elf.iter_sections():

if not isinstance(section, SymbolTableSection):

continue

itersymbols = section.iter_symbols()

next(itersymbols) # Skip first symbol which is always NULL.

for symbol in itersymbols:

symbol_address = self._elf_get_symval(elf, load_base, symbol)

if symbol_address is not None :

symbols_resolved[symbol.name] = SymbolResolved(symbol_address, symbol)

def _elf_get_symval (self, elf, elf_base, symbol) :

if symbol.name in self.symbol_hooks:

return self.symbol_hooks[symbol.name]

if symbol[ ‘st_shndx’ ] == ‘SHN_UNDEF’ : # 外部符号

# External symbol, lookup value.

target = self._elf_lookup_symbol(symbol.name)

if target is None :

# Extern symbol not found

if symbol[ ‘st_info’ ][ ‘bind’ ] == ‘STB_WEAK’ :

# Weak symbol initialized as 0

return 0

else :

logger.error( ‘=> Undefined external symbol: %s’ % symbol.name)

return None

else :

return target

elif symbol[ ‘st_shndx’ ] == ‘SHN_ABS’ :

# Absolute symbol.

return elf_base + symbol[ ‘st_value’ ]

else :

# Internally defined symbol.

return

‘st_value’

]

重定位

# Relocate.

for section in elf.iter_sections():

if not isinstance(section, RelocationSection):

continue

#for relsection in elf.get_dynmic_rel():

for rel in section.iter_relocations():

sym = dynsym.get_symbol(rel[ ‘r_info_sym’ ])

sym_value = sym[ ‘st_value’ ]

rel_addr = load_base + rel[ ‘r_offset’ ] # Location where relocation should happen

rel_info_type = rel[ ‘r_info_type’ ]

# Relocation table for ARM

if rel_info_type == arm.R_ARM_ABS32:

# Create the new value.

value = load_base + sym_value

# Write the new value

self.emu.mu.mem_write(rel_addr, value.to_bytes( 4 , byteorder= ‘little’ ))

elif rel_info_type == arm.R_ARM_GLOB_DAT or /

rel_info_type == arm.R_ARM_JUMP_SLOT or /

rel_info_type == arm.R_AARCH64_GLOB_DAT or /

rel_info_type == arm.R_AARCH64_JUMP_SLOT:

# Resolve the symbol.

if sym.name in symbols_resolved:

value = symbols_resolved[sym.name].address

# Write the new value

self.emu.mu.mem_write(rel_addr, value.to_bytes( 4 , byteorder= ‘little’ ))

elif rel_info_type == arm.R_ARM_RELATIVE or /

rel_info_type == arm.R_AARCH64_RELATIVE:

if sym_value == 0 :

# Load address at which it was linked originally.

value_orig_bytes = self.emu.mu.mem_read(rel_addr, 4 )

value_orig = int.from_bytes(value_orig_bytes, byteorder= ‘little’ )

# Create the new value

value = load_base + value_orig

# Write the new value

self.emu.mu.mem_write(rel_addr, value.to_bytes( 4 , byteorder= ‘little’ ))

else :

raise NotImplementedError()

else :

"Unhandled relocation type %i."

% rel_info_type)

Unicorn 调用SO之内存管理

Unicorn 采用虚拟内存机制,使得虚拟CPU的内存与真实CPU的内存隔离。

Unicorn 使用如下API来操作内存:

  • uc_mem_map

  • uc_mem_read

  • uc_mem_write

使用uc_mem_map映射内存的时候,address 与 size 都需要与0x1000对齐,也就是0x1000的整数倍,否则会报UC_ERR_ARG 异常。

内存对齐

UC_MEM_ALIGN = 0x1000

Unicorn map的内存基地址和长度都需要对齐到0x1000,对齐函数如下:

# Thansk to http s: //github. com /lunixbochs/usercorn/blob/master/ go /mem. go

def align(addr, size, growl):

to = ctypes.c_uint64(UC_MEM_ALIGN).value

mask = ctypes.c_uint64( 0 xFFFFFFFFFFFFFFFF).value ^ ctypes.c_uint64( to1 ).value

right = addr + size

right = ( right + to1 ) & mask

addr &= mask

size = right – addr

if grow l:

size = (size + to1 ) & mask

return

addr, size

在 Unicorn 虚拟机中分配内存(用于加载模块)。

Memory类用于分配内存

class Memory :

""

"

:type emu androidemu.emulator.Emulator


""

def __init__ ( self , emu) :

self .emu = emu

self .counter_memory = config.BASE_ADDR

self .counter_stack = config.STACK_ADDR + config.STACK_SIZE

def mem_reserve ( self , size) :

( _ , size_aligned) = align( 0 , size, True)

ret = self .counter_memory

self .counter_memory += size_aligned

return ret

def mem_map ( self , address, size, prot) :

(address, size) = align(address, size, True)

self .emu.mu.mem_map(address, size, prot)

logger.debug( "=> Mapping memory page 0x%08x – 0x%08x, size 0x%08x, prot %s" % (address, address + size, size,

prot))

def mem_write ( self , address, data) :

self .emu.mu.mem_write(address, data)

def mem_read ( self , address, size) :

return

self

.emu.mu.mem_read(address, size)

heap 的实现

为了对malloc和free等函数提供支持,需要实现heap。可喜可贺, Afl-Unicorn 开源项目中已经有一个heap例子,可以直接使用。

heap 实现代码

from unicorn import *

from unicorn.arm64_const import *

# Page size required by Unicorn

UNICORN_PAGE_SIZE = 0x1000

# Max allowable segment size (1G)

MAX_ALLOWABLE_SEG_SIZE = 1024 * 1024 * 1024

ALIGN_PAGE_DOWN = lambda x: x & ~(UNICORN_PAGE_SIZE – 1 )

ALIGN_PAGE_UP = lambda x: (x + UNICORN_PAGE_SIZE – 1 ) & ~(UNICORN_PAGE_SIZE -1 )

# Implementation from

class UnicornSimpleHeap :

""" Use this class to provide a simple heap implementation. This should

be used if malloc/free calls break things during emulation.

# Helper data-container used to track chunks

class HeapChunk (object) :

def __init__ (self, data_addr, data_size) :

self.data_addr = data_addr

self.data_size = data_size

def is_buffer_in_chunk (self, addr, size) :

if addr >= self.data_addr and ((addr + size) <= (self.data_addr + self.data_size)):

return True

else :

return False

_uc = None # Unicorn engine instance to interact with

_chunks = [] # List of all known chunks

_debug_print = False # True to print debug information

def __init__ (self, uc, heap_min_addr, heap_max_addr, debug_print=False) :

self._uc = uc

self._heap_min_addr = heap_min_addr

self._heap_max_addr = heap_max_addr

self._debug_print = debug_print

def malloc (self, size, prot=UC_PROT_READ | UC_PROT_WRITE) :

# Figure out the overall size to be allocated/mapped

# – Allocate at least 1 4k page of memory to make Unicorn happy

data_size = ALIGN_PAGE_UP(size)

# Gross but efficient way to find space for the chunk:

chunk = None

for addr in range(self._heap_min_addr, self._heap_max_addr, UNICORN_PAGE_SIZE):

try :

self._uc.mem_map(addr, data_size, prot)

chunk = self.HeapChunk(addr, data_size)

if self._debug_print:

print( "Allocating 0x{0:x}-byte chunk @ 0x{1:016x}" .format(chunk.data_size, chunk.data_addr))

break

except UcError as e:

continue

# Something went very wrong

if chunk is None :

raise Exception( "Oh no." )

self._chunks.append(chunk)

return chunk.data_addr

def calloc (self, size, count) :

# Simple wrapper around malloc with calloc() args

return self.malloc(size * count)

def realloc (self, ptr, new_size) :

if self._debug_print:

print( "Reallocating chunk @ 0x{0:016x} to be 0x{1:x} bytes" .format(ptr, new_size))

old_chunk = None

for chunk in self._chunks:

if chunk.data_addr == ptr:

old_chunk = chunk

new_chunk_addr = self.malloc(new_size)

if old_chunk is not None :

self._uc.mem_write(new_chunk_addr, str(self._uc.mem_read(old_chunk.data_addr, old_chunk.data_size)))

self.free(old_chunk.data_addr)

return new_chunk_addr

def protect (self, addr, len_in, prot) :

for chunk in self._chunks:

if chunk.is_buffer_in_chunk(addr, len_in):

# self._uc.mem_protect(chunk.data_addr, chunk.data_size, perms=prot)

return True

return False

def free (self, addr) :

for chunk in self._chunks:

if chunk.is_buffer_in_chunk(addr, 1 ):

if self._debug_print:

print( "Freeing 0x{0:x}-byte chunk @ 0x{0:016x}" .format(chunk.data_addr, chunk.data_size))

self._uc.mem_unmap(chunk.data_addr, chunk.data_size)

self._chunks.remove(chunk)

return True

return

False

处理内存管理方面的syscall

native程序可能直接通过syscall调用mmap2 映射内存。libc等库最底层也一定会调用mmap2分配内存。

self ._syscall_handler.set_handler( 0x5B , "munmap" , 2 , self ._handle_munmap)

self ._syscall_handler.set_handler( 0x7D , "mprotect" , 3 , self ._handle_mprotect)

self ._syscall_handler.set_handler( 0xC0 , "mmap2" , 6 , self ._handle_mmap2)

self ._syscall_handler.set_handler(
0xDC ,
"madvise" ,
3

self

._handle_madvise)

mmap2 实际并不是一个纯粹的内存管理函数,它还能映射文件到内存,这就涉及到文件系统的支持。我在原作者的基础上进行了修改。

def _handle_mmap2 (self, mu, addr, length, prot, flags, fd, offset) :

"""

void *mmap2(void *addr, size_t length, int prot, int flags, int fd, off_t pgoffset);

# MAP_FILE 0

# MAP_SHARED 0x01

# MAP_PRIVATE 0x02

# MAP_FIXED 0x10

# MAP_ANONYMOUS 0x20

prot = UC_PROT_ALL

addr = self._heap.malloc(length, prot)

if fd != 0xffffffff : # 如果有fd

if fd <= 2 :

raise NotImplementedError( "Unsupported read operation for file descriptor %d." % fd)

if fd not in self._file_system._file_descriptors:

# TODO: Return valid error.

raise NotImplementedError()

file = self._file_system._file_descriptors[fd]

data = open(file.name_virt, ‘rb’ ).read(length)

self._mu.mem_write(addr, data)

return

addr

hook libc中的内存管理函数。

既然我们的框架支持加载多个lib库,直接加载libc就可以了,为何还需要hook libc中的这些符号呢?

实际上,目前的模拟框架并不完善,很多系统信息模拟不到位,libc初始化不完善,使用libc的malloc经常会出现异常。所以直接接替为内置的heap 管理器。

#memory

modules.add_symbol_hook( ‘malloc’ , hooker.write_function( self .malloc) + 1 )

modules.add_symbol_hook( ‘free’ , hooker.write_function( self .free) + 1 )

modules.add_symbol_hook(
‘calloc’ , hooker.write_function(
self

1

)

内存访问属性

我们知道,常见的内存权限有可读可写可执行,一般可以调用mprotect 函数修改指定内存页面的权限。为了方便,我们将所有内存设置为可读可写可执行。

Unicorn 调用SO之函数级Hook

很多情况下,有一些系统API无法直接调用,就需要手动实现它,比如以下应用场景:

1、在libc没有完全初始化的情况下,直接调用malloc 可能会崩溃,为了使程序更加稳定,就需要自己实现malloc和free。

2、实现dlopen,就可以使用框架的模块管理器来加载模块,而不用linker。

3、打出dlopen、dlsym等函数的日志,还可以分析native程序运行过程中会调用的API。

4、360加固,在反调试完成后,它会调用dlopen加载libz.so,并调用uncompress函数解压数据。hook uncompress 就可以拿到解压的数据。

5、目前,许多加固的整体加固会调用art中的api来加载dex文件,如果这些函数被Hook,那么就可以直接拿到dex文件,岂不美哉?

6、 JNI Functions 实现,需要Hook 技术支撑。

不幸的是, Unicorn 内部并没有函数的概念,它只是一个单纯的CPU,没有 HOOK_FUNCTION的callback , Hook 函数看上去困难重重。 AndroidNativeEmu 为我们提供了一个很好的Hook思路。

AndroidNativeEmu 中的函数级Hook 并不是真正意义上的Hook,它不仅能Hook存在的函数,还能Hook不存在的函数。 AndroidNativeEmu 使用这种技术实现了JNI函数Hook、库函数Hook。Jni函数是不存的,Hook它只是为了能够用Python 实现 Jni Functions 。有一些库函数是存在的,Hook只是为了重新实现它。

用 Python 实现 Unicorn 虚拟机内部的函数

用 Python 实现 Unicorn 虚拟机内部的函数,首先要解决 Unicorn 虚拟机内部如何与外部交互。AndroidNativeEmu 的实现类似于系统调用,它会为每一个Hook函数实现一个stub函数,stub函数中有一条“陷阱”指令,当虚拟CPU执行这一条”陷阱“指令的时候就会被 HOOK_CODE 捕获,然后通过R4寄存器的值确定Python 的处理函数。

stub 函数代码如下:

PUSH {R4,LR}

MOV R4, Number

IT AL

POP {R4,PC}

HOOK_CODE callback 直接检测PC寄存器指向内存的字节数据是否为"/xE8/xBF"来判断IT AL指令。如果是IT AL指令,则根据R4寄存器的值确定Python 回调的函数。

Native 程序中可能会有许多IT AL指令,会误判吗?

HOOK_CODE 会trace每一条指令,这种方式效率岂不是很低?

这两个问题都很好解决,因为 HOOK_CODE 是有作用范围的,如果开辟一段空间,完全用于存放stub,然后将该callback设置在这个空间范围内,就可以很好的避免了冲突和效率问题!

def _hook ( self , mu, address, size, user_data) :

# Check if instruction is "IT AL"

if size != 2 or self ._emu.mu.mem_read(address, size) != b "/xE8/xBF" :

return

# Find hook.

hook_id = self ._emu.mu.reg_read(UC_ARM_REG_R4)

hook_func = self ._hooks[hook_id]

# Call hook.

try:

hook_func( self ._emu)

except:

# Make sure we catch exceptions inside hooks and stop emulation.

mu.emu_stop()

raise

使用 keystone 动态编译 stub

Keystone 是一款很牛逼的汇编框架, Unicorn 的兄弟!

使用pip安装

pip install keystone

keystone 使用方法十分简单,不需要额外学习即可理解接下来的代码。write_function函数将python的func函数映射到Unicorn虚拟机,并返回虚拟机中的函数地址, 如果在虚拟机中调用该地址就会被Python捕获并调用func函数。

def write_function ( self , func) :

# Get the hook id.

hook_id = self ._get_next_id()

hook_addr = self ._hook_current

# Create the ARM assembly code.

# Make sure to update STACK_OFFSET if you change the PUSH/POP.

asm = "PUSH {R4,LR}/n" /

"MOV R4, #" + hex(hook_id) + "/n" /

"IT AL/n" /

"POP {R4,PC}"

asm_bytes_list, asm_count = self ._keystone.asm(bytes(asm, encoding= ‘ascii’ ))

if asm_count != 4 :

raise ValueError( "Expected asm_count to be 4 instead of %u." % asm_count)

# Write assembly code to the emulator.

self ._emu.mu.mem_write(hook_addr, bytes(asm_bytes_list))

# Save results.

self ._hook_current += len(asm_bytes_list)

self ._hooks[hook_id] = func

return

hook_addr

symbol hook

上一个小结中讲到如何实现stub,那么如何hook 呢? AndroidNativeEmu 实现了 Symbol Hook 。这种HOOK与平时常见的 IAT HOOK、GOT Hook 原理是一样的。调用add_symbol_hook函数,可以将符号和新地址关联起来, 模块加载的时候,查找符号优先从该表中获取地址。

这种实现方式有点bug,比如有一些Native 程序,自身会got hook,那么这种方式可能就会出问题。

例子:使用Hook 实现 AAssetManager_open

modules.add_symbol_hook( ‘AAssetManager_open’ , hooker.write_function( self .AAssetManager_open) + 1 )

@native_method

def AAssetManager_open ( self , mu, mgr, filename_ptr, mode) :

filename = memory_helpers.read_utf8(mu, filename_ptr)

filename = "assert/" + filename

logger.info( "AAssetManager_open(%s,%d)" % (filename, mode))

fd = self ._open_file(filename, 0 , mode)

if fd == – 1 :

fd = 0

return

fd

python 实现 Native 函数修饰 @native_method

大家可能对Python 装饰器的语法不太熟悉,可以参考廖雪峰老师博客 Python 装饰器。

简单的讲,装饰器可以自定义修改函数,为目标函数套一层外壳。

native_method主要功能是处理参数数据,这样就能很优雅地编写Native 函数,不用再函数中冗杂地调用寄存器读写函数。

def native_method(func):

def native_method_wrapper(* argv ):

"" "

: type self

: type emu androidemu.emulator.Emulator

: type mu Uc

"" "

emu = argv [ 1 ] if len ( argv ) == 2 else argv [ 0 ]

mu = emu.mu

args = inspect.getfullargspec(func). args # 判断参数个数

args_count = len ( args ) – ( 2 if ‘self’ in args else 1 )

if args_count < 0 :

raise RuntimeError( "NativeMethod accept at least (self, mu) or (mu)." )

native_args = []

if args_count >= 1 : # 从寄存器取参数

native_args. append (mu.reg_read(UC_ARM_REG_R0))

if args_count >= 2 :

native_args. append (mu.reg_read(UC_ARM_REG_R1))

if args_count >= 3 :

native_args. append (mu.reg_read(UC_ARM_REG_R2))

if args_count >= 4 :

native_args. append (mu.reg_read(UC_ARM_REG_R3))

sp = mu.reg_read(UC_ARM_REG_SP) # 参数大于 4 个,从栈中取参数

sp = sp + STACK_OFFSET # Need to offset by 4 because our hook pushes one register on the stack.

if args_count >= 5 :

for x in range ( 0 , args_count – 4 ):

native_args. append ( int .from_bytes(mu.mem_read( sp + ( x * 4 ), 4 ), byteorder= ‘little’ ))

if len ( argv ) == 1 :

result = func(mu, *native_args)

else :

result = func( argv [ 0 ], mu, *native_args)

if result is not None:

native_write_arg_register(emu, UC_ARM_REG_R0, result)

else :

mu.reg_write(UC_ARM_REG_R0, JNI_ERR)

return

native_method_wrapper

小结

这篇文章讲了如何实现 Unicorn 内部的函数级Hook,原理是在模块加载重定位阶段,填充stub函数地址到目标函数的 got表。stub函数是Unicorn 内部环境与外部环境的一个桥梁,使用小巧的IT AL和R4寄存器实现交互,像极了系统调用。使用Python的装饰器,简化了Python 编写Hook 函数的难度。

Unicorn 调用SO之系统调用

系统调用是操作系统给应用程序提供的最底层的基础接口。应用程序读写文件、访问网络等操作都需要操作系统支持。

Unicorn 拦截系统调用只需要添加UC_HOOK_INTR的callback,该callback的参数定义如下:

typedef void (* uc_cb_hookintr_t ) (uc_engine *uc, uint32_t intno, void *user_data) ;

  • intno: 中断号

  • user_data: user data

我们只需要用hook_add 添加一个 UC_HOOK_INTR 的callback 就能够处理中断了。根据intno 中断号分发不同的中断处理函数。

实现getpid

self ._syscall_handler.set_handler( 0xe0 , "getpid" , 0 , self ._getPid)

def _getpid ( self , mu) :

return

0x1122

实现文件系统需要拦截的syscall

我们将在文件系统章节详细讨论如何实现文件系统。

syscall_handler.set_handler( 0x3 , "read" , 3 , self ._handle_read)

syscall_handler.set_handler( 0x4 , "write" , 3 , self ._handle_write)

syscall_handler.set_handler( 0x5 , "open" , 3 , self ._handle_open)

syscall_handler.set_handler( 0x6 , "close" , 1 , self ._handle_close)

syscall_handler.set_handler( 0x92 , "writev" , 3 , self ._handle_writev)

syscall_handler.set_handler( 0xC5 , "fstat64" , 2 , self ._handle_fstat64)

syscall_handler.set_handler( 0x142 , "openat" , 4 , self ._handle_openat)

syscall_handler.set_handler( 0x147 , "fstatat64" , 4 , self ._handle_fstatat64)

syscall_handler.set_handler( 0x14e , "faccessat" , 4 , self ._faccessat)

syscall_handler.set_handler( 0x14d , "fchmodat" , 4 , self ._fchmodat)

syscall_handler.set_handler( 0x8c , "_llseek" , 5 , self ._llseek)

syscall_handler.set_handler(
0x13 ,
"lseek" ,
3

self

._lseek)

实现内存管理的syscall

self ._syscall_handler.set_handler( 0x5B , "munmap" , 2 , self ._handle_munmap)

self ._syscall_handler.set_handler( 0x7D , "mprotect" , 3 , self ._handle_mprotect)

self ._syscall_handler.set_handler( 0xC0 , "mmap2" , 6 , self ._handle_mmap2)

self ._syscall_handler.set_handler(
0xDC ,
"madvise" ,
3

self

._handle_madvise)

其它杂项syscall

这些实现都非常的简单,就不过多展开了。

self ._syscall_handler.set_handler( 0x4E , "gettimeofday" , 2 , self ._handle_gettimeofday)

self ._syscall_handler.set_handler( 0xAC , "prctl" , 5 , self ._handle_prctl)

self ._syscall_handler.set_handler( 0xF0 , "futex" , 6 , self ._handle_futex)

self ._syscall_handler.set_handler( 0x107 , "clock_gettime" , 2 , self ._handle_clock_gettime)

self ._syscall_handler.set_handler( 0x119 , "socket" , 3 , self ._socket)

self ._syscall_handler.set_handler( 0x11b , "connect" , 3 , self ._connect)

self ._syscall_handler.set_handler( 0x159 , "getcpu" , 3 , self ._getcpu)

self ._syscall_handler.set_handler( 0x14e , "faccessat" , 4 , self ._faccessat)

self ._syscall_handler.set_handler( 0x14 , "getpid" , 0 , self ._getpid)

self ._syscall_handler.set_handler( 0xe0 , "gettid" , 0 , self ._gettid)

self ._syscall_handler.set_handler( 0x180 , "null1" , 0 , self ._null)

self ._syscall_handler.set_handler( 0x7e , "sigprocmask" , 0 , self ._null)

self ._syscall_handler.set_handler( 0xaf , "rt_sigprocmask" , 0 , self ._null)

self ._syscall_handler.set_handler( 0x10c , "sigaction" , 0 , self ._tgkill)

self ._syscall_handler.set_handler( 0x43 , "sigaction" , 0 , self ._sigaction)

self ._syscall_handler.set_handler( 0xf8 , "exit" , 0 , self ._null)

self ._syscall_handler.set_handler(
0x16e ,
"accept4" ,
0

self

.accept4)

Unicorn 调用SO之文件系统

如果想尽可能完美的模拟Android Native程序,那就必须加入虚拟文件系统的支持。虚拟文件系统可以将Unicorn 虚拟机内部访问文件的操作映射到主机

> > > >

思路

Hook 拦截文件操作相关的系统调用,转换成对主机的文件操作。

为了安全性,我们要在主机上划分一个用于专门虚拟文件系统的目录。 处理系统调用的时候,将路径转换成虚拟文件系统目录的路径。

> > > >

实现

需要处理的syscall已经在上一篇文章介绍过了。

syscall_handler.set_handler( 0x3 , "read" , 3 , self ._handle_read)

syscall_handler.set_handler( 0x4 , "write" , 3 , self ._handle_write)

syscall_handler.set_handler( 0x5 , "open" , 3 , self ._handle_open)

syscall_handler.set_handler( 0x6 , "close" , 1 , self ._handle_close)

syscall_handler.set_handler( 0x92 , "writev" , 3 , self ._handle_writev)

syscall_handler.set_handler( 0xC5 , "fstat64" , 2 , self ._handle_fstat64)

syscall_handler.set_handler( 0x142 , "openat" , 4 , self ._handle_openat)

syscall_handler.set_handler( 0x147 , "fstatat64" , 4 , self ._handle_fstatat64)

syscall_handler.set_handler( 0x14e , "faccessat" , 4 , self ._faccessat)

syscall_handler.set_handler( 0x14d , "fchmodat" , 4 , self ._fchmodat)

syscall_handler.set_handler( 0x8c , "_llseek" , 5 , self ._llseek)

syscall_handler.set_handler(
0x13 ,
"lseek" ,
3

self

._lseek)

_handle_open 处理 open syscall

def _handle_open (self, mu, filename_ptr, flags, mode) :

"""

int open(const char *pathname, int flags, mode_t mode);

return the new file descriptor, or -1 if an error occurred (in which case, errno is set appropriately).

filename = memory_helpers.read_utf8(mu, filename_ptr)

return

self._open_file(filename, flags, mode)

进一步分析_open_file

这个函数先对路径进行一些简单检验,判断是否为特殊文件,如果不是,则调用translate_path转换安全路径,并调用os.open打开本地文件,最后调用_store_fd转换文件句柄。

def _open_file ( self , filename, flags, mode) :

# Special cases, such as /dev/urandom.

orig_filename = filename

if filename == ‘/dev/urandom’ :

logger.info( "File opened ‘%s’" % filename)

return self ._store_fd( ‘/dev/urandom’ , None, ‘urandom’ )

file_path = self .translate_path(filename)

if os.path.isfile(file_path):

logger.info( "File opened ‘%s’" % orig_filename)

flags = os.O_RDWR

if hasattr(os, "O_BINARY" ):

flags = os.O_BINARY

return self ._store_fd(orig_filename, file_path, os.open(file_path, flags=flags))

else:

logger.warning( "File does not exist ‘%s’" % orig_filename)

return

1

读文件分析

def _handle_read (self, mu, fd, buf_addr, count) :

"""

ssize_t read(int fd, void *buf, size_t count);

On files that support seeking, the read operation commences at the current file offset, and the file offset

is incremented by the number of bytes read. If the current file offset is at or past the end of file,

no bytes are read, and read() returns zero.

If count is zero, read() may detect the errors described below. In the absence of any errors, or if read()

does not check for errors, a read() with a count of 0 returns zero and has no other effects.

If count is greater than SSIZE_MAX, the result is unspecified.

if fd <= 2 :

raise NotImplementedError( "Unsupported read operation for file descriptor %d." % fd)

if fd not in self._file_descriptors:

# TODO: Return valid error.

raise NotImplementedError()

file = self._file_descriptors[fd]

if file.descriptor == ‘urandom’ :

if OVERRIDE_URANDOM:

buf = OVERRIDE_URANDOM_BYTE * count

else :

buf = os.urandom(count)

else :

buf = os.read(file.descriptor, count)

result = len(buf)

mu.mem_write(buf_addr, buf)

return

result

Unicorn 调用SO之实现 JNI Functions

调用JNI是工程量最大的一部分,不仅需要实现JNI Functions,还要模拟JNI Env和Java类似的引用管理。

在前面的文章中已经学习了如何在虚拟机中调用主机的Python函数,接下来就学习如何实现JNI Functions里面的所有函数。

> > > >

模拟实现 Jni Function Table

可以参考 Jni Functions

Jni Function Table 是一个函数地址表,里面记录了Jni 函数地址。

( self .address_ptr, self .address) = hooker.write_function_table({

4 : self .get_version,

5 : self .define_class,

6 : self .find_class,

7 : self .from_reflected_method,

8 : self .from_reflected_field,

9 : self .to_reflected_method,

10 : self .get_superclass,

11 : self .is_assignable_from,

12 : self .to_reflected_field,

13 : self . throw ,

14 : self .throw_new,

15 : self .exception_occurred,

16 : self .exception_describe,

17 : self .exception_clear,

18 : self .fatal_error,

19 : self .push_local_frame,

20 : self .pop_local_frame,

21 : self .new_global_ref,

22 : self .delete_global_ref,

…………………..

232 : self .get_object_ref_type

})

write_function_table 函数实现了创建一个Function 地址表。

实现如下:

def write_function_table ( self , table) :

if not isinstance(table, dict):

raise ValueError( "Expected a dictionary for the function table." )

index_max = int(max(table, key=int)) + 1

# First, we write every function and store its result address.

hook_map = dict()

for index, func in table.items():

hook_map[index] = self .write_function(func)

# Then we write the function table.

table_bytes = b ""

table_address = self ._hook_current

for index in range( 0 , index_max):

address = hook_map[index] if index in hook_map else 0

table_bytes += int(address + 1 ).to_bytes( 4 , byteorder= ‘little’ ) # + 1 because THUMB.

self ._emu.mu.mem_write(table_address, table_bytes)

self ._hook_current += len(table_bytes)

# Then we write the a pointer to the table.

ptr_address = self ._hook_current

self ._emu.mu.mem_write(ptr_address, table_address.to_bytes( 4 , byteorder= ‘little’ ))

self ._hook_current += 4

return

ptr_address, table_address

Jni Functions Table 中有 200 多个函数,全部实现的话工作量十分巨大。默认实现如下:

@native_method

def call_boolean_method_a (self, mu, env) :

raise

NotImplementedError()

抛出异常可以保证当Native调用一个没有实现的JNI函数时能够及时发现,并实现它。

Classloader

AndroidNativeEmu 支持使用Python类来代替 Jvm 中Java类,这是如何实现的呢?

class android_content_context

(metaclass=JavaClassDef, jvm_name= "android/content/Context" ,

( "CONNECTIVITY_SERVICE" , "Ljava/lang/String;" , True, "" )

:

def __init__ (self) :

pass

@java_method_def(native=False, name="getPackageName", signature="()Ljava/lang/String;")

def getPackageName (self, *args, **kwargs) :

return "com.test"

# load class

jvm_supe r 指定父类

jvm_name 定义对应的类名

jvm_fields 定义字段,是一个列表,每一项都是由JavaFieldDef定义的字段, JavaFieldDef(name, signature, is_static, static_value=None)

如果不是static字段,则还需要在init 中创建这个这个私有的成员变量。

metaclass=JavaClassDef 是Python的元类机制,参考廖雪峰老师文章

元类可真的是魔法!它可以动态修改类的定义,比如给类增加成员变量,增加方法等。

JavaClassDef 将class 定义指定的 jvm_name jvm_fields 保存到成员变量,并添加了 find_method、find_method_by_id、find_field 等函数,用于实现JNI。

注册方法

# Register all defined Java methods.

for func in inspect.getmembers(cls, predicate=inspect.isfunction):

if hasattr ( func[ 1 ], ‘jvm_method’ ):

= func[
1 ].jvm_method

method.jvm_id = next(JavaClassDef.next_jvm_method_id)

cls.jvm_methods[method.jvm_id] = method

注册字段

# Register all defined Java fields.

if jvm_fields is not None :

for jvm_field in jvm_fields:

jvm_field.jvm_id = next(JavaClassDef.next_jvm_field_id)

cls.jvm_fields[jvm_field.jvm_id] = jvm_field

查找字段的函数

支持类继承,Java是单继承,查找的时候先从基类开始递归查找。

def find_field_by_id (cls, jvm_id) :

try :

if cls.jvm_super is not None :

find = cls.jvm_super.find_field_by_id(jvm_id)

if find is not None :

return find

except KeyError:

pass

return

cls.jvm_fields[jvm_id]

metaclass 修饰了定义的java类,隐藏了类解析背后的细节,使得添加一个java类很方便。java类定义后,还需要添加模拟器的class管理。

def add_class ( self , clazz) :

if not isinstance(clazz, JavaClassDef):

raise ValueError( ‘Expected a JavaClassDef.’ )

if clazz.jvm_name in self . class_by_name:

raise KeyError( ‘The class /’%s/’ is already registered.’ % clazz.jvm_name)

self .class_by_id[clazz.jvm_id] = clazz

self

.class_by_name[clazz.jvm_name] = clazz

保存的是类的定义而不是实例

> > > >

引用管理

目前实现两种类型的引用,一种 jobject 和jclass。jobejct是用来引用实例对象,jclass用来引用类。

Python 实现java的类,如果返回一个 String ,那么就会自动创建一个 String 引用,然后把引用id返回给Native 函数。Native 函数再调用 GetStringUtfChars 获取引用的字符串。 GetStringUtfChars 的实现如下。

@native_method

def get_string_utf_chars ( self , mu, env, string, is_copy_ptr) :

logger.debug( "JNIEnv->GetStringUtfChars(%u, %x) was called" % (string, is_copy_ptr))

if is_copy_ptr != 0 :

raise NotImplementedError()

str_ref = self .get_reference(string) #通过引用ID获取引用对象

str_val = str_ref.value # 获取该引用的值

str_ptr = self ._emu.native_memory.allocate(len(str_val) + 1 )

logger.debug( "=> %s" % str_val)

memory_helpers.write_utf8(mu, str_ptr, str_val)

return

str_ptr

引用还分为局部引用和全局引用。局部引用的生命周期是进入native 函数到native 函数返回。 全局引用则作用于模拟器整个生命周期。

self._locals = ReferenceTable( start = 1 , max_entries= 2048 )

self._globals = ReferenceTable(
start =
4096

512000

)

> > > >

实现 GetEnv

class JavaVM :

""

"

:type class_loader JavaClassLoader

:type hooker Hooker


""

def __init__ ( self , emu, class_loader, hooker) :

( self .address_ptr, self .address) = hooker.write_function_table({

3 : self .destroy_java_vm,

4 : self .attach_current_thread,

5 : self .detach_current_thread,

6 : self .get_env,

7 : self .attach_current_thread

})

self .jni_env = JNIEnv(emu, class_loader, hooker)

@native_method

def get_env ( self , mu, java_vm, env, version) :

logger.debug( "java_vm: 0x%08x" % java_vm)

logger.debug( "env: 0x%08x" % env)

logger.debug( "version: 0x%08x" % version)

mu.mem_write(env, self .jni_env.address_ptr.to_bytes( 4 , byteorder= ‘little’ ))

logger.debug( "JavaVM->GetENV() was called!" )

return

JNI_OK

> > > >

Native / Java 函数调用

我们目前已经实现了Java 调用native 的过程,native再回调java是否可以实现呢?实际上,在 class 定义的时候就解析了所有带@ java_method_def 装饰器的成员函数。n ative 通过JNI调用java函数,我们可以模拟对应的JNI函数,使其在所有加载的类中查找是否有对应签名的函数,如果有则直接调用。

@java_method_def 装饰器不仅描述了函数的签名,还转换函数的参数数据和返回值信息。

这个修饰器的定义如下:

def java_method_def (name, signature, native=False, args_list=None, modifier=None, ignore=False) :

def java_method_def_real (func) :

def native_wrapper ( self , emulator, *argv) : # native 方法,进入虚拟机

return emulator.call_native(

native_wrapper.jvm_method.native_addr,

emulator.java_vm.jni_env.address_ptr, # JNIEnv*

self , # this

# method has been declared in

*argv # Extra args.

)

def normal_wrapper (*args, **kwargs) : # 普通方法,直接调用

result = func(*args, **kwargs)

return result

wrapper = native_wrapper if native else normal_wrapper # 判断是否没native函数,分别进入不同的wrapper

wrapper.jvm_method = JavaMethodDef(func.__name_ _ , wrapper, name, signature, native,

args_list=args_list,

modifier=modifier,

ignore=ignore)

return wrapper

return

java_method_def_real

> > > >

参数转换

Java 调用 Native 的时候要把非数字类型转换为对象引用。

Native 调用 Java 或者 返回的时候要把引用转换成对象,转换方法实现如下:

def native_write_args (emu, *argv) :

amount = len(argv)

if amount == 0 :

return

if amount >= 1 :

native_write_arg_register(emu, UC_ARM_REG_R0, argv[ 0 ])

if amount >= 2 :

native_write_arg_register(emu, UC_ARM_REG_R1, argv[ 1 ])

if amount >= 3 :

native_write_arg_register(emu, UC_ARM_REG_R2, argv[ 2 ])

if amount >= 4 :

native_write_arg_register(emu, UC_ARM_REG_R3, argv[ 3 ])

if amount >= 5 :

sp_start = emu.mu.reg_read(UC_ARM_REG_SP)

sp_current = sp_start – STACK_OFFSET # Need to offset because our hook pushes one register on the stack.

for arg in argv[ 4 :]:

emu.mu.mem_write(sp_current – STACK_OFFSET, native_translate_arg(emu, arg).to_bytes( 4 , byteorder= ‘little’ ))

sp_current = sp_current – 4

emu.mu.reg_write(UC_ARM_REG_SP, sp_current)

def native_translate_arg (emu, val) :

if isinstance(val, int):

return val

elif isinstance(val, str):

return emu.java_vm.jni_env.add_local_reference(jstring(val))

elif isinstance(val, list):

return emu.java_vm.jni_env.add_local_reference(jobjectArray(val))

elif isinstance(val, bytearray):

return emu.java_vm.jni_env.add_local_reference(jbyteArray(val))

elif isinstance(type(val), JavaClassDef):

# TODO: Look into this, seems wrong..

return emu.java_vm.jni_env.add_local_reference(jobject(val))

elif isinstance(val, JavaClassDef):

return emu.java_vm.jni_env.add_local_reference(jobject(val))

else :

raise NotImplementedError( "Unable to write response ‘%s’ type ‘%s’ to emulator." % (str(val), type(val)))

def native_write_arg_register (emu, reg, val) :

emu.mu.reg_write(reg, native_translate_arg(emu, val))

## 返回值转换

result_idx = self.mu.reg_read(UC_ARM_REG_R0)

result = self.java_vm.jni_env.get_local_reference(result_idx)

附录

Unicorn 优秀项目: http://www.unicorn-engine.org/showcase/

- End -

Unicorn 调用SO之加载模块

看雪ID 无名侠

https://bbs.pediy.com/user-617255.htm

*本文由看雪论坛无名侠原创, 转载请注明来自看雪社区

原文 

http://mp.weixin.qq.com/s?__biz=MjM5NTc2MDYxMw==&mid=2458298594&idx=1&sn=4feaaf7959a89c3299fdd569535a60f6

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » Unicorn 调用SO之加载模块

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址