Commit 52b4135f authored by Max Mirko Polzin's avatar Max Mirko Polzin
Browse files

Add filter and controller.

parent 04091ead
......@@ -26,7 +26,7 @@ electra_msgs::State rawMsg;
ros::Publisher encoderRawPublisher("/electra/encoder/raw", &rawMsg);
void commandCallback( const std_msgs::Int16& commandMsg);
ros::Subscriber<std_msgs::Int16> commandSubscriber("/electra/command", &commandCallback );
ros::Subscriber<std_msgs::Int16> commandSubscriber("/electra/command/raw", &commandCallback );
void setup() {
nh.initNode();
......
#!/usr/bin/python3
from __future__ import annotations
import collections
import rospy
from std_msgs.msg import Float32
import electra_msgs.msg
import std_msgs.msg
class EncoderMeasurement:
def __init__(self, _time: rospy.Time, _position: float):
self.time = _time
self.position = _position
# todo[max] Filter with time stamp
class MovingAverageFilter:
def __init__(self, _maxlen: int):
self.data = collections.deque(maxlen=_maxlen)
def __sub__(self, other: EncoderMeasurement) -> float:
return (self.position - other.position) / (self.time - other.time).to_sec()
def output(self):
return sum(self.data) / len(self.data) if self.data else 0.0
def insert(self, sample):
self.data.append(sample)
class ElectraReel:
def __init__(self) -> None:
self.encoder_velocity_publisher = rospy.Publisher(
self.velocity_filter = MovingAverageFilter(100)
self.encoder_state_publisher = rospy.Publisher(
"/electra/encoder/stamped", electra_msgs.msg.StateStamped, queue_size=1
)
self.last_encoder_measurement = EncoderMeasurement(rospy.Time.now(), 0.0)
self.encoder_raw_subscriber = rospy.Subscriber(
"/electra/encoder/raw",
electra_msgs.msg.State,
self.encoder_raw_callback,
self.encoder_callback,
queue_size=1,
)
self.motor_command_raw_publisher = rospy.Publisher(
"/electra/command/raw",
std_msgs.msg.Int16,
queue_size=1,
)
self.velocity_setpoint = 0.0
self.motor_command_subscriber = rospy.Subscriber(
"/electra/command/velocity",
std_msgs.msg.Float32,
self.motor_command_callback,
queue_size=1,
)
def encoder_raw_callback(self, msg: electra_msgs.msg.State) -> None:
self.controller_timer = rospy.Timer(
rospy.Duration(0.02),
self.controller_callback,
)
def encoder_callback(self, msg: electra_msgs.msg.State) -> None:
msg_stamped = electra_msgs.msg.StateStamped()
msg_stamped.position = msg.position
msg_stamped.velocity = msg.velocity
msg_stamped.header.stamp = rospy.Time.now()
# current_encoder_measurement = EncoderMeasurement(rospy.Time.now(), msg.data)
# velocity = current_encoder_measurement - self.last_encoder_measurement
self.encoder_velocity_publisher.publish(msg_stamped)
# self.last_encoder_measurement = current_encoder_measurement
self.velocity_filter.insert(msg.velocity)
msg_stamped.velocity = self.velocity_filter.output()
msg_stamped.header.stamp = rospy.Time.now()
# add subscriber to set_vel
# publish command = p*(set_vel - filtered_vel)
self.encoder_state_publisher.publish(msg_stamped)
# publish filtered&raw velocity instead of raw velocity
def motor_command_callback(self, msg: std_msgs.msg.Float32) -> None:
self.velocity_setpoint = msg.data
# def set_dop_name(self, req: setStringRequest) -> setStringResponse:
def controller_callback(self, event: rospy.TimerEvent):
p_gain = 0.1
velocity_error = self.velocity_setpoint - self.velocity_filter.output()
input = p_gain * velocity_error
command = std_msgs.msg.Int16(data=int(input))
self.motor_command_raw_publisher.publish(command)
if __name__ == "__main__":
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment