java实战】虚拟机执行cmd命令并且通过线程读取执行记录

一、使用ProcessBuilder执行cmd命令

使用ProcessBuilder来执行cmd命令

进行声明:

ProcessBuilder pb = new ProcessBuilder(cmds);

cmds代表需要执行的命令行信息

之后执行开始命令:

Process p = pb.start();

之后确认命令是否执行成功:

int exitValue = p.waitFor();

通过判断exitValue来进行确定是否执行成功

exitValue > 0 则执行命令错误

二、使用线程在cmd命令执行过程中监听执行记录
Future<String> errorFuture = executor.submit(new ReadTask(type,clusterId, p.getErrorStream(),expandId));
Future<String> resFuture = executor.submit(new ReadTask(type,clusterId, p.getInputStream(),expandId));
三、完整代码
package com.awifi.cloudnative.container.kubetelecom.provider.service.impl;

import com.alibaba.fastjson.JSON;
import com.awifi.cloudnative.container.common.basic.ResponseJson;
import com.awifi.cloudnative.container.kubetelecom.provider.entity.vo.ClusterInitLogVO;
import com.awifi.cloudnative.container.kubetelecom.provider.entity.vo.ClusterInitLogVO.Message;
import com.awifi.cloudnative.container.kubetelecom.provider.service.CommandService;
import com.awifi.cloudnative.container.kubetelecom.provider.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


/**
* 功能:
* 版权所有: 
*
* @author: 
* @date: 2022/1/6
* 文件名称: CommandServiceImpl.java
* 版本: v1.0
* 修改记录:
*/
@Service
@Slf4j
public class CommandServiceImpl implements CommandService, InitializingBean {

   @Value("${cmd.threadname:cmd-executor}")
   private String threadName;

   @Value("${cmd.taskQueueMaxStorage:20}")
   private Integer taskQueueMaxStorage;

   @Value("${cmd.corePoolSize:4}")
   private Integer corePoolSize;

   @Value("${cmd.maximumPoolSize:8}")
   private Integer maximumPoolSize;

   @Value("${cmd.keepAliveSeconds:15}")
   private Integer keepAliveSeconds;

   @Autowired
   private MessageService messageService;

   private ThreadPoolExecutor executor;

   private static final String BASH = "sh";
   private static final String BASH_PARAM = "-c";

   @Override
   public ResponseJson executeCmd(String clusterId, String cmd, String type, String expandId) {
       ResponseJson result = new ResponseJson();
       Process p = null;
       String res;
       log.debug("命令行信息 : {}", cmd);
       try {
           // need to pass command as bash's param,
           // so that we can compatible with commands: "echo a >> b.txt" or "bash a && bash b"
           List<String> cmds = new ArrayList<>();
           cmds.add(BASH);
           cmds.add(BASH_PARAM);
           cmds.add(cmd);
           // 定义cmd命令执行
           ProcessBuilder pb = new ProcessBuilder(cmds);
           p = pb.start();
			
           // 监听错误
           Future<String> errorFuture = executor.submit(new ReadTask(type,clusterId, p.getErrorStream(),expandId));
           // 监听成功
           Future<String> resFuture = executor.submit(new ReadTask(type,clusterId, p.getInputStream(),expandId));
           int exitValue = p.waitFor();
           if (exitValue > 0) {
               log.info("执行cmd命令错误: {} ", errorFuture.get());
               res = errorFuture.get();
               result.setErrorCode("-1");
//                throw new RuntimeException(res);
           } else {
               log.info("执行cmd命令成功 ");
               res = resFuture.get();
               result.setErrorCode(ResponseJson.EnumCode.CODE_OK.getCode());
           }
       } catch (Exception e) {
           log.info("执行cmd命令错误: {} ", e.getMessage());
           res = e.getMessage();
           result.setErrorCode("-1");
//            throw new RuntimeException(res);
       } finally {
           if (p != null) {
               p.destroy();
           }
       }
       // remove System.lineSeparator() (actually it's '\n') in the end of res if exists
       if (StringUtils.isNotBlank(res) && res.endsWith(System.lineSeparator())) {
           res = res.substring(0, res.lastIndexOf(System.lineSeparator()));
       }
       result.setErrorMsg(res);
       return result;
   }

   // 使用线程池读取数据流
   @Override
   public void afterPropertiesSet() {
       // 定义线程池
       executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveSeconds, TimeUnit.SECONDS,
               new ArrayBlockingQueue<Runnable>(taskQueueMaxStorage),
               new ThreadFactory() {
                   @Override
                   public Thread newThread(Runnable r) {
                       return new Thread(r, threadName + r.hashCode());
                   }
               },
               new ThreadPoolExecutor.AbortPolicy());
   }

   // 定义线程
   class ReadTask implements Callable<String> {
       String clusterId;
       InputStream is;
       String type;
       String expandId;
		
       // 定义线程读取任务,传入参数
       ReadTask(String type,String clusterId, InputStream is,String expandId) {
           this.clusterId = clusterId;
           this.is = is;
           this.type = type;
           this.expandId = expandId;
       }

       @Override
       public String call() throws Exception {
           BufferedReader br = new BufferedReader(new InputStreamReader(is));
           String finalLine = "";
           String line;
           while ((line = br.readLine()) != null) {
               log.info(line);
               // 将输出信息放入队列(kafka)
               saveLogs(clusterId,type, line, false, false,expandId);
               finalLine = line;
           }
           return finalLine;
       }
   }

   @Override
   public void saveLogs(String clusterId,String type, String log, Boolean isRancherLog, Boolean isEnd , String expandId) {
       // 构造存储数据结构
       Message message = new Message();
       message.setTimestamp(System.currentTimeMillis());
       message.setLog(log);

       ClusterInitLogVO clusterInitLog = new ClusterInitLogVO();
       clusterInitLog.setClusterId(clusterId);
       clusterInitLog.setMessage(message);
       clusterInitLog.setIsEnd(isEnd);
       clusterInitLog.setType(type);
       clusterInitLog.setExpandId(expandId);
       clusterInitLog.setIsRancherLog(isRancherLog);
       // 发送kafka
       messageService.sendLog(JSON.toJSONString(clusterInitLog));
   }

}

点击阅读全文
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