#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Copyright (C) 2016-2019 Zhiyang Liu in Software Development of NIO/NEXTEV) All rights reserved.
Author: zhiyang.liu.o@nio.com
Date: 2019-05-20

History:
---------------------------------------------
Modified            Author              Content
"""
import os

from ...config.laputa_config import source_path, run_path

temp_path = source_path + 'test_report/ios'
get_log = True if os.getenv('get_log') else False  # 执行文件为test_run.py时开启抓取手机日志功能
if os.getenv('no_install') is None:
    os.environ['no_install'] = 'true'


class Cmd:

    def get_cmd_return(self, cmd):
        return os.popen(cmd).readlines()

    # 获取设备id
    def get_devices(self, cmd='idevice_id -l'):
        content = self.get_cmd_return(cmd=cmd)
        device_list = list()
        ignore_list = [
            '8056ca675ee0f32cf0bdae6bcbaeda80eb41e688',  # 7
            '00008020-001045490238002E',  # xs
            '08feda8b7b9e76e22a244d8f90de2f9d01e178de'  # 7
        ]
        if 'test_run' in run_path:
            from ..init_project import module_list
        else:
            module_list = None
        for d in content:
            udid = d.split('\n')[0]
            if module_list == ['Build_check']:
                if udid in ignore_list and (os.getenv('no_install') == 'true' or os.getenv('%s_install' % udid) == 'success'):
                    device_list.append(udid)
            elif os.getenv('no_install') == 'true' or os.getenv('%s_install' % udid) == 'success':
                device_list.append(udid)
        if os.getenv('print_cmd') is None:
            print('可用设备列表:', sorted(device_list))
            os.environ['print_cmd'] = '1'
            if len(device_list) is 0:
                raise Exception('无可用设备,请在控制台输入命令「%s」确认连接设备信息' % cmd)
        return sorted(device_list)

    # 获取系统版本
    def get_system_version(self, udid):
        os_system_version = os.popen('ideviceinfo -u %s -k ProductVersion' % udid).readlines()  # 适配ios
        os_system_version = os_system_version[0].split('\n')[0]  # 适配ios
        return os_system_version

    # 获取手机型号
    def get_phone_hw_info(self, udid):
        product_type = os.popen('ideviceinfo -u %s -k ProductType' % udid).readlines()  # 适配ios
        product_type = product_type[0].split('\n')[0]
        return product_type
        # for key in iphone_info:
        #     phone_info_value = iphone_info[key]
        #     phone_type_list = phone_info_value['type']
        #     if product_type in phone_type_list:
        #         return phone_info_value

    # 获取手机名字
    def get_device_name(self, udid):
        device_name = os.popen('ideviceinfo -u %s -k DeviceName' % udid).readlines()  # 适配ios
        device_name = device_name[0].split('\n')[0]
        if os.getenv('print_%s_name' % udid) is None:
            print(udid, 'name', device_name)
            os.environ['print_%s_name' % udid] = '1'
        return device_name

    # 获取手机品牌
    def get_phone_brand(self, udid):
        phone_brand = os.popen('ideviceinfo -u %s -k ProductName' % udid).readlines()
        phone_brand = phone_brand[0].split(' ')[0]
        return phone_brand

    def kill_port(self, port):
        from time import sleep
        t = 0
        while True:
            if os.name == 'nt':
                pid_line = os.popen('netstat -ano|findstr 127.0.0.1:%s' % port).readlines()
                if len(pid_line) > 0 and t < 3:
                    t += 1
                    for i in pid_line:
                        if 'LISTEN' in i:
                            i = i.strip('\n').strip(' ')
                            line = i.split(' ')[-1]
                            os.system('tskill %s' % line)
                else:
                    return
                os.system('taskkill /f /im cmd.exe /fi "windowtitle eq appium"')  # win10 appium结束后遗留窗口
                os.system('taskkill /f /im cmd.exe /fi "windowtitle eq 管理员: appium"')  # win7 appium结束后遗留窗口
            else:
                while True:

                    pid_line = os.popen("lsof -i:%s|grep node | awk '{print $2}'" % port).readline().strip('\n')
                    if len(pid_line) > 0 and t < 3:
                        t += 1
                        os.system('kill -9 %s' % pid_line)
                        sleep(1)
                    else:
                        return


    def create_log_path(self, path):
        """
        创建log存储目录,并将目录保存到环境变量中
        """
        dir_path = os.path.abspath(path + '/log/')
        os.makedirs(dir_path, exist_ok=True)
        os.environ['log_path'] = dir_path

    def start_syslog(self, udid):
        """
        logcat日志临时存放在手机中,测试完成后取出
        """
        if get_log:
            dir_path = os.getenv('log_path')
            print(dir_path)
            syslog_path = dir_path + '/%s.log' % udid
            start_syslog_cmd = 'nohup idevicesyslog -u %s  > %s &' % (udid, syslog_path)
            os.system(start_syslog_cmd)
            return syslog_path

    def end_syslog(self, udid, case_id=None, result=None):
        """
        报告路径从提前写入到手机中,用例结束后将logcat压缩取出更名存入到报告目录下
        :param udid: 通过设备id指定设备执行
        :param case_id: 脚本名
        :return: 返回服务器log文件链接
        """
        if get_log:
            check_pid_command = "ps -le| grep 'idevicesyslog -u %s'|sed -n '1p'|awk '{print $2}'" % udid
            pid_list = os.popen(check_pid_command).readline()
            os.system('kill -9 %s' % pid_list)
            if case_id is None:
                return
            dir_path = os.getenv('log_path') + '/%s.log' % udid
            gzip_command = 'gzip %s' % dir_path
            os.system(gzip_command)
            if os.path.exists(dir_path):
                os.remove(dir_path)
            dir_path_new = os.getenv('log_path')
            os.system('mv {0}/{1}.log.gz {0}/{2}.log.gz'.format(dir_path_new, udid, case_id))
            if result == 'Pass':
                os.remove('%s.log.gz' % (dir_path_new + '/' + case_id))

    def end_all_log(self):
        pid_lines = os.popen('ps -le| grep \'idevicesyslog -u\'').readlines()
        for i in pid_lines:
            if 'grep idevicesyslog' not in i:
                i = i.replace('  ', ' ')
                pid = i.split(' ')[1]
                os.system('kill -9 %s' % pid)

    def clear_cache(self):
        import getpass
        from laputa_auto.util.init_project import module_list
        if module_list == ['Build_check']:
            return
        wda_cache_path = '/Users/%s/Library/Developer/Xcode/DerivedData/WebDriverAgent*' % getpass.getuser()
        os.system('rm -rf %s' % wda_cache_path)
        content = self.get_cmd_return(cmd='idevice_id -l')
        for d in content:
            udid = d.split('\n')[0]
            wda = os.popen('ios-deploy --id %s --list_bundle_id | grep WebDriverAgent' % udid).readline().strip()
            if 'WebDriverAgent' in wda:
                os.system('ios-deploy -i %s -9 -1 %s' % (udid, wda))


if __name__ == "__main__":
    pass
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