5.RDC Profiler


自动化截帧/自动化回放分析

截帧 (capure.py)


# _*_ encoding:utf-8 _*_
from rdc_mudule import *

rd = IfImportRdcPackage()

if __name__ == '__main__':
    status, protocol_to_use = SelectSupportedDeviceProtocols()
    status, protocol_controller, URL = GetDeviceProtocolController(protocol_to_use)

    CheckRdcRemoteEnv(protocol_controller, URL)
    status,remote_server = ConnectRemoteServer(URL, protocol_controller)

    # 填写自己的
    executeResult = ExecuteAndInjectPackage(remote_server,'包名', '启动的Activity名')
    killEvt, th = PingRemoteServer(remote_server)

    # URL, ident, ClientName, ForceConnect
    captureController = rd.CreateTargetControl(URL, result.ident, 'auto py script', True)

    if not captureController:
        KillPingRemoteServer(killEvt, th)
        ShutdownServerAndConnection(remote_server)
        raise RuntimeError(f"无法连接调试 app")

    logger.info(f"-------------- 启动调试 成功 --------------")  

    # 开始截帧率
    msg = TriggerCapture(captureController, 1)

    cap_path = msg.newCapture.path
    cap_id = msg.newCapture.captureId
    print(f"Got new capture at {cap_path} which is frame {msg.newCapture.frameNumber} with {msg.newCapture.api}")
    
    #结束截帧
    captureController.Shutdown()
    captureController = None

    KillPingRemoteServer(killEvt, th)


    ShutdownServerAndConnection(remote_server)
        

回放(replay.py)

# _*_ encoding:utf-8 _*_
from rdc_mudule import *

rd = IfImportRdcPackage()

class ReplayContext(object):
    def __init__(self, replayController):
        self.controller = replayController
        self._drawcalls = None

    def GetDrawcalls(self):
        if self._drawcalls == None:
            self._drawcalls = self.controller.GetDrawcalls()
        return self._drawcalls

    def GetDrawcalls(self):
        if self._drawcalls == None:
            self._drawcalls = self.controller.GetDrawcalls()
        return self._drawcalls


if __name__ == '__main__':
    rd.InitialiseReplay(rd.GlobalEnvironment(), [])
    status, protocol_to_use = SelectSupportedDeviceProtocols()
    status, protocol_controller, URL = GetDeviceProtocolController(protocol_to_use)

    CheckRdcRemoteEnv(protocol_controller, URL)
    status, remote_server = ConnectRemoteServer(URL, protocol_controller)

    # cap_path
    status, replayController = remote_server.OpenCapture(rd.RemoteServer.NoPreference, cap_path, rd.ReplayOptions(), None)

    if status != rd.ReplayStatus.Succeeded:
        raise RuntimeError(f"Couldn't open {cap_path}, got error {str(result.status)}")
    else:
        sampleCode(replayController)
        # replayController.Shutdown()
        # replayController = None
        remote_server.CloseCapture(replayController)

    KillPingRemoteServer(killEvt, th)
    ShutdownServerAndConnection(remote_server)
    rd.ShutdownReplay()
        

辅助部分(rdc_mudule.py)

# _*_ encoding:utf-8 _*_

renderdoc_home  = 'C:/Program Files/RenderDoc'
renderdoc_pymodule  = renderdoc_home + '/pymodules'
scripts_home = renderdoc_home +'/rdc_pys'


import sys, os, time, threading
sys.path.append(renderdoc_pymodule)
sys.path.append(scripts_home)
os.environ["PATH"] += os.pathsep + os.path.abspath(renderdoc_home)
if sys.platform == 'win32' and sys.version_info[1] >= 8:
    os.add_dll_directory(renderdoc_home)


# ------------------------------- env -------------------------------

# import rdc_package 
def IfImportRdcPackage():
    if 'renderdoc' not in sys.modules and '_renderdoc' not in sys.modules:
        import renderdoc
    return renderdoc


# ------------------------------- logic -------------------------------

rd = IfImportRdcPackage()
from collections import namedtuple
import logging as logger


# namedtuple('ProtocolSelection', ['protocols', devices])

def SelectSupportedDeviceProtocols()->(bool, str):
    protocols = rd.GetSupportedDeviceProtocols()
    if len(protocols) <= 0:
        logger.error('请检查android ndk, sdk, adb环境')
        return False, None

    protocol_to_use = protocols[0]
    logger.info(f'多种方式默认连接第一个:{protocol_to_use}')
    return True, protocol_to_use

