【java实战】虚拟机执行cmd命令并且通过线程读取执行记录
【java实战】虚拟机执行cmd命令并且通过线程读取执行记录一、使用ProcessBuilder执行cmd命令使用ProcessBuilder来执行cmd命令进行声明:ProcessBuilder pb = new ProcessBuilder(cmds);cmds代表需要执行的命令行信息之后执行开始命令:Process p = pb.start();之后确认命令是否执行成功:int exitVa
·
【java实战】虚拟机执行cmd命令并且通过线程读取执行记录
一、使用ProcessBuilder执行cmd命令
使用ProcessBuilder来执行cmd命令
进行声明:
ProcessBuilder pb = new ProcessBuilder(cmds);cmds代表需要执行的命令行信息
之后执行开始命令:
Process p = pb.start();之后确认命令是否执行成功:
int exitValue = p.waitFor();通过判断exitValue来进行确定是否执行成功
exitValue > 0 则执行命令错误
二、使用线程在cmd命令执行过程中监听执行记录
Future<String> errorFuture = executor.submit(new ReadTask(type,clusterId, p.getErrorStream(),expandId)); Future<String> resFuture = executor.submit(new ReadTask(type,clusterId, p.getInputStream(),expandId));
三、完整代码
package com.awifi.cloudnative.container.kubetelecom.provider.service.impl; import com.alibaba.fastjson.JSON; import com.awifi.cloudnative.container.common.basic.ResponseJson; import com.awifi.cloudnative.container.kubetelecom.provider.entity.vo.ClusterInitLogVO; import com.awifi.cloudnative.container.kubetelecom.provider.entity.vo.ClusterInitLogVO.Message; import com.awifi.cloudnative.container.kubetelecom.provider.service.CommandService; import com.awifi.cloudnative.container.kubetelecom.provider.service.MessageService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 功能: * 版权所有: * * @author: * @date: 2022/1/6 * 文件名称: CommandServiceImpl.java * 版本: v1.0 * 修改记录: */ @Service @Slf4j public class CommandServiceImpl implements CommandService, InitializingBean { @Value("${cmd.threadname:cmd-executor}") private String threadName; @Value("${cmd.taskQueueMaxStorage:20}") private Integer taskQueueMaxStorage; @Value("${cmd.corePoolSize:4}") private Integer corePoolSize; @Value("${cmd.maximumPoolSize:8}") private Integer maximumPoolSize; @Value("${cmd.keepAliveSeconds:15}") private Integer keepAliveSeconds; @Autowired private MessageService messageService; private ThreadPoolExecutor executor; private static final String BASH = "sh"; private static final String BASH_PARAM = "-c"; @Override public ResponseJson executeCmd(String clusterId, String cmd, String type, String expandId) { ResponseJson result = new ResponseJson(); Process p = null; String res; log.debug("命令行信息 : {}", cmd); try { // need to pass command as bash's param, // so that we can compatible with commands: "echo a >> b.txt" or "bash a && bash b" List<String> cmds = new ArrayList<>(); cmds.add(BASH); cmds.add(BASH_PARAM); cmds.add(cmd); // 定义cmd命令执行 ProcessBuilder pb = new ProcessBuilder(cmds); p = pb.start(); // 监听错误 Future<String> errorFuture = executor.submit(new ReadTask(type,clusterId, p.getErrorStream(),expandId)); // 监听成功 Future<String> resFuture = executor.submit(new ReadTask(type,clusterId, p.getInputStream(),expandId)); int exitValue = p.waitFor(); if (exitValue > 0) { log.info("执行cmd命令错误: {} ", errorFuture.get()); res = errorFuture.get(); result.setErrorCode("-1"); // throw new RuntimeException(res); } else { log.info("执行cmd命令成功 "); res = resFuture.get(); result.setErrorCode(ResponseJson.EnumCode.CODE_OK.getCode()); } } catch (Exception e) { log.info("执行cmd命令错误: {} ", e.getMessage()); res = e.getMessage(); result.setErrorCode("-1"); // throw new RuntimeException(res); } finally { if (p != null) { p.destroy(); } } // remove System.lineSeparator() (actually it's '\n') in the end of res if exists if (StringUtils.isNotBlank(res) && res.endsWith(System.lineSeparator())) { res = res.substring(0, res.lastIndexOf(System.lineSeparator())); } result.setErrorMsg(res); return result; } // 使用线程池读取数据流 @Override public void afterPropertiesSet() { // 定义线程池 executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveSeconds, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(taskQueueMaxStorage), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, threadName + r.hashCode()); } }, new ThreadPoolExecutor.AbortPolicy()); } // 定义线程 class ReadTask implements Callable<String> { String clusterId; InputStream is; String type; String expandId; // 定义线程读取任务,传入参数 ReadTask(String type,String clusterId, InputStream is,String expandId) { this.clusterId = clusterId; this.is = is; this.type = type; this.expandId = expandId; } @Override public String call() throws Exception { BufferedReader br = new BufferedReader(new InputStreamReader(is)); String finalLine = ""; String line; while ((line = br.readLine()) != null) { log.info(line); // 将输出信息放入队列(kafka) saveLogs(clusterId,type, line, false, false,expandId); finalLine = line; } return finalLine; } } @Override public void saveLogs(String clusterId,String type, String log, Boolean isRancherLog, Boolean isEnd , String expandId) { // 构造存储数据结构 Message message = new Message(); message.setTimestamp(System.currentTimeMillis()); message.setLog(log); ClusterInitLogVO clusterInitLog = new ClusterInitLogVO(); clusterInitLog.setClusterId(clusterId); clusterInitLog.setMessage(message); clusterInitLog.setIsEnd(isEnd); clusterInitLog.setType(type); clusterInitLog.setExpandId(expandId); clusterInitLog.setIsRancherLog(isRancherLog); // 发送kafka messageService.sendLog(JSON.toJSONString(clusterInitLog)); } }
更多推荐


所有评论(0)