2016-01-24 47 views
1

我有一个应用程序需要从MemSQL读取数据并加载到DataFrame。我正在使用memsql-spark-connector来连接该通信。但是,不幸的是,我被困在了我试图连接到我的memSQL主节点的地步。我连接到memsql主节点的方式有什么问题? 其实我试图在我的本地机器上使用mySQL客户端来登录到memsql主节点,它工作。所以我想这个问题与服务器端没有关系。使用MemSQL的错误Spark连接器

这里是我的异常堆栈跟踪:

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.) 
java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure 

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.) 
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2294) 
at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:2039) 
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:1533) 
at com.memsql.spark.connector.MemSQLConnectionPool$.connect(MemSQLConnectionPool.scala:34) 
at com.memsql.spark.connector.MemSQLConnectionPool$.withConnection(MemSQLConnectionPool.scala:38) 
at com.memsql.spark.connector.MemSQLCluster$$anonfun$withAggregatorConn$1.apply(MemSQLCluster.scala:26) 
at com.memsql.spark.connector.MemSQLCluster$$anonfun$withAggregatorConn$1.apply(MemSQLCluster.scala:26) 
at org.apache.spark.sql.memsql.MemSQLCatalog.getDBTablePairs(MemSQLCatalog.scala:64) 
at org.apache.spark.sql.memsql.MemSQLCatalog.lookupTable(MemSQLCatalog.scala:58) 
at org.apache.spark.sql.memsql.MemSQLCatalog.lookupRelation(MemSQLCatalog.scala:24) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:279) 
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264) 
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80) 
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 
at scala.collection.immutable.List.foldLeft(List.scala:84) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72) 
at scala.collection.immutable.List.foreach(List.scala:318) 
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) 
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:916) 
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:916) 
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914) 
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132) 
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) 
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725) 
at org.apache.spark.sql.memsql.MemSQLContext.sql(MemSQLContext.scala:48) 
at org.apache.spark.sql.memsql.MemSQLContext.sql(MemSQLContext.scala:39) 
at MemsqlSparkLoader.load(MemsqlSparkLoader.scala:19) 
at MemsqlSparkLoaderTest$$anonfun$1.apply$mcV$sp(MemsqlSparkLoaderTest.scala:20) 
at MemsqlSparkLoaderTest$$anonfun$1.apply(MemsqlSparkLoaderTest.scala:14) 
at MemsqlSparkLoaderTest$$anonfun$1.apply(MemsqlSparkLoaderTest.scala:14) 
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure 

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. 
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) 
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
at java.lang.reflect.Constructor.newInstance(Constructor.java:526) 
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377) 
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1036) 
at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:338) 
at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2232) 
at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2265) 
at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2064) 
at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:790) 
at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:44) 
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) 
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
at java.lang.reflect.Constructor.newInstance(Constructor.java:526) 
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377) 
at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:395) 
at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:325) 
at org.apache.commons.dbcp2.DriverConnectionFactory.createConnection(DriverConnectionFactory.java:39) 
at org.apache.commons.dbcp2.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:256) 
at org.apache.commons.dbcp2.BasicDataSource.validateConnectionFactory(BasicDataSource.java:2304) 
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2290) 
... 126 more 
Caused by: java.net.SocketTimeoutException: connect timed out 
at java.net.PlainSocketImpl.socketConnect(Native Method) 
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:579) 
at com.mysql.jdbc.StandardSocketFactory.connect(StandardSocketFactory.java:213) 
at com.mysql.jdbc.MysqlIO.<init>(MysqlIO.java:297) 
... 142 more  

的代码段的最后一行下面是哪里此异常被抛出:

val masterHost:String = "XXX" 
val masterPort:Int = 3307 
val defaultDBName:String = "mydbtest" 
val user:String = "user" 
val passwd:String = "passwd" 

val query:String = "select * from transactions where block_height <= 1000" 

val conf:SparkConf = new SparkConf().setAppName("MemsqlSparkLoaderTest").setMaster("local") 
conf.set("memsql.host", masterHost) 
conf.set("memsql.port", masterPort.toString) 
conf.set("memsql.defaultDatabase", defaultDBName) 
conf.set("memsql.user", user) 
conf.set("memsql.password", passwd) 

val sc:SparkContext = new SparkContext(conf) 
val msc:MemSQLContext = new MemSQLContext(sc) 

val df = msc.sql(query) 

我的主人的memsql.cnf配置文件节点写成如下:

[server] 
basedir = . 
bind_address = 0.0.0.0 
core_file 
durability = on 
lc_messages_dir = ./share 
lock_wait_timeout = 60 
max_connections = 100000 
plan_expiration_minutes = 180 
redundancy_level = 2 
skip_name_resolve = on 
snapshot_trigger_size = 256m 
socket = memsql.sock 
ssl_cert = /var/lib/memsql/certs/server-cert.pem 
ssl_key = /var/lib/memsql/certs/server-key.pem 
tmpdir = . 
transaction_buffer = 64m 
; ------------------------------------------------------------------------ 
; MEMSQL OPS VARIABLES 
; 
; Variables below this header are controlled by MemSQL Ops. 
; Please do not edit any of these values directly. 
; ------------------------------------------------------------------------ 
master_aggregator 
port = 3307 
+0

您可以发布MemSQL节点设置(memsql.cnf配置) –

+0

@张贴在我的问题的身体莱特 - 猎鹰强制。这是我memsql集群主节点上的memsql.cnf文件。 –

+0

检查:http://stackoverflow.com/questions/6865538/solving-a-communications-link-failure-with-jdbc-and-mysql –

回答

0

看来你的ap折is不能连接到数据库。您的应用程序可能没有网络访问您的数据库,或者防火墙阻止您的应用程序需要使用的某些端口。另外,请尝试使用主IP而不是主主机名。我认为你说“这个问题与服务器端无关”是正确的。

请让我知道,如果这解决了你的问题。 :)

+0

我使用IP而不是主机名本身。但是,当我打'的mysql -h -P -u -p'我可以成功连接到数据库memsql。考虑到这种情况,你是否认为网络访问问题仍然存在?至少有一个例外是与网络有关的。 –

2

我也有同样的问题。

我的MySQL客户端可以从我的Spark主服务器连接到我的数据库服务器,但是我的Spark从服务器无法连接到我的数据库服务器。

我使用了AWS RDS服务。我为ElasticMapReduce-slave向RDS安全组添加了授权,我的问题已解决。