Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Algorithm negotiation fail saving csv to SFTP from Azure Databricks #75

Open
AndyAldersley opened this issue Apr 17, 2020 · 11 comments
Open

Comments

@AndyAldersley
Copy link

I'm receiving the above error when trying to write a csv file to SFTP from Azure Databricks.

Code snippet:
df.write. format("com.springml.spark.sftp"). option("host", "sftp_host"). option("username", "username"). option("password", "pwd"). option("fileType", "csv"). option("tempLocation", "/dbfs/mnt/sandbox/test/"). option("hdfsTempLocation", "/mnt/sandbox/test/"). save("/test.csv")

Which is producing the following error log:
com.jcraft.jsch.JSchException: Algorithm negotiation fail at com.jcraft.jsch.Session.receive_kexinit(Session.java:582) at com.jcraft.jsch.Session.connect(Session.java:320) at com.jcraft.jsch.Session.connect(Session.java:183) at com.springml.sftp.client.SFTPClient.createSFTPChannel(SFTPClient.java:275) at com.springml.sftp.client.SFTPClient.copyToFTP(SFTPClient.java:102) at com.springml.spark.sftp.DefaultSource.upload(DefaultSource.scala:160) at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:126) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:147) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:188) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:184) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:135) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:118) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:116) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:112) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:241) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:171) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:306) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:292) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235) at line8d8b6a0390294c468569f4d9bc480baa67.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2234296783096833:17) at line8d8b6a0390294c468569f4d9bc480baa67.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-2234296783096833:88) at line8d8b6a0390294c468569f4d9bc480baa67.$read$$iw$$iw$$iw$$iw.<init>(command-2234296783096833:90) at line8d8b6a0390294c468569f4d9bc480baa67.$read$$iw$$iw$$iw.<init>(command-2234296783096833:92) at line8d8b6a0390294c468569f4d9bc480baa67.$read$$iw$$iw.<init>(command-2234296783096833:94) at line8d8b6a0390294c468569f4d9bc480baa67.$read$$iw.<init>(command-2234296783096833:96) at line8d8b6a0390294c468569f4d9bc480baa67.$read.<init>(command-2234296783096833:98) at line8d8b6a0390294c468569f4d9bc480baa67.$read$.<init>(command-2234296783096833:102) at line8d8b6a0390294c468569f4d9bc480baa67.$read$.<clinit>(command-2234296783096833) at line8d8b6a0390294c468569f4d9bc480baa67.$eval$.$print$lzycompute(<notebook>:7) at line8d8b6a0390294c468569f4d9bc480baa67.$eval$.$print(<notebook>:6) at line8d8b6a0390294c468569f4d9bc480baa67.$eval.$print(<notebook>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572) at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215) at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:202) at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202) at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202) at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:699) at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:652) at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202) at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:385) at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:362) at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:251) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:246) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49) at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:288) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49) at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:362) at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644) at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644) at scala.util.Try$.apply(Try.scala:192) at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639) at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485) at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597) at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390) at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337) at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219) at java.lang.Thread.run(Thread.java:748)

I followed previous instructions to add the temp and hdfs location options to the write function. Without these I get a NullPointerException. Is this error a bug, or can someone advise? It appears I am using spark-sftp version 1.1.3 as available through the Databricks package installer link to Maven.

@nonsignificantp
Copy link

I've got the same error, tried it on spark local and works flawlessly so I'm guessing is an error on how databricks manage tmps. Has anybody found a workaround?

@nonsignificantp
Copy link

nonsignificantp commented Apr 17, 2020

