写了一个初版,目前线上在跑,目的是记录每次合并的yarn application信息

原理其实就是新起一个线程去轮询yarn日志

核心代码如下:

    /**
     * 执行合并sql
     */
    private String execute(IntegrationMergeLog mergeLog, String sql) {
        // 数据库地址
        Connection conn = null;
        PreparedStatement pstmt = null;
        try {
            Class.forName(hiveDriver);
            conn = DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword);
            pstmt = conn.prepareStatement(sql);
            Thread logThread = new Thread(new LogRunnable((HiveStatement) pstmt, mergeLog));
            logThread.setDaemon(true);
            logThread.start();
            pstmt.execute();
            return "";
        } catch (Exception e) {
            log.error(sql, e);
            return e.getMessage();
        } finally {
            if (pstmt != null) {
                try {
                    pstmt.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }


    /**
     * @author Bob
     * @description 进度信息的轮询线程实现
     * @date 2020/7/23
     */
    static class LogRunnable implements Runnable {
        private final HiveStatement hiveStatement;
        private IntegrationMergeLog mergeLog;

        LogRunnable(HiveStatement hiveStatement, IntegrationMergeLog mergeLog) {
            this.hiveStatement = hiveStatement;
            this.mergeLog = mergeLog;
        }

        private void updateQueryLog() {
            try {
                List<String> queryLogs = hiveStatement.getQueryLog();
                for (String queryLog : queryLogs) {
                    log.info("{}进度信息-->{}", Thread.currentThread().getName(), queryLog);
                    if (queryLog.contains("INFO  : The url to track the job:")) {
                        String job = queryLog.substring(queryLog.indexOf("INFO  : The url to track the job:") + 34);
                        if (mergeLog.getYarnApplication() == null) {
                            mergeLog.setYarnApplication(job);
                        } else if (!mergeLog.getYarnApplication().contains(job)) {
                            mergeLog.setYarnApplication(mergeLog.getYarnApplication() + "\n" + job);
                        }
                    }
                }
            } catch (Exception e) {

            }
        }

        @Override
        public void run() {
            try {
                while (hiveStatement.hasMoreLogs()) {
                    updateQueryLog();
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.getStackTrace();
            }
        }
    }

结果:

2020-07-23 16:48:06:060|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : Query ID = hive_20200723164848_901487c3-d93a-4c65-b556-08870deb809f
2020-07-23 16:48:06:060|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : Total jobs = 1
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : Launching Job 1 out of 1
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : Starting task [Stage-1:MAPRED] in serial mode
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : Number of reduce tasks not specified. Estimated from input data size: 1
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : In order to change the average load for a reducer (in bytes):
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  :   set hive.exec.reducers.bytes.per.reducer=<number>
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : In order to limit the maximum number of reducers:
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  :   set hive.exec.reducers.max=<number>
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : In order to set a constant number of reducers:
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  :   set mapreduce.job.reduces=<number>
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : number of splits:1
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : Submitting tokens for job: job_1595332030231_0061
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : The url to track the job: http://yy-t-bigdata1.xxx.com:8088/proxy/application_1595332030231_0061/
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : Starting Job = job_1595332030231_0061, Tracking URL = http://yy-t-bigdata1.xxx.com:8088/proxy/application_1595332030231_0061/
2020-07-23 16:48:06:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : Kill Command = /opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib/hadoop/bin/hadoop job  -kill job_1595332030231_0061
2020-07-23 16:48:11:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2020-07-23 16:48:11:061|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : 2020-07-23 16:48:33,715 Stage-1 map = 0%,  reduce = 0%
2020-07-23 16:48:21:070|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : 2020-07-23 16:48:40,006 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.75 sec
2020-07-23 16:48:26:074|INFO |Thread-6||com.xxx.dataintegration.tools.service.impl.ViewCreateServiceImpl-277| - Thread-6进度信息-->INFO  : 2020-07-23 16:48:48,353 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 7.67 sec

参考:

https://blog.csdn.net/supperman_009/article/details/77508354

https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-HiveServer2Logging

 
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