-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue Stream #1
Comments
Could you provide a list of dependencies that work? After I tried to install latest libraries: When tried to install org.camunda.bpm.extension.feel.scala:feel-engine-factory:1.10.1 After that I had an error below that points to fundamental Scala package class: Error message: |
Hi, @eyankovsky If you are executing this code in a scenario with more than one node, I think the You might try the following:
Regarding the issue with Finally, regarding the list of dependencies what work, I use the ones within the <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.camunda.bpm.extension.feel.scala/feel-engine -->
<dependency>
<groupId>org.camunda.bpm.extension.feel.scala</groupId>
<artifactId>feel-engine</artifactId>
<version>1.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.camunda.bpm.extension.feel.scala/feel-engine-factory -->
<dependency>
<groupId>org.camunda.bpm.extension.feel.scala</groupId>
<artifactId>feel-engine-factory</artifactId>
<version>1.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.camunda.bpm.dmn/camunda-engine-dmn -->
<dependency>
<groupId>org.camunda.bpm.dmn</groupId>
<artifactId>camunda-engine-dmn</artifactId>
<version>7.14.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.camunda.bpm/camunda-engine-plugin-spin -->
<dependency>
<groupId>org.camunda.bpm</groupId>
<artifactId>camunda-engine-plugin-spin</artifactId>
<version>7.14.0</version>
</dependency> Hope this helps you. |
Alvaro, thank you for your reply. I followed up on your suggestions and found the next:
These challenges with older and newer packages incompatibilities resulted in the error in the 1st importing package step: Error message while installing library org.camunda.bpm:camunda-engine-plugins:7.14.0 Library resolution failed. Cause: java.lang.RuntimeException: org.camunda.bpm:camunda-engine-plugins download failed. |
Hi,
We are trying to implement dmn4spark inn our Azure Databricks workspaces and have the challenges even with simple decision rule applied to one string field. We put the engine and all yoiur codes inside of the the databricks notebook and made it run without compilation errors. However, the execution error " NullPointerException" with Stream(Decision_Reporting, ?) below suggest that the code can't read the input data properly and generate null. Could you share any simple end-to - end example or hint on how solving the errors with streaming the input data? Thank you in advance, Eugene
#####################################################################
ReportingUnit "1" true ############################################################################CTBRules_New.dmn
databricks-logoDMN_Test_KS_EY(Scala) Import Notebook
package es.us.idea.dmn4spark.dmn.engine
import org.camunda.bpm.dmn.feel.impl.FeelException
import org.camunda.bpm.engine.variable.context.VariableContext
import org.camunda.feel.integration.CamundaValueMapper
import org.camunda.feel.interpreter.{RootContext, ValueMapper, VariableProvider}
import org.camunda.feel.spi.SpiServiceLoader
/**
class SafeCamundaFeelEngine extends org.camunda.bpm.dmn.feel.impl.FeelEngine {
private lazy val engine =
new org.camunda.feel.FeelEngine(
valueMapper = new CamundaValueMapper,
functionProvider = SpiServiceLoader.loadFunctionProvider
)
private def asVariableProvider(ctx: VariableContext,
valueMapper: ValueMapper): VariableProvider = (name: String) => {
if (ctx.containsVariable(name)) {
Some(valueMapper.toVal(ctx.resolve(name).getValue))
} else {
None
}
}
override def evaluateSimpleExpression[T](expression: String,
ctx: VariableContext): T = {
val context = new RootContext(
variableProvider = asVariableProvider(ctx, engine.valueMapper))
engine.evalExpression(expression, context) match {
case Right(value) => value.asInstanceOf[T]
case Left(failure) => throw new FeelException(failure.message)
}
}
override def evaluateSimpleUnaryTests(expression: String,
inputVariable: String,
ctx: VariableContext): Boolean = {
val context = new RootContext(
Map(RootContext.inputVariableKey -> inputVariable),
variableProvider = asVariableProvider(ctx, engine.valueMapper))
}
}
Warning: classes defined within packages cannot be redefined without a cluster restart.
Compilation successful.
package es.us.idea.dmn4spark.dmn.engine
import org.camunda.bpm.dmn.feel.impl.FeelEngine
import org.camunda.feel.integration.CamundaFeelEngineFactory
class SafeCamundaFeelEngineFactory extends CamundaFeelEngineFactory{
override def createInstance(): FeelEngine = new SafeCamundaFeelEngine
}
Warning: classes defined within packages cannot be redefined without a cluster restart.
Compilation successful.
package es.us.idea.dmn4spark.dmn
import java.io.ByteArrayInputStream
import es.us.idea.dmn4spark.dmn.engine.SafeCamundaFeelEngineFactory
import org.camunda.bpm.dmn.engine.impl.DefaultDmnEngineConfiguration
import org.camunda.bpm.dmn.engine.{DmnDecision, DmnEngine, DmnEngineConfiguration}
import org.camunda.bpm.model.dmn.{Dmn, DmnModelInstance}
import scala.collection.JavaConverters
class DMNExecutor(input: Array[Byte], selectedDecisions: Seq[String]) extends Serializable{
/*
*/
/**
@transient lazy val dmnEngine: DmnEngine = {
val dmnEngineConfig: DefaultDmnEngineConfiguration = DmnEngineConfiguration.createDefaultDmnEngineConfiguration.asInstanceOf[DefaultDmnEngineConfiguration]
dmnEngineConfig.setFeelEngineFactory(new SafeCamundaFeelEngineFactory)
dmnEngineConfig.setDefaultOutputEntryExpressionLanguage("feel")
dmnEngineConfig.buildEngine
}
/**
@transient lazy val dmnModelInstance: DmnModelInstance = Dmn.readModelFromStream(new ByteArrayInputStream(input))
@transient lazy val decisions: Seq[DmnDecision] = JavaConverters.collectionAsScalaIterableConverter(dmnEngine.parseDecisions(dmnModelInstance)).asScala.toSeq
@transient lazy val decisionKeys: Seq[String] = decisions.map(_.getKey)
def getDecisionsResults(map: java.util.HashMap[String, AnyRef]): Seq[String] = {
decisions.map(d => dmnEngine.evaluateDecisionTable(d, map).getFirstResult.getEntry(d.getKey).toString)
}
}
Warning: classes defined within packages cannot be redefined without a cluster restart.
Compilation successful.
package es.us.idea.dmn4spark.spark
import org.apache.spark.sql.types.{DataType, DataTypes}
object Utils {
case class ColumnInfo(name: String, dataTypeWithAssignedValue: DataTypeWithAssignedValue)
case class DataTypeWithAssignedValue(dataType: DataType, establishedValue: Option[Boolean] = None) extends Serializable
def createStructType(fieldNames: Seq[String]) = {
DataTypes.createStructType(fieldNames.map(DataTypes.createStructField(_, DataTypes.StringType, true)).toArray)
}
/**
*
* @param str string containing the part of the rule consequence which includes the second argument of the
* insertResult function
* @return the data type of the second argument of the inserResult function and, if it is Boolean, its associated
* value
/
def inferSecondArgumentType(str: String): DataTypeWithAssignedValue = {
str.split(')').headOption match {
case Some(x) =>
if(x.contains(""")) DataTypeWithAssignedValue(DataTypes.StringType)
else {
val candidate = x.split("\s+").mkString//.asInstanceOf[Any]
if(candidate == "true") DataTypeWithAssignedValue(DataTypes.BooleanType, Some(true))
else if(candidate == "false") DataTypeWithAssignedValue(DataTypes.BooleanType, Some(false))
else if(candidate.matches("^(\d)$")) DataTypeWithAssignedValue(DataTypes.IntegerType)
else if(candidate.matches("^(\d*)L$")) DataTypeWithAssignedValue(DataTypes.LongType)
else if(candidate.matches("^(\d+\.\d*|\.?\d+)$")) DataTypeWithAssignedValue(DataTypes.DoubleType)
else if(candidate.matches("^(\d+\.\d*|\.?\d+)f$")) DataTypeWithAssignedValue(DataTypes.FloatType)
else DataTypeWithAssignedValue(DataTypes.StringType)
}
}
def unwrap(a: Any): Any = {
a match {
case Some(x) => unwrap(x)
case null => None
case _ => a
}
}
}
Warning: classes defined within packages cannot be redefined without a cluster restart.
Compilation successful.
package es.us.idea.dmn4spark.spark
import java.util
import es.us.idea.dmn4spark.spark.Utils.ColumnInfo
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import scala.collection.JavaConverters._
import scala.collection.mutable
object SparkDataConversor {
def spark2javamap(row: Row, dsSchema: Option[org.apache.spark.sql.types.StructType] = None): java.util.HashMap[String, AnyRef] = {
}
def javamap2Spark(map: java.util.Map[String, AnyRef], columnsInfo: List[ColumnInfo]) = {
columnsInfo.map(ci => {
val value = Utils.unwrap(map.getOrDefault(ci.name, None))
value match {
case None => None
case x => Some(x.toString)
}
}
}
Warning: classes defined within packages cannot be redefined without a cluster restart.
Compilation successful.
package es.us.idea.dmn4spark.dmn
import java.io.{File, FileInputStream, IOException, InputStream}
import java.net.{MalformedURLException, URI}
import es.us.idea.dmn4spark.spark.{SparkDataConversor, Utils}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.spark.sql.api.java.{UDF0, UDF1}
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
class DMNSparkEngine(df: DataFrame, selectedDecisions: Seq[String] = Seq()) {
def setDecisions(decisions: String*): DMNSparkEngine = new DMNSparkEngine(df, decisions)
def loadFromLocalPath(path:String) = execute(IOUtils.toByteArray(new FileInputStream(path)))
def loadFromHDFS(uri: String, configuration: Configuration = new Configuration()) = {
}
def loadFromURL(url: String) = {
val content = Source.fromURL(url)
val bytes = content.mkString.getBytes
execute(bytes)
}
def loadFromInputStream(is: InputStream) = execute(IOUtils.toByteArray(is))
private def execute(input: Array[Byte]) = {
val tempColumn = s"__${System.currentTimeMillis().toHexString}"
}
}
Warning: classes defined within packages cannot be redefined without a cluster restart.
Compilation successful.
package es.us.idea.dmn4spark
import es.us.idea.dmn4spark.dmn.DMNSparkEngine
import org.apache.spark.sql.DataFrame
object implicits {
implicit class DMN4Spark(df: DataFrame) {
def dmn: DMNSparkEngine = new DMNSparkEngine(df)
}
}
Warning: classes defined within packages cannot be redefined without a cluster restart.
Compilation successful.
import es.us.idea.dmn4spark.implicits._
import org.apache.spark.sql.functions._
val df = sqlContext.read.format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "true")
.load("/FileStore/tables/TrialBalance2.csv")
.limit(1)
.select("ReportingUnit")
/* val df1 = df.withColumn("Decision", lit(null: String)) */
import es.us.idea.dmn4spark.implicits._
import org.apache.spark.sql.functions._
df: org.apache.spark.sql.DataFrame = [ReportingUnit: string]
df.show()
+-------------+
|ReportingUnit|
+-------------+
| 1|
+-------------+
val dmn_output = df.dmn
.setDecisions("Decision_ReportingUnit")
.loadFromLocalPath("/dbfs/FileStore/tables/CTBRules_New.dmn")
/* .loadFromLocalPath("/dbfs/FileStore/tables/CTBRules_New-1.dmn") */
dmn_output: org.apache.spark.sql.DataFrame = [ReportingUnit: string, Decision_ReportingUnit: string]
display(dmn_output)
/dmn_output.show()/
SparkException: Failed to execute user defined function(functions$$$Lambda$5872/1957788276: (structReportingUnit:string) => struct<Decision_ReportingUnit:string>)
Caused by: NullPointerException:
dmn_output.write.mode("overwrite").parquet("dbfs:/user/hive/warehouse/dmn_output_pqt")
Command skipped
val data = sqlContext.read.parquet("dbfs:/user/hive/warehouse/dmn_output_pqt")
display(data)
Command skipped
The text was updated successfully, but these errors were encountered: