From 37f0138e62fe98b7968c70c1b59fbbbd9d4465e0 Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Mon, 29 Apr 2024 19:29:43 +0800 Subject: [PATCH] fix: flink job error --- .../scaleph/api/config/RedissionConfig.java | 5 +- .../resource/handler/FileFetcherHandler.java | 3 +- .../handler/FileSystemPluginHandler.java | 3 +- .../flink/resource/handler/MinioUtil.java | 104 ------- ...FlinkKubernetesJobInstanceServiceImpl.java | 42 ++- .../sliew/scaleph/common/util/NetUtils.java | 281 ++++++++++++++++++ .../scaleph/ds/gravitino/GravitinoConfig.java | 5 +- .../file/fetcher/config/RedissionConfig.java | 38 +++ .../system/snowflake/utils/NetUtils.java | 217 -------------- .../worker/DisposableWorkerIdAssigner.java | 2 +- scaleph-ui-react/src/constants/enum.ts | 25 ++ .../src/locales/zh-CN/pages/project.ts | 1 + .../Engine/Compute/Flink/Job/Detail/index.tsx | 21 +- 13 files changed, 404 insertions(+), 343 deletions(-) delete mode 100644 scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/MinioUtil.java create mode 100644 scaleph-common/src/main/java/cn/sliew/scaleph/common/util/NetUtils.java create mode 100644 scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/config/RedissionConfig.java delete mode 100644 scaleph-support/scaleph-system/src/main/java/cn/sliew/scaleph/system/snowflake/utils/NetUtils.java diff --git a/scaleph-api/src/main/java/cn/sliew/scaleph/api/config/RedissionConfig.java b/scaleph-api/src/main/java/cn/sliew/scaleph/api/config/RedissionConfig.java index b30db0cbe..b9c61e2b2 100644 --- a/scaleph-api/src/main/java/cn/sliew/scaleph/api/config/RedissionConfig.java +++ b/scaleph-api/src/main/java/cn/sliew/scaleph/api/config/RedissionConfig.java @@ -18,7 +18,9 @@ package cn.sliew.scaleph.api.config; +import cn.sliew.scaleph.common.util.NetUtils; import org.redisson.config.Config; +import org.redisson.config.SingleServerConfig; import org.redisson.spring.starter.RedissonAutoConfiguration; import org.redisson.spring.starter.RedissonAutoConfigurationCustomizer; import org.springframework.boot.autoconfigure.AutoConfigureBefore; @@ -30,6 +32,7 @@ public class RedissionConfig implements RedissonAutoConfigurationCustomizer { @Override public void customize(Config config) { - + SingleServerConfig singleServerConfig = config.useSingleServer(); + singleServerConfig.setAddress(NetUtils.replaceLocalhost(singleServerConfig.getAddress())); } } diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FileFetcherHandler.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FileFetcherHandler.java index 6bbff0d47..4ed01930a 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FileFetcherHandler.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FileFetcherHandler.java @@ -21,6 +21,7 @@ import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO; import cn.sliew.scaleph.common.dict.image.ImagePullPolicy; +import cn.sliew.scaleph.common.util.NetUtils; import cn.sliew.scaleph.config.kubernetes.resource.ResourceNames; import cn.sliew.scaleph.config.storage.S3FileSystemProperties; import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec; @@ -163,7 +164,7 @@ private List buildEnvs() { if (s3FileSystemProperties != null) { EnvVarBuilder builder = new EnvVarBuilder(); builder.withName(ENDPOINT); - builder.withValue(MinioUtil.replaceLocalhost(s3FileSystemProperties.getEndpoint())); + builder.withValue(NetUtils.replaceLocalhost(s3FileSystemProperties.getEndpoint())); return Arrays.asList(builder.build()); } return Collections.emptyList(); diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FileSystemPluginHandler.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FileSystemPluginHandler.java index c26fa9450..446218c09 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FileSystemPluginHandler.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/FileSystemPluginHandler.java @@ -20,6 +20,7 @@ import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO; import cn.sliew.scaleph.common.dict.flink.FlinkVersion; +import cn.sliew.scaleph.common.util.NetUtils; import cn.sliew.scaleph.config.kubernetes.resource.ResourceNames; import cn.sliew.scaleph.config.storage.S3FileSystemProperties; import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec; @@ -70,7 +71,7 @@ private void handlePodTemplate(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builde private void addFileSystemConfigOption(Map flinkConfiguration) { if (s3FileSystemProperties != null) { - flinkConfiguration.put(S3_ENDPOINT, MinioUtil.replaceLocalhost(s3FileSystemProperties.getEndpoint())); + flinkConfiguration.put(S3_ENDPOINT, NetUtils.replaceLocalhost(s3FileSystemProperties.getEndpoint())); flinkConfiguration.put(S3_ACCESS_KEY, s3FileSystemProperties.getAccessKey()); flinkConfiguration.put(S3_SECRET_KEY, s3FileSystemProperties.getSecretKey()); flinkConfiguration.put(S3_PATH_STYLE_ACCESS, "true"); // container diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/MinioUtil.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/MinioUtil.java deleted file mode 100644 index e65ff00e8..000000000 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/MinioUtil.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.sliew.scaleph.application.flink.resource.handler; - -import cn.sliew.scaleph.system.snowflake.utils.NetUtils; -import lombok.extern.slf4j.Slf4j; - -import java.net.InetAddress; -import java.net.SocketException; -import java.net.URI; - -@Slf4j -public enum MinioUtil { - ; - - /* The common local hosts and ips */ - private static final String[] LOCAL_IPS = { - "localhost", "127.0.0.1", "0.0.0.0" - }; - - /** - * If the endpoint pointing to a local address, - * we should replace the local address with PUBLIC address - * to make sure all pods/container can access to the minio. - * - *

- * Generally, the endpoint should be a uri. - * So we firstly parse it to a uri and get the host. - * Then check if the host is a local address. - *

- *

- * If exception occurs in parsing uri, - * we replace the endpoint in hard coded ways. - *

- * - * @param endpoint The minio endpoint, Generally in uri format - * @return Replaced endpoint. - */ - public static String replaceLocalhost(String endpoint) { - String result = endpoint; - try { - URI uri = URI.create(endpoint); - String host = uri.getHost(); - InetAddress inetAddress = InetAddress.getByName(host); - if (inetAddress.isLoopbackAddress() || inetAddress.isAnyLocalAddress()) { - log.debug("Host {}, address = {} is a local address", host, inetAddress); - InetAddress publicAddress = NetUtils.getLocalInetAddress(); - log.debug("Public address is {}", publicAddress); - if (validateInetAddress(publicAddress)) { - log.debug("Public address {} is valid!", publicAddress); - URI res = new URI(uri.getScheme(), - uri.getUserInfo(), - publicAddress.getHostAddress(), - uri.getPort(), - uri.getPath(), - uri.getQuery(), - uri.getFragment()); - result = res.toString(); - } - } - } catch (Exception e) { - log.error(e.getLocalizedMessage(), e); - try { - InetAddress publicAddress = NetUtils.getLocalInetAddress(); - log.debug("Public address is {}", publicAddress); - if (validateInetAddress(publicAddress)) { - log.debug("Public address {} is valid!", publicAddress); - for (String localIp : LOCAL_IPS) { - if (result.contains(localIp)) { - log.debug("Endpoint {} contains local ip {}", result, localIp); - result = result.replace(localIp, publicAddress.getHostAddress()); - } - } - } - } catch (SocketException ex) { - log.error(ex.getLocalizedMessage(), ex); - } - } - log.debug("Final endpoint is {}", result); - return result; - } - - private static boolean validateInetAddress(InetAddress inetAddress) { - return !inetAddress.isLinkLocalAddress() - && !inetAddress.isLoopbackAddress() - && !inetAddress.isAnyLocalAddress(); - } -} diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java index 0bda8707b..a695a4006 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java @@ -20,23 +20,13 @@ import cn.sliew.milky.common.exception.Rethrower; import cn.sliew.milky.common.util.JacksonUtil; -import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesJobInstanceDeployParam; -import cn.sliew.scaleph.common.dict.flink.FlinkJobState; -import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; -import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointFormatType; -import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointTriggerType; -import cn.sliew.scaleph.common.util.UUIDUtil; -import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstance; -import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstanceSavepoint; -import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceMapper; -import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceSavepointMapper; -import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger; import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec; import cn.sliew.scaleph.application.flink.operator.spec.JobState; import cn.sliew.scaleph.application.flink.operator.status.FlinkDeploymentStatus; import cn.sliew.scaleph.application.flink.operator.status.JobStatus; import cn.sliew.scaleph.application.flink.operator.status.Savepoint; import cn.sliew.scaleph.application.flink.operator.status.SavepointInfo; +import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger; import cn.sliew.scaleph.application.flink.resource.definition.job.instance.FlinkJobInstanceConverterFactory; import cn.sliew.scaleph.application.flink.resource.definition.job.instance.MetadataHandler; import cn.sliew.scaleph.application.flink.service.FlinkKubernetesOperatorService; @@ -47,10 +37,20 @@ import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO; import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO; import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceSavepointDTO; +import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesJobInstanceDeployParam; import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesJobInstanceListParam; import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesJobInstanceSavepointListParam; import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesJobInstanceShutdownParam; import cn.sliew.scaleph.application.flink.watch.FlinkDeploymentWatchCallbackHandler; +import cn.sliew.scaleph.common.dict.flink.FlinkJobState; +import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; +import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointFormatType; +import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointTriggerType; +import cn.sliew.scaleph.common.util.UUIDUtil; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstance; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstanceSavepoint; +import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceMapper; +import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceSavepointMapper; import cn.sliew.scaleph.kubernetes.Constant; import cn.sliew.scaleph.kubernetes.watch.watch.WatchCallbackHandler; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -342,16 +342,22 @@ public int updateStatus(Long id, FlinkDeploymentStatus status) { if (status == null) { return -1; } - WsFlinkKubernetesJobInstance record = new WsFlinkKubernetesJobInstance(); - record.setId(id); + WsFlinkKubernetesJobInstance record = WsFlinkKubernetesJobInstanceConvert.INSTANCE.toDo(selectOne(id)); record.setState(EnumUtils.getEnum(ResourceLifecycleState.class, status.getLifecycleState().name())); if (status.getJobStatus() != null) { JobStatus jobStatus = status.getJobStatus(); if (jobStatus.getState() != null) { record.setJobState(FlinkJobState.of(jobStatus.getState())); } + if (record.getJobState() == FlinkJobState.FAILED + || record.getJobState() == FlinkJobState.CANCELED + || record.getJobState() == FlinkJobState.FINISHED + || record.getJobState() == FlinkJobState.SUSPENDED) { + record.setEndTime(new Date(Long.parseLong(jobStatus.getUpdateTime()))); + } if (jobStatus.getStartTime() != null) { record.setStartTime(new Date(Long.parseLong(jobStatus.getStartTime()))); + record.setDuration(Long.parseLong(jobStatus.getUpdateTime()) - Long.parseLong(jobStatus.getStartTime())); } } record.setError(status.getError()); @@ -396,12 +402,18 @@ private void updateSavepoint(Long id, FlinkDeploymentStatus status) { @Override public int clearStatus(Long id) { - WsFlinkKubernetesJobInstance record = new WsFlinkKubernetesJobInstance(); - record.setId(id); + WsFlinkKubernetesJobInstance record = WsFlinkKubernetesJobInstanceConvert.INSTANCE.toDo(selectOne(id)); record.setState(null); record.setError(null); record.setClusterInfo(null); record.setTaskManagerInfo(null); + if (record.getStartTime() != null && record.getEndTime() == null && (record.getJobState() == FlinkJobState.FAILED + || record.getJobState() == FlinkJobState.CANCELED + || record.getJobState() == FlinkJobState.FINISHED + || record.getJobState() == FlinkJobState.SUSPENDED)) { + record.setEndTime(new Date()); + record.setDuration(System.currentTimeMillis() - record.getStartTime().getTime()); + } return wsFlinkKubernetesJobInstanceMapper.updateById(record); } } diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/util/NetUtils.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/util/NetUtils.java new file mode 100644 index 000000000..21695d711 --- /dev/null +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/util/NetUtils.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.common.util; + +import cn.sliew.milky.common.exception.Rethrower; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.*; +import java.util.*; + +/** + * NetUtils + */ + +@Slf4j +public enum NetUtils { + ; + + private static final String ETHERNET_PREFIX = "eth"; + private static final String MAC_ETHERNET_PREFIX = "en"; + private static final String WIRELESS_LAN_PREFIX = "wlp"; + private static final String[] LOCAL_IPS = { + "localhost", "127.0.0.1", "0.0.0.0" + }; + + /** + * Pre-loaded local address + */ + public static InetAddress localAddress; + + static { + try { + localAddress = getLocalInetAddress(); + } catch (SocketException e) { + log.error("fail to get local ip.", e); + throw new RuntimeException("fail to get local ip.", e); + } + } + + /** + * Retrieve the first validated local ip address(the Public and LAN ip addresses are validated). + * + * @return the local address + * @throws SocketException the socket exception + */ + public static InetAddress getLocalInetAddress() throws SocketException { + if (localAddress != null) { + return localAddress; + } + // List of all network interfaces + List networkInterfaces = getNetworkInterfaces(); + log.info("Trying to find ethernet..."); + NetworkInterface ni = getByEthPrefix(networkInterfaces) + .or(() -> getByEnPrefix(networkInterfaces)) + .or(() -> getByWlpPrefix(networkInterfaces)) + .or(() -> getByFirst(networkInterfaces)) + .orElseThrow(); + if (ni != null) { + log.info("Found network interface: index = {}, name = {}, mac = {}, ", + ni.getIndex(), ni.getName(), bytesToMac(ni.getHardwareAddress())); + Enumeration addressEnumeration = ni.getInetAddresses(); + while (addressEnumeration.hasMoreElements()) { + InetAddress address = addressEnumeration.nextElement(); + // ignores all invalidated addresses + if (address.isLinkLocalAddress() || address.isLoopbackAddress() || address.isAnyLocalAddress()) { + log.info("Invalid address: {}", address); + continue; + } + try { + if (address.isSiteLocalAddress() && address.isReachable(1000)) { + return address; + } + log.warn("Ip {} of network interface {} is not reachable", address, ni.getName()); + } catch (IOException e) { + log.warn(e.getLocalizedMessage(), e); + } + } + } + + // If no available interface found, return the localhost. + try { + InetAddress localHost = InetAddress.getLocalHost(); + log.warn("No available interface found, use localhost: {}", localHost); + return localHost; + } catch (Exception e) { + throw new RuntimeException("No validated local address!"); + } + } + + private static Optional getByEthPrefix(List networkInterfaces) { + return networkInterfaces.stream() + // Find NetworkInterfaces named eth{n}, This means wired network + .filter(networkInterface -> networkInterface.getName().startsWith(ETHERNET_PREFIX)) + .findAny(); + } + + private static Optional getByEnPrefix(List networkInterfaces) { + return networkInterfaces.stream() + // Find NetworkInterfaces named en{n}, This means wired network + .filter(networkInterface -> networkInterface.getName().startsWith(MAC_ETHERNET_PREFIX)) + .sorted(Comparator.comparingLong(NetworkInterface::getIndex).reversed()) + .findFirst(); + } + + private static Optional getByWlpPrefix(List networkInterfaces) { + return networkInterfaces + .stream() + // Then try to find NetworkInterfaces named wlp{n}s{n}, This means wireless network + .filter(networkInterface -> networkInterface.getName().startsWith(WIRELESS_LAN_PREFIX)) + .findAny(); + } + + private static Optional getByFirst(List networkInterfaces) { + return networkInterfaces + // Then find NetworkInterfaces by its index + // Generally the wired and wireless network are ahead of those + // who are created by docker/minikube and other softwares. + .stream() + .sorted(Comparator.comparingLong(NetworkInterface::getIndex).reversed()) + .findFirst() + .filter(Objects::nonNull) + // Finally get the first network interface in the list + .or(() -> { + log.info("No network interface by index, trying to get the first one or return null"); + return networkInterfaces + .stream() + .findAny(); + }); + } + + private static List getNetworkInterfaces() throws SocketException { + List networkInterfaces = new ArrayList<>(); + Enumeration enu = NetworkInterface.getNetworkInterfaces(); + while (enu.hasMoreElements()) { + NetworkInterface ni = enu.nextElement(); + // Skip the interface which is loopback or is down or is virtual + if (ni.isLoopback() || !ni.isUp() || ni.isVirtual()) { + continue; + } + networkInterfaces.add(ni); + log.debug("Valid network interface: index = {}, name = {}, mac = {}", + ni.getIndex(), ni.getName(), bytesToMac(ni.getHardwareAddress())); + } + if (networkInterfaces.isEmpty()) { + log.warn("No validated local address!"); + } + return networkInterfaces; + } + + /** + * Retrieve local address + * + * @return the string local address + */ + public static String getLocalAddress() { + return localAddress.getHostAddress(); + } + + public static String getLocalIP() { + try { + return InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + Rethrower.throwAs(e); + return null; + } + } + + public static String getLocalHost() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + Rethrower.throwAs(e); + return null; + } + } + + /** + * Convert mac bytes to string + * + * @param bytes Mac in bytes + * @return Mac in string + */ + private static String bytesToMac(byte[] bytes) { + if (bytes == null || bytes.length == 0) { + return null; + } + final char[] chars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'd', 'f'}; + StringJoiner stringJoiner = new StringJoiner(":"); + for (byte aByte : bytes) { + char left = chars[(aByte & 0xF0) >> 4]; + char right = chars[aByte & 0x0F]; + stringJoiner.add(new String(new char[]{left, right})); + } + return stringJoiner.toString(); + } + + /** + * If the endpoint pointing to a local address, + * we should replace the local address with PUBLIC address + * to make sure all pods/container can access to the minio. + * + *

+ * Generally, the endpoint should be a uri. + * So we firstly parse it to a uri and get the host. + * Then check if the host is a local address. + *

+ *

+ * If exception occurs in parsing uri, + * we replace the endpoint in hard coded ways. + *

+ * + * @param endpoint The minio endpoint, Generally in uri format + * @return Replaced endpoint. + */ + public static String replaceLocalhost(String endpoint) { + String result = endpoint; + try { + URI uri = URI.create(endpoint); + String host = uri.getHost(); + InetAddress inetAddress = InetAddress.getByName(host); + if (inetAddress.isLoopbackAddress() || inetAddress.isAnyLocalAddress()) { + log.debug("Host {}, address = {} is a local address", host, inetAddress); + InetAddress publicAddress = NetUtils.getLocalInetAddress(); + log.debug("Public address is {}", publicAddress); + if (validateInetAddress(publicAddress)) { + log.debug("Public address {} is valid!", publicAddress); + URI res = new URI(uri.getScheme(), + uri.getUserInfo(), + publicAddress.getHostAddress(), + uri.getPort(), + uri.getPath(), + uri.getQuery(), + uri.getFragment()); + result = res.toString(); + } + } + } catch (Exception e) { + log.error(e.getLocalizedMessage(), e); + try { + InetAddress publicAddress = NetUtils.getLocalInetAddress(); + log.debug("Public address is {}", publicAddress); + if (validateInetAddress(publicAddress)) { + log.debug("Public address {} is valid!", publicAddress); + for (String localIp : LOCAL_IPS) { + if (result.contains(localIp)) { + log.debug("Endpoint {} contains local ip {}", result, localIp); + result = result.replace(localIp, publicAddress.getHostAddress()); + } + } + } + } catch (SocketException ex) { + log.error(ex.getLocalizedMessage(), ex); + } + } + log.debug("Final endpoint is {}", result); + return result; + } + + private static boolean validateInetAddress(InetAddress inetAddress) { + return !inetAddress.isLinkLocalAddress() + && !inetAddress.isLoopbackAddress() + && !inetAddress.isAnyLocalAddress(); + } +} diff --git a/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/gravitino/GravitinoConfig.java b/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/gravitino/GravitinoConfig.java index 42093f52f..90da3c7bd 100644 --- a/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/gravitino/GravitinoConfig.java +++ b/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/gravitino/GravitinoConfig.java @@ -18,6 +18,7 @@ package cn.sliew.scaleph.ds.gravitino; +import cn.sliew.scaleph.common.util.NetUtils; import com.datastrato.gravitino.client.GravitinoAdminClient; import com.datastrato.gravitino.client.GravitinoClient; import org.springframework.beans.factory.annotation.Autowired; @@ -34,7 +35,7 @@ public class GravitinoConfig { @Bean public GravitinoAdminClient gravitinoAdminClient() { - return GravitinoAdminClient.builder(properties.getUrl()) + return GravitinoAdminClient.builder(NetUtils.replaceLocalhost(properties.getUrl())) .withSimpleAuth() .build(); } @@ -43,7 +44,7 @@ public GravitinoAdminClient gravitinoAdminClient() { * fixme 必须添加 metalakeName */ public GravitinoClient gravitinoClient() { - return GravitinoClient.builder(properties.getUrl()) + return GravitinoClient.builder(NetUtils.replaceLocalhost(properties.getUrl())) .withSimpleAuth() .build(); } diff --git a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/config/RedissionConfig.java b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/config/RedissionConfig.java new file mode 100644 index 000000000..e555316ad --- /dev/null +++ b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/config/RedissionConfig.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.file.fetcher.config; + +import cn.sliew.scaleph.common.util.NetUtils; +import org.redisson.config.Config; +import org.redisson.config.SingleServerConfig; +import org.redisson.spring.starter.RedissonAutoConfiguration; +import org.redisson.spring.starter.RedissonAutoConfigurationCustomizer; +import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.context.annotation.Configuration; + +@Configuration +@AutoConfigureBefore(RedissonAutoConfiguration.class) +public class RedissionConfig implements RedissonAutoConfigurationCustomizer { + + @Override + public void customize(Config config) { + SingleServerConfig singleServerConfig = config.useSingleServer(); + singleServerConfig.setAddress(NetUtils.replaceLocalhost(singleServerConfig.getAddress())); + } +} diff --git a/scaleph-support/scaleph-system/src/main/java/cn/sliew/scaleph/system/snowflake/utils/NetUtils.java b/scaleph-support/scaleph-system/src/main/java/cn/sliew/scaleph/system/snowflake/utils/NetUtils.java deleted file mode 100644 index 613e0b307..000000000 --- a/scaleph-support/scaleph-system/src/main/java/cn/sliew/scaleph/system/snowflake/utils/NetUtils.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.sliew.scaleph.system.snowflake.utils; - -import cn.sliew.milky.common.exception.Rethrower; -import lombok.extern.slf4j.Slf4j; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.List; -import java.util.Objects; -import java.util.StringJoiner; - -/** - * NetUtils - */ - -@Slf4j -public class NetUtils { - - private static final String ETHERNET_PREFIX = "eth"; - private static final String WIRELESS_LAN_PREFIX = "wlp"; - - /** - * Pre-loaded local address - */ - public static InetAddress localAddress; - - static { - try { - localAddress = getLocalInetAddress(); - } catch (SocketException e) { - throw new RuntimeException("fail to get local ip."); - } - } - - private NetUtils() { - } - - /** - * Retrieve the first validated local ip address(the Public and LAN ip addresses are validated). - * - * @return the local address - * @throws SocketException the socket exception - */ - public static InetAddress getLocalInetAddress() throws SocketException { - if (localAddress != null) { - return localAddress; - } - // List of all network interfaces - List networkInterfaces = getNetworkInterfaces(); - log.info("Trying to find ethernet..."); - NetworkInterface ni = networkInterfaces.stream() - // Find NetworkInterfaces named eth{n}, This means wired network - .filter(networkInterface -> networkInterface.getName().startsWith(ETHERNET_PREFIX)) - .findAny() - .orElseGet(() -> { - log.info("No network interface name starts with {}, trying to find wireless...", ETHERNET_PREFIX); - return networkInterfaces - .stream() - // Then try to find NetworkInterfaces named wlp{n}s{n}, This means wireless network - .filter(networkInterface -> networkInterface.getName().startsWith(WIRELESS_LAN_PREFIX)) - .findAny() - .orElseGet(() -> { - log.info("No network interface name starts with {}, trying to find by index...", WIRELESS_LAN_PREFIX); - return networkInterfaces - // Then find NetworkInterfaces by its index - // Generally the wired and wireless network are ahead of those - // who are created by docker/minikube and other softwares. - .stream() - .mapToInt(NetworkInterface::getIndex) - .sorted() - .findFirst() - .stream() - .mapToObj(index -> { - try { - NetworkInterface byIndex = NetworkInterface.getByIndex(index); - log.info("Index {} with interface name {}", index, byIndex.getName()); - return byIndex; - } catch (SocketException e) { - log.error(e.getLocalizedMessage(), e); - } - return null; - }) - .filter(Objects::nonNull) - .findFirst() - // Finally get the first network interface in the list - .orElseGet(() -> { - log.info("No network interface by index, trying to get the first one or return null"); - return networkInterfaces - .stream() - .findAny() - .orElse(null); - }); - }); - }); - if (ni != null) { - log.info("Found network interface: index = {}, name = {}, mac = {}", - ni.getIndex(), ni.getName(), bytesToMac(ni.getHardwareAddress())); - Enumeration addressEnumeration = ni.getInetAddresses(); - while (addressEnumeration.hasMoreElements()) { - InetAddress address = addressEnumeration.nextElement(); - // ignores all invalidated addresses - if (address.isLinkLocalAddress() || address.isLoopbackAddress() || address.isAnyLocalAddress()) { - log.info("Invalid address: {}", address); - continue; - } - try { - if (!address.isReachable(1000)) { - log.warn("Ip {} of network interface {} is not reachable", address, ni.getName()); - } - } catch (IOException e) { - log.warn(e.getLocalizedMessage(), e); - } - return address; - } - } - - // If no available interface found, return the localhost. - try { - InetAddress localHost = InetAddress.getLocalHost(); - log.warn("No available interface found, use localhost: {}", localHost); - return localHost; - } catch (Exception e) { - throw new RuntimeException("No validated local address!"); - } - } - - private static List getNetworkInterfaces() throws SocketException { - List networkInterfaces = new ArrayList<>(); - Enumeration enu = NetworkInterface.getNetworkInterfaces(); - while (enu.hasMoreElements()) { - NetworkInterface ni = enu.nextElement(); - // Skip the interface which is loopback or is down or is virtual - if (ni.isLoopback() || !ni.isUp() || ni.isVirtual()) { - log.info("Invalid network interface: index = {}, name = {}, mac = {}", - ni.getIndex(), ni.getName(), bytesToMac(ni.getHardwareAddress())); - continue; - } - networkInterfaces.add(ni); - log.debug("Valid network interface: index = {}, name = {}, mac = {}", - ni.getIndex(), ni.getName(), bytesToMac(ni.getHardwareAddress())); - } - if (networkInterfaces.isEmpty()) { - log.warn("No validated local address!"); - } - return networkInterfaces; - } - - /** - * Retrieve local address - * - * @return the string local address - */ - public static String getLocalAddress() { - return localAddress.getHostAddress(); - } - - public static String getLocalIP() { - try { - return InetAddress.getLocalHost().getHostAddress(); - } catch (UnknownHostException e) { - Rethrower.throwAs(e); - return null; - } - } - - public static String getLocalHost() { - try { - return InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - Rethrower.throwAs(e); - return null; - } - } - - /** - * Convert mac bytes to string - * - * @param bytes Mac in bytes - * @return Mac in string - */ - private static String bytesToMac(byte[] bytes) { - if (bytes == null || bytes.length == 0) { - return null; - } - final char[] chars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'd', 'f'}; - StringJoiner stringJoiner = new StringJoiner(":"); - for (byte aByte : bytes) { - char left = chars[(aByte & 0xF0) >> 4]; - char right = chars[aByte & 0x0F]; - stringJoiner.add(new String(new char[]{left, right})); - } - return stringJoiner.toString(); - } -} diff --git a/scaleph-support/scaleph-system/src/main/java/cn/sliew/scaleph/system/snowflake/worker/DisposableWorkerIdAssigner.java b/scaleph-support/scaleph-system/src/main/java/cn/sliew/scaleph/system/snowflake/worker/DisposableWorkerIdAssigner.java index 93e21d297..ac2357925 100644 --- a/scaleph-support/scaleph-system/src/main/java/cn/sliew/scaleph/system/snowflake/worker/DisposableWorkerIdAssigner.java +++ b/scaleph-support/scaleph-system/src/main/java/cn/sliew/scaleph/system/snowflake/worker/DisposableWorkerIdAssigner.java @@ -19,11 +19,11 @@ package cn.sliew.scaleph.system.snowflake.worker; import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.common.util.NetUtils; import cn.sliew.scaleph.dao.DataSourceConstants; import cn.sliew.scaleph.dao.entity.master.snowflake.SnowflakeWorkerNode; import cn.sliew.scaleph.dao.mapper.master.snowflake.SnowflakeWorkerNodeMapper; import cn.sliew.scaleph.system.snowflake.utils.DockerUtils; -import cn.sliew.scaleph.system.snowflake.utils.NetUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomUtils; import org.springframework.transaction.annotation.Transactional; diff --git a/scaleph-ui-react/src/constants/enum.ts b/scaleph-ui-react/src/constants/enum.ts index da06acd56..fcabf8d62 100644 --- a/scaleph-ui-react/src/constants/enum.ts +++ b/scaleph-ui-react/src/constants/enum.ts @@ -14,3 +14,28 @@ export enum FlinkJobType { SEATUNNEL = '2', FLINK_CDC = '3' } + +export enum ResourceLifecycleState { + CREATED = 'CREATED', + SUSPENDED = 'SUSPENDED', + UPGRADING = 'UPGRADING', + DEPLOYED = 'DEPLOYED', + STABLE = 'STABLE', + ROLLING_BACK = 'ROLLING_BACK', + ROLLED_BACK = 'ROLLED_BACK', + FAILED = 'FAILED' +} + +export enum FlinkJobState { + INITIALIZING = 'INITIALIZING', + CREATED = 'CREATED', + RUNNING = 'RUNNING', + FAILING = 'FAILING', + FAILED = 'FAILED', + CANCELLING = 'CANCELLING', + CANCELED = 'CANCELED', + FINISHED = 'FINISHED', + RESTARTING = 'RESTARTING', + SUSPENDED = 'SUSPENDED', + RECONCILING = 'RECONCILING', +} diff --git a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts index a3e1c3512..9b1060d5f 100644 --- a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts +++ b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts @@ -1018,6 +1018,7 @@ export default { 'pages.project.flink.kubernetes.job.detail.overview.artifact': 'Artifact', 'pages.project.flink.kubernetes.job.detail.overview.resource': '资源详情', 'pages.project.flink.kubernetes.job.detail.overview.configuration': 'Flink 配置', + 'pages.project.flink.kubernetes.job.detail.triggerSavepoint': 'Savepoint', 'pages.project.flink.kubernetes.job.detail.savepoint': '状态管理', 'pages.project.flink.kubernetes.job.detail.savepoint.timeStamp': 'TimeStamp', 'pages.project.flink.kubernetes.job.detail.savepoint.formatType': 'Format', diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Engine/Compute/Flink/Job/Detail/index.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Engine/Compute/Flink/Job/Detail/index.tsx index 7bd38f07d..1774f36a2 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Engine/Compute/Flink/Job/Detail/index.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Engine/Compute/Flink/Job/Detail/index.tsx @@ -22,6 +22,7 @@ import FlinkKubernetesJobDetailInstanceListWeb from "@/pages/Project/Workspace/Engine/Compute/Flink/Job/Detail/InstanceList"; import FlinkKubernetesJobDetailSavepointWeb from "@/pages/Project/Workspace/Engine/Compute/Flink/Job/Detail/Savepoint"; import FlinkKubernetesJobDetailOverviewWeb from "@/pages/Project/Workspace/Engine/Compute/Flink/Job/Detail/Overview"; +import {FlinkJobState, ResourceLifecycleState} from "@/constants/enum"; const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { const intl = useIntl(); @@ -110,6 +111,7 @@ const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { @@ -137,6 +143,7 @@ const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { , @@ -169,6 +180,14 @@ const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => {