初次接触到大数据相关的信息,还有点不适应,通过不断搜索和尝试解决了Spark读取Hive表时出现的异常,错误如下:

Caused by:java.lang.RuntimeException:org.apache.spark.sql.AnalysisException: cannot resolve '`id`' given input columns:[demo.demo.id,demo.demo.name,demo.demo.birthday];
'Project ['id,'name,'age]
+- SubqueryAlias demo
 +-Relation[demo.id#0,demo.name#1,demo.birthday#2]JDBCRelation(demo)[numPartintions=1]
 ....

出现错误的代码如下:

	public static viod main(String[] args){
		String url = ""
		String username = "";
		String password = "";
		String driver = "";
		String path = "path";
		List<String> tables = Arrays.asList("demo");
		String sql = “select id,name,birthday from demo”;
		SparkSession spark = SparkSession.builder().appName("hiveDb").getOrCreate();
		Properties properties = new Properties();
		properties.setProperty("user",username);
		properties.setProperty("password",password);
		for (String table : tables){
			Dataset<Row> dataset = sparkSession.read().option("driver",driver).jdbc(url, table, properties);
			dataset.createOrReplaceTempView(table);
		}
		//调用sql(sql)时出错	
		sparkSession.sql(sql).write().option("header","true").mode("Append").csv(path);
	} 

观察错误,发现列是‘demo.demo.id’,感到懵逼,不知道怎么来的这个东西,所以用spark-shell进行测试,列名称是怎么来的:

scala>spark.read.jdbc(hiveUrl, "demo", connectionProperties).show()//这里的connectionProperties同上面代码的properties一样

执行以上代码返回结果:

+-------+---------+-------------+
|demo.id|demo.name|demo.birthday|
+-------+---------+-------------+
+-------+---------+-------------+

通过观察打印结果的表头发现,读取hive表的结果列是由表名称和列名称组合而成,而我们的临时视图也是跟表名称一样,所以才会导致以上错误中出现‘demo.demo.id’的列出现。

怎么解决这个问题呢,只需要对以上代码做调整:

public static viod main(String[] args){
		String url = ""
		String username = "";
		String password = "";
		String driver = "";
		String path = "path";
		List<String> tables = Arrays.asList("demo");
		String sql = “select id,name,birthday from demo”;
		SparkSession spark = SparkSession.builder().appName("hiveDb").getOrCreate();
		Properties properties = new Properties();
		properties.setProperty("user",username);
		properties.setProperty("password",password);
		for (String table : tables){
			Dataset<Row> dataset = sparkSession.read().option("driver",driver).jdbc(url, table, properties);
			//-----------------新增代码 开始------------------ 
			if(url.toLowerCase().startsWith("jdbc:hive2")){
				for(String column : dataset.columns)){
					String[] arr = column.split("\\.");
					dataset = dataset.withColumnRenamed(column, split[1]);
				}
			}
			//-----------------新增代码 结束------------------
			dataset.createOrReplaceTempView(table);
		}
		//调用sql(sql)时出错	
		sparkSession.sql(sql).write().option("header","true").mode("Append").csv(path);
	} 
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