diff options
author | Greg Brown <gmb60@cam.ac.uk> | 2022-01-30 23:50:36 +0000 |
---|---|---|
committer | Greg Brown <gmb60@cam.ac.uk> | 2022-01-30 23:51:00 +0000 |
commit | f6ee0e36c9cc075e5f007c44e95dc3aaa7736a57 (patch) | |
tree | 0fb6437bf04884ca7da38299d30b37ad6f9b4ee3 /murl/sensors.py |
Initial commit
Diffstat (limited to 'murl/sensors.py')
-rw-r--r-- | murl/sensors.py | 180 |
1 files changed, 180 insertions, 0 deletions
diff --git a/murl/sensors.py b/murl/sensors.py new file mode 100644 index 0000000..700db35 --- /dev/null +++ b/murl/sensors.py @@ -0,0 +1,180 @@ +import numpy as np +import rospy + +from gazebo_msgs.msg import ModelStates +from nav_msgs.msg import Odometry +from sensor_msgs.msg import LaserScan +from tf.transformations import euler_from_quaternion + +SCAN_ANGLES = [0., np.pi / 4., -np.pi / 4., np.pi / 2., -np.pi / 2.] +CONE_WIDTH = np.pi / 180. * 3.1 # 3.1 degrees cone of view (3 rays). + +class SimpleLaser: + def __init__(self, robot): + rospy.Subscriber('/robot{}/scan'.format(robot), LaserScan, self.callback) + self._angles = SCAN_ANGLES + self._width = CONE_WIDTH + self._measurements = [float('inf')] * len(self._angles) + self._indices = None + + def callback(self, msg): + # Helper for angles. + def _within(x, a, b): + pi2 = np.pi * 2. + x %= pi2 + a %= pi2 + b %= pi2 + if a < b: + return a <= x and x <= b + return a <= x or x <= b; + + # Compute indices the first time. + if self._indices is None: + self._indices = [[] for _ in range(len(self._angles))] + for i, d in enumerate(msg.ranges): + angle = msg.angle_min + i * msg.angle_increment + for j, center_angle in enumerate(self._angles): + if _within(angle, center_angle - self._width / 2., center_angle + self._width / 2.): + self._indices[j].append(i) + + ranges = np.array(msg.ranges) + for i, idx in enumerate(self._indices): + # We do not take the minimum range of the cone but the 10-th percentile for robustness. + self._measurements[i] = np.percentile(ranges[idx], 10) + + @property + def ready(self): + return not np.isnan(self._measurements[0]) + + @property + def measurements(self): + return self._measurements + + +class Motion: + def __init__(self, robot): + self._previous_time = None + self._delta_pose = np.array([0., 0., 0.], dtype=np.float32) + rospy.Subscriber('/robot{}/odom'.format(robot), Odometry, self.callback) + + def callback(self, msg): + u = msg.twist.twist.linear.x + w = msg.twist.twist.angular.z + if self._previous_time is None: + self._previous_time = msg.header.stamp + current_time = msg.header.stamp + dt = (current_time - self._previous_time).to_sec() + self._delta_pose[0] += u * dt + self._delta_pose[1] += 0. + self._delta_pose[2] += w * dt + self._previous_time = current_time + + @property + def ready(self): + return True + + @property + def delta_pose(self): + ret = self._delta_pose.copy() + self._delta_pose[:] = 0 + return ret + + +class SimpleLaser: + def __init__(self, robot): + rospy.Subscriber('/robot{}/scan'.format(robot), LaserScan, self.callback) + self._angles = SCAN_ANGLES + self._width = CONE_WIDTH + self._measurements = [float('inf')] * len(self._angles) + self._indices = None + + def callback(self, msg): + # Helper for angles. + def _within(x, a, b): + pi2 = np.pi * 2. + x %= pi2 + a %= pi2 + b %= pi2 + if a < b: + return a <= x and x <= b + return a <= x or x <= b; + + # Compute indices the first time. + if self._indices is None: + self._indices = [[] for _ in range(len(self._angles))] + for i, d in enumerate(msg.ranges): + angle = msg.angle_min + i * msg.angle_increment + for j, center_angle in enumerate(self._angles): + if _within(angle, center_angle - self._width / 2., center_angle + self._width / 2.): + self._indices[j].append(i) + + ranges = np.array(msg.ranges) + for i, idx in enumerate(self._indices): + # We do not take the minimum range of the cone but the 10-th percentile for robustness. + self._measurements[i] = np.percentile(ranges[idx], 10) + + @property + def ready(self): + return not np.isnan(self._measurements[0]) + + @property + def measurements(self): + return self._measurements + + +class Motion(object): + def __init__(self, robot): + self._previous_time = None + self._delta_pose = np.array([0., 0., 0.], dtype=np.float32) + rospy.Subscriber('/robot{}/odom'.format(robot), Odometry, self.callback) + + def callback(self, msg): + u = msg.twist.twist.linear.x + w = msg.twist.twist.angular.z + if self._previous_time is None: + self._previous_time = msg.header.stamp + current_time = msg.header.stamp + dt = (current_time - self._previous_time).to_sec() + self._delta_pose[0] += u * dt + self._delta_pose[1] += 0. + self._delta_pose[2] += w * dt + self._previous_time = current_time + + @property + def ready(self): + return True + + @property + def delta_pose(self): + ret = self._delta_pose.copy() + self._delta_pose[:] = 0 + return ret + + +class GroundtruthPose: + def __init__(self, robot): + rospy.Subscriber('/gazebo/model_states', ModelStates, self.callback) + self._pose = np.array([np.nan, np.nan, np.nan], dtype=np.float32) + self._name = 'Robot{}'.format(robot) + + def callback(self, msg): + idx = [i for i, n in enumerate(msg.name) if n == self._name] + if not idx: + raise ValueError('Specified name "{}" does not exist.'.format(self._name)) + idx = idx[0] + self._pose[0] = msg.pose[idx].position.x + self._pose[1] = msg.pose[idx].position.y + _, _, yaw = euler_from_quaternion([ + msg.pose[idx].orientation.x, + msg.pose[idx].orientation.y, + msg.pose[idx].orientation.z, + msg.pose[idx].orientation.w]) + self._pose[2] = yaw + + @property + def ready(self): + return not np.isnan(self._pose[0]) + + @property + def pose(self): + return self._pose |