自动化截帧/自动化回放分析
截帧 (capure.py)
# _*_ encoding:utf-8 _*_
from rdc_mudule import *
rd = IfImportRdcPackage()
if __name__ == '__main__':
status, protocol_to_use = SelectSupportedDeviceProtocols()
status, protocol_controller, URL = GetDeviceProtocolController(protocol_to_use)
CheckRdcRemoteEnv(protocol_controller, URL)
status,remote_server = ConnectRemoteServer(URL, protocol_controller)
# 填写自己的
executeResult = ExecuteAndInjectPackage(remote_server,'包名', '启动的Activity名')
killEvt, th = PingRemoteServer(remote_server)
# URL, ident, ClientName, ForceConnect
captureController = rd.CreateTargetControl(URL, result.ident, 'auto py script', True)
if not captureController:
KillPingRemoteServer(killEvt, th)
ShutdownServerAndConnection(remote_server)
raise RuntimeError(f"无法连接调试 app")
logger.info(f"-------------- 启动调试 成功 --------------")
# 开始截帧率
msg = TriggerCapture(captureController, 1)
cap_path = msg.newCapture.path
cap_id = msg.newCapture.captureId
print(f"Got new capture at {cap_path} which is frame {msg.newCapture.frameNumber} with {msg.newCapture.api}")
#结束截帧
captureController.Shutdown()
captureController = None
KillPingRemoteServer(killEvt, th)
ShutdownServerAndConnection(remote_server)
回放(replay.py)
# _*_ encoding:utf-8 _*_
from rdc_mudule import *
rd = IfImportRdcPackage()
class ReplayContext(object):
def __init__(self, replayController):
self.controller = replayController
self._drawcalls = None
def GetDrawcalls(self):
if self._drawcalls == None:
self._drawcalls = self.controller.GetDrawcalls()
return self._drawcalls
def GetDrawcalls(self):
if self._drawcalls == None:
self._drawcalls = self.controller.GetDrawcalls()
return self._drawcalls
if __name__ == '__main__':
rd.InitialiseReplay(rd.GlobalEnvironment(), [])
status, protocol_to_use = SelectSupportedDeviceProtocols()
status, protocol_controller, URL = GetDeviceProtocolController(protocol_to_use)
CheckRdcRemoteEnv(protocol_controller, URL)
status, remote_server = ConnectRemoteServer(URL, protocol_controller)
# cap_path
status, replayController = remote_server.OpenCapture(rd.RemoteServer.NoPreference, cap_path, rd.ReplayOptions(), None)
if status != rd.ReplayStatus.Succeeded:
raise RuntimeError(f"Couldn't open {cap_path}, got error {str(result.status)}")
else:
sampleCode(replayController)
# replayController.Shutdown()
# replayController = None
remote_server.CloseCapture(replayController)
KillPingRemoteServer(killEvt, th)
ShutdownServerAndConnection(remote_server)
rd.ShutdownReplay()
辅助部分(rdc_mudule.py)
# _*_ encoding:utf-8 _*_
renderdoc_home = 'C:/Program Files/RenderDoc'
renderdoc_pymodule = renderdoc_home + '/pymodules'
scripts_home = renderdoc_home +'/rdc_pys'
import sys, os, time, threading
sys.path.append(renderdoc_pymodule)
sys.path.append(scripts_home)
os.environ["PATH"] += os.pathsep + os.path.abspath(renderdoc_home)
if sys.platform == 'win32' and sys.version_info[1] >= 8:
os.add_dll_directory(renderdoc_home)
# ------------------------------- env -------------------------------
# import rdc_package
def IfImportRdcPackage():
if 'renderdoc' not in sys.modules and '_renderdoc' not in sys.modules:
import renderdoc
return renderdoc
# ------------------------------- logic -------------------------------
rd = IfImportRdcPackage()
from collections import namedtuple
import logging as logger
# namedtuple('ProtocolSelection', ['protocols', devices])
def SelectSupportedDeviceProtocols()->(bool, str):
protocols = rd.GetSupportedDeviceProtocols()
if len(protocols) <= 0:
logger.error('请检查android ndk, sdk, adb环境')
return False, None
protocol_to_use = protocols[0]
logger.info(f'多种方式默认连接第一个:{protocol_to_use}')
return True, protocol_to_use
def GetDeviceProtocolController(protocol_to_use:str = 'adb')->(bool, rd.DeviceProtocolController, str):
protocol_controller = rd.GetDeviceProtocolController(protocol_to_use)
devices = protocol_controller.GetDevices()
if len(devices) == 0:
logger.error(f"没有 {protocol_to_use} 设备连接,请检查设备是否连接,以及调试模式是否开启")
return False, protocol_controller,None
dev = devices[0]
name = protocol_controller.GetFriendlyName(dev)
logger.info(f'多个设备默认连接第一个:{dev} - named {name}')
URL = protocol_controller.GetProtocolName() + "://" + dev
return True, protocol_controller,URL
def CheckRdcRemoteEnv(protocol_controller:rd.DeviceProtocolController, URL:str):
if not protocol_controller.IsSupported(URL):
logger.error(f"{dev} 不支持 'capture/replay' - 可能版本太低?")
return False
if not protocol_controller.SupportsMultiplePrograms(URL):
ident = rd.EnumerateRemoteTargets(URL, 0)
if ident != 0:
logger.error(f"{name} 已经有一个rdc 程序正在运行,默认强制关闭 {ident}")
# 强制关闭 TODO
def ConnectRemoteServer(URL:str, protocol_controller:rd.DeviceProtocolController)->(bool, rd.RemoteServer):
'''
连接远程服务器, 自动重试次数20次
'''
status,remote_server = rd.CreateRemoteServerConnection(URL)
if status == rd.ReplayStatus.NetworkIOFailed and protocol_controller is not None:
maxRetry = 20
retry = 1
# 循环重新启动
while True:
status = protocol_controller.StartRemoteServer(URL)
if status == rd.ReplayStatus.Succeeded:
break
if retry > maxRetry:
raise TimeoutError('启动rdc remote server fail')
logger.error(f"启动远程 rdc server error: {str(status)}, 重新尝试启动 第{retry} 次")
retry = retry +1
time.sleep(0.5)
retry = 1
# 循环连接
while True:
status,remote_server = rd.CreateRemoteServerConnection(URL)
if status == rd.ReplayStatus.Succeeded:
break
if retry > maxRetry:
raise TimeoutError('连接rdc remote server fail')
logger.error(f"连接 rdc server error: {str(status)}, 重新尝试启动 第{retry} 次")
retry = retry +1
time.sleep(0.1)
return status, remote_server
def PingRemoteServer(remote_server:rd.RemoteServer)->(threading.Event, threading.Thread):
def ping_remote(remote_server, kill):
success = True
while success and not kill.is_set():
success = remote_server.Ping()
time.sleep(1)
kill = threading.Event()
ping_thread = threading.Thread(target=ping_remote, args=(remote_server,kill))
ping_thread.start()
return kill, ping_thread
def ExecuteAndInjectPackage(remote_server:rd.RemoteServer, package:str, activity:str):
# if not package or len(package.split('.'))[2] != 'Alibaba':
# pass
# if not package or not activity:
# logger.error(f"参数不正确,package/activity不对, 请仔细检查: {package}, {activity}")
# return False, None
# 屏蔽检查, 硬性速度
home = remote_server.GetHomeFolder()
InstalledPackages = [path.filename for path in remote_server.ListFolder(home)]
# if package not in InstalledPackages:
for pkg in InstalledPackages:
logger.info(f" -- {pkg}")
# logger.error(f"没有安装游戏包, 或者包名不对, 请仔细检查: {package}")
# return False,None
packageFullName = package +'/' + activity
executeResult = remote_server.ExecuteAndInject(packageFullName, '', '', [], rd.GetDefaultCaptureOptions())
maxRetry = 3
retry = 1
while True:
if executeResult.status == rd.ReplayStatus.Succeeded:
break
if retry > maxRetry:
raise Exception('启动app fail, 请检查连接以及开发者模式等选项')
logger.error(f"启动app 失败 {packageFullName}, error {str(executeResult.status)}, 重试启动 第{retry}次")
executeResult = remote_server.ExecuteAndInject(packageFullName, '', '', [], rd.GetDefaultCaptureOptions())
retry = retry +1
time.sleep(0.5)
return executeResult
def TriggerCapture(targetController: rd.TargetControl, count: int = 1)->rd.TargetControlMessage:
target.TriggerCapture(1)
msg = None
start = time.clock()
while msg is None or msg.type != rd.TargetControlMessageType.NewCapture:
msg = target.ReceiveMessage(None)
if time.clock() - start > 30:
break
return msg
def KillPingRemoteServer(evt: threading.Event, th: threading.Thread):
kill.set()
ping_thread.join()
def ShutdownServerAndConnection(remote_server:rd.RemoteServer):
'''
断开远程服务器
'''
if not remote_server:
return
remote_server.ShutdownServerAndConnection()