본문 바로가기
Robot/ROS

킴 테스트3 토픽 주고 받기

by 9루트 2022. 3. 8.

아주아주 기본형태

1. 기본적인 publisher 코드

#! /usr/bin/env python

import rospy
from std_msgs.msg import String

class SimplePubNode:
	def __init__(self):
		self.simple_pub = rospy.Publisher(
			"/msg",
			String,
			queue_size = 5)
		print("init node")


	def pubMsg(self):
		self.simple_pub.publish("Hello world!")

def run():
	rospy.init_node("simple_pub")
	spn = SimplePubNode() # Instance
	rate = rospy.Rate(5)

	while not rospy.is_shutdown():
		spn.pubMsg()
		rate.sleep()

if __name__ == "__main__":
	run()

1-2. Int32 메세지를 발행한다면?

#! /usr/bin/env python

import rospy
from std_msgs.msg import Int32

class SortingPubNode:
	def __init__(self):
		self.sorting_pub = rospy.Publisher(
			"/msg",
			Int32,
			queue_size = 5)
		print("write integer number : ")


	def pubMsg(self):	
		n = input()
		if n != -1:
			self.sorting_pub.publish(n)
			return True
		else:
			self.sorting_pub.publish(n)
			return False


def run():
	rospy.init_node("sorting_pub")
	spn = SortingPubNode() # Instance
	rate = rospy.Rate(5)

	state = True

	while state and not rospy.is_shutdown():
		state = spn.pubMsg()
		rate.sleep()

if __name__ == "__main__":
	run()
	print("topic finish")

2. 기본적인 subscriber 코드

#! /usr/bin/env python

import rospy
from std_msgs.msg import String

class SimpleSubNode:
	def __init__(self):
		self.simple_pub = rospy.Subscriber(
			"/msg",
			String,
			self.callback
			)
	def callback(self, _msg):
		print(_msg.data)

def run():
	rospy.init_node("simple_sub")
	ssn = SimpleSubNode()
	rospy.spin()

if __name__ == "__main__":
	run()

2-1 Int32 메세지를 구독한다면?

#! /usr/bin/env python

import rospy
from std_msgs.msg import Int32

class SortingSubNode:
	def __init__(self):
		self.sorting_sub = rospy.Subscriber(
			"/msg",
			Int32,
			self.callback
			)
		self.lst = []
	def callback(self, _msg):
		if _msg.data == -1:
			self.lst.sort()
			print("sorted list: ",self.lst)
			self.lst = []
		else:
			self.lst.append(_msg.data)
		print(self.lst)


def run():
	rospy.init_node("sorting_sub")
	ssn = SortingSubNode()
	rospy.spin()

if __name__ == "__main__":
	run()

 


 

1. 발행 형태

예1) scout_sim_launch > scripts > publisher.py

#!/usr/bin/env python
#-*- coding:utf-8 -*-
import rospy
from geometry_msgs.msg import Twist
from geometry_msgs.msg import PolygonStamped
from geometry_msgs.msg import Point32
from visualization_msgs.msg import Marker
from visualization_msgs.msg import MarkerArray


NODE_NAME = "publisher_tester"
TOPIC_NAME = "cmd_vel1"
#MSG_TYPE = Twist
#MSG_TYPE = PolygonStamped
MSG_TYPE = Marker
#MSG_TYPE = MarkerArray
PUBLISH_HZ = 10.0

