Skip to content

Commit

Permalink
Merge pull request #6 from converged-computing/bug-missing-image
Browse files Browse the repository at this point in the history
fix: bug that image is not generated
  • Loading branch information
vsoch authored Mar 18, 2024
2 parents 8bec25d + ba90c6e commit 6f25ba1
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 125 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ build-ppc: $(LOCALBIN)

.PHONY: worker
worker: build
$(LOCALBIN)/fractal worker --metrics --quiet
$(LOCALBIN)/fractal worker --quiet

.PHONY: leader
leader: build
Expand Down
3 changes: 0 additions & 3 deletions api/v1/node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ service NodeService {
rpc ReportStatus(Request) returns (Response){};
rpc ReportResult(WorkResponse) returns (Response){};

// Functions to request the worker node to print metrics
rpc RequestMetrics(Request) returns (Response){};

// We are streaming the work request to the workers
rpc AssignTask(Request) returns (stream WorkRequest){};
}
4 changes: 2 additions & 2 deletions cmd/fractal/fractal.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
// Shared values
host := parser.String("", "host", &argparse.Options{Default: "localhost:50051", Help: "Leader address (host:port)"})
quiet := parser.Flag("q", "quiet", &argparse.Options{Help: "Suppress additional output"})
metrics := parser.Flag("m", "metrics", &argparse.Options{Help: "Output metrics"})
metrics := leaderCmd.Flag("m", "metrics", &argparse.Options{Help: "Output metrics"})

// Leader arguments (for image generation)
colorStep := leaderCmd.Int("", "step", &argparse.Options{Default: 6000, Help: "Color smooth step (greater than iteration count, defaults to 6000)"})
Expand Down Expand Up @@ -66,7 +66,7 @@ func main() {

// TODO add error handling here
if workerCmd.Happened() {
worker := core.GetWorkerNode(*host, *retries, *quiet, *metrics)
worker := core.GetWorkerNode(*host, *retries, *quiet)
err := worker.Start()
if err != nil {
log.Fatalf("Issue with starting worker: %s", err)
Expand Down
Binary file modified mandelbrot.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
44 changes: 18 additions & 26 deletions pkg/api/v1/node.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 0 additions & 38 deletions pkg/api/v1/node_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/core/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,14 @@ func (n *Leader) Init() (err error) {
ticker.Stop()
totalTime := time.Since(start)
output, err := os.Create(n.outfile)
png.Encode(output, n.image)
if n.metrics {
fmt.Printf("METRICS LEADER time: %v\n", totalTime)
metrics.ReportMetrics("METRICS LEADER")

// Request subset of workers to output metrics
n.nodeSvr.MetricsRequestChannel <- true
}
if err != nil {
fmt.Printf("Warning: error creating image file: %s\n", err)
}
png.Encode(output, n.image)
fmt.Printf("\n\nMandelbrot set rendered into `%s`\n", n.outfile)
if n.forceExit {
panic("Image generation complete, force exited.")
Expand Down
33 changes: 4 additions & 29 deletions pkg/core/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@ import (
"context"
)

var (
metricsDesired = false
)

type NodeServiceGrpcServer struct {
pb.UnimplementedNodeServiceServer
MetricsRequestChannel chan bool
ResultChannel chan IterationResult
WorkChannel chan MandelIteration
ResultChannel chan IterationResult
WorkChannel chan MandelIteration
}

// ReportStatus reports a worker status
Expand All @@ -26,25 +21,6 @@ func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *pb.Req
return &pb.Response{Data: "ok"}, nil
}

// Request metrics checks the metrics request channel to see if metrics are desired
func (n NodeServiceGrpcServer) RequestMetrics(ctx context.Context, result *pb.Request) (*pb.Response, error) {

// If we've cached it (and another worker received it)
if metricsDesired {
return &pb.Response{Data: "yes"}, nil
}
select {
case yes, ok := <-n.MetricsRequestChannel:
if ok && yes {
metricsDesired = true
return &pb.Response{Data: "yes"}, nil
}
default:
break
}
return &pb.Response{Data: "no"}, nil
}

// Report a result to the leader
func (n NodeServiceGrpcServer) ReportResult(ctx context.Context, result *pb.WorkResponse) (*pb.Response, error) {

Expand Down Expand Up @@ -92,9 +68,8 @@ var server *NodeServiceGrpcServer
func GetNodeServiceGrpcServer() *NodeServiceGrpcServer {
if server == nil {
server = &NodeServiceGrpcServer{
WorkChannel: make(chan MandelIteration),
ResultChannel: make(chan IterationResult),
MetricsRequestChannel: make(chan bool),
WorkChannel: make(chan MandelIteration),
ResultChannel: make(chan IterationResult),
}
}
return server
Expand Down
25 changes: 3 additions & 22 deletions pkg/core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ type WorkerNode struct {
leaderHost string
retries int
quiet bool
metrics bool
counts map[string]int32

// We ask the worker to report metrics close to the end
// There is maybe a better way to do this
metricsReported bool
}

func (n *WorkerNode) Init() (err error) {
Expand Down Expand Up @@ -62,9 +57,6 @@ func (n *WorkerNode) ConnectStream() (pb.NodeService_AssignTaskClient, error) {

// recordMetrics takes a count of calculations, etc.
func (n *WorkerNode) recordMetrics() {
if !n.metrics {
return
}
_, ok := n.counts["tasks"]
if !ok {
n.counts["tasks"] = 0
Expand Down Expand Up @@ -119,23 +111,14 @@ func (n *WorkerNode) Start() error {
if err != nil {
return err
}

// If we want metrics, check if the request is in (meaning we are done)
if n.metrics && !n.metricsReported {
n.reportMetrics()
}
}
}

// Report metrics for the worker
// This is currently a bit of a hack, done before the end
// This is currently not used.
func (n *WorkerNode) reportMetrics() {

response, err := n.client.RequestMetrics(context.Background(), &pb.Request{})
if err != nil || response.Data != "yes" {
return
}

// TODO some trigger here to know they are requested
// Add the worker hostname to the prefix
prefix := "WORKER"
hostname, err := os.Hostname()
Expand All @@ -148,12 +131,11 @@ func (n *WorkerNode) reportMetrics() {

// If we report metrics, do based on hostname
metrics.ReportMetrics(prefix)
n.metricsReported = true
}

var workerNode *WorkerNode

func GetWorkerNode(host string, retries int, quiet, metrics bool) *WorkerNode {
func GetWorkerNode(host string, retries int, quiet bool) *WorkerNode {
if retries == 0 {
retries = 10
}
Expand All @@ -162,7 +144,6 @@ func GetWorkerNode(host string, retries int, quiet, metrics bool) *WorkerNode {
leaderHost: host,
retries: retries,
quiet: quiet,
metrics: metrics,
counts: map[string]int32{},
}
if err := workerNode.Init(); err != nil {
Expand Down

0 comments on commit 6f25ba1

Please sign in to comment.