[2023]现代AI杀毒引擎原理+部分代码 huoji 杀毒,AI,人工智能,鸭鸭杀毒,杀毒引擎 2023-07-19 1111 次浏览 60 次点赞 之前的鸭鸭引擎当时说要开源了,只不过工作忙一直没空管.这几天闲了一会想找找源码,发现模型都丢了.源码发不出来了,但是没关系,还是能说一下当时怎么做这款引擎的 鸭鸭杀毒一共迭代了3个版本,容我慢慢说 ### v1.0 所以鸭鸭杀毒是怎么做的,这还得在2019年说起,那会无聊,做了一个基于unicorn-engine的脱壳机,用来自动脱UPX、ASP与VMP3.x这种壳的程序 做完后,发现可以更进一步做更多有意思的事情,于是产生了想法,做一款杀毒引擎. 最初杀毒引擎的设计思想是,先通过PEID模块判断有没有壳,如果有壳,跑unicorn-engine进行脱壳. 流程如下: 1. 是否有壳,有壳放入虚拟机进行脱壳 2. 提取关键PE信息,如代码熵、区段大小、字符串、导入表作为AI的input 3. 利用xgboost,对样本进行判黑判白操作 这部分python代码如下: 脱壳的: ```cpp def scan_file_by_path(file_path): global log_import_dlls pe_obj = None with open(file_path, "rb") as f: try: pe_obj = pefile.PE(data=f.read()) except: return 0 if pe_obj is None: return 0 # 进行一波脱壳操作 matches = g_pe_signatures.match(pe_obj, ep_only=True) import_dlls = get_import_dlls(pe_obj) clean_pe = pe_obj pe_buffer = pe_obj.get_memory_mapped_image() if matches is not None and matches[0].lower().find('c++') == -1 and matches[0].lower().find('.net') != -1: g_obj_unpacker.init_file(pe_obj) g_obj_unpacker.log_import_dlls = import_dlls g_obj_unpacker.start() # 脱壳成功 if g_obj_unpacker.status == True: import_dlls = g_obj_unpacker.log_import_dlls # pe_obj.close() clean_pe = g_obj_unpacker.clean_pe_object pe_buffer = g_obj_unpacker.uc_buffer else: import_dlls = get_import_dlls(pe_obj) g_obj_unpacker.free() else: import_dlls = get_import_dlls(pe_obj) # print('import dlls:') # print(import_dlls) result = ai_engine.scan_file(clean_pe, pe_obj, pe_buffer, import_dlls) pe_obj.close() if clean_pe != pe_obj: clean_pe.close() return result g_pe_signatures = peutils.SignatureDatabase('./userdb.txt') ``` 部分脱壳代码,没什么好说的,模拟PEB、API一把嗦.基本操作了: ```cpp import pickle from time import sleep from head import * import dumper import _thread STACK_BASE_64 = 0x7ffffffde000 STACK_SIZE_64 = 0x40000 STACK_BASE_32 = 0xfffdd000 STACK_SIZE_32 = 0x21000 HEAP_ADDRESS_64 = 0x500000000 HEAP_SIZE_64 = 0x5000000 HEAP_ADDRESS_32 = 0x5000000 HEAP_SIZE_32 = 0x5000000 THREAD_ID = 0x1337 PROCESS_ID = 0x1337 hook_arg_table = { 'GetSystemTimeAsFileTime': 1, 'GetCurrentThreadId': 0, 'GetCurrentProcessId': 0, 'QueryPerformanceCounter': 1, '__acrt_iob_func': 1, 'LoadLibraryA': 1, 'GetProcAddress': 2, 'VirtualProtect': 4, 'LocalAlloc': 2, 'GetModuleHandleA': 1, 'GetModuleFileNameW': 3, } def align(value, page_size=4096): m = value % page_size f = page_size - m aligned_size = value + f return aligned_size class unpack(object): enivronment_var = { 'stack_address': 0x0, 'stack_size': 0x0, # 'stack_start': 0, 'ntdll_base': 0x77400000, 'kernel32_base': 0x755D0000, 'kernelbase_base': 0x73D00000, 'teb_base': 0, 'peb_base': 0, 'api_handle_address': 0, 'org_api_handle_address': 0, 'api_handle_size': 0, 'heap': [], # (address, size, isfree) } sample_var = { 'path': "", 'pe_obj': None, 'memeory_map': None, 'virtual_memory_size': 0, 'is_x64': False, 'entry_point': 0, 'image_base': 0, } start_time = 0 clean_pe_object = None hook_table = {} dll_load_table = {} export_to_name_table = {} log_import_function_name = [] log_import_dlls = [] sections_read = {} sections_written = {} write_targets = [] allowed_sections = [] allowed_addr_ranges = [] uc_buffer = None # Dict Address to Name: (StartVAddr, EndVAddr) -> Name address_to_name = {} # Dict Name to Protection Tupel: Name -> (Execute, Read, Write) name_to_protection = {} last_run_address = 0 last_section_name = '' uc_engine_x64 = Uc(UC_ARCH_X86, UC_MODE_64) uc_engine_x32 = Uc(UC_ARCH_X86, UC_MODE_32) capstone_x64 = Cs(CS_ARCH_X86, CS_MODE_64) capstone_x32 = Cs(CS_ARCH_X86, CS_MODE_32) uc_engine = None capstone = None status = False is_exits = False def __init__(self): pass # self.enivronment_var['stack_start'] = self.enivronment_var['stack_address'] + \ # self.enivronment_var['stack_size'] def free(self): # if self.sample_var['pe_obj'] is not None: # self.sample_var['pe_obj'].close() self.sample_var['memeory_map'] = None self.sample_var['virtual_memory_size'] = 0 self.enivronment_var['peb_base'] = 0 self.enivronment_var['teb_base'] = 0 self.enivronment_var['api_handle_address'] = 0 self.enivronment_var['heap'] = [] self.uc_engine = None self.uc_engine_x64 = None self.uc_engine_x32 = None self.uc_engine_x64 = Uc(UC_ARCH_X86, UC_MODE_64) self.uc_engine_x32 = Uc(UC_ARCH_X86, UC_MODE_32) self.capstone = None self.last_run_address = 0 self.hook_table = {} self.dll_load_table = {} self.start_time = 0 self.is_exits = False self.log_import_function_name = [] self.log_import_dlls = [] self.sections_read = {} self.sections_written = {} self.write_targets = [] self.allowed_sections = [] self.allowed_addr_ranges = [] self.uc_buffer = None self.last_section_name = '' self.status = False self.clean_pe_object = None def merge(self, ranges): if not ranges: return [] saved = list(ranges[0]) for lower, upper in sorted([sorted(t) for t in ranges]): if lower <= saved[1] + 1: saved[1] = max(saved[1], upper) else: yield tuple(saved) saved[0] = lower saved[1] = upper yield tuple(saved) def get_virtual_memory_size(self, pe_obj): sections = pe_obj.sections min_offset = sys.maxsize total_size = 0 for sec in sections: if sec.VirtualAddress < min_offset: min_offset = sec.VirtualAddress total_size += sec.Misc_VirtualSize total_size += min_offset return total_size def alloc_heap_memory(self, size): heap_alloc_memory_address = 0 for iter in self.enivronment_var['heap']: if iter.isfree and iter.size >= size: iter.isfree = False heap_alloc_memory_address = iter.address return heap_alloc_memory_address if heap_alloc_memory_address == 0: for iter in self.enivronment_var['heap']: if heap_alloc_memory_address < iter.address + iter.size: heap_alloc_memory_address = iter.address + iter.size break if heap_alloc_memory_address == 0: heap_alloc_memory_address = HEAP_ADDRESS_64 if self.sample_var[ 'is_x64'] else HEAP_ADDRESS_32 # 记得对其 heap_alloc_memory_address = align( heap_alloc_memory_address, 0x1000) heap_end = (HEAP_ADDRESS_64 if self.sample_var[ 'is_x64'] else HEAP_ADDRESS_32) + (HEAP_SIZE_64 if self.sample_var[ 'is_x64'] else HEAP_SIZE_32) if heap_alloc_memory_address + size > heap_end: print("[!] OverHeap because Heap is full 0x%x" % size) return 0 return heap_alloc_memory_address def api_GetCurrentThreadId(self): payload = THREAD_ID if self.sample_var['is_x64']: self.uc_engine.reg_write( UC_X86_REG_RAX, payload) else: self.uc_engine.reg_write( UC_X86_REG_EAX, payload) def api_QueryPerformanceCounter(self): payload = struct.pack("> 32 payload = bytes(SYS_FILETIME( dwLowDateTime, dwHighDateTime, )) if self.sample_var['is_x64'] == False: esp_address = self.uc_engine.reg_read(UC_X86_REG_ESP) + 0x4 self.uc_engine.mem_write(esp_address, payload) else: self.uc_engine.mem_write(self.uc_engine.reg_read( UC_X86_REG_RCX), payload) # 这里有问题 不过不管了 def api_acrt_iob_func(self): payload = struct.pack(" input_size: self.uc_engine.mem_write( out_address, out_data) if self.sample_var['is_x64']: self.uc_engine.reg_write(UC_X86_REG_RAX, len(out_data)) else: self.uc_engine.reg_write(UC_X86_REG_EAX, len(out_data)) def api_VirtualProtect(self): address = 0 size = 0 new_protect = 0 out_protect_address = 0 if self.sample_var['is_x64']: address = self.uc_engine.reg_read(UC_X86_REG_RCX) size = self.uc_engine.reg_read(UC_X86_REG_RDX) new_protect = self.uc_engine.reg_read(UC_X86_REG_R8) out_protect_address = self.uc_engine.reg_read(UC_X86_REG_R9) # out_protect_address = self.uc_engine.mem_read( # out_protect_address, 0x8) else: address = self.uc_engine.reg_read(UC_X86_REG_ESP) + 0x4 size = self.uc_engine.reg_read(UC_X86_REG_ESP) + 0x8 new_protect = self.uc_engine.reg_read(UC_X86_REG_ESP) + 0xC out_protect_address = self.uc_engine.reg_read( UC_X86_REG_ESP) + 0x10 address = self.uc_engine.mem_read(address, 4) size = self.uc_engine.mem_read(size, 4) new_protect = self.uc_engine.mem_read(new_protect, 4) out_protect_address = self.uc_engine.mem_read( out_protect_address, 4) address = int.from_bytes( address[::-1], byteorder='big', signed=False) size = int.from_bytes( size[::-1], byteorder='big', signed=False) new_protect = int.from_bytes( new_protect[::-1], byteorder='big', signed=False) out_protect_address = int.from_bytes( out_protect_address[::-1], byteorder='big', signed=False) result = 0x1 memory_protection = { # Tupel Format: (Execute, Read, Write) 0x01: (False, False, False), # 0x01 PAGE_NOACCESS 0x02: (False, True, False), # 0x02 PAGE_READONLY 0x04: (False, True, True), # 0x04 PAGE_READWRITE 0x08: (False, True, True), # 0x08 PAGE_WRITECOPY 0x10: (True, False, False), # 0x10 PAGE_EXECUTE 0x20: (True, True, False), # 0x20 PAGE_EXECUTE_READ 0x40: (True, True, True), # 0x40 PAGE_EXECUTE_READWRITE 0x80: (True, True, True), # 0x80 PAGE_EXECUTE_WRITECOPY } for saddr, eaddr in self.address_to_name.keys(): if (address <= saddr <= address + size or address <= eaddr <= address + size) and new_protect in memory_protection: name = self.address_to_name[(saddr, eaddr)] self.name_to_protection[name] = memory_protection[new_protect] if self.sample_var['is_x64']: self.uc_engine.reg_write(UC_X86_REG_RAX, result) # bool else: self.uc_engine.reg_write(UC_X86_REG_EAX, result) # bool self.uc_engine.mem_write( out_protect_address, struct.pack(" self.enivronment_var['org_api_handle_address'] + self.enivronment_var['api_handle_size'] or address < self.enivronment_var['org_api_handle_address']: curr_section = self.get_section_by_address(address) if curr_section != None: curr_section_name = convert_to_string(curr_section.Name) if self.last_section_name != curr_section_name: print("[*] Section changed: %s" % curr_section_name) self.last_section_name = curr_section_name if any(lower <= address <= upper for (lower, upper) in sorted(self.write_targets)): print("[+] Write target hit at 0x%x" % address) self.dump_pe() elif not self.is_allowed(address) and ( address < self.sample_var['image_base'] or address > self.sample_var['image_base'] + 0x1000): print("[-] Address 0x%x not allowed sec name %s" % (address, curr_section_name)) self.allow(address) self.dump_pe() # 执行dump pass else: print("current address : 0x%x" % address) self.stop() pass def memory_access_hook(self, uc, access, address, size, value, user_data): curr_section = self.get_section_by_address(address) if curr_section is not None: curr_section_name = convert_to_string(curr_section.Name) if access == UC_MEM_READ: if curr_section_name not in self.sections_read: self.sections_read[curr_section_name] = 1 else: self.sections_read[curr_section_name] += 1 elif access == UC_MEM_WRITE: self.write_targets = list( self.merge(self.write_targets + [(address, address + size)])) if curr_section_name not in self.sections_written: self.sections_written[curr_section_name] = 1 else: self.sections_written[curr_section_name] += 1 if access == UC_MEM_READ: # 查看是否手工读取导入表 if address in self.hook_table: if self.hook_table[address] not in self.log_import_function_name: self.log_import_function_name.append( self.hook_table[address]) def memory_invalid_hook(self, uc, access, address, size, value, user_data): current_rip = uc.reg_read( UC_X86_REG_RIP) if self.sample_var['is_x64'] else uc.reg_read(UC_X86_REG_EIP) print("[!] Invalid memory access at 0x%x Last address at 0x%x" % (current_rip, self.last_run_address)) print("[!] Access: %u, Address: 0x%x, Size: %u, Value: 0x%x" % (access, address, size, value)) def interrupt_hook(self, uc, value, user_data): if self.last_run_address not in self.hook_table: print("[!] Unknown Interrupt: %u at %x" % (value, self.last_run_address)) self.stop() return # print("[!] handle api call %s" % # self.hook_table[self.last_run_address]) call_api = self.hook_table[self.last_run_address] if call_api == 'GetSystemTimeAsFileTime': self.api_GetSystemTimeAsFileTime() elif call_api == 'GetCurrentThreadId': self.api_GetCurrentThreadId() elif call_api == 'GetCurrentProcessId': self.api_GetCurrentProcessId() elif call_api == 'QueryPerformanceCounter': self.api_QueryPerformanceCounter() elif call_api == 'LoadLibraryA': self.api_LoadLibraryA() elif call_api == 'GetProcAddress': self.api_GetProcAddress() elif call_api == 'VirtualProtect': self.api_VirtualProtect() elif call_api == 'LocalAlloc': self.api_LocalAlloc() elif call_api == 'GetModuleHandleA': self.api_GetModuleHandleA() elif call_api == 'GetModuleFileNameW': self.api_GetModuleFileNameW() elif call_api == '_initterm_e' or call_api == '_initterm' or call_api == '_get_initial_narrow_environment' or call_api == '__p___argv' or call_api == '__p___argc' or call_api == '__acrt_iob_func': if call_api == '__p___argc' or call_api == '__p___argv': self.api_acrt_iob_func() pass else: print("[!] Unknown api call %s" % call_api) self.stop() return pass def watch_dog(self, nonuse): while True: if self.status or self.is_exits: break current_time = time.time() if current_time - self.start_time > 30: #print("[!] Timeout") self.stop() break sleep(1) def stop(self): if self.uc_engine != None: print("[!] Stopping") self.uc_engine.emu_stop() def start(self): self.start_time = time.time() _thread.start_new_thread(self.watch_dog, (self,)) try: self.uc_engine.emu_start( self.sample_var['entry_point'], sys.maxsize) except UcError as e: print(f"Error: {e}") finally: self.is_exits = True self.uc_engine.emu_stop() def load_dll(self, path_dll, start_addr): filename = os.path.splitext(os.path.basename(path_dll))[0] file_path = f"{os.path.dirname(__file__)}/x64_dll/{filename}.ldll" if self.sample_var[ 'is_x64'] else f"{os.path.dirname(__file__)}/x32_dll/{filename}.ldll" pick_save_path = f"{os.path.dirname(__file__)}/x64_dll/{filename}.pickle" if self.sample_var[ 'is_x64'] else f"{os.path.dirname(__file__)}/x32_dll/{filename}.pickle" if not os.path.exists(file_path): with open(path_dll, "rb") as f: dll = pefile.PE(data=f.read()) # 解析导出表 dll.parse_data_directories() export_data = {} # name <-> offset for entry in dll.DIRECTORY_ENTRY_EXPORT.symbols: export_data[entry.name] = entry.address with open(pick_save_path, 'wb') as pick_file_handle: pickle.dump(export_data, pick_file_handle) self.export_to_name_table[filename] = export_data loaded_dll = dll.get_memory_mapped_image(ImageBase=start_addr) with open(file_path, 'wb') as f: f.write(loaded_dll) self.uc_engine.mem_map(start_addr, align(len(loaded_dll) + 0x1000)) self.uc_engine.mem_write(start_addr, loaded_dll) dll.close() # self.resolve_dll_export_table(loaded_dll) else: with open(file_path, 'rb') as dll: loaded_dll = dll.read() # self.resolve_dll_export_table(loaded_dll) self.uc_engine.mem_map( start_addr, align((len(loaded_dll) + 0x1000))) self.uc_engine.mem_write(start_addr, loaded_dll) with open(pick_save_path, 'rb') as pick_file_handle: self.export_to_name_table[filename] = pickle.load( pick_file_handle) def set_int3_hook(self, address, name): hook_addr = self.enivronment_var['api_handle_address'] payload = b'\xCC\xC3\x00\xC3' # print("set up %s hook at 0x%x" % (name, hook_addr)) if name in hook_arg_table: payload = payload[:2] + struct.pack( ' 0: self.allowed_sections += [convert_to_string(s.Name)] self.allowed_addr_ranges = self.get_allowed_addr_ranges() # self.allowed_sections = [s.Name.decode('utf-8') for s in pe_obj.sections if # s.VirtualAddress + self.sample_var['image_base'] <= self.sample_var['entry_point'] < s.VirtualAddress + s.Misc_VirtualSize + self.sample_var['image_base']] # self.allowed_addr_ranges = [] # 保存保护属性,用于脱壳后重建PE def prot_val(x, y): return True if x & y != 0 else False for s in pe_obj.sections: self.address_to_name[( s.VirtualAddress + self.sample_var['image_base'], s.VirtualAddress + self.sample_var['image_base'] + s.Misc_VirtualSize)] = convert_to_string(s.Name) self.name_to_protection[convert_to_string(s.Name)] = ( prot_val(s.Characteristics, 0x20000000), prot_val( s.Characteristics, 0x40000000), prot_val(s.Characteristics, 0x80000000)) ``` 这部分说一下为什么不进行完全模拟(实现脱VMP那种效果),因为后来发现,实在是太慢了,python的效率30分钟都跑不完VMP,拿来杀毒纯属扯淡,因此阉割了这个简单虚拟机的代码. 之后将干净的无壳代码送入xgboost,这部分代码丢了,字面意义的丢了,但是还记得训练过程: 1. 先去下样本,有很多国外免费/收费(不贵)的地方可以下样本,样本和白样本数量最好是1:1 2. 聚类差不多的样本,比如一个家族的,我们就只看10个样本,防止一类样本过多导致过拟合.我当时写了一个LSH算法,通过文件相似度,对样本进行聚类 3. 提特征,一般是比如代码段比率、代码熵、字符串、导入表、代码段名字、PE的头字节、PDB路径等等,我那会取了大概120个特征 4. 丢xgboost训练,为什么是xgboost因为他小、推算快、分类任务做的非常好.而且有C++版本模型可以直接换成C++ 之后用脱壳后代码,丢这个xgboost进行分类,我那会定义了三个类别,一个是没问题,一个是病毒,另外一个是PUA(你下的样本里面大概率有很多PUA而不是真正的病毒),当然现代杀软启发引擎做得好的会有很多分类,如hacktools、ransomware、Trojan等但是我这边人力有限,就三类 之后就是取最高分数了.当时效果准确率个人评估有70-80%左右.但是缺点很明显,慢、还是不够准确,接近70%的查杀率与50%的误报率,而且通过shape看模型权重,大部分集中在代码段比率与熵那块,也就是说,加壳了就肯定报,导入表有异常就报,跟早期诺顿或者defender没啥区别(这里只针对PE文件,其他的非PE就不说了).而且python的内存占用和推理速度,实在是不敢恭维.即便是把虚拟机脱壳模块给重写了,也是慢. ### v2.0 根据1.0的经验,我又花了点时间改进这破东西,首先第一步,我决定在原有脱壳机基础上,把熵、代码段比率这块的权重降低,并且让他更智能一点,如何做到?我的想法是人可以看代码,为什么AI不能看代码.但是AI不能直接看代码,因为他不懂汇编.(包括现在那个很火的chatgpt对汇编也是很不懂),因此我们要给代码打tag,要打tag首先就要分析PE里面有什么代码,这是我那会用的取程序funciton的例子,原理是判0xcc和0x90 ``` def get_capstone(pData, pIsX64, pPe): return_result = { 'op_code': {}, 'active': [] } capstone_handle = None if pIsX64: capstone_handle = g_capstone_handle_x64 else: capstone_handle = g_capstone_handle_x32 capstone_handle.detail = True sizeof_code = pPe.OPTIONAL_HEADER.SizeOfCode baseof_code = pPe.OPTIONAL_HEADER.BaseOfCode backtrack_code = [] isEnterFunction = False current_function_size = 0 current_fucntion_address = 0 function_size = [] # print("get_capstone") is_first = True for code in capstone_handle.disasm(pData[baseof_code:baseof_code + sizeof_code], 0x00000000): if len(backtrack_code) > 3: backtrack_code.pop(0) backtrack_code.append(code.mnemonic) if (code.mnemonic != 'int3' and code.mnemonic != 'nop') and (backtrack_code[0] == 'int3' or backtrack_code[0] == 'nop') and (backtrack_code[1] == 'int3' or backtrack_code[1] == 'nop') and (backtrack_code[2] == 'int3' or backtrack_code[2] == 'nop'): # print("进入函数") backtrack_code = [] isEnterFunction = True current_fucntion_address = code.address elif (code.mnemonic == 'int3' or code.mnemonic == 'nop') and (backtrack_code[0] == 'int3' or backtrack_code[0] == 'nop') and isEnterFunction: # print("退出函数") isEnterFunction = False function_size.append({ 'start_address': current_fucntion_address, 'end_address': code.address, 'size': current_function_size }) is_first = False current_function_size = 0 current_fucntion_address = 0 continue current_function_size = current_function_size + 1 if is_first: function_size.append({ 'start_address': 0, 'end_address': sizeof_code, 'size': sizeof_code }) function_size.sort(reverse=True, key=lambda x: x['size']) func_num = 0 for func in function_size: if func_num > 100: break # print(func) for code in capstone_handle.disasm(pData[baseof_code + func['start_address']:baseof_code + func['end_address']], 0x00000000): str_code = "{}-{}".format(code.mnemonic, get_opcode_type(code)) # str_code = code.mnemonic return_result['op_code'][str_code] = 1 return_result['active'].append(str_code) func_num = func_num + 1 return return_result ``` 解决代码这个问题后,我们就可以给代码打tag了,原理就是取相同,试想100个程序用同一个汇编代码,不用想,肯定是某些for或者if或者某些stl函数,把这些汇编给组合在一起就行 同理病毒也是,100个病毒用这一段代码,肯定是某些bypass AV手段或者作恶手段 这是当时打的一些tag: ``` {"inc-1 add-3": 819, "add-3 add-3": 285722, "add-3 add-3 add-3": 177864, "add-3 add-3 add-3 add-3": 131152, "add-3 add-3 add-3 add-3 add-3": 52358, "add-3 add-3 add-3 add-3 add-3 add-3": 43450, "add-3 add-3 add-3 add-3 add-3 add-3 add-3": 37200, "add-3 add-3 add-3 add-3 add-3 add-3 add-3 add-3": 32491, "mov-2 push-1": 0, "push-1 push-0": 2, "push-0 call-0": 30, "push-0 call-0 add-1": 4, "push-0 call-0 add-1 mov-7": 0, "call-0 add-1": 11, "call-0 add-1 mov-7": 0, "add-1 mov-7": 16, "xor-2 mov-7": 0, "mov-7 mov-3": 12, "mov-7 mov-3 and-1": 0, "mov-7 mov-3 and-1 mov-7": 0, "mov-7 mov-3 and-1 mov-7 mov-3": 0, "mov-3 and-1": 0, "mov-3 and-1 mov-7": 0, "mov-3 and-1 mov-7 mov-3": 0, "and-1 mov-7": 0, "and-1 mov-7 mov-3": 0, "mov-7 mov-3 xor-2": 0, "mov-3 xor-2": 0, "mov-7 add-1": 9, "mov-3 mov-7": 4, "mov-3 mov-7 mov-7": 0, "mov-3 mov-7 mov-7 mov-7": 0, "mov-3 mov-7 mov-7 mov-7 mov-7": 0, "mov-7 mov-7": 65, "mov-7 mov-7 mov-7": 0, "mov-7 mov-7 mov-7 mov-7": 0, "mov-7 mov-7 mov-6": 0, "mov-7 mov-6": 6, "push-0 push-0": 38, "push-0 push-0 mov-7": 0, "push-0 push-0 mov-7 push-1": 0, "push-0 push-0 mov-7 push-1 mov-7": 0, "push-0 push-0 mov-7 push-1 mov-7 push-1": 0, "push-0 push-0 mov-7 push-1 mov-7 push-1 call-0": 0, "push-0 mov-7": 0, "push-0 mov-7 push-1": 0, "push-0 mov-7 push-1 mov-7": 0, "push-0 mov-7 push-1 mov-7 push-1": 0, "push-0 mov-7 push-1 mov-7 push-1 call-0": 0, "push-0 mov-7 push-1 mov-7 push-1 call-0 add-1": 0, "mov-7 push-1": 0, "mov-7 push-1 mov-7": 0, "mov-7 push-1 mov-7 push-1": 0, "mov-7 push-1 mov-7 push-1 call-0": 0, "mov-7 push-1 mov-7 push-1 call-0 add-1": 0, "mov-7 push-1 mov-7 push-1 call-0 add-1 pop-1": 0, "push-1 mov-7": 1, "push-1 mov-7 push-1": 0, "push-1 mov-7 push-1 call-0": 0, "push-1 mov-7 push-1 call-0 add-1": 0, "push-1 mov-7 push-1 call-0 add-1 pop-1": 0, "mov-7 push-1 call-0": 0, "mov-7 push-1 call-0 add-1": 0, "mov-7 push-1 call-0 add-1 pop-1": 0, "push-1 call-0": 0, "push-1 call-0 add-1": 0, "push-1 call-0 add-1 pop-1": 0, "call-0 add-1 pop-1": 0, "add-1 pop-1": 0, "mov-2 pop-1": 10, "mov-2 pop-1 ret-0": 5, "mov-2 pop-1 ret-0 int3-0": 0, "mov-2 pop-1 ret-0 int3-0 int3-0": 0, "mov-2 pop-1 ret-0 int3-0 int3-0 int3-0": 0, ``` 比如 push call add mov 这应该是函数传参、call完后给某个东西add,再移动什么东西. 总之搜集了非常多的此类代码tag(这里跑了3月,用于用的算法和python的问题,非常慢..),搜集了大概300W的tags 然后就是传统路线,虚拟机脱壳->送入xgboost,但是xgboost中加入这些程序的tags命中情况以及顺序. 最终效果只能说提升一般般,看分布发现大部分tag被命中的疑似是security cookie和各种壳的自解压代码(因为我那虚拟机只能脱一些压缩壳,还是有很多壳脱不了),似乎对整体帮助不算大 ### V3.0 终于,在看了defender的实现后,我决定彻底改进这个破烂虚拟机,当时的想法是: 1. 不管什么程序直接跑虚拟机 2. 在虚拟执行与性能中寻找一个平衡点(虚拟执行不能执行太久,要不然非常慢) 3. 在虚拟执行过程中记录代码控制流、函数API调用、字符串、以及代码tags 4. 虚拟执行一旦因为各种情况而退出,保存(3)记录的东西 5. 送入xgboost,结合之前的有用的信息,抛弃之前一些AI压根不关注的features 于是,终于在各种努力下,有一个看起来像那么回事的杀毒引擎了,只不过这个杀毒引擎是python实现的、性能有点拉、没有白名单会误报之外,我个人认为在没白名单和特征库的情况下,已经达到我心目中的水平了. 后续复盘发现是,比如一些程序,混淆的,控制流会非常难看,代码tags接近混淆那段控制流,所以AI权重就偏向那边,而病毒喜欢内存加载、反虚拟机之类的,tags就会偏向病毒这类.而正常程序往往会因为调用一些不在API模拟列表里面的API而推出模拟,此类除了部分奇葩东西之外,控制流往往是接近正常程序这一类,这也是为什么这次效果看起来比较好. ###不足 当然,这也是有不足的,这是不足的地方: 1. 误报还是太大,这其实并不是我的问题,而是所有启发杀毒引擎的问题,而杀软厂商的解决误报方式非常简单,加白,加签名白.只是不在白名单+没有签名的文件才过这个所谓的启发引擎.而本人没这个功夫,毕竟做着玩.所以就这样了.不要管误报率 2. 还是太占性能,这一点没有办法,这一点是由虚拟机执行的天然特性决定的,当然还可以极限压缩,就是不用unicorn-engine,自己手搓虚拟机,但是我懒得了. 3. 对非PE文件、无文件攻击、.net、browser base(aka electron)程序完全无能为力.这也是现代杀毒引擎对此类文件无能为力的地方,我也没办法解决.对于高级威胁,建议用EDR而不是杀毒引擎 4. 有些病毒文件,比如非常简单联网C2,就是不报,这个没办法解决,除非MD5加黑,否则你的代码跟正常程序长一样是不会报的.这也是此类杀毒引擎的天然缺陷,即你的feature表现为正常,你没办法让他认为是黑的,认为是黑的其他的程序就会误报,所以AI的局限性就在这. 希望能帮助到想做杀毒引擎的朋友.最后打个小广告,推广一下QQ群:  本文由 huoji 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。 点赞 60
defender我没看过,但感觉你再弄弄就相当于跟eset一个模式了
只能看后人了
111