def GetDeviceProtocolController(protocol_to_use:str = 'adb')->(bool, rd.DeviceProtocolController, str):

    protocol_controller = rd.GetDeviceProtocolController(protocol_to_use)
    devices = protocol_controller.GetDevices()
    if len(devices) == 0:
        logger.error(f"没有 {protocol_to_use} 设备连接,请检查设备是否连接,以及调试模式是否开启")
        return False, protocol_controller,None

    dev = devices[0]
    name = protocol_controller.GetFriendlyName(dev)
    logger.info(f'多个设备默认连接第一个:{dev} - named {name}')

    URL = protocol_controller.GetProtocolName() + "://" + dev
    return True, protocol_controller,URL

def CheckRdcRemoteEnv(protocol_controller:rd.DeviceProtocolController, URL:str):

    if not protocol_controller.IsSupported(URL):
        logger.error(f"{dev} 不支持 'capture/replay' - 可能版本太低?")
        return False

    if not protocol_controller.SupportsMultiplePrograms(URL):
        ident = rd.EnumerateRemoteTargets(URL, 0)
        if ident != 0:
            logger.error(f"{name} 已经有一个rdc 程序正在运行,默认强制关闭 {ident}")
            # 强制关闭 TODO
  
        
def ConnectRemoteServer(URL:str, protocol_controller:rd.DeviceProtocolController)->(bool, rd.RemoteServer):
    '''
    连接远程服务器, 自动重试次数20次
    '''

    status,remote_server = rd.CreateRemoteServerConnection(URL)
    if status == rd.ReplayStatus.NetworkIOFailed and protocol_controller is not None:
        maxRetry = 20
        retry = 1
        # 循环重新启动
        while True:
            status = protocol_controller.StartRemoteServer(URL)
            if status == rd.ReplayStatus.Succeeded:
                break

            if retry > maxRetry:
                raise TimeoutError('启动rdc remote server fail')
            logger.error(f"启动远程 rdc server error: {str(status)}, 重新尝试启动 第{retry} 次")
            retry = retry +1
            time.sleep(0.5)

        retry = 1
        # 循环连接
        while True:
            status,remote_server = rd.CreateRemoteServerConnection(URL)
            if status == rd.ReplayStatus.Succeeded:
                break
            
            if retry > maxRetry:
                raise TimeoutError('连接rdc remote server fail')
            logger.error(f"连接 rdc server error: {str(status)}, 重新尝试启动 第{retry} 次")
            retry = retry +1
            time.sleep(0.1)

    return status, remote_server

def PingRemoteServer(remote_server:rd.RemoteServer)->(threading.Event, threading.Thread):
    def ping_remote(remote_server, kill):
        success = True
        while success and not kill.is_set():
            success = remote_server.Ping()
            time.sleep(1)

    kill = threading.Event()
    ping_thread = threading.Thread(target=ping_remote, args=(remote_server,kill))
    ping_thread.start()
    return kill, ping_thread

def ExecuteAndInjectPackage(remote_server:rd.RemoteServer, package:str, activity:str):
    # if not package or len(package.split('.'))[2] != 'Alibaba':
    #     pass
    # if not package or not activity:
    #     logger.error(f"参数不正确,package/activity不对, 请仔细检查: {package}, {activity}")
    #     return False, None

    # 屏蔽检查, 硬性速度
    home = remote_server.GetHomeFolder()
    InstalledPackages = [path.filename for path in  remote_server.ListFolder(home)]
    # if package not in InstalledPackages:
    for pkg in InstalledPackages:
        logger.info(f"  --  {pkg}")

    #     logger.error(f"没有安装游戏包, 或者包名不对, 请仔细检查: {package}")
    #     return False,None

    packageFullName = package +'/' + activity

    executeResult = remote_server.ExecuteAndInject(packageFullName, '', '', [], rd.GetDefaultCaptureOptions())

    maxRetry = 3
    retry = 1
    while True:
        if executeResult.status == rd.ReplayStatus.Succeeded:   
            break
        
        if retry > maxRetry:
            raise Exception('启动app fail, 请检查连接以及开发者模式等选项')

        logger.error(f"启动app 失败 {packageFullName}, error {str(executeResult.status)}, 重试启动 第{retry}次")            
        executeResult = remote_server.ExecuteAndInject(packageFullName, '', '', [], rd.GetDefaultCaptureOptions())
        retry = retry +1
        time.sleep(0.5)
    return executeResult

def TriggerCapture(targetController: rd.TargetControl, count: int = 1)->rd.TargetControlMessage:
    target.TriggerCapture(1)
    msg = None
    start = time.clock()
    while msg is None or msg.type != rd.TargetControlMessageType.NewCapture:
        msg = target.ReceiveMessage(None)
        if time.clock() - start > 30:
            break
    return msg


def KillPingRemoteServer(evt: threading.Event, th: threading.Thread):
    kill.set()
    ping_thread.join()

def ShutdownServerAndConnection(remote_server:rd.RemoteServer):
    '''
    断开远程服务器
    '''
    if not remote_server:
        return

    remote_server.ShutdownServerAndConnection()

文章作者: lyg
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 lyg !
  目录