diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index d69209920..bf147270e 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -20,11 +20,25 @@ export interface C2DClusterInfo { tempFolder?: string } -export type ComputeResourceType = 'cpu' | 'memory' | 'storage' +export type ComputeResourceType = 'cpu' | 'ram' | 'disk' | any export interface ComputeResourcesPricingInfo { - type: ComputeResourceType - price: number + id: ComputeResourceType + price: number // price per unit +} + +export interface ComputeResource { + id: ComputeResourceType + type?: string + kind?: string + total: number // total number of specific resource + min: number // min number of resource needed for a job + max: number // max number of resource for a job + inUse?: number // for display purposes +} +export interface ComputeResourceRequest { + id: string + amount: number } export interface ComputeEnvFees { @@ -32,44 +46,38 @@ export interface ComputeEnvFees { prices: ComputeResourcesPricingInfo[] } export interface ComputeEnvFeesStructure { - [chainId: string]: ComputeEnvFees + [chainId: string]: ComputeEnvFees[] } export interface RunningPlatform { architecture: string - os: string + os?: string +} + +export interface ComputeEnvironmentFreeOptions { + // only if a compute env exposes free jobs + storageExpiry?: number + maxJobDuration?: number + maxJobs?: number // maximum number of simultaneous free jobs + resources?: ComputeResource[] } export interface ComputeEnvironmentBaseConfig { - // cpuNumber: number - // ramGB: number - // diskGB: number - description: string // v1 - // maxJobs: number - storageExpiry: number // v1 - maxJobDuration: number // v1 max seconds for a job - // chainId?: number - // feeToken: string - // priceMin: number - totalCpu: number // total cpu available for jobs - maxCpu: number // max cpu for a single job. Imagine a K8 cluster with two nodes, each node with 10 cpus. Total=20, but at most you can allocate 10 cpu for a job - totalRam: number // total gb of RAM - maxRam: number // max allocatable GB RAM for a single job. - maxDisk: number // max GB of disck allocatable for a single job + description?: string // v1 + storageExpiry?: number // amount of seconds for storage + minJobDuration?: number // min billable seconds for a paid job + maxJobDuration?: number // max duration in seconds for a paid job + maxJobs?: number // maximum number of simultaneous paid jobs fees: ComputeEnvFeesStructure + resources?: ComputeResource[] + free?: ComputeEnvironmentFreeOptions + platform: RunningPlatform } export interface ComputeEnvironment extends ComputeEnvironmentBaseConfig { id: string // v1 - // arch: string => part of platform bellow - // cpuType?: string - // gpuNumber?: number - // gpuType?: string - chainId?: number // it can be useful to keep the chain id (optional) - currentJobs: number + runningJobs: number + runningfreeJobs?: number consumerAddress: string // v1 - // lastSeen?: number - free: boolean - platform?: RunningPlatform[] // array due to k8 support } export interface C2DDockerConfig { @@ -80,12 +88,12 @@ export interface C2DDockerConfig { caPath: string certPath: string keyPath: string - environments: ComputeEnvironment[] - freeComputeOptions?: ComputeEnvironment -} - -export interface ComputeEnvByChain { - [chainId: number]: ComputeEnvironment[] + storageExpiry?: number + maxJobDuration?: number + maxJobs?: number + fees: ComputeEnvFeesStructure + resources?: ComputeResource[] // optional, owner can overwrite + free?: ComputeEnvironmentFreeOptions } export type ComputeResultType = @@ -165,6 +173,8 @@ export interface DBComputeJob extends ComputeJob { isRunning: boolean isStarted: boolean containerImage: string + resources: ComputeResourceRequest[] + isFree: boolean } // make sure we keep them both in sync diff --git a/src/@types/commands.ts b/src/@types/commands.ts index 212accf0a..1501b4902 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -1,7 +1,12 @@ import { ValidateParams } from '../components/httpRoutes/validateCommands.js' import { DDO } from './DDO/DDO' import { P2PCommandResponse } from './OceanNode' -import type { ComputeAsset, ComputeAlgorithm, ComputeOutput } from './C2D/C2D.js' +import type { + ComputeAsset, + ComputeAlgorithm, + ComputeOutput, + ComputeResourceRequest +} from './C2D/C2D.js' import { ArweaveFileObject, FileObjectType, @@ -166,14 +171,18 @@ export interface ComputeStartCommand extends Command { algorithm: ComputeAlgorithm datasets?: ComputeAsset[] output?: ComputeOutput + resources?: ComputeResourceRequest[] + chainId?: number // network used by payment } export interface FreeComputeStartCommand extends Command { consumerAddress: string signature: string nonce: string + environment: string algorithm: ComputeAlgorithm datasets?: ComputeAsset[] output?: ComputeOutput + resources?: ComputeResourceRequest[] } export interface ComputeStopCommand extends Command { diff --git a/src/OceanNode.ts b/src/OceanNode.ts index bea6ddbb7..da6daab66 100644 --- a/src/OceanNode.ts +++ b/src/OceanNode.ts @@ -79,6 +79,7 @@ export class OceanNode { return } this.c2dEngines = new C2DEngines(_config, this.db.c2d) + await this.c2dEngines.startAllEngines() } } diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 68d1d3245..6f7835da8 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -6,14 +6,20 @@ import type { ComputeAsset, ComputeJob, ComputeOutput, + ComputeResourceRequest, + ComputeResourceType, + ComputeResource, DBComputeJob } from '../../@types/C2D/C2D.js' import { C2DClusterType } from '../../@types/C2D/C2D.js' +import { C2DDatabase } from '../database/C2DDatabase.js' export abstract class C2DEngine { private clusterConfig: C2DClusterInfo - public constructor(cluster: C2DClusterInfo) { + public db: C2DDatabase + public constructor(cluster: C2DClusterInfo, db: C2DDatabase) { this.clusterConfig = cluster + this.db = db } getC2DConfig(): C2DClusterInfo { @@ -31,12 +37,12 @@ export abstract class C2DEngine { // overwritten by classes for start actions public start(): Promise { - throw new Error('Method not implemented.') + return null } // overwritten by classes for cleanup public stop(): Promise { - throw new Error('Method not implemented.') + return null } public abstract startComputeJob( @@ -47,7 +53,8 @@ export abstract class C2DEngine { owner?: string, validUntil?: number, chainId?: number, - agreementId?: string + agreementId?: string, + resources?: ComputeResourceRequest[] ): Promise public abstract stopComputeJob( @@ -123,57 +130,162 @@ export abstract class C2DEngine { } return null } -} -export class C2DEngineLocal extends C2DEngine { - public getComputeEnvironments(chainId?: number): Promise { - throw new Error('Method not implemented.') - } + /* Returns ComputeResources for a specific resource + */ + public getMaxMinResource( + id: ComputeResourceType, + env: ComputeEnvironment, + isFree: boolean + ): ComputeResource { + const paid = this.getResource(env.resources, id) + let free = null + if (isFree && 'free' in env && 'resources' in env.free) { + free = this.getResource(env.free.resources, id) + if (!free) { + // this resource is not listed under free, so it's not available + return { + id, + total: 0, + max: 0, + min: 0 + } + } + } + const total = 'total' in paid ? paid.total : 0 + const max = 'max' in paid ? paid.max : 0 + const min = 'min' in paid ? paid.min : 0 + const ret: ComputeResource = { + id, + total: free && 'total' in free ? free.total : total, + max: free && 'max' in free ? free.max : max, + min: free && 'min' in free ? free.min : min + } - public startComputeJob( - assets: ComputeAsset[], - algorithm: ComputeAlgorithm, - output: ComputeOutput, - environment: string, - owner?: string, - validUntil?: number, - chainId?: number, - agreementId?: string - ): Promise { - throw new Error('Method not implemented.') + return ret } - public stopComputeJob( - jobId: string, - owner: string, - agreementId?: string - ): Promise { - throw new Error('Method not implemented.') + // make sure that all requests have cpu, ram, storage + // eslint-disable-next-line require-await + public async checkAndFillMissingResources( + resources: ComputeResourceRequest[], + env: ComputeEnvironment, + isFree: boolean + ): Promise { + if (isFree && !('free' in env)) throw new Error('This env does not support free jobs') + const properResources: ComputeResourceRequest[] = [] + const elements: string[] = [] + + for (const res of env.free.resources) elements.push(res.id) + for (const res of env.resources) if (!elements.includes(res.id)) elements.push(res.id) + + /* if (isFree && 'free' in env && 'resources' in env.free) { + for (const res of env.free.resources) elements.push(res.id) + } else for (const res of env.resources) elements.push(res.id) + */ + for (const device of elements) { + let desired = this.getResourceRequest(resources, device) + const minMax = this.getMaxMinResource(device, env, isFree) + if (!desired && minMax.min > 0) { + // it's required + desired = minMax.min + } else { + if (desired < minMax.min) desired = minMax.min + if (desired > minMax.max) { + throw new Error( + 'Not enough ' + + device + + ' resources. Requested ' + + desired + + ', but max is ' + + minMax.max + ) + } + } + properResources.push({ id: device, amount: minMax.min }) + } + + return properResources } - public getComputeJobStatus( - consumerAddress?: string, - agreementId?: string, - jobId?: string - ): Promise { - throw new Error('Method not implemented.') + public async getUsedResources(env: ComputeEnvironment): Promise { + const usedResources: { [x: string]: any } = {} + const usedFreeResources: { [x: string]: any } = {} + const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash) + let totalJobs = 0 + let totalFreeJobs = 0 + for (const job of jobs) { + if (job.environment === env.id) { + totalJobs++ + if (job.isFree) totalFreeJobs++ + + for (const resource of job.resources) { + if (!(resource.id in usedResources)) usedResources[resource.id] = 0 + usedResources[resource.id] += resource.amount + if (job.isFree) { + if (!(resource.id in usedFreeResources)) usedFreeResources[resource.id] = 0 + usedFreeResources[resource.id] += resource.amount + } + } + } + } + return { totalJobs, totalFreeJobs, usedResources, usedFreeResources } } - public getComputeJobResult( - consumerAddress: string, - jobId: string, - index: number - ): Promise { - throw new Error('Method not implemented.') + // overridden by each engine if required + // eslint-disable-next-line require-await + public async checkIfResourcesAreAvailable( + resourcesRequest: ComputeResourceRequest[], + env: ComputeEnvironment, + isFree: boolean + ) { + for (const request of resourcesRequest) { + let envResource = this.getResource(env.resources, request.id) + if (!envResource) throw new Error(`No such resource ${request.id}`) + if (envResource.total - envResource.inUse < request.amount) + throw new Error(`Not enough available ${request.id}`) + if (isFree) { + if (!env.free) throw new Error(`No free resources`) + envResource = this.getResource(env.free.resources, request.id) + if (!envResource) throw new Error(`No such free resource ${request.id}`) + if (envResource.total - envResource.inUse < request.amount) + throw new Error(`Not enough available ${request.id} for free`) + } + } + if ('maxJobs' in env && env.maxJobs && env.runningJobs + 1 > env.maxJobs) { + throw new Error(`Too many running jobs `) + } + if ( + isFree && + 'free' in env && + `maxJobs` in env.free && + env.free.maxJobs && + env.runningfreeJobs + 1 > env.free.maxJobs + ) { + throw new Error(`Too many running free jobs `) + } } - public cleanupExpiredStorage(job: DBComputeJob): Promise { - throw new Error('Method not implemented.') + public getResource(resources: ComputeResource[], id: ComputeResourceType) { + if (!resources) return null + for (const resource of resources) { + if (resource.id === id) { + return resource + } + } + return null } - // eslint-disable-next-line no-useless-constructor - public constructor(clusterConfig: C2DClusterInfo) { - super(clusterConfig) + public getResourceRequest( + resources: ComputeResourceRequest[], + id: ComputeResourceType + ) { + if (!resources) return null + for (const resource of resources) { + if (resource.id === id) { + return resource.amount + } + } + return null } - // not implemented yet } diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 10856317e..082339d1e 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -1,6 +1,6 @@ /* eslint-disable security/detect-non-literal-fs-filename */ import { Readable } from 'stream' -import { C2DClusterType, C2DStatusNumber, C2DStatusText } from '../../@types/C2D/C2D.js' +import { C2DStatusNumber, C2DStatusText } from '../../@types/C2D/C2D.js' import type { C2DClusterInfo, ComputeEnvironment, @@ -10,7 +10,9 @@ import type { ComputeOutput, DBComputeJob, ComputeResult, - RunningPlatform + RunningPlatform, + ComputeEnvFeesStructure, + ComputeResourceRequest } from '../../@types/C2D/C2D.js' import { getConfiguration } from '../../utils/config.js' import { C2DEngine } from './compute_engine_base.js' @@ -32,7 +34,6 @@ import { import { pipeline } from 'node:stream/promises' import { CORE_LOGGER } from '../../utils/logging/common.js' import { generateUniqueID } from '../database/sqliteCompute.js' -import { Blockchain } from '../../utils/blockchain.js' import { AssetUtils } from '../../utils/asset.js' import { FindDdoHandler } from '../core/handler/ddoHandler.js' import { OceanNode } from '../../OceanNode.js' @@ -40,18 +41,16 @@ import { Service } from '../../@types/DDO/Service.js' import { decryptFilesObject, omitDBComputeFieldsFromComputeJob } from './index.js' import * as drc from 'docker-registry-client' import { ValidateParams } from '../httpRoutes/validateCommands.js' -import { convertGigabytesToBytes } from '../../utils/util.js' -import os from 'os' export class C2DEngineDocker extends C2DEngine { private envs: ComputeEnvironment[] = [] - protected db: C2DDatabase + public docker: Dockerode private cronTimer: any private cronTime: number = 2000 public constructor(clusterConfig: C2DClusterInfo, db: C2DDatabase) { - super(clusterConfig) - this.db = db + super(clusterConfig, db) + this.docker = null if (clusterConfig.connection.socketPath) { try { @@ -84,13 +83,89 @@ export class C2DEngineDocker extends C2DEngine { 'Could not create Docker container temporary folders: ' + e.message ) } + // envs are build on start function + } - if (clusterConfig.connection?.environments) { - this.envs = clusterConfig.connection.environments + public override async start() { + // let's build the env. Swarm and k8 will build multiple envs, based on arhitecture + const config = await getConfiguration() + const envConfig = await this.getC2DConfig().connection + const sysinfo = await this.docker.info() + // console.log(sysinfo) + let fees: ComputeEnvFeesStructure = null + + const supportedChains: number[] = [] + for (const chain of Object.keys(config.supportedNetworks)) { + supportedChains.push(parseInt(chain)) } - // only when we got the first request to start a compute job, - // no need to start doing this right away - // this.setNewTimer() + for (const feeChain of Object.keys(envConfig.fees)) { + // for (const feeConfig of envConfig.fees) { + // console.log(feeChain) + if (supportedChains.includes(parseInt(feeChain))) { + if (fees === null) fees = {} + if (!(feeChain in fees)) fees[feeChain] = [] + fees[feeChain].push(envConfig.fees[feeChain]) + } + + /* for (const chain of Object.keys(config.supportedNetworks)) { + const chainId = parseInt(chain) + if (task.chainId && task.chainId !== chainId) continue + result[chainId] = await computeEngines.fetchEnvironments(chainId) + } */ + } + this.envs.push({ + id: '', // this.getC2DConfig().hash + '-' + create256Hash(JSON.stringify(this.envs[i])), + runningJobs: 0, + consumerAddress: config.keys.ethAddress, + platform: { + architecture: sysinfo.Architecture, + os: sysinfo.OperatingSystem + }, + fees + }) + if (`storageExpiry` in envConfig) this.envs[0].storageExpiry = envConfig.storageExpiry + if (`maxJobDuration` in envConfig) + this.envs[0].maxJobDuration = envConfig.maxJobDuration + if (`maxJobs` in envConfig) this.envs[0].maxJobs = envConfig.maxJobs + // let's add resources + this.envs[0].resources = [] + this.envs[0].resources.push({ + id: 'cpu', + total: sysinfo.NCPU, + max: sysinfo.NCPU, + min: 1 + }) + this.envs[0].resources.push({ + id: 'ram', + total: sysinfo.MemTotal, + max: sysinfo.MemTotal, + min: 1e9 + }) + if (envConfig.resources) { + for (const res of envConfig.resources) { + // allow user to add other resources + if (res.id !== 'cpu' && res.id !== 'ram') { + if (!res.max) res.max = res.total + if (!res.min) res.min = 0 + this.envs[0].resources.push(res) + } + } + } + // limits for free env + if ('free' in envConfig) { + this.envs[0].free = {} + if (`storageExpiry` in envConfig.free) + this.envs[0].free.storageExpiry = envConfig.free.storageExpiry + if (`maxJobDuration` in envConfig.free) + this.envs[0].free.maxJobDuration = envConfig.free.maxJobDuration + if (`maxJobs` in envConfig.free) this.envs[0].free.maxJobs = envConfig.free.maxJobs + if ('resources' in envConfig.free) { + // TO DO - check if resource is also listed in this.envs[0].resources, if not, ignore it + this.envs[0].free.resources = envConfig.free.resources + } + } + this.envs[0].id = + this.getC2DConfig().hash + '-' + create256Hash(JSON.stringify(this.envs[0])) } // eslint-disable-next-line require-await @@ -98,43 +173,37 @@ export class C2DEngineDocker extends C2DEngine { chainId?: number ): Promise { /** - * Returns all cluster's compute environments for a specific chainId. Env's id already contains the cluster hash + * Returns all cluster's compute environments, filtered by a specific chainId if needed. Env's id already contains the cluster hash */ if (!this.docker) return [] - - if (chainId) { - const config = await getConfiguration() - const supportedNetwork = config.supportedNetworks[chainId] - if (supportedNetwork) { - const blockchain = new Blockchain( - supportedNetwork.rpc, - supportedNetwork.network, - chainId, - supportedNetwork.fallbackRPCs - ) - - // write the consumer address (compute env address) - const consumerAddress = await blockchain.getWalletAddress() - const filteredEnvs = [] - for (const computeEnv of this.envs) { - if ( - (computeEnv.fees && Object.hasOwn(computeEnv.fees, String(chainId))) || - computeEnv.chainId === chainId // not mandatory - ) { - // was: (computeEnv.chainId === chainId) { - computeEnv.consumerAddress = consumerAddress - // also set the chain id (helpful) - computeEnv.chainId = chainId - filteredEnvs.push(computeEnv) + const filteredEnvs = [] + for (const computeEnv of this.envs) { + if ( + !chainId || + (computeEnv.fees && Object.hasOwn(computeEnv.fees, String(chainId))) + ) { + const { totalJobs, totalFreeJobs, usedResources, usedFreeResources } = + await this.getUsedResources(computeEnv) + computeEnv.runningJobs = totalJobs + computeEnv.runningfreeJobs = totalFreeJobs + for (let i = 0; i < computeEnv.resources.length; i++) { + if (computeEnv.resources[i].id in usedResources) + computeEnv.resources[i].inUse = usedResources[computeEnv.resources[i].id] + else computeEnv.resources[i].inUse = 0 + } + if (computeEnv.free && computeEnv.free.resources) { + for (let i = 0; i < computeEnv.free.resources.length; i++) { + if (computeEnv.free.resources[i].id in usedFreeResources) + computeEnv.free.resources[i].inUse = + usedFreeResources[computeEnv.free.resources[i].id] + else computeEnv.free.resources[i].inUse = 0 } } - return filteredEnvs + filteredEnvs.push(computeEnv) } - // no compute envs or network is not supported - CORE_LOGGER.error(`There are no free compute environments for network ${chainId}`) - return [] } - return this.envs + + return filteredEnvs } /** @@ -203,10 +272,11 @@ export class C2DEngineDocker extends C2DEngine { owner?: string, validUntil?: number, chainId?: number, - agreementId?: string + agreementId?: string, + resources?: ComputeResourceRequest[] ): Promise { if (!this.docker) return [] - + const isFree: boolean = !agreementId const jobId = generateUniqueID() // C2D - Check image, check arhitecture, etc @@ -227,10 +297,7 @@ export class C2DEngineDocker extends C2DEngine { environment ) - const validation = await C2DEngineDocker.checkDockerImage( - image, - env.platform && env.platform.length > 0 ? env.platform[0] : null - ) + const validation = await C2DEngineDocker.checkDockerImage(image, env.platform) if (!validation.valid) throw new Error(`Unable to validate docker image ${image}: ${validation.reason}`) @@ -255,7 +322,9 @@ export class C2DEngineDocker extends C2DEngine { outputsURL: null, stopRequested: false, isRunning: true, - isStarted: false + isStarted: false, + resources, + isFree } await this.makeJobFolders(job) // make sure we actually were able to insert on DB @@ -544,15 +613,16 @@ export class C2DEngineDocker extends C2DEngine { // create the volume & create container // TO DO C2D: Choose driver & size // get env info - const environment = await this.getJobEnvironment(job) + // const environment = await this.getJobEnvironment(job) const volume: VolumeCreateOptions = { Name: job.jobId + '-volume' } // volume - if (environment != null) { + const diskSize = this.getResourceRequest(job.resources, 'disk') + if (diskSize && diskSize > 0) { volume.DriverOpts = { - size: environment.maxDisk > 0 ? `${environment.maxDisk}G` : '1G' + o: 'size=' + String(diskSize) } } @@ -568,7 +638,7 @@ export class C2DEngineDocker extends C2DEngine { // create the container const mountVols: any = { '/data': {} } - let hostConfig: HostConfig = { + const hostConfig: HostConfig = { Mounts: [ { Type: 'volume', @@ -578,17 +648,25 @@ export class C2DEngineDocker extends C2DEngine { } ] } - if (environment != null) { - // storage (container) + // disk + if (diskSize && diskSize > 0) { hostConfig.StorageOpt = { - size: environment.maxDisk > 0 ? `${environment.maxDisk}G` : '1G' - } - hostConfig = { - ...hostConfig, - ...(await buildCPUAndMemoryConstraints(environment, this.docker)) + size: String(diskSize) } } - + // ram + const ramSize = this.getResourceRequest(job.resources, 'ram') + if (ramSize && ramSize > 0) { + hostConfig.Memory = ramSize + // set swap to same memory value means no swap (otherwise it use like 2X mem) + hostConfig.MemorySwap = hostConfig.Memory + } + const cpus = this.getResourceRequest(job.resources, 'cpu') + if (cpus && cpus > 0) { + const systemInfo = this.docker ? await this.docker.info() : null + hostConfig.CpuPeriod = 100000 // 100 miliseconds is usually the default + hostConfig.CpuQuota = (cpus / systemInfo.NCPU) * hostConfig.CpuPeriod + } const containerInfo: ContainerCreateOptions = { name: job.jobId + '-algoritm', Image: job.containerImage, @@ -609,7 +687,8 @@ export class C2DEngineDocker extends C2DEngine { ) containerInfo.Entrypoint = newEntrypoint.split(' ') } - + console.log('CREATING CONTAINER') + console.log(containerInfo) const container = await this.createDockerContainer(containerInfo, true) if (container) { console.log('container: ', container) @@ -657,11 +736,22 @@ export class C2DEngineDocker extends C2DEngine { return } catch (e) { // container failed to start + try { + const algoLogFile = + this.getC2DConfig().tempFolder + + '/' + + job.jobId + + '/data/logs/algorithm.log' + writeFileSync(algoLogFile, String(e.message)) + } catch (e) { + console.log('Failed to write') + console.log(e) + } console.error('could not start container: ' + e.message) console.log(e) job.status = C2DStatusNumber.AlgorithmFailed job.statusText = C2DStatusText.AlgorithmFailed - job.algologURL = String(e) + job.isRunning = false await this.db.updateJob(job) await this.cleanupJob(job) @@ -733,32 +823,41 @@ export class C2DEngineDocker extends C2DEngine { // - delete volume // - delete container - const container = await this.docker.getContainer(job.jobId + '-algoritm') try { - writeFileSync( - this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/algorithm.log', - await container.logs({ - stdout: true, - stderr: true, - follow: false - }) - ) + const container = await this.docker.getContainer(job.jobId + '-algoritm') + if (container) { + if (job.status !== C2DStatusNumber.AlgorithmFailed) { + writeFileSync( + this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/algorithm.log', + await container.logs({ + stdout: true, + stderr: true, + follow: false + }) + ) + } + await container.remove() + } + const volume = await this.docker.getVolume(job.jobId + '-volume') + if (volume) { + try { + await volume.remove() + } catch (e) { + console.log(e) + } + } + // remove folders + rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/inputs', { + recursive: true, + force: true + }) + rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/transformations', { + recursive: true, + force: true + }) } catch (e) { console.log(e) } - - await container.remove() - const volume = await this.docker.getVolume(job.jobId + '-volume') - await volume.remove() - // remove folders - rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/inputs', { - recursive: true, - force: true - }) - rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/transformations', { - recursive: true, - force: true - }) } private deleteOutputFolder(job: DBComputeJob) { @@ -796,54 +895,62 @@ export class C2DEngineDocker extends C2DEngine { this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/transformations/algorithm' try { let storage = null - // do we have a files object? - if (job.algorithm.fileObject) { - // is it unencrypted? - if (job.algorithm.fileObject.type) { - // we can get the storage directly - storage = Storage.getStorageClass(job.algorithm.fileObject, config) - } else { - // ok, maybe we have this encrypted instead - CORE_LOGGER.info('algorithm file object seems to be encrypted, checking it...') - // 1. Decrypt the files object - const decryptedFileObject = await decryptFilesObject(job.algorithm.fileObject) - console.log('decryptedFileObject: ', decryptedFileObject) - // 2. Get default storage settings - storage = Storage.getStorageClass(decryptedFileObject, config) - } + + if (job.algorithm.meta.rawcode && job.algorithm.meta.rawcode.length > 0) { + // we have the code, just write it + writeFileSync(fullAlgoPath, job.algorithm.meta.rawcode) } else { - // no files object, try to get information from documentId and serviceId - CORE_LOGGER.info( - 'algorithm file object seems to be missing, checking "serviceId" and "documentId"...' - ) - const { serviceId, documentId } = job.algorithm - // we can get it from this info - if (serviceId && documentId) { - const algoDdo = await new FindDdoHandler( - OceanNode.getInstance() - ).findAndFormatDdo(documentId) - console.log('algo ddo:', algoDdo) - // 1. Get the service - const service: Service = AssetUtils.getServiceById(algoDdo, serviceId) + // do we have a files object? + if (job.algorithm.fileObject) { + // is it unencrypted? + if (job.algorithm.fileObject.type) { + // we can get the storage directly + storage = Storage.getStorageClass(job.algorithm.fileObject, config) + } else { + // ok, maybe we have this encrypted instead + CORE_LOGGER.info( + 'algorithm file object seems to be encrypted, checking it...' + ) + // 1. Decrypt the files object + const decryptedFileObject = await decryptFilesObject(job.algorithm.fileObject) + console.log('decryptedFileObject: ', decryptedFileObject) + // 2. Get default storage settings + storage = Storage.getStorageClass(decryptedFileObject, config) + } + } else { + // no files object, try to get information from documentId and serviceId + CORE_LOGGER.info( + 'algorithm file object seems to be missing, checking "serviceId" and "documentId"...' + ) + const { serviceId, documentId } = job.algorithm + // we can get it from this info + if (serviceId && documentId) { + const algoDdo = await new FindDdoHandler( + OceanNode.getInstance() + ).findAndFormatDdo(documentId) + console.log('algo ddo:', algoDdo) + // 1. Get the service + const service: Service = AssetUtils.getServiceById(algoDdo, serviceId) - // 2. Decrypt the files object - const decryptedFileObject = await decryptFilesObject(service.files) - console.log('decryptedFileObject: ', decryptedFileObject) - // 4. Get default storage settings - storage = Storage.getStorageClass(decryptedFileObject, config) + // 2. Decrypt the files object + const decryptedFileObject = await decryptFilesObject(service.files) + console.log('decryptedFileObject: ', decryptedFileObject) + // 4. Get default storage settings + storage = Storage.getStorageClass(decryptedFileObject, config) + } } - } - if (storage) { - console.log('fullAlgoPath', fullAlgoPath) - await pipeline( - (await storage.getReadableStream()).stream, - createWriteStream(fullAlgoPath) - ) - } else { - CORE_LOGGER.info( - 'Could not extract any files object from the compute algorithm, skipping...' - ) + if (storage) { + console.log('fullAlgoPath', fullAlgoPath) + await pipeline( + (await storage.getReadableStream()).stream, + createWriteStream(fullAlgoPath) + ) + } else { + CORE_LOGGER.info( + 'Could not extract any files object from the compute algorithm, skipping...' + ) + } } } catch (e) { CORE_LOGGER.error( @@ -932,36 +1039,47 @@ export class C2DEngineDocker extends C2DEngine { const folderToTar = this.getC2DConfig().tempFolder + '/' + job.jobId + '/data' const destination = this.getC2DConfig().tempFolder + '/' + job.jobId + '/tarData/upload.tar.gz' - tar.create( - { - gzip: true, - file: destination, - sync: true, - C: folderToTar - }, - ['./'] - ) - // now, upload it to the container - const container = await this.docker.getContainer(job.jobId + '-algoritm') - - console.log('Start uploading') try { - // await container2.putArchive(destination, { - const stream = await container.putArchive(destination, { - path: '/data' - }) - console.log('PutArchive') - console.log(stream) + tar.create( + { + gzip: true, + file: destination, + sync: true, + C: folderToTar + }, + ['./'] + ) + // check if tar.gz actually exists + console.log('Start uploading') - console.log('Done uploading') - } catch (e) { - console.log('Data upload failed') - console.log(e) - return { - status: C2DStatusNumber.DataUploadFailed, - statusText: C2DStatusText.DataUploadFailed + if (existsSync(destination)) { + // now, upload it to the container + const container = await this.docker.getContainer(job.jobId + '-algoritm') + + try { + // await container2.putArchive(destination, { + const stream = await container.putArchive(destination, { + path: '/data' + }) + console.log('PutArchive') + console.log(stream) + + console.log('Done uploading') + } catch (e) { + console.log('Data upload failed') + console.log(e) + return { + status: C2DStatusNumber.DataUploadFailed, + statusText: C2DStatusText.DataUploadFailed + } + } + } else { + CORE_LOGGER.debug('No data to upload, empty tar.gz') } + } catch (e) { + CORE_LOGGER.debug(e.message) } + rmSync(this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/inputs', { recursive: true, force: true @@ -1026,135 +1144,6 @@ export class C2DEngineDocker extends C2DEngine { } // this uses the docker engine, but exposes only one env, the free one -export class C2DEngineDockerFree extends C2DEngineDocker { - public constructor(clusterConfig: C2DClusterInfo, db: C2DDatabase) { - // we remove envs, cause we have our own - const hash = create256Hash('free' + clusterConfig.hash) - const owerwrite = { - type: C2DClusterType.DOCKER, - hash, - connection: { - socketPath: clusterConfig.connection.socketPath, - protocol: clusterConfig.connection.protocol, - host: clusterConfig.connection.host, - port: clusterConfig.connection.port, - caPath: clusterConfig.connection.caPath, - certPath: clusterConfig.connection.certPath, - keyPath: clusterConfig.connection.keyPath, - freeComputeOptions: clusterConfig.connection.freeComputeOptions - }, - tempFolder: './c2d_storage/' + hash - } - super(owerwrite, db) - } - - // eslint-disable-next-line require-await - public override async getComputeEnvironments( - chainId?: number - ): Promise { - /** - * Returns all cluster's compute environments for a specific chainId. Env's id already contains the cluster hash - */ - // TO DO C2D - fill consts below - if (!this.docker) return [] - // const cpuType = '' - // const currentJobs = 0 - // const consumerAddress = '' - if (chainId) { - const config = await getConfiguration() - const supportedNetwork = config.supportedNetworks[chainId] - if (supportedNetwork) { - const blockchain = new Blockchain( - supportedNetwork.rpc, - supportedNetwork.network, - chainId, - supportedNetwork.fallbackRPCs - ) - - // write the consumer address (compute env address) - const consumerAddress = await blockchain.getWalletAddress() - const computeEnv: ComputeEnvironment = - this.getC2DConfig().connection?.freeComputeOptions - if ( - (computeEnv.fees && Object.hasOwn(computeEnv.fees, String(chainId))) || - computeEnv.chainId === chainId - ) { - // was: computeEnv.chainId === chainId - computeEnv.consumerAddress = consumerAddress - const envs: ComputeEnvironment[] = [computeEnv] - return envs - } - } - // no compute envs or network is not supported - CORE_LOGGER.error(`There are no free compute environments for network ${chainId}`) - return [] - } - // get them all - const envs: ComputeEnvironment[] = [ - this.getC2DConfig().connection?.freeComputeOptions - ] - return envs - } - - public override async startComputeJob( - assets: ComputeAsset[], - algorithm: ComputeAlgorithm, - output: ComputeOutput, - environment: string, - owner?: string, - validUntil?: number, - chainId?: number, - agreementId?: string - ): Promise { - // since it's a free job, we need to mangle some params - agreementId = create256Hash( - JSON.stringify({ - owner, - assets, - algorithm, - time: process.hrtime.bigint().toString() - }) - ) - chainId = 0 - const envs = await this.getComputeEnvironments() - if (envs.length < 1) { - // no free env ?? - throw new Error('No free env found') - } - validUntil = envs[0].maxJobDuration - return await super.startComputeJob( - assets, - algorithm, - output, - environment, - owner, - validUntil, - chainId, - agreementId - ) - } - - // eslint-disable-next-line require-await - public override async getComputeJobResult( - consumerAddress: string, - jobId: string, - index: number - ): Promise { - const result = await super.getComputeJobResult(consumerAddress, jobId, index) - if (result !== null) { - setTimeout(async () => { - const jobs: DBComputeJob[] = await this.db.getJob(jobId) - CORE_LOGGER.info( - 'Cleaning storage for free container, after retrieving results...' - ) - if (jobs.length === 1) { - this.cleanupExpiredStorage(jobs[0], true) // clean the storage, do not wait for it to expire - } - }, 5000) - } - return result - } -} export function getAlgorithmImage(algorithm: ComputeAlgorithm): string { if (!algorithm.meta || !algorithm.meta.container) { @@ -1166,7 +1155,7 @@ export function getAlgorithmImage(algorithm: ComputeAlgorithm): string { else if (algorithm.meta.container.tag) image = image + ':' + algorithm.meta.container.tag else image = image + ':latest' - console.log('Using image: ' + image) + // console.log('Using image: ' + image) return image } @@ -1182,34 +1171,3 @@ export function checkManifestPlatform( return false return true } -/** - * Helper function to build CPU constraints, also useful for testing purposes - * @param environment C2D environment - * @returns partial HostConfig object - */ -export async function buildCPUAndMemoryConstraints( - environment: ComputeEnvironment, - docker?: Dockerode -): Promise { - const hostConfig: HostConfig = {} - // CPU - const systemInfo = docker ? await docker.info() : null - const existingCPUs = systemInfo ? systemInfo.NCPU : os.cpus().length - const confCPUs = environment.maxCpu > 0 ? environment.maxCpu : 1 - // windows only - hostConfig.CpuCount = Math.min(confCPUs, existingCPUs) - // hostConfig.CpuShares = 1 / hostConfig.CpuCount - hostConfig.CpuPeriod = 100000 // 100 miliseconds is usually the default - hostConfig.CpuQuota = (1 / hostConfig.CpuCount) * hostConfig.CpuPeriod - // if more than 1 CPU, Limit the specific CPUs or cores a container can use. - if (hostConfig.CpuCount > 1) { - hostConfig.CpusetCpus = `0-${hostConfig.CpuCount - 1}` - } - // MEM - const existingMem = systemInfo ? systemInfo.MemTotal : os.totalmem() - const configuredRam = 0 || convertGigabytesToBytes(environment.maxRam) - hostConfig.Memory = 0 || Math.min(existingMem, configuredRam) - // set swap to same memory value means no swap (otherwise it use like 2X mem) - hostConfig.MemorySwap = hostConfig.Memory - return hostConfig -} diff --git a/src/components/c2d/compute_engine_opf_k8.ts b/src/components/c2d/compute_engine_opf_k8.ts index 194dbeb60..a07484601 100644 --- a/src/components/c2d/compute_engine_opf_k8.ts +++ b/src/components/c2d/compute_engine_opf_k8.ts @@ -29,7 +29,7 @@ import { Storage } from '../storage/index.js' export class C2DEngineOPFK8 extends C2DEngine { // eslint-disable-next-line no-useless-constructor public constructor(clusterConfig: C2DClusterInfo) { - super(clusterConfig) + super(clusterConfig, null) } public override async getComputeEnvironments( diff --git a/src/components/c2d/compute_engines.ts b/src/components/c2d/compute_engines.ts index a119adaa3..dab515a66 100644 --- a/src/components/c2d/compute_engines.ts +++ b/src/components/c2d/compute_engines.ts @@ -1,7 +1,7 @@ import { C2DClusterType, ComputeEnvironment } from '../../@types/C2D/C2D.js' import { C2DEngine } from './compute_engine_base.js' import { C2DEngineOPFK8 } from './compute_engine_opf_k8.js' -import { C2DEngineDocker, C2DEngineDockerFree } from './compute_engine_docker.js' +import { C2DEngineDocker } from './compute_engine_docker.js' import { OceanNodeConfig } from '../../@types/OceanNode.js' import { C2DDatabase } from '../database/C2DDatabase.js' export class C2DEngines { @@ -10,7 +10,9 @@ export class C2DEngines { public constructor(config: OceanNodeConfig, db: C2DDatabase) { // let's see what engines do we have and initialize them one by one // for docker, we need to add the "free" - let haveFree = false + + // TO DO - check if we have multiple config.c2dClusters with the same host + // if yes, do not create multiple engines if (config && config.c2dClusters) { this.engines = [] for (const cluster of config.c2dClusters) { @@ -19,10 +21,6 @@ export class C2DEngines { } if (cluster.type === C2DClusterType.DOCKER) { this.engines.push(new C2DEngineDocker(cluster, db)) - if (!haveFree) { - this.engines.push(new C2DEngineDockerFree(cluster, db)) - haveFree = true - } } } } diff --git a/src/components/c2d/index.ts b/src/components/c2d/index.ts index cc7e898b6..94f5b6180 100644 --- a/src/components/c2d/index.ts +++ b/src/components/c2d/index.ts @@ -1,45 +1,11 @@ -import { OceanNode } from '../../OceanNode.js' -import { getConfiguration } from '../../utils/config.js' -import { ComputeGetEnvironmentsHandler } from '../core/compute/index.js' -import { PROTOCOL_COMMANDS } from '../../utils/constants.js' -import { - deleteKeysFromObject, - sanitizeServiceFiles, - streamToObject -} from '../../utils/util.js' -import { Readable } from 'stream' +import { deleteKeysFromObject, sanitizeServiceFiles } from '../../utils/util.js' + import { decrypt } from '../../utils/crypt.js' import { BaseFileObject, EncryptMethod } from '../../@types/fileObject.js' import { CORE_LOGGER } from '../../utils/logging/common.js' import { ComputeJob, DBComputeJob } from '../../@types/index.js' export { C2DEngine } from './compute_engine_base.js' -export async function checkC2DEnvExists( - envId: string, - oceanNode: OceanNode -): Promise { - const config = await getConfiguration() - const { supportedNetworks } = config - for (const supportedNetwork of Object.keys(supportedNetworks)) { - const getEnvironmentsTask = { - command: PROTOCOL_COMMANDS.COMPUTE_GET_ENVIRONMENTS, - chainId: parseInt(supportedNetwork) - } - const response = await new ComputeGetEnvironmentsHandler(oceanNode).handle( - getEnvironmentsTask - ) - if (response.status.httpStatus === 200) { - const computeEnvironments = await streamToObject(response.stream as Readable) - for (const computeEnvironment of computeEnvironments[parseInt(supportedNetwork)]) { - if (computeEnvironment.id === envId) { - return true - } - } - } - } - return false -} - export async function decryptFilesObject( serviceFiles: any ): Promise { diff --git a/src/components/core/compute/environments.ts b/src/components/core/compute/environments.ts index a07047e45..a123b8eeb 100644 --- a/src/components/core/compute/environments.ts +++ b/src/components/core/compute/environments.ts @@ -1,10 +1,8 @@ import { Readable } from 'stream' import { P2PCommandResponse } from '../../../@types/index.js' -import { ComputeEnvByChain } from '../../../@types/C2D/C2D.js' import { CORE_LOGGER } from '../../../utils/logging/common.js' import { Handler } from '../handler/handler.js' import { ComputeGetEnvironmentsCommand } from '../../../@types/commands.js' -import { getConfiguration } from '../../../utils/config.js' import { ValidateParams, buildInvalidRequestMessage, @@ -27,14 +25,8 @@ export class ComputeGetEnvironmentsHandler extends Handler { return validationResponse } try { - const result: ComputeEnvByChain = {} const computeEngines = this.getOceanNode().getC2DEngines() - const config = await getConfiguration() - for (const chain of Object.keys(config.supportedNetworks)) { - const chainId = parseInt(chain) - if (task.chainId && task.chainId !== chainId) continue - result[chainId] = await computeEngines.fetchEnvironments(chainId) - } + const result = await computeEngines.fetchEnvironments(task.chainId) CORE_LOGGER.logMessage( 'ComputeGetEnvironmentsCommand Response: ' + JSON.stringify(result, null, 2), diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index 1b07a4882..134442827 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -146,7 +146,7 @@ export class ComputeInitializeHandler extends Handler { .getExactComputeEnv(task.compute.env, ddo.chainId) const validation: ValidateParams = await C2DEngineDocker.checkDockerImage( algoImage, - env.platform && env.platform.length > 0 ? env.platform[0] : null + env.platform ) if (!validation.valid) { return { diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index d1065a0dc..ed6a21f34 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -71,6 +71,33 @@ export class ComputeStartHandler extends Handler { } } } + + try { + const env = await engine.getComputeEnvironment(null, task.environment) + if (!env) { + return { + stream: null, + status: { + httpStatus: 500, + error: 'Invalid C2D Environment' + } + } + } + task.resources = await engine.checkAndFillMissingResources( + task.resources, + env, + false + ) + await engine.checkIfResourcesAreAvailable(task.resources, env, true) + } catch (e) { + return { + stream: null, + status: { + httpStatus: 400, + error: e + } + } + } const node = this.getOceanNode() const { algorithm } = task let foundValidCompute = null @@ -223,16 +250,6 @@ export class ComputeStartHandler extends Handler { result.chainId = ddo.chainId const env = await engine.getComputeEnvironment(ddo.chainId, task.environment) - if (env.free) { - const error = `Free Jobs cannot be started here, use startFreeCompute` - return { - stream: null, - status: { - httpStatus: 500, - error - } - } - } if (!('transferTxId' in elem) || !elem.transferTxId) { const error = `Missing transferTxId for DDO ${elem.documentId}` return { @@ -333,7 +350,8 @@ export class ComputeStartHandler extends Handler { task.consumerAddress, validUntil, chainId, - agreementId + agreementId, + task.resources ) CORE_LOGGER.logMessage( @@ -369,7 +387,8 @@ export class FreeComputeStartHandler extends Handler { 'datasets', 'consumerAddress', 'signature', - 'nonce' + 'nonce', + 'environment' ]) if (commandValidation.valid) { if (!isAddress(command.consumerAddress)) { @@ -386,32 +405,80 @@ export class FreeComputeStartHandler extends Handler { if (this.shouldDenyTaskHandling(validationResponse)) { return validationResponse } - let environment = null + let engine = null try { - // get all envs and see if we have a free one - const allEnvs = await this.getOceanNode().getC2DEngines().fetchEnvironments() - for (const env of allEnvs) { - if (env.free) { - environment = env + // split compute env (which is already in hash-envId format) and get the hash + // then get env which might contain dashes as well + const eIndex = task.environment.indexOf('-') + const hash = task.environment.slice(0, eIndex) + // const envId = task.environment.slice(eIndex + 1) + try { + engine = await this.getOceanNode().getC2DEngines().getC2DByHash(hash) + } catch (e) { + return { + stream: null, + status: { + httpStatus: 500, + error: 'Invalid C2D Environment' + } } } - if (!environment) + if (engine === null) { return { stream: null, status: { httpStatus: 500, - error: 'This node does not have a free compute env' + error: 'Invalid C2D Environment' + } + } + } + try { + const env = await engine.getComputeEnvironment(null, task.environment) + if (!env) { + return { + stream: null, + status: { + httpStatus: 500, + error: 'Invalid C2D Environment' + } + } + } + + task.resources = await engine.checkAndFillMissingResources( + task.resources, + env, + true + ) + await engine.checkIfResourcesAreAvailable(task.resources, env, true) + } catch (e) { + console.error(e) + return { + stream: null, + status: { + httpStatus: 400, + error: String(e) } } - const engine = await this.getOceanNode() - .getC2DEngines() - .getC2DByEnvId(environment.id) + } + // console.log(task.resources) + /* + return { + stream: null, + status: { + httpStatus: 200, + error: null + } + } */ const response = await engine.startComputeJob( task.datasets, task.algorithm, task.output, - environment.id, - task.consumerAddress + task.environment, + task.consumerAddress, + null, + null, + null, + task.resources ) CORE_LOGGER.logMessage( diff --git a/src/components/core/utils/feesHandler.ts b/src/components/core/utils/feesHandler.ts index dc03a4ce2..ade205587 100644 --- a/src/components/core/utils/feesHandler.ts +++ b/src/components/core/utils/feesHandler.ts @@ -1,8 +1,7 @@ import type { ComputeEnvFees, ComputeEnvironment, - ComputeResourcesPricingInfo, - ComputeResourceType + ComputeResourcesPricingInfo } from '../../../@types/C2D/C2D.js' import { JsonRpcApiProvider, @@ -33,12 +32,12 @@ import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/template import { fetchEventFromTransaction } from '../../../utils/util.js' import { fetchTransactionReceipt } from './validateOrders.js' -export function getEnvironmentPriceSchemaForType( +export function getEnvironmentPriceSchemaForResource( prices: ComputeResourcesPricingInfo[], - type: ComputeResourceType + id: string ): number { for (const pr of prices) { - if (pr.type === type) { + if (pr.id === id) { return pr.price } } @@ -57,14 +56,14 @@ async function calculateProviderFeeAmount( if (computeEnv) { if (computeEnv.fees) { // get the fess for the asset chain - const feesForChain: ComputeEnvFees = computeEnv.fees[chainId] + const feesForChain: ComputeEnvFees = computeEnv.fees[chainId][0] if (feesForChain && feesForChain.prices.length > 0) { const price = // TODO: check this again // try to get the price from the SUM of the available types; 'cpu', 'memory' or 'storage' - getEnvironmentPriceSchemaForType(feesForChain.prices, 'cpu') + - getEnvironmentPriceSchemaForType(feesForChain.prices, 'memory') + - getEnvironmentPriceSchemaForType(feesForChain.prices, 'storage') + getEnvironmentPriceSchemaForResource(feesForChain.prices, 'cpu') + + getEnvironmentPriceSchemaForResource(feesForChain.prices, 'memory') + + getEnvironmentPriceSchemaForResource(feesForChain.prices, 'storage') // it's a compute provider fee providerFeeAmount = (seconds * parseFloat(String(price || 0))) / 60 // was: (seconds * parseFloat(String(computeEnv.priceMin))) / 60 } @@ -99,7 +98,7 @@ export async function createProviderFee( const providerFeeAddress: string = providerWallet.address let providerFeeAmount: number let providerFeeAmountFormatted: BigNumberish - + // TO DO - this will be overwritten with new escrow anyay let providerFeeToken: string if ( computeEnv && @@ -107,7 +106,7 @@ export async function createProviderFee( Object.hasOwn(computeEnv.fees, String(asset.chainId)) ) { // was: if (computeEnv) - providerFeeToken = computeEnv.fees[asset.chainId].feeToken // was: computeEnv.feeToken + providerFeeToken = computeEnv.fees[asset.chainId][0].feeToken // was: computeEnv.feeToken } else { // it's download, take it from config providerFeeToken = await getProviderFeeToken(asset.chainId) diff --git a/src/components/database/sqliteCompute.ts b/src/components/database/sqliteCompute.ts index f92f7a0f9..4095998ef 100644 --- a/src/components/database/sqliteCompute.ts +++ b/src/components/database/sqliteCompute.ts @@ -33,7 +33,9 @@ function getInternalStructure(job: DBComputeJob): any { assets: job.assets, isRunning: job.isRunning, isStarted: job.isStarted, - containerImage: job.containerImage + containerImage: job.containerImage, + resources: job.resources, + isFree: job.isFree } return internalBlob } diff --git a/src/components/httpRoutes/compute.ts b/src/components/httpRoutes/compute.ts index 76c95d64c..9569c8109 100644 --- a/src/components/httpRoutes/compute.ts +++ b/src/components/httpRoutes/compute.ts @@ -12,7 +12,8 @@ import { import type { ComputeAlgorithm, ComputeAsset, - ComputeOutput + ComputeOutput, + ComputeResourceRequest } from '../../@types/C2D/C2D.js' import type { ComputeStartCommand, @@ -28,29 +29,9 @@ import { PROTOCOL_COMMANDS, SERVICES_API_BASE_PATH } from '../../utils/constants import { Readable } from 'stream' import { HTTP_LOGGER } from '../../utils/logging/common.js' import { LOG_LEVELS_STR } from '../../utils/logging/Logger.js' -import { getConfiguration } from '../../utils/index.js' export const computeRoutes = express.Router() -async function areEmpty(computeEnvs: any, requestChainId?: any): Promise { - if (requestChainId) { - return computeEnvs[parseInt(requestChainId)].length === 0 - } else { - const config = await getConfiguration() - let isEmpty: number = 0 - const supportedNetworks = Object.keys(config.supportedNetworks) - for (const supportedNetwork of supportedNetworks) { - if (computeEnvs[supportedNetwork].length === 0) { - isEmpty++ - } - } - if (isEmpty === supportedNetworks.length) { - return true - } - return false - } -} - computeRoutes.get(`${SERVICES_API_BASE_PATH}/computeEnvironments`, async (req, res) => { try { HTTP_LOGGER.logMessage( @@ -67,16 +48,8 @@ computeRoutes.get(`${SERVICES_API_BASE_PATH}/computeEnvironments`, async (req, r ) // get compute environments const computeEnvironments = await streamToObject(response.stream as Readable) - // check if computeEnvironments is a valid json object and not empty - if ( - computeEnvironments && - !(await areEmpty(computeEnvironments, req.query.chainId)) - ) { - res.json(computeEnvironments) - } else { - HTTP_LOGGER.logMessage(`Compute environments not found`, true) - res.status(404).send('Compute environments not found') - } + // always return the array, even if it's empty + res.json(computeEnvironments) } catch (error) { HTTP_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error: ${error}`) res.status(500).send('Internal Server Error') @@ -99,7 +72,8 @@ computeRoutes.post(`${SERVICES_API_BASE_PATH}/compute`, async (req, res) => { nonce: (req.body.nonce as string) || null, environment: (req.body.environment as string) || null, algorithm: (req.body.algorithm as ComputeAlgorithm) || null, - datasets: (req.body.datasets as unknown as ComputeAsset[]) || null + datasets: (req.body.datasets as unknown as ComputeAsset[]) || null, + resources: (req.body.resources as unknown as ComputeResourceRequest[]) || null } if (req.body.output) { startComputeTask.output = req.body.output as ComputeOutput @@ -135,8 +109,10 @@ computeRoutes.post(`${SERVICES_API_BASE_PATH}/freeCompute`, async (req, res) => consumerAddress: (req.body.consumerAddress as string) || null, signature: (req.body.signature as string) || null, nonce: (req.body.nonce as string) || null, + environment: (req.body.environment as string) || null, algorithm: (req.body.algorithm as ComputeAlgorithm) || null, - datasets: (req.body.datasets as unknown as ComputeAsset[]) || null + datasets: (req.body.datasets as unknown as ComputeAsset[]) || null, + resources: (req.body.resources as unknown as ComputeResourceRequest[]) || null } if (req.body.output) { startComputeTask.output = req.body.output as ComputeOutput diff --git a/src/test/data/assets.ts b/src/test/data/assets.ts index 632394f5f..caf55bc03 100644 --- a/src/test/data/assets.ts +++ b/src/test/data/assets.ts @@ -253,6 +253,7 @@ export const algoAsset = { files: [ { type: 'url', + method: 'GET', url: 'https://raw.githubusercontent.com/oceanprotocol/test-algorithm/master/javascript/algo.js', contentType: 'text/js', encoding: 'UTF-8' diff --git a/src/test/data/commands.ts b/src/test/data/commands.ts index 3b75a2225..b9a0156da 100644 --- a/src/test/data/commands.ts +++ b/src/test/data/commands.ts @@ -1,6 +1,7 @@ export const freeComputeStartPayload = { command: 'freeStartCompute', consumerAddress: '0xC7EC1970B09224B317c52d92f37F5e1E4fF6B687', + environment: '', nonce: '1', signature: '0x123', datasets: [ diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 393b65e58..b1120a93d 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -1,22 +1,22 @@ import { expect, assert } from 'chai' import { ComputeGetEnvironmentsHandler, - ComputeStartHandler, + // ComputeStartHandler, ComputeStopHandler, ComputeGetStatusHandler, - ComputeInitializeHandler, + // ComputeInitializeHandler, FreeComputeStartHandler } from '../../components/core/compute/index.js' import type { ComputeStartCommand, ComputeStopCommand, ComputeGetStatusCommand, - ComputeInitializeCommand, + // ComputeInitializeCommand, FreeComputeStartCommand } from '../../@types/commands.js' import type { - ComputeAsset, - ComputeAlgorithm, + // ComputeAsset, + // ComputeAlgorithm, ComputeEnvironment } from '../../@types/C2D/C2D.js' import { @@ -42,7 +42,8 @@ import { Signer, ZeroAddress } from 'ethers' -import { publishAsset, orderAsset } from '../utils/assets.js' +// import { publishAsset, orderAsset } from '../utils/assets.js' +import { publishAsset } from '../utils/assets.js' import { computeAsset, algoAsset } from '../data/assets.js' import { RPCS } from '../../@types/blockchain.js' import { @@ -55,7 +56,7 @@ import { tearDownEnvironment } from '../utils/utils.js' -import { ProviderFees } from '../../@types/Fees.js' +// import { ProviderFees } from '../../@types/Fees.js' import { homedir } from 'os' import { publishAlgoDDO, publishDatasetDDO } from '../data/ddo.js' import { DEVELOPMENT_CHAIN_ID, getOceanArtifactsAdresses } from '../../utils/address.js' @@ -64,7 +65,6 @@ import ERC721Template from '@oceanprotocol/contracts/artifacts/contracts/templat import { createHash } from 'crypto' import { encrypt } from '../../utils/crypt.js' import { EncryptMethod } from '../../@types/fileObject.js' -import { checkC2DEnvExists } from '../../components/c2d/index.js' import { getAlgoChecksums, validateAlgoForDataset @@ -79,18 +79,18 @@ describe('Compute', () => { let oceanNode: OceanNode let provider: any let publisherAccount: any - let consumerAccount: any + // let consumerAccount: any let computeEnvironments: any let publishedComputeDataset: any let publishedAlgoDataset: any let jobId: string let datasetOrderTxId: any let algoOrderTxId: any - let providerFeesComputeDataset: ProviderFees - let providerFeesComputeAlgo: ProviderFees + // let providerFeesComputeDataset: ProviderFees + // let providerFeesComputeAlgo: ProviderFees let indexer: OceanIndexer - const now = new Date().getTime() / 1000 - const computeJobValidUntil = now + 60 * 15 // 15 minutes from now should be enough + // const now = new Date().getTime() / 1000 + // const computeJobValidUntil = now + 60 * 15 // 15 minutes from now should be enough let firstEnv: ComputeEnvironment const wallet = new ethers.Wallet( @@ -98,7 +98,7 @@ describe('Compute', () => { ) // const chainId = DEVELOPMENT_CHAIN_ID const mockSupportedNetworks: RPCS = getMockSupportedNetworks() - const chainId = 8996 + const chainId = DEVELOPMENT_CHAIN_ID // randomly use a set of trusted algos or empty arrays // should validate if set and match, invalidate otherwise const setTrustedAlgosEmpty: boolean = Math.random() <= 0.5 @@ -118,19 +118,17 @@ describe('Compute', () => { ENVIRONMENT_VARIABLES.PRIVATE_KEY, ENVIRONMENT_VARIABLES.AUTHORIZED_DECRYPTERS, ENVIRONMENT_VARIABLES.ADDRESS_FILE, - // ENVIRONMENT_VARIABLES.OPERATOR_SERVICE_URL, - ENVIRONMENT_VARIABLES.DOCKER_SOCKET_PATH - // ENVIRONMENT_VARIABLES.DB_TYPE + ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS ], [ JSON.stringify(mockSupportedNetworks), - JSON.stringify([8996]), + JSON.stringify([DEVELOPMENT_CHAIN_ID]), '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', JSON.stringify(['0xe2DD09d719Da89e5a3D0F2549c7E24566e947260']), `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, - // JSON.stringify(['http://localhost:31000']), - '/var/run/docker.sock' - // DB_TYPES.ELASTIC_SEARCH + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"' + + DEVELOPMENT_CHAIN_ID + + '":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' ] ) ) @@ -143,7 +141,7 @@ describe('Compute', () => { provider = new JsonRpcProvider('http://127.0.0.1:8545') publisherAccount = (await provider.getSigner(0)) as Signer - consumerAccount = (await provider.getSigner(1)) as Signer + // consumerAccount = (await provider.getSigner(1)) as Signer const artifactsAddresses = getOceanArtifactsAdresses() publisherAddress = await publisherAccount.getAddress() @@ -259,28 +257,25 @@ describe('Compute', () => { computeEnvironments = await streamToObject(response.stream as Readable) console.log('existing envs: ', computeEnvironments) // expect 1 OR + envs (1 if only docker free env is available) - assert(computeEnvironments[DEVELOPMENT_CHAIN_ID].length >= 1, 'incorrect length') - for (const computeEnvironment of computeEnvironments[DEVELOPMENT_CHAIN_ID]) { + assert(computeEnvironments.length >= 1, 'Not enough compute envs') + for (const computeEnvironment of computeEnvironments) { assert(computeEnvironment.id, 'id missing in computeEnvironments') + assert(computeEnvironment.fees, 'fees missing in computeEnvironments') assert( computeEnvironment.consumerAddress, 'consumerAddress missing in computeEnvironments' ) assert(computeEnvironment.id.startsWith('0x'), 'id should start with 0x') - - // new structure - assert(computeEnvironment.totalCpu > 0, 'totalCpu missing in computeEnvironments') - assert(computeEnvironment.totalRam > 0, 'totalRam missing in computeEnvironments') - assert(computeEnvironment.maxDisk > 0, 'maxDisk missing in computeEnvironments') + assert(computeEnvironment.resources.length > 2, 'Missing resources') assert( computeEnvironment.maxJobDuration > 0, 'maxJobDuration missing in computeEnvironments' ) } - firstEnv = computeEnvironments[DEVELOPMENT_CHAIN_ID][0] + firstEnv = computeEnvironments[0] }) - + /* it('Initialize compute without transaction IDs', async () => { const dataset: ComputeAsset = { documentId: publishedComputeDataset.ddo.id, @@ -297,7 +292,7 @@ describe('Compute', () => { getEnvironmentsTask ) computeEnvironments = await streamToObject(response.stream as Readable) - firstEnv = computeEnvironments[DEVELOPMENT_CHAIN_ID][0] + firstEnv = computeEnvironments[0] const initializeComputeTask: ComputeInitializeCommand = { datasets: [dataset], @@ -312,12 +307,14 @@ describe('Compute', () => { const resp = await new ComputeInitializeHandler(oceanNode).handle( initializeComputeTask ) + console.log(resp) assert(resp, 'Failed to get response') assert(resp.status.httpStatus === 200, 'Failed to get 200 response') assert(resp.stream, 'Failed to get stream') expect(resp.stream).to.be.instanceOf(Readable) const result: any = await streamToObject(resp.stream as Readable) + console.log(result) assert(result.algorithm, 'algorithm does not exist') expect(result.algorithm.datatoken?.toLowerCase()).to.be.equal( publishedAlgoDataset.datatokenAddress?.toLowerCase() @@ -579,8 +576,6 @@ describe('Compute', () => { it('should start a compute job', async () => { // first need to check the existing envs // If only FREE envs than start a free compute job instead of a regular/payed one - const hasOnlyFreeEnv = - computeEnvironments[DEVELOPMENT_CHAIN_ID].length === 1 && firstEnv.free const nonce = Date.now().toString() const message = String(nonce) @@ -592,9 +587,7 @@ describe('Compute', () => { const messageHashBytes = ethers.toBeArray(consumerMessage) const signature = await wallet.signMessage(messageHashBytes) const startComputeTask: ComputeStartCommand = { - command: hasOnlyFreeEnv - ? PROTOCOL_COMMANDS.FREE_COMPUTE_START - : PROTOCOL_COMMANDS.COMPUTE_START, + command: PROTOCOL_COMMANDS.COMPUTE_START, consumerAddress: await wallet.getAddress(), signature, nonce, @@ -616,9 +609,7 @@ describe('Compute', () => { // additionalDatasets?: ComputeAsset[] // output?: ComputeOutput } - const response = hasOnlyFreeEnv - ? await new FreeComputeStartHandler(oceanNode).handle(startComputeTask) - : await new ComputeStartHandler(oceanNode).handle(startComputeTask) + const response = await new ComputeStartHandler(oceanNode).handle(startComputeTask) assert(response, 'Failed to get response') assert(response.status.httpStatus === 200, 'Failed to get 200 response') assert(response.stream, 'Failed to get stream') @@ -628,7 +619,7 @@ describe('Compute', () => { // eslint-disable-next-line prefer-destructuring jobId = jobs[0].jobId }) - + */ it('should start a free docker compute job', async () => { const nonce = Date.now().toString() const message = String(nonce) @@ -671,32 +662,9 @@ describe('Compute', () => { expect(response.stream).to.be.instanceOf(Readable) const jobs = await streamToObject(response.stream as Readable) - // eslint-disable-next-line prefer-destructuring assert(jobs[0].jobId, 'failed to got job id') - }) - - it('should stop a compute job', async () => { - const nonce = Date.now().toString() - const message = String(nonce) - // sign message/nonce - const consumerMessage = ethers.solidityPackedKeccak256( - ['bytes'], - [ethers.hexlify(ethers.toUtf8Bytes(message))] - ) - const messageHashBytes = ethers.toBeArray(consumerMessage) - const signature = await wallet.signMessage(messageHashBytes) - const stopComputeTask: ComputeStopCommand = { - command: PROTOCOL_COMMANDS.COMPUTE_STOP, - consumerAddress: await wallet.getAddress(), - signature, - nonce, - jobId - } - const response = await new ComputeStopHandler(oceanNode).handle(stopComputeTask) - assert(response, 'Failed to get response') - assert(response.status.httpStatus === 200, 'Failed to get 200 response') - assert(response.stream, 'Failed to get stream') - expect(response.stream).to.be.instanceOf(Readable) + // eslint-disable-next-line prefer-destructuring + jobId = jobs[0].jobId }) it('should get job status by jobId', async () => { @@ -734,8 +702,31 @@ describe('Compute', () => { const jobs = await streamToObject(response.stream as Readable) console.log(jobs) }) - + it('should stop a compute job', async () => { + const nonce = Date.now().toString() + const message = String(nonce) + // sign message/nonce + const consumerMessage = ethers.solidityPackedKeccak256( + ['bytes'], + [ethers.hexlify(ethers.toUtf8Bytes(message))] + ) + const messageHashBytes = ethers.toBeArray(consumerMessage) + const signature = await wallet.signMessage(messageHashBytes) + const stopComputeTask: ComputeStopCommand = { + command: PROTOCOL_COMMANDS.COMPUTE_STOP, + consumerAddress: await wallet.getAddress(), + signature, + nonce, + jobId + } + const response = await new ComputeStopHandler(oceanNode).handle(stopComputeTask) + assert(response, 'Failed to get response') + assert(response.status.httpStatus === 200, 'Failed to get 200 response') + assert(response.stream, 'Failed to get stream') + expect(response.stream).to.be.instanceOf(Readable) + }) it('should deny the Free job due to bad container image (directCommand payload)', async function () { + freeComputeStartPayload.environment = firstEnv.id const command: FreeComputeStartCommand = freeComputeStartPayload const handler = new FreeComputeStartHandler(oceanNode) const response = await handler.handle(command) @@ -749,12 +740,6 @@ describe('Compute', () => { ) }) - it('should checkC2DEnvExists', async () => { - const envId = '0x123' - const result = await checkC2DEnvExists(envId, oceanNode) - expect(result).to.equal(false) - }) - // algo and checksums related describe('C2D algo and checksums related', () => { it('should publish AlgoDDO', async () => { diff --git a/src/test/unit/compute.test.ts b/src/test/unit/compute.test.ts index 40dcfa0cc..83f209017 100644 --- a/src/test/unit/compute.test.ts +++ b/src/test/unit/compute.test.ts @@ -1,12 +1,13 @@ import { C2DDatabase } from '../../components/database/C2DDatabase.js' -import { existsEnvironmentVariable, getConfiguration } from '../../utils/config.js' +// import { existsEnvironmentVariable, getConfiguration } from '../../utils/config.js' +import { getConfiguration } from '../../utils/config.js' import { typesenseSchemas } from '../../components/database/TypesenseSchemas.js' import { C2DStatusNumber, C2DStatusText, ComputeAlgorithm, ComputeAsset, - ComputeEnvironment, + // ComputeEnvironment, ComputeJob, DBComputeJob, RunningPlatform @@ -29,12 +30,7 @@ import { ENVIRONMENT_VARIABLES } from '../../utils/constants.js' import { completeDBComputeJob, dockerImageManifest } from '../data/assets.js' import { omitDBComputeFieldsFromComputeJob } from '../../components/c2d/index.js' import os from 'os' -import Dockerode from 'dockerode' -import { - buildCPUAndMemoryConstraints, - checkManifestPlatform -} from '../../components/c2d/compute_engine_docker.js' -import type { HostConfig } from 'dockerode' +import { checkManifestPlatform } from '../../components/c2d/compute_engine_docker.js' describe('Compute Jobs Database', () => { let envOverrides: OverrideEnvConfig[] @@ -52,29 +48,16 @@ describe('Compute Jobs Database', () => { } before(async () => { envOverrides = buildEnvOverrideConfig( - [ENVIRONMENT_VARIABLES.DOCKER_SOCKET_PATH], - ['/var/lib/docker'] + [ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS], + [ + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":1000000000}],"storageExpiry":604800,"maxJobDuration":3600,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1000000000},{"id":"disk","max":1000000000}]}}]' + ] ) envOverrides = await setupEnvironment(null, envOverrides) config = await getConfiguration(true) db = await new C2DDatabase(config.dbConfig, typesenseSchemas.c2dSchemas) }) - it('should have at least a free docker compute environment', () => { - let size = 1 - if (existsEnvironmentVariable(ENVIRONMENT_VARIABLES.OPERATOR_SERVICE_URL, false)) { - expect(config.c2dClusters.length).to.be.at.least(2) - size = 2 - } else { - expect(config.c2dClusters.length).to.be.at.least(1) - } - const dockerConfig = config.c2dClusters[size - 1].connection - const freeEnv: ComputeEnvironment = dockerConfig.freeComputeOptions - expect(freeEnv.description).to.be.equal('Free') - expect(freeEnv.free).to.be.equal(true) - expect(freeEnv.id).to.be.equal(config.c2dClusters[size - 1].hash + '-free') - }) - it('should create a new C2D Job', async () => { const job: DBComputeJob = { owner: '0xe2DD09d719Da89e5a3D0F2549c7E24566e947260', @@ -98,7 +81,9 @@ describe('Compute Jobs Database', () => { assets: [dataset], isRunning: false, isStarted: false, - containerImage: 'some container image' + containerImage: 'some container image', + resources: [], + isFree: false } jobId = await db.newJob(job) @@ -154,7 +139,9 @@ describe('Compute Jobs Database', () => { assets: [dataset], isRunning: false, isStarted: false, - containerImage: 'another container image' + containerImage: 'another container image', + resources: [], + isFree: false } const jobId = await db.newJob(job) @@ -236,20 +223,11 @@ describe('Compute Jobs Database', () => { expect(checkManifestPlatform(null, env)).to.be.equal(true) }) - it('should check cpu constraints on c2d docker env', async function () { - const size = config.c2dClusters.length - const dockerConfig = config.c2dClusters[size - 1].connection - const freeEnv: ComputeEnvironment = dockerConfig.freeComputeOptions - const cpus = os.cpus() - freeEnv.maxCpu = cpus.length + 1 // should be capped to cpus.length - const docker = new Dockerode({ socketPath: '/var/run/docker.sock' }) - let hostConfig: HostConfig = await buildCPUAndMemoryConstraints(freeEnv, docker) - expect(hostConfig.CpuCount).to.be.equal(cpus.length) - freeEnv.maxCpu = -1 - hostConfig = await buildCPUAndMemoryConstraints(freeEnv) - expect(hostConfig.CpuCount).to.be.equal(1) - const ram = os.totalmem() - expect(hostConfig.Memory).to.be.lessThanOrEqual(ram) + it('testing checkAndFillMissingResources', async function () { + // TO DO + }) + it('testing checkIfResourcesAreAvailable', async function () { + // TO DO }) after(async () => { diff --git a/src/utils/config.ts b/src/utils/config.ts index 39c2ed6aa..d0235dc85 100644 --- a/src/utils/config.ts +++ b/src/utils/config.ts @@ -1,11 +1,6 @@ import type { DenyList, OceanNodeConfig, OceanNodeKeys } from '../@types/OceanNode' import { dhtFilterMethod } from '../@types/OceanNode.js' -import type { - C2DClusterInfo, - ComputeEnvironment, - C2DDockerConfig, - ComputeEnvironmentBaseConfig -} from '../@types/C2D/C2D.js' +import type { C2DClusterInfo, C2DDockerConfig } from '../@types/C2D/C2D.js' import { C2DClusterType } from '../@types/C2D/C2D.js' import { createFromPrivKey } from '@libp2p/peer-id-factory' import { keys } from '@libp2p/crypto' @@ -24,7 +19,7 @@ import { import { LOG_LEVELS_STR, GENERIC_EMOJIS, getLoggerLevelEmoji } from './logging/Logger.js' import { RPCS } from '../@types/blockchain' -import { getAddress, Wallet, ZeroAddress } from 'ethers' +import { getAddress, Wallet } from 'ethers' import { FeeAmount, FeeStrategy, FeeTokens } from '../@types/Fees' import { getOceanArtifactsAdresses, @@ -32,8 +27,7 @@ import { } from '../utils/address.js' import { CONFIG_LOGGER } from './logging/common.js' import { create256Hash } from './crypt.js' -import { convertGigabytesToBytes, isDefined } from './util.js' -import os from 'os' +import { isDefined } from './util.js' import { fileURLToPath } from 'url' import path from 'path' @@ -359,96 +353,23 @@ function getC2DClusterEnvironment(isStartup?: boolean): C2DClusterInfo[] { ) } } - // docker clusters - const dockerConfig: C2DDockerConfig = { - socketPath: getEnvValue(process.env.DOCKER_SOCKET_PATH, null), - protocol: getEnvValue(process.env.DOCKER_PROTOCOL, null), - host: getEnvValue(process.env.DOCKER_HOST, null), - port: getIntEnvValue(process.env.DOCKER_PORT, 0), - caPath: getEnvValue(process.env.DOCKER_CA_PATH, null), - certPath: getEnvValue(process.env.DOCKER_CERT_PATH, null), - keyPath: getEnvValue(process.env.DOCKER_KEY_PATH, null), - environments: getDockerComputeEnvironments(isStartup) - } - - if (dockerConfig.socketPath || dockerConfig.host) { - const hash = create256Hash(JSON.stringify(dockerConfig)) - // get env values - dockerConfig.freeComputeOptions = getDockerFreeComputeOptions(hash, isStartup) - clusters.push({ - connection: dockerConfig, - hash, - type: C2DClusterType.DOCKER, - tempFolder: './c2d_storage/' + hash - }) + const dockerC2Ds = getDockerComputeEnvironments(isStartup) + for (const dockerC2d of dockerC2Ds) { + if (dockerC2d.socketPath || dockerC2d.host) { + const hash = create256Hash(JSON.stringify(dockerC2d)) + // get env values + clusters.push({ + connection: dockerC2d, + hash, + type: C2DClusterType.DOCKER, + tempFolder: './c2d_storage/' + hash + }) + } } return clusters } -// TODO C2D v2.0 -// eslint-disable-next-line no-unused-vars -function getDockerFreeComputeOptions( - clusterHash: string, - isStartup?: boolean -): ComputeEnvironment { - const defaultOptions: ComputeEnvironment = { - id: `${clusterHash}-free`, - maxCpu: 1, - // cpuType: '', - // gpuNumber: 0, - totalRam: convertGigabytesToBytes(os.totalmem()), - totalCpu: os.cpus().length, - maxRam: 1, // 1GB - maxDisk: 1, // 1GB - description: 'Free', - currentJobs: 0, - // maxJobs: 1, - consumerAddress: '', - storageExpiry: 600, - maxJobDuration: 600, // 10 minutes - // feeToken: ZeroAddress, - // chainId: 8996, - fees: { - 8996: { - feeToken: ZeroAddress, - prices: [ - { type: 'cpu', price: 0 }, - { type: 'memory', price: 0 }, - { type: 'storage', price: 0 } - ] - } - }, - free: true, - platform: [{ architecture: os.machine(), os: os.platform() }] - } - - if (existsEnvironmentVariable(ENVIRONMENT_VARIABLES.DOCKER_FREE_COMPUTE, isStartup)) { - try { - const options: ComputeEnvironmentBaseConfig = JSON.parse( - process.env.DOCKER_FREE_COMPUTE - ) as ComputeEnvironmentBaseConfig - doComputeEnvChecks([options]) - const env = { ...options } as ComputeEnvironment - env.platform = [{ architecture: os.machine(), os: os.platform() }] - return env - } catch (error) { - CONFIG_LOGGER.logMessageWithEmoji( - `Invalid "${ENVIRONMENT_VARIABLES.DOCKER_FREE_COMPUTE.name}" env variable => ${process.env.DOCKER_FREE_COMPUTE}...`, - true, - GENERIC_EMOJIS.EMOJI_CROSS_MARK, - LOG_LEVELS_STR.LEVEL_ERROR - ) - } - } else if (isStartup) { - // avoid logging to much times - CONFIG_LOGGER.warn( - `No options for ${ENVIRONMENT_VARIABLES.DOCKER_FREE_COMPUTE.name} were specified, using defaults.` - ) - } - return defaultOptions -} - /** * Reads a partial ComputeEnvironment setting (array of) * @param isStartup for logging purposes @@ -468,7 +389,8 @@ function getDockerFreeComputeOptions( "priceMin": 1 }, */ -function getDockerComputeEnvironments(isStartup?: boolean): ComputeEnvironment[] { +function getDockerComputeEnvironments(isStartup?: boolean): C2DDockerConfig[] { + const dockerC2Ds: C2DDockerConfig[] = [] if ( existsEnvironmentVariable( ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS, @@ -476,15 +398,41 @@ function getDockerComputeEnvironments(isStartup?: boolean): ComputeEnvironment[] ) ) { try { - const options: ComputeEnvironmentBaseConfig[] = JSON.parse( + const configs: C2DDockerConfig[] = JSON.parse( process.env.DOCKER_COMPUTE_ENVIRONMENTS - ) as ComputeEnvironmentBaseConfig[] - doComputeEnvChecks(options) - const envs = { ...options } as ComputeEnvironment[] - envs.forEach((env) => { - env.platform = [{ architecture: os.machine(), os: os.platform() }] - }) - return envs + ) as C2DDockerConfig[] + + for (const config of configs) { + let errors = '' + if (!isDefined(config.fees)) { + errors += ' There is no fees configuration!' + } + + if (config.storageExpiry < config.maxJobDuration) { + errors += ' "storageExpiry" should be greater than "maxJobDuration"! ' + } + // for docker there is no way of getting storage space + let foundDisk = false + if ('resources' in config) { + for (const resource of config.resources) { + if (resource.id === 'disk' && resource.total) foundDisk = true + } + } + if (!foundDisk) { + errors += ' There is no "disk" resource configured.This is mandatory ' + } + if (errors.length > 1) { + CONFIG_LOGGER.error( + 'Please check your compute env settings: ' + + errors + + 'for env: ' + + JSON.stringify(config) + ) + } else { + dockerC2Ds.push(config) + } + } + return dockerC2Ds } catch (error) { CONFIG_LOGGER.logMessageWithEmoji( `Invalid "${ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS.name}" env variable => ${process.env.DOCKER_COMPUTE_ENVIRONMENTS}...`, @@ -492,42 +440,14 @@ function getDockerComputeEnvironments(isStartup?: boolean): ComputeEnvironment[] GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) + console.log(error) } } else if (isStartup) { CONFIG_LOGGER.warn( `No options for ${ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS.name} were specified.` ) } - return null -} - -function doComputeEnvChecks(configEnv: ComputeEnvironmentBaseConfig[]): boolean { - for (const config of configEnv) { - if (!isDefined(config.fees)) { - CONFIG_LOGGER.error( - 'Please check your compute env settings: There is no fees configuration!' - ) - return false - } - // if (config.fees && !isDefined(config.fees)) { - // CONFIG_LOGGER.error( - // "Please check your compute env settings: We have a fee token but we don't have a price!" - // ) - // return false - // } - // was: if (isDefined(config.pricePerCpu) && !isDefined(config.fees)) { - // CONFIG_LOGGER.error( - // "Please check your compute env settings: We have a price but we don't have a fee token!" - // ) - // return false - // } - if (config.storageExpiry < config.maxJobDuration) { - CONFIG_LOGGER.error( - 'Please check your compute env settings: "storageExpiry" should be greater than "maxJobDuration"!' - ) - return false - } - } + return [] } // connect interfaces (p2p or/and http) diff --git a/src/utils/constants.ts b/src/utils/constants.ts index 73dd27437..2f2ee7a75 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -343,11 +343,6 @@ export const ENVIRONMENT_VARIABLES: Record = { value: process.env.DOCKER_COMPUTE_ENVIRONMENTS, required: false }, - DOCKER_FREE_COMPUTE: { - name: 'DOCKER_FREE_COMPUTE', - value: process.env.DOCKER_FREE_COMPUTE, - required: false - }, DOCKER_SOCKET_PATH: { name: 'DOCKER_SOCKET_PATH', value: process.env.DOCKER_SOCKET_PATH,