کد:
# patcher.py
# handles patching and unpatching of process memory.
# public domain code.
from ctypes import *
from win32api import *
from pytcc import pytcc
from struct import pack, unpack, calcsize
from win32gui import PyGetString, PySetMemory, PySetString
from win32con import MEM_COMMIT, MEM_RESERVE, PAGE_EXECUTE_READWRITE, PROCESS_ALL_ACCESS
from distorm import Decode
DEBUG = True
def DB (msg):
global DEBUG
if DEBUG: print (msg)
def OpenProcess (pid=GetCurrentProcessId()):
"""Opens a process by pid."""
DB ("[openProcess] pid:%s."%pid)
phandle = windll.kernel32.OpenProcess (\
PROCESS_ALL_ACCESS,
False,
pid )
assert phandle, "Failed to open process!\n%s" % WinError (GetLastError ()) [1]
return phandle
def readMemory (phandle, address, size):
"""readMemory (address, size, phandle):"""
cbuffer = c_buffer (size)
success = windll.kernel32.ReadProcessMemory (\
phandle,
address,
cbuffer,
size,
0 )
assert success, "Failed to read memory!\n%s" % WinError (GetLastError()) [1]
return cbuffer.raw
def writeMemory (phandle, address=None, data=None):
"""Writes data to memory and returns the address."""
assert data
size = len (data) if isinstance (data, str) else sizeof (data)
cdata = c_buffer (data) if isinstance (data, str) else byref (data)
if not address: address = allocate (size, phandle)
success = windll.kernel32.WriteProcessMemory (\
phandle,
address,
cdata,
size,
0 )
assert success, "Failed to write process memory!\n%s" % WinError (GetLastError()) [1]
DB ("[write memory] :%s OK." % address)
return address
def allocate (size, phandle):
"""Allocates memory of size in phandle."""
address = windll.kernel32.VirtualAllocEx (\
phandle,
0,
size,
MEM_RESERVE | MEM_COMMIT,
PAGE_EXECUTE_READWRITE )
assert address, "Failed to allocate memory!\n%s" % WinError(GetLastError()) [1]
DB ("[memory allocation] :%s" % address)
return address
def releaseMemory (address, size, phandle):
"""Releases memory by address."""
return windll.kernel32.VirtualFreeEx (\
phandle,
address,
size,
MEM_RELEASE )
assert success, "Failed to read process memory!\n%s" % WinError(GetLastError()) [1]
return cbuffer.raw
def transport (data, phandle):
size = len (data)
memory = allocate (size, phandle)
writeMemory (phandle, memory, data)
return memory
def get_patch (destination, params_size=0):
"""mov eax, destination
call eax
retn params_size
"""
if isinstance (destination, (int,long)): destination = pack ("i", destination)
if isinstance (params_size, (int,long)): params_size = pack ("h", params_size)
return '\xb8%s\xff\xd0\xc2%s' % (destination, params_size)
def get_cparams_size (cparams):
if not cparams: return 0
s = ''
for param in cparams:
s += "size += sizeof (%s);\n" % param
c_code = """
int getsize ()
{
int size = 0;
%s
return size;
}""" % s
#DB (c_code)
ccompiler = pytcc ()
ccompiler.compile (c_code)
ccompiler.relocate ()
getsize = ccompiler.get_function ("getsize")
size = getsize ()
# ccompiler.delete ()
return size
def get_cparams_size_b (cparams):
return sum (map (calcsize, [param._type_ for param in cparams]))
def find_good_spot_to_patch (apiaddress, needed_size, maxscan=4000):
"""find_good_spot_to_patch (apiaddress, needed_size, maxscan=4000):
Searches the instructions inside an API for a good place to patch."""
# DEBUG
if DEBUG == 2:
bytes = PyGetString (apiaddress, needed_size * 2)
dprint (apiaddress, bytes)
# # # #
aoffset = 0
found_space = 0
position = apiaddress
while found_space < needed_size:
bytes = PyGetString (position, 24)
# DB ("found_space: %s. aoffset: %s. apiaddress: %s." % (found_space, aoffset, hex(position)))
# if does_code_end_function (bytes): raise "Function end found before enough space was found!"
offset, size, instruction, hexstr = Decode (position, bytes) [0]
if "ret" in instruction.lower (): raise "Function end found before enough space was found!"
if not ....... (lambda x:x.lower() in instruction.lower(), ["call", "jmp"]):
found_space += size
else:
found_space = 0
aoffset += size
if aoffset >= maxscan: raise "Maxscan exceeded while searching for a good spot to patch!"
position += size
return apiaddress + (aoffset - found_space)
class patcher:
source = None
destination = None
jmp_asm = None
original_bytes = None
params_size = 0
pid = None
phandle = None
duplicate_api = None
original_api = None
def __init__ (self,
source=None,
destination=None,
params_size=0,
pid=GetCurrentProcessId () ):
self.set_pid (pid)
self.set_source (source)
self.set_destination (destination)
self.set_params_size (params_size)
def set_pid (self, pid):
self.close ()
self.phandle = OpenProcess (pid)
self.pid = pid
def set_source (self, source): self.source = source
def set_destination (self, destination): self.destination = destination
def set_params_size (self, size): self.params_size = size
def set_source_as_api (self, apiname, dllname="kernel32.dll", free=True):
module = LoadLibrary (dllname)
procedure = GetProcAddress (module, apiname)
if free: FreeLibrary (module)
assert procedure
self.original_api = eval ("windll.%s.%s" % (dllname.strip(".dll"), apiname))
self.source = find_good_spot_to_patch (procedure, len (get_patch (0, self.params_size)))
if DEBUG: DB ("found good spot to patch: %s %s. Offset from original api address: %s." \
%(self.source, hex (self.source), self.source - procedure))
def patch (self):
assert all ((self.phandle, self.source, self.destination)), "Patch source or destination not set!"
assert not self.original_bytes, "Already patched!"
self.jmp_asm = get_patch (self.destination, self.params_size)
jmp_asm_size = len (self.jmp_asm)
self.original_bytes = PyGetString (self.source, jmp_asm_size)
assert self.original_bytes, "Failed to capture original_bytes."
writeMemory (\
phandle=self.phandle,
address=self.source,
data=self.jmp_asm)
msg = "[jmp_asm]:%s\n[jmp_asm_size]:%s\n[original_bytes]:%s\n" \
% (repr (self.jmp_asm), jmp_asm_size, repr (self.original_bytes))
DB (msg)
def unpatch (self):
if not self.original_bytes: raise "Not patched!"
assert all ((self.phandle, self.source, self.destination)), "Not initialized!"
writeMemory (\
phandle=self.phandle,
address=self.source,
data=self.original_bytes )
self.original_bytes = None
def close (self):
if self.phandle:
windll.kernel32.CloseHandle (self.phandle)
self.phandle = None
def release (self):
if self.phandle and self.duplicate_api:
releaseMemory (self.duplicate_api, 0, self.phandle)
def call_original_api (self, *args, **kwargs): return self.original_api (*args, **kwargs)
def call_duplicate_api (self, types, *args, **kwargs):
return WINFUNCTYPE (c_void_p, types) (self.duplicate_api) (*args, **kwargs)
def __del__ (self):
try:self.unpatch ()
except:pass
try:self.release ()
except:pass
try:self.close ()
except:pass
def dprint (a, c):
"""Pretty prints disassembled bytes. dprint (offset, bytes)."""
x = Decode (a, c)
print "[deci addr : hexi addr] [size] instruction\n"
for offset, size, instruction, hexstr in x:
print "[%s : %s] [%s] %s" % (a,hex (a), size, instruction)
a += size
print
#cadCode:
# tramper