有些时候,我们需要使用 yarn api来获取应用的相关信息,主要的使用场景包括:

  1. 数据质量管理,比如检查查询主键是否异常、记录数是否异常
  2. 任务运行监控,比如获取任务的执行用户、执行时长、申请资源等等。
  3. 对外开放的统一数据管理平台

这里对自己做的相关测试,做一个简单记录,供后来查阅。

接口类型

hadoop 版本选择 3.0.1,不同的版本,接口查询/返回字段可能不同,详见官方文档。
关于 yarn 应用状态的接口,有两个。

yarn 集群所有应用:ResourceManagerRest

Cluster Information API
Cluster Metrics API
Cluster Scheduler API
Cluster Applications API
Cluster Application Statistics API
Cluster Application API
Cluster Application Attempts API
Cluster Nodes API
Cluster Node API
Cluster Writeable APIs
Cluster New Application API
Cluster Applications API(Submit Application)
Cluster Application State API
Cluster Application Queue API
Cluster Application Priority API
Cluster Delegation Tokens API
Cluster Reservation API List
Cluster Reservation API Create
Cluster Reservation API Submit
Cluster Reservation API Update
Cluster Reservation API Delete
Cluster Application Timeouts API
Cluster Application Timeout API
Cluster Application Timeout Update API
Scheduler Configuration Mutation API

单个nodemanager上的应用:NodeManagerRest

NodeManager Information API
Applications API
Application API
Containers API
Container API

下面简单测试 Cluster Applications API使用,其他接口使用方式都差不多。

一、demo 使用

依赖
<dependency>
     <groupId>org.apache.httpcomponents</groupId>
     <artifactId>httpclient</artifactId>
     <version>4.5.13</version>
 </dependency>
 <dependency>
     <groupId>com.alibaba</groupId>
     <artifactId>fastjson</artifactId>
     <version>1.2.75</version>
 </dependency>
 <dependency>
     <groupId>commons-io</groupId>
     <artifactId>commons-io</artifactId>
     <version>2.8.0</version>
 </dependency>

1. 获取RM上的所有apps

查询时,还可以指定应用的提交用户、在yarn的哪个队列、应用的类型等。

val rsUrl = "http://10.12.0.74:8088/ws/v1/cluster/apps"
val client = HttpClientBuilder.create().build()
val httpGet = new HttpGet(rsUrl)

val response = client.execute(httpGet)
if (HttpStatus.SC_OK == response.getStatusLine.getStatusCode) {
  val entity: HttpEntity = response.getEntity
  val appsStr = IOUtils.toString(entity.getContent, StandardCharsets.UTF_8)
  val appsJson = JSON.parseObject(appsStr)
  // 获取 所有 apps
  val apps: JSONArray = appsJson.getJSONObject("apps").getJSONArray("app")
  apps.toList.foreach(p => {
    val app: JSONObject = JSON.parseObject(p.toString)
           println(app.toJSONString)
  })

结果:

<apps>
<app>
    <id>application_1613812954227_0011</id>
    <user>admin</user>
    <name>-- select 1,"jack",10 sel...bigint(30000)) (Stage-1)</name>
    <queue>root.users.admin</queue>
    <state>FINISHED</state>
    <finalStatus>SUCCEEDED</finalStatus>
    <progress>100.0</progress>
    <trackingUI>History</trackingUI>
    <trackingUrl>http://i-jpbgvaro:8088/proxy/application_1613812954227_0011/</trackingUrl>
    <diagnostics/>
    <clusterId>1613812954227</clusterId>
    <applicationType>MAPREDUCE</applicationType>
    <applicationTags/>
    <priority>0</priority>
    <startedTime>1613957940049</startedTime>
    <launchTime>1613957940672</launchTime>
    <finishedTime>1613957981096</finishedTime>
    <elapsedTime>41047</elapsedTime>
    <amContainerLogs>http://i-22ljqs7v:8042/node/containerlogs/container_1613812954227_0011_01_000001/admin</amContainerLogs>
    <amHostHttpAddress>i-22ljqs7v:8042</amHostHttpAddress>
    <amRPCAddress>i-22ljqs7v:45211</amRPCAddress>
    <allocatedMB>-1</allocatedMB>
    <allocatedVCores>-1</allocatedVCores>
    <reservedMB>-1</reservedMB>
    <reservedVCores>-1</reservedVCores>
    <runningContainers>-1</runningContainers>
    <memorySeconds>81889</memorySeconds>
    <vcoreSeconds>79</vcoreSeconds>
    <queueUsagePercentage>0.0</queueUsagePercentage>
    <clusterUsagePercentage>0.0</clusterUsagePercentage>
    <resourceSecondsMap>
        <entry>
            <key>memory-mb</key>
            <value>81889</value>
        </entry>
        <entry>
            <key>vcores</key>
            <value>79</value>
        </entry>
    </resourceSecondsMap>
    <preemptedResourceMB>0</preemptedResourceMB>
    <preemptedResourceVCores>0</preemptedResourceVCores>
    <numNonAMContainerPreempted>0</numNonAMContainerPreempted>
    <numAMContainerPreempted>0</numAMContainerPreempted>
    <preemptedMemorySeconds>0</preemptedMemorySeconds>
    <preemptedVcoreSeconds>0</preemptedVcoreSeconds>
    <preemptedResourceSecondsMap/>
    <logAggregationStatus>SUCCEEDED</logAggregationStatus>
    <unmanagedApplication>false</unmanagedApplication>
    <amNodeLabelExpression/>
    <timeouts>
        <timeout>
            <type>LIFETIME</type>
            <expiryTime>UNLIMITED</expiryTime>
            <remainingTimeInSeconds>-1</remainingTimeInSeconds>
        </timeout>
    </timeouts>
