You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
需要根据1秒的window中的ROWTIME进行排序,但是报如下错误:
Exception in thread "main" org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSort.translateToPlanInternal(StreamExecSort.scala:118)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSort.translateToPlanInternal(StreamExecSort.scala:59)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSort.translateToPlan(StreamExecSort.scala:59)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:91)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
at com.dtstack.flink.sql.exec.FlinkSQLExec.sqlUpdate(FlinkSQLExec.java:94)
at com.dtstack.flink.sql.exec.ExecuteProcessHelper.sqlTranslation(ExecuteProcessHelper.java:235)
at com.dtstack.flink.sql.exec.ExecuteProcessHelper.getStreamExecution(ExecuteProcessHelper.java:169)
at com.dtstack.flink.sql.Main.main(Main.java:41)
at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:140)
我的脚本如下:
CREATE TABLE MyTable(
name varchar,
after varchar,
jstime bigint,
WATERMARK FOR jstime AS withOffset(jstime,1000)
)WITH(
type ='kafka10',
bootstrapServers ='pro1:9092',
kafka.auto.offset.reset ='latest',
topic ='test1',
parallelism ='1',
sourcedatatype ='json'
);
CREATE TABLE result_user_info(
name varchar,
after varchar,
jstime bigint,
ROWTIME datetime
)WITH(
type ='console',
parallelism ='1'
);
insert into result_user_info(name,after,jstime,ROWTIME) (select name ,
after ,
jstime,ROWTIME from MyTable
group by name,after,jstime,TUMBLE(ROWTIME, INTERVAL '1' SECOND),ROWTIME order by ROWTIME)
需要根据1秒的window中的ROWTIME进行排序,但是报如下错误:
Exception in thread "main" org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSort.translateToPlanInternal(StreamExecSort.scala:118)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSort.translateToPlanInternal(StreamExecSort.scala:59)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSort.translateToPlan(StreamExecSort.scala:59)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:91)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
at com.dtstack.flink.sql.exec.FlinkSQLExec.sqlUpdate(FlinkSQLExec.java:94)
at com.dtstack.flink.sql.exec.ExecuteProcessHelper.sqlTranslation(ExecuteProcessHelper.java:235)
at com.dtstack.flink.sql.exec.ExecuteProcessHelper.getStreamExecution(ExecuteProcessHelper.java:169)
at com.dtstack.flink.sql.Main.main(Main.java:41)
at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:140)
我的脚本如下:
CREATE TABLE MyTable(
name varchar,
after varchar,
jstime bigint,
WATERMARK FOR jstime AS withOffset(jstime,1000)
)WITH(
type ='kafka10',
bootstrapServers ='pro1:9092',
kafka.auto.offset.reset ='latest',
topic ='test1',
parallelism ='1',
sourcedatatype ='json'
);
CREATE TABLE result_user_info(
name varchar,
after varchar,
jstime bigint,
ROWTIME datetime
)WITH(
type ='console',
parallelism ='1'
);
insert into result_user_info(name,after,jstime,ROWTIME) (select name ,
after ,
jstime,ROWTIME from MyTable
group by name,after,jstime,TUMBLE(ROWTIME, INTERVAL '1' SECOND),ROWTIME order by ROWTIME)
kafka消息:
{"name":"testname","jstime":3119988231,"after":"{"khh":"我是khh0003"}"}
想请教是否是语法写错了,还是有别的排序方法,多谢
The text was updated successfully, but these errors were encountered: