From 991a8c000991cbf227ae1895279c86eb6d154f0c Mon Sep 17 00:00:00 2001 From: Yusuf-Serdar Date: Wed, 25 Dec 2024 14:17:39 +0100 Subject: [PATCH 1/9] feat(agevar): Provide a ROS2/Python implementation - Refactor movement calculations and integrate time-based updates --- reseq_ros2/reseq_ros2/agevar.py | 84 ++++++++++++++++++++++++++------- 1 file changed, 68 insertions(+), 16 deletions(-) diff --git a/reseq_ros2/reseq_ros2/agevar.py b/reseq_ros2/reseq_ros2/agevar.py index 677110b..b369a38 100644 --- a/reseq_ros2/reseq_ros2/agevar.py +++ b/reseq_ros2/reseq_ros2/agevar.py @@ -1,6 +1,8 @@ import traceback from math import cos, pi, sin +from time import time +import numpy as np import rclpy from geometry_msgs.msg import Twist from rclpy.node import Node @@ -9,6 +11,11 @@ import reseq_ros2.constants as rc from reseq_interfaces.msg import Motors + +def Rotz(th): + return np.array([[cos(th), -sin(th), 0], [sin(th), cos(th), 0], [0, 0, 1]]) + + """ROS node with control algorithm for snake-like movement It receives data from the remote over a ROS topic and publishes to the ROS topics @@ -35,7 +42,11 @@ def __init__(self): ) self.n_mod = len(self.modules) - self.yaw_angles = [0] * self.n_mod + self.eta = [[0.0, 0.0, 0.0] for _ in range(self.n_mod)] + self.etad = [[0.0, 0.0, 0.0] for _ in range(self.n_mod)] + + self.init_conditions() + self.previous_time = 0 # subscribe to remote (parsed by teleop_twist_joy) self.create_subscription( @@ -61,8 +72,13 @@ def __init__(self): self.joint_subs.append(s) # create publisher for the motor topics - p = self.create_publisher(Motors, f'reseq/module{address}/motor/setpoint', 10) - self.motors_pubs.append(p) + p2 = self.create_publisher(Motors, f'reseq/module{address}/motor/setpoint', 10) + self.motors_pubs.append(p2) + + def init_conditions(self): + for m in range(self.n_mod): + self.eta[m] = [0.0, (m - 1) * (-self.a - self.b), 0.0] + self.etad[m] = [0.0, 0.0, 0.0] def remote_callback(self, msg: Twist): # extract information from ROS Twist message @@ -76,9 +92,52 @@ def remote_callback(self, msg: Twist): linear_vel = -linear_vel angular_vel = -angular_vel + # time step + t = time() + dt = t - self.previous_time + self.previous_time = t + for mod_id in modules: - # get velocity of left and right motor - w_right, w_left = self.vel_motors(linear_vel, angular_vel, sign) + # For the first module + if mod_id == modules[0]: + # Update angular velocity (yaw rate) + self.etad[mod_id][0] = angular_vel + + # Integrate angular velocity to get new yaw angle + self.eta[mod_id][0] += dt * self.etad[mod_id][0] + + # Calculate linear velocities in the local frame and transform to global frame + vs = Rotz(self.eta[mod_id][0]) @ [linear_vel, 0.0, 0.0] + self.etad[mod_id][1:3] = vs[0:2] + else: + # Compute relative angle between the current and previous module + th = self.eta[mod_id - 1][0] - self.eta[mod_id][0] + + # Compute output linear and angular velocities + linear_out, angular_out = self.kinematic(linear_vel, angular_vel, th) + + # Update angular velocity (yaw rate) + self.etad[mod_id][0] = angular_out + + # Integrate angular velocity to get new yaw angle + self.eta[mod_id][0] += dt * self.etad[mod_id][0] + + # Calculate linear velocities in the local frame and transform to global frame + vs = Rotz(self.eta[mod_id][0]) @ [linear_out, 0.0, 0.0] + self.etad[mod_id][1:3] = vs[0:2] + + # Integrate linear velocities to get new position + self.eta[mod_id][1] += dt * self.etad[mod_id][1] + self.eta[mod_id][2] += dt * self.etad[mod_id][2] + + # Calculate linear velocity magnitude for vel_motors + lin_vel = np.linalg.norm(self.etad[mod_id][1:3]) + + # Use angular velocity directly for vel_motors + ang_vel = self.etad[mod_id][0] + + # Compute motor velocities for each module + w_right, w_left = self.vel_motors(lin_vel, ang_vel, sign) # publish to ROS motor topics m = Motors() @@ -86,14 +145,7 @@ def remote_callback(self, msg: Twist): m.left = w_left self.motors_pubs[mod_id].publish(m) - if mod_id != modules[-1]: # for every module except the last one - # invert yaw angle if going backwards - yaw_angle = (1 if sign else -1) * self.yaw_angles[mod_id] - - # compute linear and angular velocity of the following module - linear_vel, angular_vel = self.kinematic(linear_vel, angular_vel, yaw_angle) - - self.get_logger().debug(f'Output lin:{linear_vel}, ang:{angular_vel}, sign:{sign}') + self.get_logger().debug(f'Output lin:{linear_vel}, ang:{angular_vel}, sign:{sign}') # update yaw angle of a joint def yaw_feedback_callback(self, msg, module_num): @@ -103,8 +155,8 @@ def yaw_feedback_callback(self, msg, module_num): if angle >= 180: angle -= 360 - # store the angle in radiants - self.yaw_angles[module_num - 17] = angle * pi / 180.0 + # store the angle in radians + self.eta[module_num - 17][0] = angle * pi / 180.0 # given data of a module, compute linear and angular velocities of the next one def kinematic(self, linear_vel, angular_vel, yaw_angle): @@ -123,7 +175,7 @@ def vel_motors(self, lin_vel, ang_vel, sign): if sign == 0: # backwards w_left, w_right = -w_left, -w_right - # from radiants to rpm + # from radians to rpm w_right = w_right * rc.rads2rpm w_left = w_left * rc.rads2rpm From 333cd808ece3d68a6824c87d333ac1d1c2c39bb2 Mon Sep 17 00:00:00 2001 From: Yusuf-Serdar Date: Wed, 25 Dec 2024 14:19:04 +0100 Subject: [PATCH 2/9] feat(agevar): Add publisher for joint yaw setpoint and publish yaw angle --- reseq_ros2/reseq_ros2/agevar.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/reseq_ros2/reseq_ros2/agevar.py b/reseq_ros2/reseq_ros2/agevar.py index b369a38..e85a100 100644 --- a/reseq_ros2/reseq_ros2/agevar.py +++ b/reseq_ros2/reseq_ros2/agevar.py @@ -58,6 +58,7 @@ def __init__(self): self.joint_subs = [] self.motors_pubs = [] + self.yaw_pubs = [] for i in range(self.n_mod): address = self.modules[i] @@ -71,6 +72,12 @@ def __init__(self): ) self.joint_subs.append(s) + # create publisher for the joint yaw setpoint topics + p1 = self.create_publisher( + Float32, f'reseq/module{address}/joint/yaw/setpoint', 10 + ) + self.yaw_pubs.append(p1) + # create publisher for the motor topics p2 = self.create_publisher(Motors, f'reseq/module{address}/motor/setpoint', 10) self.motors_pubs.append(p2) @@ -145,6 +152,12 @@ def remote_callback(self, msg: Twist): m.left = w_left self.motors_pubs[mod_id].publish(m) + # Publish yaw angle to joint yaw setpoint topic + if mod_id in self.joints: + yaw_msg = Float32() + yaw_msg.data = self.eta[mod_id][0] + self.yaw_pubs[mod_id].publish(yaw_msg) + self.get_logger().debug(f'Output lin:{linear_vel}, ang:{angular_vel}, sign:{sign}') # update yaw angle of a joint From 978ded7a69ddf72808628b6e054086af3e6760da Mon Sep 17 00:00:00 2001 From: Yusuf-Serdar Date: Thu, 26 Dec 2024 14:20:51 +0100 Subject: [PATCH 3/9] test(agevar): Refactor and add tests for init_conditions function --- .../test_agevar/test_agevar_functional.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/reseq_ros2/test/test_nodes/test_agevar/test_agevar_functional.py b/reseq_ros2/test/test_nodes/test_agevar/test_agevar_functional.py index dbbe098..b6e51bc 100644 --- a/reseq_ros2/test/test_nodes/test_agevar/test_agevar_functional.py +++ b/reseq_ros2/test/test_nodes/test_agevar/test_agevar_functional.py @@ -41,7 +41,18 @@ def tearDownClass(cls): cls.agevar.destroy_node() rclpy.shutdown() - def test_4_vel_motors(self): + def test_4_init_conditions(self): + node = self.agevar + + node.a = self.expected_params['a'] + node.b = self.expected_params['b'] + + node.init_conditions() + for m in range(node.n_mod): + self.assertEqual(node.eta[m], [0.0, (m - 1) * (-node.a - node.b), 0.0]) + self.assertEqual(node.etad[m], [0.0, 0.0, 0.0]) + + def test_5_vel_motors(self): """Test the vel_motors function.""" node = self.agevar @@ -66,7 +77,7 @@ def test_4_vel_motors(self): self.assertAlmostEqual(w_right, expected_w_right, places=5) self.assertAlmostEqual(w_left, expected_w_left, places=5) - def test_5_kinematic(self): + def test_6_kinematic(self): """Test the kinematic function.""" node = self.agevar @@ -91,7 +102,7 @@ def test_5_kinematic(self): self.assertAlmostEqual(linear_out, expected_linear_out, places=5) self.assertAlmostEqual(angular_out, expected_angular_out, places=5) - def test_6_yaw_feedback_callback(self): + def test_7_yaw_feedback_callback(self): """Test the yaw_feedback_callback function.""" node = self.agevar @@ -106,7 +117,7 @@ def test_6_yaw_feedback_callback(self): # Verify constrained values expected_angle = pi / 4 - self.assertAlmostEqual(node.yaw_angles[module_num - 17], expected_angle) + self.assertAlmostEqual(node.eta[module_num - 17][0], expected_angle) # Test case: values not within constraints angle_msg.data = 180.0 @@ -116,9 +127,9 @@ def test_6_yaw_feedback_callback(self): # Verify constrained values expected_angle = -pi - self.assertAlmostEqual(node.yaw_angles[module_num - 17], expected_angle) + self.assertAlmostEqual(node.eta[module_num - 17][0], expected_angle) - def test_7_remote_callback(self): + def test_8_remote_callback(self): """Test the remote_callback function.""" node = self.agevar From 0c8dc62302acb7b05546ae8a65ed9d5fc52c351e Mon Sep 17 00:00:00 2001 From: Yusuf-Serdar Date: Thu, 26 Dec 2024 14:23:13 +0100 Subject: [PATCH 4/9] test(agevar): Add unit tests for Rotz function with various angles - Integrate Rotz function into the Agevar node. --- reseq_ros2/reseq_ros2/agevar.py | 13 +++---- .../test_agevar/test_agevar_functional.py | 37 ++++++++++++++++++- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/reseq_ros2/reseq_ros2/agevar.py b/reseq_ros2/reseq_ros2/agevar.py index e85a100..3170d89 100644 --- a/reseq_ros2/reseq_ros2/agevar.py +++ b/reseq_ros2/reseq_ros2/agevar.py @@ -11,11 +11,6 @@ import reseq_ros2.constants as rc from reseq_interfaces.msg import Motors - -def Rotz(th): - return np.array([[cos(th), -sin(th), 0], [sin(th), cos(th), 0], [0, 0, 1]]) - - """ROS node with control algorithm for snake-like movement It receives data from the remote over a ROS topic and publishes to the ROS topics @@ -114,7 +109,7 @@ def remote_callback(self, msg: Twist): self.eta[mod_id][0] += dt * self.etad[mod_id][0] # Calculate linear velocities in the local frame and transform to global frame - vs = Rotz(self.eta[mod_id][0]) @ [linear_vel, 0.0, 0.0] + vs = self.Rotz(self.eta[mod_id][0]) @ [linear_vel, 0.0, 0.0] self.etad[mod_id][1:3] = vs[0:2] else: # Compute relative angle between the current and previous module @@ -130,7 +125,7 @@ def remote_callback(self, msg: Twist): self.eta[mod_id][0] += dt * self.etad[mod_id][0] # Calculate linear velocities in the local frame and transform to global frame - vs = Rotz(self.eta[mod_id][0]) @ [linear_out, 0.0, 0.0] + vs = self.Rotz(self.eta[mod_id][0]) @ [linear_out, 0.0, 0.0] self.etad[mod_id][1:3] = vs[0:2] # Integrate linear velocities to get new position @@ -194,6 +189,10 @@ def vel_motors(self, lin_vel, ang_vel, sign): return w_right, w_left + # rotation matrix around z axis + def Rotz(self, th): + return np.array([[cos(th), -sin(th), 0], [sin(th), cos(th), 0], [0, 0, 1]]) + def main(args=None): rclpy.init(args=args) diff --git a/reseq_ros2/test/test_nodes/test_agevar/test_agevar_functional.py b/reseq_ros2/test/test_nodes/test_agevar/test_agevar_functional.py index b6e51bc..ca17980 100644 --- a/reseq_ros2/test/test_nodes/test_agevar/test_agevar_functional.py +++ b/reseq_ros2/test/test_nodes/test_agevar/test_agevar_functional.py @@ -1,6 +1,7 @@ import unittest from math import cos, pi, sin +import numpy as np import rclpy from geometry_msgs.msg import Twist from rclpy.executors import SingleThreadedExecutor @@ -129,7 +130,41 @@ def test_7_yaw_feedback_callback(self): expected_angle = -pi self.assertAlmostEqual(node.eta[module_num - 17][0], expected_angle) - def test_8_remote_callback(self): + def test_8_rotz(self): + """Test the Rotz function.""" + node = self.agevar + + # Test case: 90 degrees (pi/2 radians) + theta = pi / 2 + expected_matrix = np.array([[0, -1, 0], [1, 0, 0], [0, 0, 1]]) + result_matrix = node.Rotz(theta) + np.testing.assert_array_almost_equal(result_matrix, expected_matrix, decimal=5) + + # Test case: -90 degrees (-pi/2 radians) + theta = -pi / 2 + expected_matrix = np.array([[0, 1, 0], [-1, 0, 0], [0, 0, 1]]) + result_matrix = node.Rotz(theta) + np.testing.assert_array_almost_equal(result_matrix, expected_matrix, decimal=5) + + # Test case: 180 degrees (pi radians) + theta = pi + expected_matrix = np.array([[-1, 0, 0], [0, -1, 0], [0, 0, 1]]) + result_matrix = node.Rotz(theta) + np.testing.assert_array_almost_equal(result_matrix, expected_matrix, decimal=5) + + # Test case: -180 degrees (-pi radians) + theta = -pi + expected_matrix = np.array([[-1, 0, 0], [0, -1, 0], [0, 0, 1]]) + result_matrix = node.Rotz(theta) + np.testing.assert_array_almost_equal(result_matrix, expected_matrix, decimal=5) + + # Test case: 0 degrees (0 radians) + theta = 0 + expected_matrix = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]]) + result_matrix = node.Rotz(theta) + np.testing.assert_array_almost_equal(result_matrix, expected_matrix, decimal=5) + + def test_9_remote_callback(self): """Test the remote_callback function.""" node = self.agevar From 2e00aa5f50cc39bbdb752e18a19d13b9e13ab25c Mon Sep 17 00:00:00 2001 From: Yusuf-Serdar Date: Thu, 26 Dec 2024 17:22:28 +0100 Subject: [PATCH 5/9] fix(agevar): Update previous_time initialization to use current time --- reseq_ros2/reseq_ros2/agevar.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reseq_ros2/reseq_ros2/agevar.py b/reseq_ros2/reseq_ros2/agevar.py index 3170d89..3ba509e 100644 --- a/reseq_ros2/reseq_ros2/agevar.py +++ b/reseq_ros2/reseq_ros2/agevar.py @@ -41,7 +41,7 @@ def __init__(self): self.etad = [[0.0, 0.0, 0.0] for _ in range(self.n_mod)] self.init_conditions() - self.previous_time = 0 + self.previous_time = time() # subscribe to remote (parsed by teleop_twist_joy) self.create_subscription( From f18fc9a6539c9843011da3a96f02d3f9d0a11697 Mon Sep 17 00:00:00 2001 From: Yusuf-Serdar Date: Thu, 26 Dec 2024 17:50:51 +0100 Subject: [PATCH 6/9] test(agevar): Add new Agevar functional tests - Verify yaw angle updates and motor velocities --- .../test_agevar/test_agevar_functional.py | 62 +++++++++++++++++-- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/reseq_ros2/test/test_nodes/test_agevar/test_agevar_functional.py b/reseq_ros2/test/test_nodes/test_agevar/test_agevar_functional.py index ca17980..299efca 100644 --- a/reseq_ros2/test/test_nodes/test_agevar/test_agevar_functional.py +++ b/reseq_ros2/test/test_nodes/test_agevar/test_agevar_functional.py @@ -1,5 +1,6 @@ import unittest from math import cos, pi, sin +from time import time import numpy as np import rclpy @@ -213,19 +214,32 @@ def test_9_remote_callback(self): # Capture published messages self.motors_msgs = [] + self.yaw_msgs = [] def motors_callback(msg): self.motors_msgs.append(msg) + def yaw_callback(msg): + self.yaw_msgs.append(msg) + for module in node.modules: node.create_subscription( Motors, f'reseq/module{module}/motor/setpoint', motors_callback, 10 ) + if module in node.joints: + node.create_subscription( + Float32, f'reseq/module{module}/joint/yaw/setpoint', yaw_callback, 10 + ) # Test all cases for edge_case_msg in edge_case_twist_msgs: # Clear the messages list for each test case self.motors_msgs.clear() + self.yaw_msgs.clear() + + # Simulate time passage + initial_time = time() + self.previous_time = initial_time # Call the remote_callback node.remote_callback(edge_case_msg) @@ -233,7 +247,9 @@ def motors_callback(msg): # Spin the node to process the publishing and subscription for _ in range(10): rclpy.spin_once(node, timeout_sec=0.1) - if len(self.motors_msgs) >= len(node.modules): + if (len(self.motors_msgs) >= len(node.modules)) and ( + len(self.yaw_msgs) >= len(node.joints) + ): break # Verify motors messages @@ -245,10 +261,46 @@ def motors_callback(msg): if not sign: # Handle the backward case by reversing the velocities lin_vel, ang_vel = -lin_vel, -ang_vel - for msg in self.motors_msgs: - expected_w_right, expected_w_left = node.vel_motors(lin_vel, ang_vel, sign) - self.assertAlmostEqual(msg.right, expected_w_right, places=4) - self.assertAlmostEqual(msg.left, expected_w_left, places=4) + # Capture the initial state for comparison + initial_eta = [node.eta[m].copy() for m in range(node.n_mod)] + + # Capture the time difference + current_time = time() + dt = current_time - self.previous_time + self.previous_time = current_time + + for mod_id in range(node.n_mod): + # Calculate expected eta and etad values + if mod_id == 0: + expected_angular_velocity = ang_vel + expected_eta = initial_eta[mod_id][0] + expected_angular_velocity * dt + vs = node.Rotz(expected_eta) @ [lin_vel, 0.0, 0.0] + else: + th = initial_eta[mod_id - 1][0] - initial_eta[mod_id][0] + linear_out, expected_angular_velocity = node.kinematic(lin_vel, ang_vel, th) + expected_eta = initial_eta[mod_id][0] + expected_angular_velocity * dt + vs = node.Rotz(expected_eta) @ [linear_out, 0.0, 0.0] + + expected_x = initial_eta[mod_id][1] + vs[0] * dt + expected_y = initial_eta[mod_id][2] + vs[1] * dt + + # Verify the yaw angle updates + for msg in self.yaw_msgs: + self.assertAlmostEqual(msg.data, expected_eta, places=3) + + # Verify the 2D position updates + self.assertAlmostEqual(node.eta[mod_id][1], expected_x, places=3) + self.assertAlmostEqual(node.eta[mod_id][2], expected_y, places=3) + + # Compute motor velocities for each module + lin_vel = np.linalg.norm(vs[0:2]) + ang_vel = expected_angular_velocity + w_right, w_left = node.vel_motors(lin_vel, ang_vel, sign) + + # Verify motor velocities + for msg in self.motors_msgs: + self.assertAlmostEqual(msg.right, w_right, places=4) + self.assertAlmostEqual(msg.left, w_left, places=4) if __name__ == '__main__': From d23405b6f9e95d787e8d43a70555e5a67d9cff68 Mon Sep 17 00:00:00 2001 From: Yusuf-Serdar Date: Sun, 29 Dec 2024 16:07:15 +0100 Subject: [PATCH 7/9] refactor(agevar): Simplify data structures for position and velocity management - Modify code to fix inaccurate indexing of modules - Change variable names to improve readability --- reseq_ros2/reseq_ros2/agevar.py | 99 +++++++++++++++++++-------------- 1 file changed, 58 insertions(+), 41 deletions(-) diff --git a/reseq_ros2/reseq_ros2/agevar.py b/reseq_ros2/reseq_ros2/agevar.py index 3ba509e..9c62329 100644 --- a/reseq_ros2/reseq_ros2/agevar.py +++ b/reseq_ros2/reseq_ros2/agevar.py @@ -6,17 +6,11 @@ import rclpy from geometry_msgs.msg import Twist from rclpy.node import Node -from std_msgs.msg import Float32 # deprecated? +from std_msgs.msg import Float32 import reseq_ros2.constants as rc from reseq_interfaces.msg import Motors -"""ROS node with control algorithm for snake-like movement - -It receives data from the remote over a ROS topic and publishes to the ROS topics -used by the Communication node to set motors velocities. -""" - class Agevar(Node): def __init__(self): @@ -36,9 +30,12 @@ def __init__(self): self.declare_parameter('end_effector', 0).get_parameter_value().integer_value ) - self.n_mod = len(self.modules) - self.eta = [[0.0, 0.0, 0.0] for _ in range(self.n_mod)] - self.etad = [[0.0, 0.0, 0.0] for _ in range(self.n_mod)] + self.position_orientation = { + m: {'yaw_angle': 0.0, 'x_coord': 0.0, 'y_coord': 0.0} for m in self.modules + } + self.velocities = { + m: {'angular_vel': 0.0, 'x_vel': 0.0, 'y_vel': 0.0} for m in self.modules + } self.init_conditions() self.previous_time = time() @@ -51,12 +48,10 @@ def __init__(self): 10, ) - self.joint_subs = [] - self.motors_pubs = [] - self.yaw_pubs = [] - for i in range(self.n_mod): - address = self.modules[i] - + self.joint_subs = {} + self.motors_pubs = {} + self.yaw_pubs = {} + for address in self.modules: # subscribe to feedback from joints if address in self.joints: s = self.create_subscription( @@ -65,22 +60,26 @@ def __init__(self): lambda msg, x=address: self.yaw_feedback_callback(msg, x), 10, ) - self.joint_subs.append(s) + self.joint_subs[address] = s # create publisher for the joint yaw setpoint topics p1 = self.create_publisher( Float32, f'reseq/module{address}/joint/yaw/setpoint', 10 ) - self.yaw_pubs.append(p1) + self.yaw_pubs[address] = p1 # create publisher for the motor topics p2 = self.create_publisher(Motors, f'reseq/module{address}/motor/setpoint', 10) - self.motors_pubs.append(p2) + self.motors_pubs[address] = p2 def init_conditions(self): - for m in range(self.n_mod): - self.eta[m] = [0.0, (m - 1) * (-self.a - self.b), 0.0] - self.etad[m] = [0.0, 0.0, 0.0] + for m in self.modules: + self.position_orientation[m] = { + 'yaw_angle': 0.0, + 'x_coord': (m - 17) * (-self.a - self.b), + 'y_coord': 0.0, + } + self.velocities[m] = {'angular_vel': 0.0, 'x_vel': 0.0, 'y_vel': 0.0} def remote_callback(self, msg: Twist): # extract information from ROS Twist message @@ -88,55 +87,73 @@ def remote_callback(self, msg: Twist): angular_vel = msg.angular.z sign = linear_vel > 0 - modules = list(range(self.n_mod)) if sign == 0: # going backwards - modules.reverse() + modules = self.modules[::-1] linear_vel = -linear_vel angular_vel = -angular_vel + else: + modules = self.modules # time step t = time() dt = t - self.previous_time self.previous_time = t - for mod_id in modules: + for idx, mod_id in enumerate(modules): # For the first module - if mod_id == modules[0]: + if idx == 0: # Update angular velocity (yaw rate) - self.etad[mod_id][0] = angular_vel + self.velocities[mod_id]['angular_vel'] = angular_vel # Integrate angular velocity to get new yaw angle - self.eta[mod_id][0] += dt * self.etad[mod_id][0] + self.position_orientation[mod_id]['yaw_angle'] += ( + dt * self.velocities[mod_id]['angular_vel'] + ) # Calculate linear velocities in the local frame and transform to global frame - vs = self.Rotz(self.eta[mod_id][0]) @ [linear_vel, 0.0, 0.0] - self.etad[mod_id][1:3] = vs[0:2] + vs = self.Rotz(self.position_orientation[mod_id]['yaw_angle']) @ [ + linear_vel, + 0.0, + 0.0, + ] + self.velocities[mod_id]['x_vel'], self.velocities[mod_id]['y_vel'] = vs[0], vs[1] else: # Compute relative angle between the current and previous module - th = self.eta[mod_id - 1][0] - self.eta[mod_id][0] + th = ( + self.position_orientation[modules[idx - 1]]['yaw_angle'] + - self.position_orientation[mod_id]['yaw_angle'] + ) # Compute output linear and angular velocities linear_out, angular_out = self.kinematic(linear_vel, angular_vel, th) # Update angular velocity (yaw rate) - self.etad[mod_id][0] = angular_out + self.velocities[mod_id]['angular_vel'] = angular_out # Integrate angular velocity to get new yaw angle - self.eta[mod_id][0] += dt * self.etad[mod_id][0] + self.position_orientation[mod_id]['yaw_angle'] += ( + dt * self.velocities[mod_id]['angular_vel'] + ) # Calculate linear velocities in the local frame and transform to global frame - vs = self.Rotz(self.eta[mod_id][0]) @ [linear_out, 0.0, 0.0] - self.etad[mod_id][1:3] = vs[0:2] + vs = self.Rotz(self.position_orientation[mod_id]['yaw_angle']) @ [ + linear_out, + 0.0, + 0.0, + ] + self.velocities[mod_id]['x_vel'], self.velocities[mod_id]['y_vel'] = vs[0], vs[1] # Integrate linear velocities to get new position - self.eta[mod_id][1] += dt * self.etad[mod_id][1] - self.eta[mod_id][2] += dt * self.etad[mod_id][2] + self.position_orientation[mod_id]['x_coord'] += dt * self.velocities[mod_id]['x_vel'] + self.position_orientation[mod_id]['y_coord'] += dt * self.velocities[mod_id]['y_vel'] # Calculate linear velocity magnitude for vel_motors - lin_vel = np.linalg.norm(self.etad[mod_id][1:3]) + lin_vel = np.linalg.norm( + [self.velocities[mod_id]['x_vel'], self.velocities[mod_id]['y_vel']] + ) # Use angular velocity directly for vel_motors - ang_vel = self.etad[mod_id][0] + ang_vel = self.velocities[mod_id]['angular_vel'] # Compute motor velocities for each module w_right, w_left = self.vel_motors(lin_vel, ang_vel, sign) @@ -150,7 +167,7 @@ def remote_callback(self, msg: Twist): # Publish yaw angle to joint yaw setpoint topic if mod_id in self.joints: yaw_msg = Float32() - yaw_msg.data = self.eta[mod_id][0] + yaw_msg.data = self.position_orientation[mod_id]['yaw_angle'] * 180 / pi self.yaw_pubs[mod_id].publish(yaw_msg) self.get_logger().debug(f'Output lin:{linear_vel}, ang:{angular_vel}, sign:{sign}') @@ -164,7 +181,7 @@ def yaw_feedback_callback(self, msg, module_num): angle -= 360 # store the angle in radians - self.eta[module_num - 17][0] = angle * pi / 180.0 + self.position_orientation[module_num]['yaw_angle'] = angle * pi / 180.0 # given data of a module, compute linear and angular velocities of the next one def kinematic(self, linear_vel, angular_vel, yaw_angle): From 614e577ca52abdbec2ed8e584fb515fa3c81a120 Mon Sep 17 00:00:00 2001 From: Yusuf-Serdar Date: Sun, 5 Jan 2025 08:24:40 +0100 Subject: [PATCH 8/9] feat(agevar): Implement PD controller for yaw angle adjustment --- reseq_ros2/reseq_ros2/agevar.py | 47 ++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/reseq_ros2/reseq_ros2/agevar.py b/reseq_ros2/reseq_ros2/agevar.py index 9c62329..ebe80f0 100644 --- a/reseq_ros2/reseq_ros2/agevar.py +++ b/reseq_ros2/reseq_ros2/agevar.py @@ -37,7 +37,14 @@ def __init__(self): m: {'angular_vel': 0.0, 'x_vel': 0.0, 'y_vel': 0.0} for m in self.modules } + # PD controller parameters + self.kp = 0.7 # Proportional gain + self.kd = 0.01 # Derivative gain + self.previous_error = {m: 0.0 for m in self.modules} + self.init_conditions() + + # time variables self.previous_time = time() # subscribe to remote (parsed by teleop_twist_joy) @@ -80,6 +87,7 @@ def init_conditions(self): 'y_coord': 0.0, } self.velocities[m] = {'angular_vel': 0.0, 'x_vel': 0.0, 'y_vel': 0.0} + self.previous_error[m] = 0.0 def remote_callback(self, msg: Twist): # extract information from ROS Twist message @@ -105,11 +113,21 @@ def remote_callback(self, msg: Twist): # Update angular velocity (yaw rate) self.velocities[mod_id]['angular_vel'] = angular_vel - # Integrate angular velocity to get new yaw angle - self.position_orientation[mod_id]['yaw_angle'] += ( - dt * self.velocities[mod_id]['angular_vel'] + # Calculate the desired yaw angle directly + desired_yaw_angle = -np.arctan2( + self.b * angular_vel, + linear_vel + (self.a + self.b) * angular_vel * np.cos(0), ) + # PD controller for yaw angle adjustment + error = desired_yaw_angle - self.position_orientation[mod_id]['yaw_angle'] + derivative = (error - self.previous_error[mod_id]) / dt + correction = self.kp * error + self.kd * derivative + self.previous_error[mod_id] = error + + # Integrate angular velocity to get new yaw angle + self.position_orientation[mod_id]['yaw_angle'] += correction * dt + # Calculate linear velocities in the local frame and transform to global frame vs = self.Rotz(self.position_orientation[mod_id]['yaw_angle']) @ [ linear_vel, @@ -119,10 +137,9 @@ def remote_callback(self, msg: Twist): self.velocities[mod_id]['x_vel'], self.velocities[mod_id]['y_vel'] = vs[0], vs[1] else: # Compute relative angle between the current and previous module - th = ( - self.position_orientation[modules[idx - 1]]['yaw_angle'] - - self.position_orientation[mod_id]['yaw_angle'] - ) + prev_yaw = self.position_orientation[modules[idx - 1]]['yaw_angle'] + current_yaw = self.position_orientation[mod_id]['yaw_angle'] + th = prev_yaw - current_yaw # Compute output linear and angular velocities linear_out, angular_out = self.kinematic(linear_vel, angular_vel, th) @@ -130,11 +147,21 @@ def remote_callback(self, msg: Twist): # Update angular velocity (yaw rate) self.velocities[mod_id]['angular_vel'] = angular_out - # Integrate angular velocity to get new yaw angle - self.position_orientation[mod_id]['yaw_angle'] += ( - dt * self.velocities[mod_id]['angular_vel'] + # Calculate the desired yaw angle directly + desired_yaw_angle = -np.arctan2( + self.b * angular_vel + (self.a + self.b) * linear_vel * np.sin(th), + linear_vel + (self.a + self.b) * angular_vel * np.cos(th), ) + # PD controller for yaw angle adjustment + error = desired_yaw_angle - self.position_orientation[mod_id]['yaw_angle'] + derivative = (error - self.previous_error[mod_id]) / dt + correction = self.kp * error + self.kd * derivative + self.previous_error[mod_id] = error + + # Integrate angular velocity to get new yaw angle + self.position_orientation[mod_id]['yaw_angle'] += correction * dt + # Calculate linear velocities in the local frame and transform to global frame vs = self.Rotz(self.position_orientation[mod_id]['yaw_angle']) @ [ linear_out, From 9474ae753324363d255f2a1f60301e983dffac02 Mon Sep 17 00:00:00 2001 From: Yusuf-Serdar Date: Fri, 10 Jan 2025 07:59:30 +0100 Subject: [PATCH 9/9] feat(agevar): Enhance PD controller with improved parameters and velocity scaling --- reseq_ros2/reseq_ros2/agevar.py | 61 ++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/reseq_ros2/reseq_ros2/agevar.py b/reseq_ros2/reseq_ros2/agevar.py index ebe80f0..69b155d 100644 --- a/reseq_ros2/reseq_ros2/agevar.py +++ b/reseq_ros2/reseq_ros2/agevar.py @@ -38,14 +38,16 @@ def __init__(self): } # PD controller parameters - self.kp = 0.7 # Proportional gain + self.kp = 1.0 # Proportional gain self.kd = 0.01 # Derivative gain + self.kv = 100 # Linear velocity scaling factor self.previous_error = {m: 0.0 for m in self.modules} self.init_conditions() # time variables self.previous_time = time() + self.max_dt = 0.1 # Maximum allowable time step in seconds # subscribe to remote (parsed by teleop_twist_joy) self.create_subscription( @@ -90,21 +92,23 @@ def init_conditions(self): self.previous_error[m] = 0.0 def remote_callback(self, msg: Twist): - # extract information from ROS Twist message + # Extract information from ROS Twist message linear_vel = msg.linear.x angular_vel = msg.angular.z - sign = linear_vel > 0 + sign = 1 if linear_vel > 0 else -1 + yaw_sign = -1 if angular_vel > 0 else 1 - if sign == 0: # going backwards + if sign == -1: # Going backwards modules = self.modules[::-1] linear_vel = -linear_vel angular_vel = -angular_vel else: modules = self.modules - # time step + # Time step t = time() dt = t - self.previous_time + dt = min(dt, self.max_dt) # Limit dt to max_dt self.previous_time = t for idx, mod_id in enumerate(modules): @@ -113,16 +117,23 @@ def remote_callback(self, msg: Twist): # Update angular velocity (yaw rate) self.velocities[mod_id]['angular_vel'] = angular_vel + kp = self.kp + kd = self.kd + kv = self.kv + # Calculate the desired yaw angle directly - desired_yaw_angle = -np.arctan2( - self.b * angular_vel, - linear_vel + (self.a + self.b) * angular_vel * np.cos(0), + desired_yaw_angle = ( + -np.arctan2( + self.b * angular_vel, + linear_vel + (self.a + self.b) * angular_vel * np.cos(0) * yaw_sign * sign, + ) + * sign ) # PD controller for yaw angle adjustment error = desired_yaw_angle - self.position_orientation[mod_id]['yaw_angle'] derivative = (error - self.previous_error[mod_id]) / dt - correction = self.kp * error + self.kd * derivative + correction = (kp * error + kd * derivative) * linear_vel * kv self.previous_error[mod_id] = error # Integrate angular velocity to get new yaw angle @@ -147,20 +158,8 @@ def remote_callback(self, msg: Twist): # Update angular velocity (yaw rate) self.velocities[mod_id]['angular_vel'] = angular_out - # Calculate the desired yaw angle directly - desired_yaw_angle = -np.arctan2( - self.b * angular_vel + (self.a + self.b) * linear_vel * np.sin(th), - linear_vel + (self.a + self.b) * angular_vel * np.cos(th), - ) - - # PD controller for yaw angle adjustment - error = desired_yaw_angle - self.position_orientation[mod_id]['yaw_angle'] - derivative = (error - self.previous_error[mod_id]) / dt - correction = self.kp * error + self.kd * derivative - self.previous_error[mod_id] = error - - # Integrate angular velocity to get new yaw angle - self.position_orientation[mod_id]['yaw_angle'] += correction * dt + # Integrate the relative angle to get the new yaw angle + self.position_orientation[mod_id]['yaw_angle'] += th # Calculate linear velocities in the local frame and transform to global frame vs = self.Rotz(self.position_orientation[mod_id]['yaw_angle']) @ [ @@ -170,9 +169,13 @@ def remote_callback(self, msg: Twist): ] self.velocities[mod_id]['x_vel'], self.velocities[mod_id]['y_vel'] = vs[0], vs[1] - # Integrate linear velocities to get new position - self.position_orientation[mod_id]['x_coord'] += dt * self.velocities[mod_id]['x_vel'] - self.position_orientation[mod_id]['y_coord'] += dt * self.velocities[mod_id]['y_vel'] + # Update position coordinates + self.position_orientation[mod_id]['x_coord'] += ( + self.velocities[mod_id]['x_vel'] * dt * 180 / pi + ) + self.position_orientation[mod_id]['y_coord'] += ( + self.velocities[mod_id]['y_vel'] * dt * 180 / pi + ) # Calculate linear velocity magnitude for vel_motors lin_vel = np.linalg.norm( @@ -185,7 +188,7 @@ def remote_callback(self, msg: Twist): # Compute motor velocities for each module w_right, w_left = self.vel_motors(lin_vel, ang_vel, sign) - # publish to ROS motor topics + # Publish to ROS motor topics m = Motors() m.right = w_right m.left = w_left @@ -206,6 +209,8 @@ def yaw_feedback_callback(self, msg, module_num): # keep the angle between -180 and +180 if angle >= 180: angle -= 360 + elif angle < -180: + angle += 360 # store the angle in radians self.position_orientation[module_num]['yaw_angle'] = angle * pi / 180.0 @@ -224,7 +229,7 @@ def vel_motors(self, lin_vel, ang_vel, sign): w_right = (lin_vel + ang_vel * self.d / 2) / self.r_eq w_left = (lin_vel - ang_vel * self.d / 2) / self.r_eq - if sign == 0: # backwards + if sign == -1: # backwards w_left, w_right = -w_left, -w_right # from radians to rpm