아주아주 기본형태
1. 기본적인 publisher 코드
#! /usr/bin/env python
import rospy
from std_msgs.msg import String
class SimplePubNode:
def __init__(self):
self.simple_pub = rospy.Publisher(
"/msg",
String,
queue_size = 5)
print("init node")
def pubMsg(self):
self.simple_pub.publish("Hello world!")
def run():
rospy.init_node("simple_pub")
spn = SimplePubNode() # Instance
rate = rospy.Rate(5)
while not rospy.is_shutdown():
spn.pubMsg()
rate.sleep()
if __name__ == "__main__":
run()
1-2. Int32 메세지를 발행한다면?
#! /usr/bin/env python
import rospy
from std_msgs.msg import Int32
class SortingPubNode:
def __init__(self):
self.sorting_pub = rospy.Publisher(
"/msg",
Int32,
queue_size = 5)
print("write integer number : ")
def pubMsg(self):
n = input()
if n != -1:
self.sorting_pub.publish(n)
return True
else:
self.sorting_pub.publish(n)
return False
def run():
rospy.init_node("sorting_pub")
spn = SortingPubNode() # Instance
rate = rospy.Rate(5)
state = True
while state and not rospy.is_shutdown():
state = spn.pubMsg()
rate.sleep()
if __name__ == "__main__":
run()
print("topic finish")
2. 기본적인 subscriber 코드
#! /usr/bin/env python
import rospy
from std_msgs.msg import String
class SimpleSubNode:
def __init__(self):
self.simple_pub = rospy.Subscriber(
"/msg",
String,
self.callback
)
def callback(self, _msg):
print(_msg.data)
def run():
rospy.init_node("simple_sub")
ssn = SimpleSubNode()
rospy.spin()
if __name__ == "__main__":
run()
2-1 Int32 메세지를 구독한다면?
#! /usr/bin/env python
import rospy
from std_msgs.msg import Int32
class SortingSubNode:
def __init__(self):
self.sorting_sub = rospy.Subscriber(
"/msg",
Int32,
self.callback
)
self.lst = []
def callback(self, _msg):
if _msg.data == -1:
self.lst.sort()
print("sorted list: ",self.lst)
self.lst = []
else:
self.lst.append(_msg.data)
print(self.lst)
def run():
rospy.init_node("sorting_sub")
ssn = SortingSubNode()
rospy.spin()
if __name__ == "__main__":
run()
1. 발행 형태
예1) scout_sim_launch > scripts > publisher.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import rospy
from geometry_msgs.msg import Twist
from geometry_msgs.msg import PolygonStamped
from geometry_msgs.msg import Point32
from visualization_msgs.msg import Marker
from visualization_msgs.msg import MarkerArray
NODE_NAME = "publisher_tester"
TOPIC_NAME = "cmd_vel1"
#MSG_TYPE = Twist
#MSG_TYPE = PolygonStamped
MSG_TYPE = Marker
#MSG_TYPE = MarkerArray
PUBLISH_HZ = 10.0
class Topic_publisher():
def __init__(self):
rospy.init_node(NODE_NAME)
self.publisher1 = rospy.Publisher(TOPIC_NAME, MSG_TYPE, queue_size=1)
rospy.Timer(rospy.Duration(1.0/PUBLISH_HZ), self.timer_CB3)
rospy.spin()
def timer_CB3(self, event):
# visualization)msgs/Marker
publishing_data_marker = Marker()
publishing_data_marker.header.stamp = rospy.Time.now()
publishing_data_marker.header.frame_id = "object"
publishing_data_marker.type = 3
publishing_data_marker.action = 0
publishing_data_marker.pose.position.x = 1.0
publishing_data_marker.pose.position.y = 1.0
publishing_data_marker.pose.position.z = 3.0
publishing_data_marker.pose.orientation.x = 1.0
publishing_data_marker.pose.orientation.y = 1.0
publishing_data_marker.pose.orientation.z = 0.0
#publishing_data_marker.scale = Vector3(1.0, 1.0, 0.0)
publishing_data_marker.scale.x = 1.0
publishing_data_marker.scale.y = 1.0
publishing_data_marker.scale.z = 1.0
publishing_data_marker.color.r = 0
publishing_data_marker.color.g = 141
publishing_data_marker.color.b = 98
publishing_data_marker.color.a = 0.8
self.publisher1.publish(publishing_data_marker)
def timer_CB2(self, event):
publishing_data_Poly = PolygonStamped()
#publising_data의 내부를 채우는 과정
#publishing_data_Poly.header.seq = 0
publishing_data_Poly.header.stamp = rospy.Time.now()
# publishing_data_Poly.header.stamp = rospy.Time.from_sec(0)
publishing_data_Poly.header.frame_id = "triangle"
# publishing_data_Poly.polygon.points = Point32() (X)
publishing_data_Poly.polygon.points = [Point32(x=0.0, y=0.0, z=0.0),Point32(x=3.0, y=3.0, z=0.0), Point32(x=6.0, y=0.0, z=0.0)]
self.publisher1.publish(publishing_data_Poly)
def timer_CB(self, event):
publishing_data = Twist()
#publising_data의 내부를 채우는 과정
publishing_data.linear = Vector3(1.0, 2.0, 3.0)
# publishing_data.linear.x = 1.0 식으로 넣어줘도 된다.
publishing_data.angular = Vector3(0, 0, 1.0)
self.publisher1.publish(publish_data)
"""
geometry_msgs/Vector3 linear
float64 x
float64 y
float64 z
geometry_msgs/Vector3 angular
float64 x
float64 y
float64 z
"""
#publishing_data.linear.x
#print(dir(Twist))
#print(dir(publishing_data))
#print(publishing_data.deserialize)
#print(publishing_data.linear)
if __name__=="__main__":
Topic_publisher()
예2) scout_sim_launch > scripts > twist_pub.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import rospy
from geometry_msgs.msg import Twist
NODE_NAME = "turtle_mover"
TOPIC_NAME = "cmd_vel"
MSG_TYPE = Twist
PUBLISH_HZ = 10.0
class topic_publisher():
def __init__(self):
rospy.init_node(NODE_NAME)
self.publisher1 = rospy.Publisher(TOPIC_NAME, MSG_TYPE, queue_size=1)
rospy.Timer(rospy.Duration(1.0 / PUBLISH_HZ), self.timer_CB)
rospy.spin()
def timer_CB(self, event):
publishing_data = Twist()
publishing_data.linear.x = rospy.get_param("~linear_speed", 1.0) # nodename/linear_speed
publishing_data.linear.y = 0.0
publishing_data.linear.z = 0.0
publishing_data.angular.x = 0
publishing_data.angular.y = 0
publishing_data.angular.z = rospy.get_param("~angular_speed", 1.0) # angular_speed
self.publisher1.publish(publishing_data)
if __name__ == '__main__':
topic_publisher()
구독 형태
예1) scout_sim_launch > scripts > imu_sub.py
#!/usr/bin/env python
## imu subscriber
## print angular z
import rospy
from sensor_msgs.msg import Imu
from scout_msgs.msg import ScoutStatus
## scout_msgs/ScoutStatus
from nav_msgs.msg import Odometry
## nav_msgs/Odometry --> Publish
import math
from tf.transformations import quaternion_from_euler
class imu_sub():
def __init__(self):
rospy.init_node("imu_sub")
rospy.Subscriber("imu", Imu, self.Imu_callback)
rospy.Subscriber("scout_status", ScoutStatus, self.Status_CB)
self.odom_pub = rospy.Publisher("scout_odom", Odometry, queue_size=1)
self.yaw = 0
self.x = 0
self.y = 0
self.old_v = 0
self.old_phi_dot = 0
self.old_time = 0
self.status_old_time = 0
self.old_x_dot = 0
self.old_y_dot = 0
rospy.Timer(rospy.Duration(1.0 / 30), self.timer_CB) ## 30hz
rospy.spin()
def timer_CB(self, event):
publish_data = Odometry()
#rospy.loginfo("(x_dot, y_dot, yaw_dot) = ({:.4f}, {:.4f}, {:.4f})".format(self.old_x_dot, self.old_y_dot, self.old_phi_dot))
#rospy.loginfo("(x, y, yaw) = ({:.4f}, {:.4f}, {:.4f})".format(self.x, self.y, self.yaw))
publish_data.header.stamp = rospy.Time.now()
publish_data.header.frame_id = "odom"
publish_data.child_frame_id = 'base_link'
publish_data.pose.pose.position.x = self.x
publish_data.pose.pose.position.y = self.y
publish_data.pose.pose.position.z = 0
(qx, qy, qz, qw) = quaternion_from_euler(0, 0, self.yaw)
publish_data.pose.pose.orientation.x = qx
publish_data.pose.pose.orientation.y = qy
publish_data.pose.pose.orientation.z = qz
publish_data.pose.pose.orientation.w = qw
publish_data.twist.twist.linear.x = self.old_v
publish_data.twist.twist.linear.y = 0
publish_data.twist.twist.linear.z = 0
publish_data.twist.twist.angular.x = 0
publish_data.twist.twist.angular.y = 0
publish_data.twist.twist.angular.z = self.old_phi_dot
"""
pose
pose
position
x --> self.x
y --> self.y
z --> 0
orientation
x --> qx
y --> qy
z --> qw
w --> tf code: (qx, qy, qz, qw)= quaternion_from_euler(roll,pitch,yaw)
(0, 0, self.yaw)
covaricance
twist
linear
x --> self.old_v
y --> 0
z --> 0
angular
x --> 0
y --> 0
z --> self.old_phi_dot
"""
self.odom_pub.publish(publish_data)
def Status_CB(self, data):
self.v = data.linear_velocity
self.status_current_time = rospy.Time.now().to_sec()
if abs(self.v) < 0.001:
self.v = 0
self.x_dot = self.v * math.cos(self.yaw)
self.y_dot = self.v * math.sin(self.yaw)
# rospy.loginfo("{}, {}".format(self.v, self.status_current_time))
if self.status_old_time != 0:
time_diff = self.status_current_time - self.status_old_time
self.x += self.old_x_dot * time_diff
self.y += self.old_y_dot * time_diff
self.status_old_time = self.status_current_time
self.old_x_dot = self.x_dot
self.old_y_dot = self.y_dot
def Imu_callback(self, data):
phi_dot = data.angular_velocity.z
self.current_time = data.header.stamp.to_sec()
if abs(phi_dot) < 0.001:
phi_dot = 0
if self.old_time != 0:
time_diff = self.current_time - self.old_time
self.yaw += self.old_phi_dot * time_diff
# rospy.loginfo("angular_z = {}".format(phi_dot))
# rospy.loginfo("yaw = {}".format(self.yaw))
# rospy.loginfo("yaw_deg = {}".format(self.yaw * 180 / math.pi))
self.old_time = self.current_time
self.old_phi_dot = phi_dot
if __name__ == '__main__':
imu_sub()
'Robot > ROS' 카테고리의 다른 글
킴 [이미지 처리] (0) | 2022.03.10 |
---|---|
킴 [OpenCV] (0) | 2022.03.10 |
킴 테스트3 토픽 주고 받기 (0) | 2022.03.08 |
킴 테스트3 토픽 주고 받기 (0) | 2022.03.08 |
킴 테스트2 IMU 센서 (0) | 2022.03.08 |