class Topic_publisher():
	def __init__(self):
		rospy.init_node(NODE_NAME)
		self.publisher1 = rospy.Publisher(TOPIC_NAME, MSG_TYPE, queue_size=1)
		rospy.Timer(rospy.Duration(1.0/PUBLISH_HZ), self.timer_CB3)
		rospy.spin()


	def timer_CB3(self, event):
		# visualization)msgs/Marker
		publishing_data_marker = Marker()
		publishing_data_marker.header.stamp = rospy.Time.now()
		publishing_data_marker.header.frame_id = "object"
		publishing_data_marker.type = 3
		publishing_data_marker.action = 0
		publishing_data_marker.pose.position.x = 1.0
		publishing_data_marker.pose.position.y = 1.0
		publishing_data_marker.pose.position.z = 3.0
		publishing_data_marker.pose.orientation.x = 1.0
		publishing_data_marker.pose.orientation.y = 1.0
		publishing_data_marker.pose.orientation.z = 0.0
		#publishing_data_marker.scale = Vector3(1.0, 1.0, 0.0)
		publishing_data_marker.scale.x = 1.0
		publishing_data_marker.scale.y = 1.0
		publishing_data_marker.scale.z = 1.0
		publishing_data_marker.color.r = 0
		publishing_data_marker.color.g = 141
		publishing_data_marker.color.b = 98
		publishing_data_marker.color.a = 0.8
		self.publisher1.publish(publishing_data_marker)

	def timer_CB2(self, event):
		publishing_data_Poly = PolygonStamped()
		#publising_data의 내부를 채우는 과정
		#publishing_data_Poly.header.seq = 0
		publishing_data_Poly.header.stamp = rospy.Time.now()
		# publishing_data_Poly.header.stamp = rospy.Time.from_sec(0)
		publishing_data_Poly.header.frame_id = "triangle"
		# publishing_data_Poly.polygon.points = Point32() (X)
		publishing_data_Poly.polygon.points = [Point32(x=0.0, y=0.0, z=0.0),Point32(x=3.0, y=3.0, z=0.0), Point32(x=6.0, y=0.0, z=0.0)]
		self.publisher1.publish(publishing_data_Poly)

	def timer_CB(self, event):
		publishing_data = Twist()
		#publising_data의 내부를 채우는 과정
		publishing_data.linear = Vector3(1.0, 2.0, 3.0)
		# publishing_data.linear.x = 1.0 식으로 넣어줘도 된다.
		publishing_data.angular = Vector3(0, 0, 1.0)
		self.publisher1.publish(publish_data)
		"""
		geometry_msgs/Vector3 linear
			  float64 x
			  float64 y
			  float64 z
			geometry_msgs/Vector3 angular
			  float64 x
			  float64 y
			  float64 z
		"""
		#publishing_data.linear.x
		#print(dir(Twist))
		#print(dir(publishing_data))
		#print(publishing_data.deserialize)
		#print(publishing_data.linear)
		


if __name__=="__main__":
	Topic_publisher()

예2) scout_sim_launch > scripts > twist_pub.py

#!/usr/bin/env python
#-*- coding:utf-8 -*-

import rospy
from geometry_msgs.msg import Twist

NODE_NAME = "turtle_mover"
TOPIC_NAME = "cmd_vel"
MSG_TYPE = Twist
PUBLISH_HZ = 10.0


class topic_publisher():
	def __init__(self):
		rospy.init_node(NODE_NAME)
		self.publisher1 = rospy.Publisher(TOPIC_NAME, MSG_TYPE, queue_size=1)
		rospy.Timer(rospy.Duration(1.0 / PUBLISH_HZ), self.timer_CB)
		rospy.spin()

	def timer_CB(self, event):
		publishing_data = Twist()

		publishing_data.linear.x = rospy.get_param("~linear_speed", 1.0) # nodename/linear_speed
		publishing_data.linear.y = 0.0
		publishing_data.linear.z = 0.0

		publishing_data.angular.x = 0
		publishing_data.angular.y = 0
		publishing_data.angular.z = rospy.get_param("~angular_speed", 1.0) # angular_speed

		self.publisher1.publish(publishing_data)

if __name__ == '__main__':
	topic_publisher()

 


구독 형태

예1) scout_sim_launch > scripts > imu_sub.py

#!/usr/bin/env python

## imu subscriber
## print angular z 

import rospy
from sensor_msgs.msg import Imu
from scout_msgs.msg import ScoutStatus
## scout_msgs/ScoutStatus
from nav_msgs.msg import Odometry
## nav_msgs/Odometry --> Publish
import math
from tf.transformations import quaternion_from_euler

