Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more logging for debug in xparl #1051

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions parl/remote/status.py → parl/remote/job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import os


class WorkerStatus(object):
"""Maintain worker's information in a worker node.
class JobPool(object):
"""Record information of the jobs created by the worker.

Attributes:
cpu_num(int): The number of CPUs to be used in this worker.
Expand Down Expand Up @@ -51,19 +51,19 @@ def remove_job(self, killed_job):
try:
os.kill(pid, signal.SIGTERM)
except OSError:
logger.warning("job:{} has been killed before".format(pid))
logger.warning("[Worker] job:{} has been killed before".format(pid))
logger.info("[Worker] kills a job:{}".format(killed_job))
self._lock.release()
return ret

def clear(self):
"""Remove all the jobs"""
"""Remove all the jobs and kill the corresponding process"""
self._lock.acquire()
for job in self.jobs.values():
try:
os.kill(job.pid, signal.SIGTERM)
except OSError:
logger.warning("job:{} has been killed before".format(job.pid))
logger.warning("[Worker] job:{} has been killed before".format(job.pid))
logger.info("[Worker] kills a job:{}".format(job.pid))
self.jobs = dict()
self._lock.release()
Expand All @@ -76,5 +76,8 @@ def add_job(self, new_job):
"""
self._lock.acquire()
self.jobs[new_job.job_address] = new_job
if len(self.jobs) < self.cpu_num:
logger.error("[Worker] The number of jobs exceeds the maximum limitation. {} vs {}".format(
len(self.jobs), self.cpu_num))
assert len(self.jobs) <= self.cpu_num
self._lock.release()
22 changes: 8 additions & 14 deletions parl/remote/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _get_status(self):
def _print_workers(self):
"""Display `worker_pool` infomation."""
logger.info(
"Master connects to {} workers and has {} vacant CPUs.\n".format(
"[Master] connects to {} workers and has {} vacant CPUs.\n".format(
self.worker_num, self.cpu_num))

@property
Expand All @@ -106,7 +106,7 @@ def _receive_message(self):
tag = message[0]

# a new worker connects to the master
if tag == remote_constants.WORKER_CONNECT_TAG:
if tag == remote_constants.WORKER_PING_TAG:
self.client_socket.send_multipart([remote_constants.NORMAL_TAG])

elif tag == remote_constants.MONITOR_TAG:
Expand All @@ -128,17 +128,16 @@ def _receive_message(self):
hostname = self.job_center.get_hostname(worker_address)
total_cpus = self.job_center.get_total_cpu(worker_address)
self.cluster_monitor.add_worker_status(worker_address, hostname, total_cpus)
logger.info("A new worker {} is added, ".format(worker_address) +
"the cluster has {} CPUs.\n".format(self.cpu_num))
logger.info("[Master ] A new worker with {} CPUs is added.".format(worker_address, total_cpus) +
"The cluster has {} CPUs.\n".format(self.cpu_num))

def heartbeat_exit_callback_func(worker_address):
self.job_center.drop_worker(worker_address)
self.cluster_monitor.drop_worker_status(worker_address)
logger.warning("\n[Master] Cannot connect to the worker " +
"{}. ".format(worker_address) +
"Worker_pool will drop this worker.")
"The cluster will erase information of the worker.")
self._print_workers()
logger.warning("Exit worker monitor from master.")

# a thread for sending heartbeat signals to the client
thread = HeartbeatClientThread(
Expand All @@ -161,18 +160,14 @@ def heartbeat_exit_callback_func(worker_address):
client_id = to_str(message[3])
self.client_hostname[client_heartbeat_address] = client_hostname
logger.info(
"Client {} is connected.".format(client_heartbeat_address))
"[Master] Client from {} is connected.".format(client_heartbeat_address))

def heartbeat_exit_callback_func(client_heartbeat_address):
self.cluster_monitor.drop_client_status(
client_heartbeat_address)
logger.warning("[Master] cannot connect to the client " +
"{}. ".format(client_heartbeat_address) +
"Please check if it is still alive.")
logger.info(
"Master connects to {} workers and have {} vacant CPUs.\n".
format(self.worker_num, self.cpu_num))

# a thread for sending heartbeat signals to the client
thread = HeartbeatClientThread(
client_heartbeat_address,
Expand Down Expand Up @@ -201,14 +196,14 @@ def heartbeat_exit_callback_func(client_heartbeat_address):
elif tag == remote_constants.CLIENT_SUBMIT_TAG:
# check available CPU resources
if self.cpu_num:
logger.info("Submitting job...")
job = self.job_center.request_job()
self.client_socket.send_multipart([
remote_constants.NORMAL_TAG,
to_byte(job.job_address),
to_byte(job.ping_heartbeat_address),
])
client_id = to_str(message[2])
logger.info("[Master] receives request of 1 CPU from the client: {}".format(client_id))
job_info = {job.job_id: job.log_server_address}
self.cluster_monitor.add_client_job(client_id, job_info)
self._print_workers()
Expand All @@ -223,8 +218,7 @@ def heartbeat_exit_callback_func(client_heartbeat_address):
self.client_socket.send_multipart([remote_constants.NORMAL_TAG])
self.job_center.update_job(last_job_address, initialized_job,
initialized_job.worker_address)
logger.info("A worker updated. cpu_num:{}".format(self.cpu_num))

logger.info("[Master] receives a new job from the worker:{}".format(initialized_job.worker_address))
self._print_workers()

# client update status periodically
Expand Down
2 changes: 1 addition & 1 deletion parl/remote/remote_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
MONITOR_TAG = b'[MONITOR]'
STATUS_TAG = b'[STATUS]'

WORKER_CONNECT_TAG = b'[WORKER_CONNECT]'
WORKER_PING_TAG = b'[WORKER_PING]'
WORKER_INITIALIZED_TAG = b'[WORKER_INITIALIZED]'
WORKER_STATUS_UPDATE_TAG = b'[WORKER_STATUS_UPDATE_TAG]'
CLIENT_CONNECT_TAG = b'[CLIENT_CONNECT]'
Expand Down
61 changes: 61 additions & 0 deletions parl/remote/tests/job_pool_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import socket
from parl.remote.job_pool import JobPool
from parl.remote.message import InitializedWorker, InitializedJob


class JobPoolTest(unittest.TestCase):
def setUp(self):
self.jobs_0 = []
for i in range(5):
job = InitializedJob(
job_address='172.18.182.39:{}'.format(1234 + i),
worker_heartbeat_address='172.18.182.39:48724',
ping_heartbeat_address='172.18.182.39:48726',
worker_address='172.18.182.39:8001',
pid=1234)
self.jobs_0.append(job)

self.jobs_1 = []
for i in range(5):
job = InitializedJob(
job_address='172.18.182.39:{}'.format(2234 + i),
worker_heartbeat_address='172.18.182.39:48724',
ping_heartbeat_address='172.18.182.39:48726',
worker_address='172.18.182.39:8002',
pid=1234)
self.jobs_1.append(job)

def test_remove_job(self):
job_pool = JobPool(worker_address='172.18.182.39:8005',
initialized_jobs=self.jobs_0, cpu_num=5)
job_pool.remove_job('172.18.182.39:1234')
self.assertEqual(len(job_pool.jobs), 4)

def test_add_job(self):
job_pool = JobPool(worker_address='172.18.182.39:8005',
initialized_jobs=self.jobs_0[:3], cpu_num=5)
self.assertEqual(len(job_pool.jobs), 3)
job_pool.add_job(self.jobs_0[3])
job_pool.add_job(self.jobs_0[4])
self.assertEqual(len(job_pool.jobs), 5)
with self.assertRaises(AssertionError):
job_pool.add_job(self.jobs_1[0])


if __name__ == '__main__':
unittest.main()
Loading