安装oracle后,创建测试表、数据:
- createtabletest(
- usernamevarchar2(32)primarykey,
- passwordvarchar2(32)
- );
- insertintotestvalues('John','1234');
- insertintotestvalues('Mike','1234');
- insertintotestvalues('Jim','1234');
- insertintotestvalues('Ana','1234');
- insertintotestvalues('Ennerson','1234');
- commit;
1、建立JDBC连接读取数据
- SparkConfsparkConf=newSparkConf().setAppName("JavaSparkSQL").setMaster("local[6]");
- JavaSparkContextjsc=newJavaSparkContext(sparkConf);
- SQLContextsqlContext=newSQLContext(jsc);
- Map<String,String>options=newHashMap<String,String>();
- options.put("url","jdbc:oracle:thin:@192.168.168.100:1521/orcl");
- options.put("user","flume");
- options.put("password","1234");
- //读取test表
- options.put("dbtable","test");
- Dataset<Row>df=sqlContext.read().format("jdbc").options(options).load();
- df.show();
- /*+--------+--------+
- |USERNAME|PASSWORD|
- +--------+--------+
- |John|1234|
- |Mike|1234|
- |Jim|1234|
- |Ana|1234|
- |Ennerson|1234|
- +--------+--------+*/
2、遍历Dataset<Row>集合
- //遍历Dataset<Row>集合
- List<Row>list=df.collectAsList();
- //读取test表中username字段的数据
- for(inti=0;i<list.size();i++){
- System.out.println(list.get(i).<String>getAs("USERNAME"));
- }
- /*John
- Mike
- Jim
- Ana
- Ennerson*/
3、执行SQL语句
- //执行sql语句
- //一定要有df.createOrReplaceTempView("test");否则会报
- //“Exceptioninthread"main"org.apache.spark.sql.AnalysisException:Tableorviewnotfound:test;line1pos0”
- df.createOrReplaceTempView("test");
- sqlContext.sql("insertintotestvalues('Obama','6666')");
4、引入spark-sql依赖包
在pom.xml文件中引入sparksql依赖包
- <!--https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>2.1.1</version>
- <scope>runtime</scope>
- </dependency>
1、在Eclipse上运行报Exception in thread "main" java.sql.SQLException: No suitable driver错误:
- Exceptioninthread"main"java.sql.SQLException:Nosuitabledriveratjava.sql.DriverManager.getDriver(DriverManager.java:315)atorg.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$7.apply(JDBCOptions.scala:84)atorg.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$7.apply(JDBCOptions.scala:84)atscala.Option.getOrElse(Option.scala:121)atorg.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:83)atorg.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:34)atorg.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)atorg.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:330)atorg.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)atorg.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)atcom.spark.test.JavaSparkSQL.main(JavaSparkSQL.java:26)
原因是没有引用oracle的jdbc驱动包,配置pom.xml文件如下:
- <!--oraclejdbc驱动-->
- <dependency>
- <groupId>com.oracle</groupId>
- <artifactId>ojdbc5</artifactId>
- <version>11.2.0.1.0</version>
- <scope>runtime</scope>
- </dependency>
由于Oracle授权问题,Maven不提供oracleJDBC driver,为了在Maven项目中应用Oracle JDBC driver,必须手动添加到本地仓库。
具体可以参考:maven添加oracle jdbc依赖
2、在spark集群环境上运行报Exception in thread "main" java.sql.SQLException: No suitable driver错误:
- Exceptioninthread"main"java.sql.SQLException:Nosuitabledriver
- atjava.sql.DriverManager.getDriver(DriverManager.java:315)
- atorg.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$7.apply(JDBCOptions.scala:84)
- atorg.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$7.apply(JDBCOptions.scala:84)
- atscala.Option.getOrElse(Option.scala:121)
- atorg.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:83)
- atorg.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:34)
- atorg.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
- atorg.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:330)
- atorg.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
- atorg.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
- atcom.spark.main.JavaLocalDirectKafkaSparkSQLCarNumber.main(JavaLocalDirectKafkaSparkSQLCarNumber.java:117)
- atsun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethod)
- atsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- atjava.lang.reflect.Method.invoke(Method.java:498)
- atorg.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
- atorg.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
- atorg.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
- atorg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
- atorg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
- cd/usr/local/spark/spark-2.1.1-bin-hadoop2.7;
- bin/spark-submit
- --masterspark://master:7077
- --class"com.spark.main.JavaLocalDirectKafkaSparkSQLCarNumber"
- myApp/test-0.0.1-SNAPSHOT-jar-with-dependencies.jar;
需要在spark集群环境上,指定对应的jdbc驱动包:--driver-class-path myApp/ojdbc5.jar
- cd/usr/local/spark/spark-2.1.1-bin-hadoop2.7;
- bin/spark-submit
- --driver-class-pathmyApp/ojdbc5.jar
- --masterspark://master:7077
- --class"com.spark.main.JavaLocalDirectKafkaSparkSQLCarNumber"
- myApp/test-0.0.1-SNAPSHOT-jar-with-dependencies.jar;
再次运行,成功!!!