class imu_sub():
	def __init__(self):
		rospy.init_node("imu_sub")
		rospy.Subscriber("imu", Imu, self.Imu_callback)
		rospy.Subscriber("scout_status", ScoutStatus, self.Status_CB)


		self.odom_pub = rospy.Publisher("scout_odom", Odometry, queue_size=1)
		self.yaw = 0
		self.x = 0
		self.y = 0
		self.old_v = 0
		self.old_phi_dot = 0
		self.old_time = 0
		self.status_old_time = 0
		self.old_x_dot = 0
		self.old_y_dot = 0
		rospy.Timer(rospy.Duration(1.0 / 30), self.timer_CB) ## 30hz
		rospy.spin()

	def timer_CB(self, event):
		publish_data = Odometry()
		#rospy.loginfo("(x_dot, y_dot, yaw_dot) = ({:.4f}, {:.4f}, {:.4f})".format(self.old_x_dot, self.old_y_dot, self.old_phi_dot))
		#rospy.loginfo("(x, y, yaw) = ({:.4f}, {:.4f}, {:.4f})".format(self.x, self.y, self.yaw))
		
		publish_data.header.stamp = rospy.Time.now()
		publish_data.header.frame_id = "odom"
		publish_data.child_frame_id = 'base_link'

		publish_data.pose.pose.position.x = self.x
		publish_data.pose.pose.position.y = self.y
		publish_data.pose.pose.position.z = 0

		(qx, qy, qz, qw) = quaternion_from_euler(0, 0, self.yaw)
		publish_data.pose.pose.orientation.x = qx
		publish_data.pose.pose.orientation.y = qy
		publish_data.pose.pose.orientation.z = qz
		publish_data.pose.pose.orientation.w = qw


		publish_data.twist.twist.linear.x = self.old_v
		publish_data.twist.twist.linear.y = 0
		publish_data.twist.twist.linear.z = 0

		publish_data.twist.twist.angular.x = 0
		publish_data.twist.twist.angular.y = 0
		publish_data.twist.twist.angular.z = self.old_phi_dot

		"""
		pose
			pose
				position
					 x --> self.x
					 y --> self.y
					 z --> 0
				orientation
					x --> qx
					y --> qy
					z --> qw
					w --> tf code: (qx, qy, qz, qw)= quaternion_from_euler(roll,pitch,yaw)
					(0, 0, self.yaw)
				covaricance

				twist
					linear
						x --> self.old_v
						y --> 0
						z --> 0
					angular
						x --> 0
						y --> 0
						z --> self.old_phi_dot
		"""

		self.odom_pub.publish(publish_data)

	def Status_CB(self, data):
		self.v = data.linear_velocity

		self.status_current_time = rospy.Time.now().to_sec()
		if abs(self.v) < 0.001:
			self.v = 0

		self.x_dot = self.v * math.cos(self.yaw)
		self.y_dot = self.v * math.sin(self.yaw)
		# rospy.loginfo("{}, {}".format(self.v, self.status_current_time))

		if self.status_old_time != 0:
			time_diff = self.status_current_time - self.status_old_time
			self.x += self.old_x_dot * time_diff
			self.y += self.old_y_dot * time_diff
		self.status_old_time = self.status_current_time
		self.old_x_dot = self.x_dot
		self.old_y_dot = self.y_dot



	def Imu_callback(self, data):
		phi_dot = data.angular_velocity.z
		self.current_time = data.header.stamp.to_sec()
		if abs(phi_dot) < 0.001:
			phi_dot = 0

		if self.old_time != 0:
			time_diff = self.current_time - self.old_time
			self.yaw += self.old_phi_dot * time_diff

		# rospy.loginfo("angular_z = {}".format(phi_dot))
		# rospy.loginfo("yaw = {}".format(self.yaw))
		# rospy.loginfo("yaw_deg = {}".format(self.yaw * 180 / math.pi))
		self.old_time = self.current_time
		self.old_phi_dot = phi_dot

if __name__ == '__main__':
	imu_sub()

'Robot > ROS' 카테고리의 다른 글

킴 [OpenCV]  (0) 2022.03.10
킴 테스트3 토픽 주고 받기  (0) 2022.03.09
킴 테스트3 토픽 주고 받기  (0) 2022.03.08
킴 테스트2 IMU 센서  (0) 2022.03.08
킴 테스트1 PointClouds로 RVIZ화면 구현  (0) 2022.03.08