Skip to content
Snippets Groups Projects
Commit 389435df authored by Kai Junge's avatar Kai Junge
Browse files

New code to command translation with gamepad.

parent fe42b561
No related branches found
No related tags found
No related merge requests found
......@@ -28,31 +28,30 @@ class ArmTeleopNode(Node):
# Main loop
self.mainloop = self.create_timer(0.001, self.mainloop_callback)
# Translation loop
self.translation_loop = self.create_timer(0.01, self.translation_callback)
# Publisher
self.franka_pose_publisher = self.create_publisher(Float64MultiArray, '/franka/pose_demand', 10)
self.wrist_servo_pos_publisher = self.create_publisher(Float64MultiArray, '/wrist/servo_demand', 10)
# Subscriber
self.franka_cartesian_pose_subscriber = self.create_subscription(Float64MultiArray, '/franka/pose', self.cart_pos_callback, 10)
self.target_translation_subscriber = self.create_subscription(Float64MultiArray, '/franka/translation_demand', self.translation_callback, 10)
self.target_orientation_subscriber = self.create_subscription(Float64MultiArray, '/franka/orientation_demand',self.orientation_callback, 10)
self.hand_wrist_servo_position_subscriber = self.create_subscription(Float64MultiArray, '/hand_wrist/servo_positions',self.hand_wrist_servo_callback, 10)
# Variables
self.translation_demand = None
self.orientation_demand = None
self.current_franka_pose = None
self.current_hand_wrist_servo_pos = None
self.last_translation_update = cp(time.time())
self.last_orientation_update = cp(time.time())
# Flags
self.is_active = False
self.translation_delta = [0.0, 0.0, 0.0]
def check_timer(self, which_axis, threshold = 0.5):
if which_axis == "translation":
which_timer = self.last_translation_update
pass
else:
which_timer = self.last_orientation_update
......@@ -61,9 +60,17 @@ class ArmTeleopNode(Node):
else:
return False
def translation_callback(self, msg:Float64MultiArray):
self.translation_demand = msg
self.last_translation_update = cp(time.time())
def translation_callback(self):
delta = [0.0, 0.0, 0.0]
scale = 0.001
if abs(self.gp.axis_data["Ly"]) > 0.3:
delta[2] = self.gp.axis_data["Ly"] * scale
if abs(self.gp.axis_data["Lx"]) > 0.3:
delta[1] = self.gp.axis_data["Lx"] * scale
self.translation_delta = cp(delta)
def orientation_callback(self, msg:Float64MultiArray):
self.orientation_demand = msg
......@@ -82,63 +89,52 @@ class ArmTeleopNode(Node):
if (self.current_franka_pose is None) or (self.current_hand_wrist_servo_pos is None):
return
if self.gp.button_data["L1"] == 1 and self.is_active == False:
self.initial_pose = list(cp(self.current_franka_pose.data))
self.initial_pose_wrist = [self.current_hand_wrist_servo_pos.data[13], self.current_hand_wrist_servo_pos.data[14]]
self.is_active = True
self.waiting_timer = cp(time.time())
if self.gp.button_data["L1"] == 0:
self.is_active = False
if self.is_active and (time.time() - self.waiting_timer > 0.5):
target_pose = cp(self.initial_pose)
target_wrist = cp(self.initial_pose_wrist)
print_msg = "\n"
if (self.translation_demand is not None) and self.check_timer("translation"):
print_msg += "Translation "
for i in range(3):
target_pose[i] = cp(self.translation_demand[i])
if (self.orientation_demand is not None) and self.check_timer("orientation"):
print_msg += "Orientation "
for i in range(4):
target_pose[i+3] = cp(self.orientation_demand.data[i+3])
for i in range(3):
target_pose[i] += self.orientation_demand.data[i]
# Wrist!
# Combine motor commands to pitch + yaw
pitch, yaw = self.hand_regulator.wrist_motor_pos_to_pitch_yaw(target_wrist[0], target_wrist[1])
target_pose = list(cp(self.current_franka_pose.data))
target_wrist = [self.current_hand_wrist_servo_pos.data[13], self.current_hand_wrist_servo_pos.data[14]]
print_msg = "\n"
# Add translation velocity from gamepad
for i in range(3):
target_pose[i] += self.translation_delta[i]
# If we recieve orientation data
if (self.orientation_demand is not None) and self.check_timer("orientation") and self.gp.button_data["L1"] == 1:
print_msg += "Orientation "
for i in range(4):
target_pose[i+3] = cp(self.orientation_demand.data[i+3])
pitch += cp(self.orientation_demand.data[7])
yaw += cp(self.orientation_demand.data[8])
left, right = self.hand_regulator.pitch_yaw_to_wrist_motor_pos(pitch, yaw)
# Wrist
wrist_msg = Float64MultiArray()
wrist_msg.data = [left, right]
self.wrist_servo_pos_publisher.publish(wrist_msg)
print_msg += "Publish! "
# Arm
arm_msg = Float64MultiArray()
arm_msg.data = target_pose
self.franka_pose_publisher.publish(arm_msg)
for t in target_pose:
print(round(t, 2), end = ", ")
# print([self.current_hand_wrist_servo_pos.data[13], self.current_hand_wrist_servo_pos.data[14]])
# print(self.orientation_demand.data[7], self.orientation_demand.data[8])
print(left, right)
print("")
# print_msg += ("\n" + posestamp_message_as_str(target_pose))
# self.get_logger().info(print_msg, throttle_duration_sec = 0.1)
for i in range(3):
target_pose[i] += self.orientation_demand.data[i]
# Wrist!
# Combine motor commands to pitch + yaw
pitch, yaw = self.hand_regulator.wrist_motor_pos_to_pitch_yaw(target_wrist[0], target_wrist[1])
pitch += cp(self.orientation_demand.data[7])
yaw += cp(self.orientation_demand.data[8])
left, right = self.hand_regulator.pitch_yaw_to_wrist_motor_pos(pitch, yaw)
# Wrist
wrist_msg = Float64MultiArray()
wrist_msg.data = [left, right]
self.wrist_servo_pos_publisher.publish(wrist_msg)
print_msg += "Publish! "
# Arm
arm_msg = Float64MultiArray()
arm_msg.data = target_pose
self.franka_pose_publisher.publish(arm_msg)
for t in target_pose:
print(round(t, 2), end = ", ")
# print([self.current_hand_wrist_servo_pos.data[13], self.current_hand_wrist_servo_pos.data[14]])
# print(self.orientation_demand.data[7], self.orientation_demand.data[8])
# print(left, right)
# print("")
# print_msg += ("\n" + posestamp_message_as_str(target_pose))
# self.get_logger().info(print_msg, throttle_duration_sec = 0.1)
def main():
rclpy.init()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment