-
Notifications
You must be signed in to change notification settings - Fork 97
/
ida_example.py
391 lines (366 loc) · 14.5 KB
/
ida_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
"""
A simple example script demonstrating the use of FunctionSimSearch via the IDA
Python API.
Registers two hotkeys:
Shift-S -- hash the current function, print the hash, and add it to the
search index in /tmp/example.simhash
Shift-L -- hash the current function, search for the best hits in the data-
base, and display the best guesses along with their similarity
score.
"""
import functionsimsearch
import idaapi
import base64
import os
def parse_function_meta_data(filename):
# Returns a dictionary mapping (id, address) tuples to
# (filename, functionname) tuples.
result = {}
for line in file(filename, "rt").readlines():
tokens = line.split()
exe_id = long(tokens[0], 16)
address = long(tokens[2], 16)
function_name = base64.decodestring(tokens[3])
file_name = tokens[1]
result[(exe_id, address)] = (file_name, function_name)
return result
def split_instruction_list(instructions, split_mnemonic):
"""
Takes a sequence of instructions of the form:
[ (address, "mnem", ("op1", ..., "opN")), ... ]
and returns multiple lists of such instructions, split after each
instruction where "mnem" is equal to split_mnemonic.
Can be re-used for other disassemblers that do not split basic blocks after
a CALL.
"""
results = []
index = 0
results.append([])
while index < len(instructions):
# Appends the current instruction under consideration to the last list in the
# list-of-lists 'results'.
results[-1].append(instructions[index])
# Checks if the right mnemonic to 'split' on is encountered.
if (instructions[index][1] == split_mnemonic):
# Time to split. Simply appends an empty list to 'results'.
results.append([])
index = index + 1
# It is possible to have appended an empty list if the instruction-to-split-on
# was the last instruction of the block. Remove it if this happens.
if len(results[-1]) == 0:
results.pop()
return results
def get_flowgraph_from(address, ignore_instructions=False):
"""
Generates a flowgraph object that can be fed into FunctionSimSearch from a
given address in IDA.
"""
call_instruction_string
if not address:
address = here()
ida_flowgraph = idaapi.FlowChart(idaapi.get_func(address))
flowgraph = functionsimsearch.FlowgraphWithInstructions()
for block in ida_flowgraph:
# Add all the ida-flowgraph-basic blocks. We do this up-front so we can
# more easily add edges later, and adding a node twice does not hurt.
flowgraph.add_node(block.start_ea)
for block in ida_flowgraph:
# There seems to be no good way to get operands without IDA substituting
# local variable names etc., so this is a very ugly hack to deal with that.
# TODO(thomasdullien): IDA 7.2 will provide a way to perform print_operand
# without replacement (by providing empty type arguments?), replace the hack
# here with a "proper" solution.
instructions = [ (i, GetMnem(i), (print_operand(i, 0).replace("+var_", "-0x"),
print_operand(i, 1).replace("+var_", "-0x"))) for i in
Heads(block.start_ea, block.end_ea)]
small_blocks = split_instruction_list(instructions, call_instruction_string)
for small_block in small_blocks:
flowgraph.add_node(small_block[0][0])
small_block_instructions = tuple(instruction[1:] for instruction
in small_block)
if not ignore_instructions:
flowgraph.add_instructions(small_block[0][0], small_block_instructions)
for index in range(len(small_blocks)-1):
flowgraph.add_edge(small_blocks[index][0][0], small_blocks[index+1][0][0])
for successor_block in block.succs():
flowgraph.add_edge(small_blocks[-1][0][0], successor_block.start_ea)
return flowgraph
def export_idb_as_json(output_prefix):
""" Writes the entire IDB into a big JSON file and create a separate symbol
file as well. """
output_file = file(output_prefix + ".json", "wt")
output_meta = file(output_prefix + ".meta", "wt")
output_file.write("{ \"flowgraphs\" : [ ")
executable_id = long(ida_nalt.retrieve_input_file_sha256()[0:16], 16)
path = ida_nalt.get_input_file_path()
function_addresses = [ x for x in Functions(MinEA(), MaxEA()) ]
for function in function_addresses:
# Obtain and write the flowgraph.
flowgraph = get_flowgraph_from(address=function)
json = flowgraph.to_json()
print("Writing flowgraph for function %lx (%d bytes)..." % (function,
len(json)))
output_file.write(json)
if function != function_addresses[-1]:
output_file.write(",\n")
output_file.write("]}")
output_file.close()
print("Done.")
def save_all_functions():
search_index
sim_hasher
for function in Functions(MinEA(), MaxEA()):
if not save_function(function_address=function):
break
def print_hash():
sim_hasher
flowgraph = get_flowgraph_from(here(), False)
print(flowgraph.size())
print(flowgraph.number_of_branching_nodes())
print(flowgraph.to_json())
hashes = sim_hasher.calculate_hash(flowgraph)
print("Hash for flowgraph at %lx is %lx %lx" % (here(), hashes[0], hashes[1]))
def save_function(function_address=None):
search_index
sim_hasher
if search_index.get_free_size() < 1024:
print("Fewer than 1024 bytes in the index file left. Resize it please.")
return False
if function_address == None:
function_address = here()
flowgraph = get_flowgraph_from(function_address)
branching_nodes = flowgraph.number_of_branching_nodes()
if branching_nodes < 5:
print("%lx: has only %d (< 5) branching nodes, ignoring." % (
function_address,
branching_nodes))
return True
hashes = sim_hasher.calculate_hash(flowgraph)
executable_id = long(ida_nalt.retrieve_input_file_sha256()[0:16], 16)
address = idaapi.get_func(function_address).start_ea
search_index.add_function(hashes[0], hashes[1], executable_id, address)
print("%lx:%lx %lx-%lx Added function to search index." % (executable_id,
address, hashes[0], hashes[1]))
# Add the metadata of the function.
metadata_file
metafile = file(metadata_file, "a")
function_name = base64.b64encode(Name(address))
metafile.write("%16.16lx %s %16.16lx %s false\n" % (
executable_id, ida_nalt.get_input_file_path(), address, function_name))
return True
def search_function(function_address, minimum=0, minsize=6):
"""
Search through the search index for a given function at a given address.
"""
search_index
sim_hasher
meta_data
flowgraph = get_flowgraph_from(function_address)
branching_nodes = flowgraph.number_of_branching_nodes()
if (branching_nodes < minsize):
return (False, [])
hashes = sim_hasher.calculate_hash(flowgraph)
results = search_index.query_top_N(hashes[0], hashes[1], 5)
return (hashes, results)
def print_results(executable_id, address, hashes, results, minimum=0):
print_separator = False
for result in results:
same_bits = result[0]
if same_bits < minimum:
continue
print_separator = True
result_exe_id = result[1]
result_address = result[2]
odds = 0.0
odds = search_index.odds_of_random_hit(same_bits)
if not meta_data.has_key((result_exe_id, result_address)):
print("%lx:%lx %lx-%lx Result is %f - %lx:%lx (1 in %f searches)" %
(executable_id, address, hashes[0], hashes[1],
same_bits, result_exe_id, result_address, odds))
else:
filename, functionname = meta_data[(result_exe_id, result_address)]
functionname = functionname.replace('\n', '')
functionname = functionname.replace('\r', '')
print("%lx:%lx %lx-%lx Result is %f - %lx:%lx %s '%s' (1 in %f searches)" %
(executable_id, address, hashes[0], hashes[1],
same_bits, result_exe_id, result_address, filename, functionname, odds))
if print_separator:
print("--------------------------------------")
def load_function(function_address = None, minimum=0, minsize=6):
"""
Search for a function in the search index, and output the best matches to the
log window.
"""
search_index
meta_data
if function_address == None:
function_address = here()
hashes, results = search_function(function_address, minimum, minsize)
executable_id = long(ida_nalt.retrieve_input_file_sha256()[0:16], 16)
address = long(idaapi.get_func(function_address).start_ea)
if len(results) == 0 and hashes:
print("%lx:%lx %lx-%lx No search results" %
(executable_id, address, hashes[0], hashes[1]))
else:
print_results(executable_id, address, hashes, results, minimum=minimum)
def match_all_functions():
"""
Iterate through all functions, and try to match them one-by-one without taking
any context into account (e.g. only the single function).
"""
search_index
sim_hasher
for function in Functions(MinEA(), MaxEA()):
load_function(function_address=function, minimum=105)
def match_all_functions_require_consecutive_n(n = 2, minimum_score = 95):
"""
Scan through all functions, but only report results if N consecutive functions
obtain a similarity score in excess of minimum_score.
This helps control false positives.
"""
search_index
sim_hasher
all_functions = [ f for f in Functions(MinEA(), MaxEA()) ]
results_map = {}
function_addresses = []
for function in Functions(MinEA(), MaxEA()):
hashes, results = search_function(function_address=function, minimum=minimum_score)
results_map[function] = results
function_addresses.append(function)
"""
nontrivial_results = True
if len(function_addresses > n):
for i in range(0, n):
# Check that there are good results for the last n functions.
func_address = function_addresses[-i]
if len(results_map[func_address]) == 0:
nontrivial_results = False
if nontrivial_results:
# A sequence of matches was found.
print("Nontrivial matches found.")
load_function(functiion_address=function, minimum=103)
"""
idx = 0
outfile = file("/tmp/res.txt", "wt")
for f in function_addresses:
res = results_map[f]
if len(res) > 0:
sim = res[0][0]
else:
sim = 0.0
if sim > 104:
outfile.write("%d %lx %d\n" % (idx, f, sim))
idx = idx+1
outfile.close()
import ida_idp, idc
processor_to_call_instructions = { "arm" : "FOO", "pc" : "call" }
call_instruction_string = processor_to_call_instructions[ida_idp.get_idp_name()]
# Attempt to de-register the hotkeys. This will throw an exception if they
# are note registered yet, and then register them.
try:
hotkey_context_S
if idaapi.del_hotkey(hotkey_context_S):
print("FunctionSimSearch: Hotkey S unregistered.")
del hotkey_context_S
else:
print("FunctionSimSearch: Failed to unregister hotkey S.")
hotkey_context_L
if idaapi.del_hotkey(hotkey_context_L):
print("FunctionSimSearch: Hotkey L unregistered.")
del hotkey_context_L
else:
print("FunctionSimSearch: Failed to unregister hotkey L.")
hotkey_context_H
if idaapi.del_hotkey(hotkey_context_H):
print("FunctionSimSearch: Hotkey H unregistered.")
del hotkey_context_H
else:
print("FunctionSimSearch: Failed to unregister hotkey H.")
hotkey_context_A
if idaapi.del_hotkey(hotkey_context_A):
print("FunctionSimSearch: Hotkey A unregistered.")
del hotkey_context_A
else:
print("FunctionSimSearch: Failed to unregister hotkey A.")
hotkey_context_M
if idaapi.del_hotkey(hotkey_context_M):
print("FunctionSimSearch: Hotkey M unregistered.")
del hotkey_context_M
else:
print("FunctionSimSearch: Failed to unregister hotkey M.")
search_index
sim_hasher
# Deleting the search index and simhashes is necessary to make sure the C++
# destructors get called and the memory-mapped file that provides the search
# index gets unmapped cleanly. This is mainly useful to make sure that the
# DB can be safely unmapped from IDA (and then modified / grown).
del search_index
del sim_hasher
except:
print(idc.ARGV)
if len(idc.ARGV) > 2 and idc.ARGV[1] == "export":
data_directory = idc.ARGV[2]
if not os.path.exists(data_directory):
print("Could not find database to export to!")
qexit(1)
else:
# Ask the user for a directory. The code expects the directory to contain
# a search index ("simhash.index"), a metadata file ("simhash.meta"), and
# optionally a feature weights file ("simhash.weights").
data_directory = AskStr("/var/tmp/",
"Please enter a directory for the search index. If no index is found, it will be created.")
while not os.path.exists(data_directory):
data_directory = AskStr("/var/tmp/",
"Please enter an EXISTING data directory.")
index_file = os.path.join(data_directory, "simhash.index")
metadata_file = os.path.join(data_directory, "simhash.meta")
weights_file = os.path.join(data_directory, "simhash.weights")
# Register the hotkeys for the plugin.
hotkey_context_S = idaapi.add_hotkey("Shift-S", save_function)
hotkey_context_L = idaapi.add_hotkey("Shift-L", load_function)
hotkey_context_H = idaapi.add_hotkey("Shift-H", print_hash)
hotkey_context_A = idaapi.add_hotkey("Shift-A", save_all_functions)
hotkey_context_M = idaapi.add_hotkey("Shift-M", match_all_functions)
if None in [hotkey_context_S, hotkey_context_L, hotkey_context_H,
hotkey_context_A, hotkey_context_M]:
print("FunctionSimSearch: Failed to register hotkeys.")
del hotkey_context_S
del hotkey_context_L
del hotkey_context_H
del hotkey_context_A
del hotkey_context_M
else:
print("FunctionSimSearch: Hotkeys registered:")
print("Shift-S : save_function")
print("Shift-L : load_function")
print("Shift-H : print_hash")
print("Shift-A : save_all_functions")
print("Shift-M : match_all_functions")
create_index = True
if os.path.isfile(index_file):
create_index = False
if os.path.isfile(metadata_file):
print("Parsing meta_data from file %s" % metadata_file)
meta_data = parse_function_meta_data(metadata_file)
print("Parsed meta_data.")
else:
meta_data = {}
number_of_buckets = 50
print("Calling functionsimsearch.SimHashSearchIndex(\"%s\", %s, %d)" % (
index_file, create_index, number_of_buckets))
try:
search_index = functionsimsearch.SimHashSearchIndex(index_file,
create_index, number_of_buckets)
if os.path.isfile(weights_file):
print("Calling functionsimsearch.SimHasher(\"%s\")" % weights_file)
sim_hasher = functionsimsearch.SimHasher(weights_file)
else:
sim_hasher = functionsimsearch.SimHasher()
if len(idc.ARGV) > 2 and idc.ARGV[1] == "export":
save_all_functions()
qexit(0)
except:
import traceback
tb = traceback.format_exc()
print("Failure to create/open the search index!")
print(tb)