</app>
</apps>
2.查看应用的日志信息

在yarn现有的api中,涉及到日志接口的字段是 amContainerLogs,不过,需要对 这个url补全一下。
日志有 stdout、stderr、syslog等,这里查看所有日志,选择 syslog
补全 url后缀为 /syslog?start=0

代码示例:

val rsUrl = "http://10.12.0.74:8088/ws/v1/cluster/apps"
val client = HttpClientBuilder.create().build()
val httpGet = new HttpGet(rsUrl)
val response = client.execute(httpGet)
if (HttpStatus.SC_OK == response.getStatusLine.getStatusCode) {
  val entity: HttpEntity = response.getEntity
  val appsStr = IOUtils.toString(entity.getContent, StandardCharsets.UTF_8)
  val appsJson = JSON.parseObject(appsStr)
  // 获取 所有 apps
  val apps: JSONArray = appsJson.getJSONObject("apps").getJSONArray("app")
  val appLogSuffix = "/syslog?start=0"
  val appMap = scala.collection.mutable.HashMap[String, String]()
  apps.toList.foreach(p => {
    val app: JSONObject = JSON.parseObject(p.toString)
    //        println(app.toJSONString)
    val applicationId = app.getString("id")
    val logUrl = app.getString("amContainerLogs") + appLogSuffix
    println("id: " + applicationId + " logUrl: " + logUrl)
    appMap.put(applicationId, logUrl)
  })
  appMap.foreach {
    case (id, url) => {
      val httpGet = new HttpGet(url)
      val res = client.execute(httpGet)
      res.setHeader("Content-Type", "application/json; charset=utf-8")
      val log = IOUtils.toString(res.getEntity.getContent, StandardCharsets.UTF_8)
//          import org.apache.http.util.EntityUtils
//          val responseContent = EntityUtils.toString(res.getEntity,StandardCharsets.UTF_8)
//          EntityUtils.consume(res.getEntity)//Consume response content
      println(log)
//          (id, log)
    }
  }
}
}

获取的日志:

2021-02-20 17:58:01,523 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Created MRAppMaster for application appattempt_1613812954227_0007_000001
2021-02-20 17:58:01,617 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: 
/************************************************************
[system properties]
os.name: Linux
os.version: 3.10.0-862.el7.x86_64
java.home: /usr/java/jdk1.8.0_241/jre
java.runtime.version: 1.8.0_241-b07
java.vendor: Oracle Corporation
java.version: 1.8.0_241
java.vm.name: Java HotSpot(TM) 64-Bit Server VM ...
user.dir: /mnt/data/yarn/nm/usercache/admin/appcache/application_1613812954227_0010/container_1613812954227_0010_01_000001
user.name: yarn
************************************************************/
...

二、未完成的工作

  1. 解析日志,获取我们想要的内容,比如说 user.name等。
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