Skip to content

Commit

Permalink
fix: flink job error
Browse files Browse the repository at this point in the history
  • Loading branch information
kalencaya committed Apr 29, 2024
1 parent 0484885 commit 37f0138
Show file tree
Hide file tree
Showing 13 changed files with 404 additions and 343 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package cn.sliew.scaleph.api.config;

import cn.sliew.scaleph.common.util.NetUtils;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.spring.starter.RedissonAutoConfiguration;
import org.redisson.spring.starter.RedissonAutoConfigurationCustomizer;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
Expand All @@ -30,6 +32,7 @@ public class RedissionConfig implements RedissonAutoConfigurationCustomizer {

@Override
public void customize(Config config) {

SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress(NetUtils.replaceLocalhost(singleServerConfig.getAddress()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO;
import cn.sliew.scaleph.common.dict.image.ImagePullPolicy;
import cn.sliew.scaleph.common.util.NetUtils;
import cn.sliew.scaleph.config.kubernetes.resource.ResourceNames;
import cn.sliew.scaleph.config.storage.S3FileSystemProperties;
import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec;
Expand Down Expand Up @@ -163,7 +164,7 @@ private List<EnvVar> buildEnvs() {
if (s3FileSystemProperties != null) {
EnvVarBuilder builder = new EnvVarBuilder();
builder.withName(ENDPOINT);
builder.withValue(MinioUtil.replaceLocalhost(s3FileSystemProperties.getEndpoint()));
builder.withValue(NetUtils.replaceLocalhost(s3FileSystemProperties.getEndpoint()));
return Arrays.asList(builder.build());
}
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO;
import cn.sliew.scaleph.common.dict.flink.FlinkVersion;
import cn.sliew.scaleph.common.util.NetUtils;
import cn.sliew.scaleph.config.kubernetes.resource.ResourceNames;
import cn.sliew.scaleph.config.storage.S3FileSystemProperties;
import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec;
Expand Down Expand Up @@ -70,7 +71,7 @@ private void handlePodTemplate(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builde

private void addFileSystemConfigOption(Map<String, String> flinkConfiguration) {
if (s3FileSystemProperties != null) {
flinkConfiguration.put(S3_ENDPOINT, MinioUtil.replaceLocalhost(s3FileSystemProperties.getEndpoint()));
flinkConfiguration.put(S3_ENDPOINT, NetUtils.replaceLocalhost(s3FileSystemProperties.getEndpoint()));
flinkConfiguration.put(S3_ACCESS_KEY, s3FileSystemProperties.getAccessKey());
flinkConfiguration.put(S3_SECRET_KEY, s3FileSystemProperties.getSecretKey());
flinkConfiguration.put(S3_PATH_STYLE_ACCESS, "true"); // container
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,13 @@

import cn.sliew.milky.common.exception.Rethrower;
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesJobInstanceDeployParam;
import cn.sliew.scaleph.common.dict.flink.FlinkJobState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointFormatType;
import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointTriggerType;
import cn.sliew.scaleph.common.util.UUIDUtil;
import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstance;
import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstanceSavepoint;
import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceMapper;
import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceSavepointMapper;
import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger;
import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.application.flink.operator.spec.JobState;
import cn.sliew.scaleph.application.flink.operator.status.FlinkDeploymentStatus;
import cn.sliew.scaleph.application.flink.operator.status.JobStatus;
import cn.sliew.scaleph.application.flink.operator.status.Savepoint;
import cn.sliew.scaleph.application.flink.operator.status.SavepointInfo;
import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger;
import cn.sliew.scaleph.application.flink.resource.definition.job.instance.FlinkJobInstanceConverterFactory;
import cn.sliew.scaleph.application.flink.resource.definition.job.instance.MetadataHandler;
import cn.sliew.scaleph.application.flink.service.FlinkKubernetesOperatorService;
Expand All @@ -47,10 +37,20 @@
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceSavepointDTO;
import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesJobInstanceDeployParam;
import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesJobInstanceListParam;
import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesJobInstanceSavepointListParam;
import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesJobInstanceShutdownParam;
import cn.sliew.scaleph.application.flink.watch.FlinkDeploymentWatchCallbackHandler;
import cn.sliew.scaleph.common.dict.flink.FlinkJobState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointFormatType;
import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointTriggerType;
import cn.sliew.scaleph.common.util.UUIDUtil;
import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstance;
import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstanceSavepoint;
import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceMapper;
import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceSavepointMapper;
import cn.sliew.scaleph.kubernetes.Constant;
import cn.sliew.scaleph.kubernetes.watch.watch.WatchCallbackHandler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
Expand Down Expand Up @@ -342,16 +342,22 @@ public int updateStatus(Long id, FlinkDeploymentStatus status) {
if (status == null) {
return -1;
}
WsFlinkKubernetesJobInstance record = new WsFlinkKubernetesJobInstance();
record.setId(id);
WsFlinkKubernetesJobInstance record = WsFlinkKubernetesJobInstanceConvert.INSTANCE.toDo(selectOne(id));
record.setState(EnumUtils.getEnum(ResourceLifecycleState.class, status.getLifecycleState().name()));
if (status.getJobStatus() != null) {
JobStatus jobStatus = status.getJobStatus();
if (jobStatus.getState() != null) {
record.setJobState(FlinkJobState.of(jobStatus.getState()));
}
if (record.getJobState() == FlinkJobState.FAILED
|| record.getJobState() == FlinkJobState.CANCELED
|| record.getJobState() == FlinkJobState.FINISHED
|| record.getJobState() == FlinkJobState.SUSPENDED) {
record.setEndTime(new Date(Long.parseLong(jobStatus.getUpdateTime())));
}
if (jobStatus.getStartTime() != null) {
record.setStartTime(new Date(Long.parseLong(jobStatus.getStartTime())));
record.setDuration(Long.parseLong(jobStatus.getUpdateTime()) - Long.parseLong(jobStatus.getStartTime()));
}
}
record.setError(status.getError());
Expand Down Expand Up @@ -396,12 +402,18 @@ private void updateSavepoint(Long id, FlinkDeploymentStatus status) {

@Override
public int clearStatus(Long id) {
WsFlinkKubernetesJobInstance record = new WsFlinkKubernetesJobInstance();
record.setId(id);
WsFlinkKubernetesJobInstance record = WsFlinkKubernetesJobInstanceConvert.INSTANCE.toDo(selectOne(id));
record.setState(null);
record.setError(null);
record.setClusterInfo(null);
record.setTaskManagerInfo(null);
if (record.getStartTime() != null && record.getEndTime() == null && (record.getJobState() == FlinkJobState.FAILED
|| record.getJobState() == FlinkJobState.CANCELED
|| record.getJobState() == FlinkJobState.FINISHED
|| record.getJobState() == FlinkJobState.SUSPENDED)) {
record.setEndTime(new Date());
record.setDuration(System.currentTimeMillis() - record.getStartTime().getTime());
}
return wsFlinkKubernetesJobInstanceMapper.updateById(record);
}
}
Loading

0 comments on commit 37f0138

Please sign in to comment.