An update from the previous message: this just have work for me:

 val df = spark.read.format("com.springml.spark.sftp")
  .option("host", "HOST")
  .option("username", "USER")
  .option("password", "PASS")
  .option("tempLocation", "/dbfs/tmp/")
  .option("hdfsTempLocation", "/dbfs/dbfs/tmp/")
  .option("fileType", "csv")
  .option("createDF", "true")
  .load(FILEPATH")

Using Apache Spark 2.4.3, Scala 2.11 and com.springml:spark-sftp_2.11:1.1.3 on Azure Databricks

EDIT:
Ok, after many hours trying different solutions and reading the source code and tracebacks I found out this trick to be a solution for now. before using it make sure to create dbfs:/dbfs/tmp using:

dbutils.fs.mkdirs("dbfs:/dbfs/tmp/")

Main problem seems to be how databricks manages tmp, by creating a folder called /dbfs/ you patch the issue, but have in mind that this IS NOT a real tmp folder, so you probably should delete the content manually using dbutils at the end of the application.

@eyalshafran
Copy link

eyalshafran commented May 7, 2020

@nonsignificantp - I created a tmp folder and modified my code as you suggested (just with write instead of read) and I still got an error.

My code:

data.write.
      format("com.springml.spark.sftp").
      option("host", HOST).
      option("username", USER).
      option("pem", PEM_PATH).
      option("tempLocation", "/dbfs/tmp/").
      option("hdfsTempLocation", "/dbfs/dbfs/tmp/").
      option("fileType", "csv").
      option("createDF", "true").
      save("test.csv")

EDIT:
I'm using Apache Spark 2.4.5, Scala 2.11, com.springml:spark-sftp_2.11:1.1.5 on AWS Databricks

@alexandrugrigore
Copy link

I had the same problem with a custom library, but using the same sftp used here com.jcraft.jsch. The problem is that on the executors, the static block in com.jcraft.jsch.JSch is not initialized correctly, so the static java.util.Hashtable config=new java.util.Hashtable(); needs to be populated on each executor.
On top of that, the classes needed by some encryption algorithms are not available on executors, so the jar needs to be passed to spark-submit using --jars and --conf spark.executor.extraClassPath=

@ezra-at-lumedic
Copy link

ezra-at-lumedic commented Jul 24, 2020

Hi @alexandrugrigore could you share further details. I'm getting the same error and think it has the same cause : missing encryption algorithms on the spark nodes. Tried the temp location fixes suggested by @nonsignificantp but that didn't resolve the issue. Thx!

@ashitabh
Copy link

I had the same problem with a custom library, but using the same sftp used here com.jcraft.jsch. The problem is that on the executors, the static block in com.jcraft.jsch.JSch is not initialized correctly, so the static java.util.Hashtable config=new java.util.Hashtable(); needs to be populated on each executor. On top of that, the classes needed by some encryption algorithms are not available on executors, so the jar needs to be passed to spark-submit using --jars and --conf spark.executor.extraClassPath=

@alexandrugrigore Please let me know which additional jars I need to add

@nova-jj
Copy link

nova-jj commented Jan 15, 2024

January 2024 and this is still an issue within Azure Databricks.

@vijaythamalla
Copy link

did anyone have a solution that works?

@nova-jj
Copy link

nova-jj commented Feb 2, 2024

did anyone have a solution that works?

I've given up on this library once realizing it was only compatible with Spark v2 and our Databricks clusters are now using Spark 3.3+. I don't expect this to ever work moving forward for newer spark projects unless SpringML picks it up again or someone forks and updates it.

@vijaythamalla
Copy link

did anyone have a solution that works?

I've given up on this library once realizing it was only compatible with Spark v2 and our Databricks clusters are now using Spark 3.3+. I don't expect this to ever work moving forward for newer spark projects unless SpringML picks it up again or someone forks and updates it.

Yeah, That's so sad. This is the only library I could find loading the data directly into a dataframe. I was able to use pysftp but hoped this could work. What other library did you choose to sftp from databricks?

@nova-jj
Copy link

nova-jj commented Feb 2, 2024

What other library did you choose to sftp from databricks?

We didn't. Ended up building an ADF Pipeline to sync from our SFTP Storage Container into our data lake. There's a big functionality gap here in my opinion, from the Databricks-SFTP-connectivity piece. Not really DB's problem, but an easy value add if they recognized how much batch interchange still takes place int he world.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants