A hands-on guide to using the myController S570 to record training data for LeRobot’s learning framework.
LeRobot
LeRobot is an open-source, modular robotics platform built on ROS, designed for fast prototyping and real-world robot control. It supports easy integration with various hardware and AI models.
Project link: huggingface/lerobot
Make sure to use a GPU device with CUDA cores; otherwise, the environment cannot be installed successfully.
myController S570
The myController S570 is highly compatible , easily adapting to the widely used collaborative robots in industrial applications. Its lightweight exoskeleton ensures easy portability, while supporting up to 14 degrees of freedom(DOF ). Equipped with 2 joysticks and 2 buttons , it is well-suited for remote operation , research on unmanned tasks , and data acquisition , making it an ideal tool for industrial automation and workstation setup .
Before installation, you need to prepare:
● Python 3.8+
● Miniconda for environment management
● Git and pip
1.1 Install Git Tool
sudo apt update sudo apt install git wget -y
1.2 Install Miniconda
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.shbash Miniconda3-latest-Linux-x86_64.sh
When prompted, select ‘yes’ to initialize Conda.
Close and reopen your terminal or run:
source ~/.bashrc
1.3 Create a Conda Environment
conda create -n lerobot python=3.8 -yconda activate lerobot
1.4 Clone the LeRobot Repository
git clone git@github.com:lerobot/lerobot.gitcd lerobotpip install .
1.5 Install Python Dependencies
pip install empy==3.4pip install pymycobot pip install conda install -y -c conda-forge ffmpegpip uninstall -y opencv-pythonconda install -y -c conda-forge "opencv>=4.10.0"
2. Install ROS Package of myController S570
2.1 Build ROS Workspace
cd ~ mkdir myController_ws && mkdir myController_ws/src && cd myController_ws/src (You need to download the correct branch)git clone -b mycontroller_s570 https://github.com/elephantrobotics/mycobot_ros.gitcd ..catkin_make
3 Put the Data Collection Script to Flolder
Create a new file, copy the code into it, and save it as “data_record.py”.
#!/usr/bin/env python3"""LeRobot Data Collection ScriptCollects robot joint states and camera data, and saves them in LeRobot-compatible format.""" import rospyimport cv2import numpy as npimport jsonimport osimport datetimefrom sensor_msgs.msg import JointState, Imagefrom cv_bridge import CvBridgeimport threadingimport timefrom collections import dequeimport argparse class LeRobotDataCollector: def __init__(self, output_dir="lerobot_dataset", buffer_size=1000): """ Initialize data collector Args: output_dir: Directory to save data buffer_size: Size of the data buffer """ rospy.init_node('lerobot_data_collector', anonymous=True) self.output_dir = output_dir self.bridge = CvBridge() # Create output directories self.setup_directories() # Data buffers self.joint_buffer = deque(maxlen=buffer_size) self.image_buffer = deque(maxlen=buffer_size) # Control flags self.collecting = False self.episode_count = 0 self.frame_count = 0 # Data lock self.data_lock = threading.Lock() # ROS subscribers self.joint_sub = rospy.Subscriber('/joint_states', JointState, self.joint_callback) self.image_sub = rospy.Subscriber('/camera/image_raw', Image, self.image_callback) # Camera self.camera_available = False self.try_camera_sources() # Current data self.current_joint_data = None self.current_image = None self.last_joint_time = None self.last_image_time = None rospy.loginfo("LeRobot Data Collector initialized") def setup_directories(self): """Create required directory structure""" timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") self.session_dir = os.path.join(self.output_dir, f"session_{timestamp}") os.makedirs(self.session_dir, exist_ok=True) os.makedirs(os.path.join(self.session_dir, "images"), exist_ok=True) os.makedirs(os.path.join(self.session_dir, "episodes"), exist_ok=True) rospy.loginfo(f"Data will be saved in: {self.session_dir}") def try_camera_sources(self): """Try available USB camera sources""" for i in range(3): try: cap = cv2.VideoCapture(i) if cap.isOpened(): ret, frame = cap.read() if ret: self.camera_source = i self.camera_available = True cap.release() rospy.loginfo(f"Found USB camera: /dev/video{i}") return cap.release() except: continue rospy.logwarn("No usable USB camera found. Will use ROS image topic only.") def joint_callback(self, msg): """Joint state callback""" with self.data_lock: if len(msg.position) >= 14: joint_data = { 'timestamp': msg.header.stamp.to_sec(), 'joint_names': list(msg.name[:14]), 'joint_positions': list(msg.position[:14]), 'joint_velocities': list(msg.velocity[:14]) if len(msg.velocity) >= 14 else [0.0] * 14, 'joint_efforts': list(msg.effort[:14]) if len(msg.effort) >= 14 else [0.0] * 14 } self.current_joint_data = joint_data self.last_joint_time = rospy.Time.now() if self.collecting: self.joint_buffer.append(joint_data) def image_callback(self, msg): """ROS image callback""" try: cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8") with self.data_lock: self.current_image = cv_image self.last_image_time = rospy.Time.now() if self.collecting: image_data = { 'timestamp': msg.header.stamp.to_sec(), 'image': cv_image, 'frame_id': self.frame_count } self.image_buffer.append(image_data) except Exception as e: rospy.logerr(f"Image conversion error: {e}") def capture_usb_camera(self): """Capture from USB camera""" if not self.camera_available: return None try: cap = cv2.VideoCapture(self.camera_source) ret, frame = cap.read() cap.release() if ret: return frame except: pass return None def sync_and_save_data(self): """Synchronize and save collected data""" if not self.joint_buffer or not self.image_buffer: rospy.logwarn("Buffers are empty. Cannot save episode.") return episode_data = { 'episode_id': self.episode_count, 'timestamp': datetime.datetime.now().isoformat(), 'total_frames': len(self.joint_buffer), 'data': [] } joint_list = list(self.joint_buffer) image_list = list(self.image_buffer) min_length = min(len(joint_list), len(image_list)) for i in range(min_length): joint_data = joint_list[i] image_data = image_list[i] image_filename = f"episode_{self.episode_count:04d}_frame_{i:06d}.jpg" image_path = os.path.join(self.session_dir, "images", image_filename) cv2.imwrite(image_path, image_data['image']) data_record = { 'frame_id': i, 'timestamp': joint_data['timestamp'], 'joint_data': { 'names': joint_data['joint_names'], 'positions': joint_data['joint_positions'], 'velocities': joint_data['joint_velocities'], 'efforts': joint_data['joint_efforts'] }, 'image_path': image_filename, 'image_timestamp': image_data['timestamp'] } episode_data['data'].append(data_record) episode_filename = f"episode_{self.episode_count:04d}.json" episode_path = os.path.join(self.session_dir, "episodes", episode_filename) with open(episode_path, 'w') as f: json.dump(episode_data, f, indent=2) rospy.loginfo(f"Episode {self.episode_count} saved: {min_length} frames") self.joint_buffer.clear() self.image_buffer.clear() self.episode_count += 1 def start_collection(self): """Start data collection""" with self.data_lock: self.collecting = True self.frame_count = 0 rospy.loginfo("Data collection started.") def stop_collection(self): """Stop data collection and save""" with self.data_lock: if self.collecting: self.collecting = False rospy.loginfo("Data collection stopped.") self.sync_and_save_data() def display_status(self): """Display current status""" joint_status = "✓" if self.current_joint_data else "✗" image_status = "✓" if self.current_image is not None else "✗" joint_time_diff = (rospy.Time.now() - self.last_joint_time).to_sec() if self.last_joint_time else float('inf') image_time_diff = (rospy.Time.now() - self.last_image_time).to_sec() if self.last_image_time else float('inf') status = f"""--- System Status ---Joint Data Received: {joint_status} | Age: {joint_time_diff:.2f}sImage Data Received: {image_status} | Age: {image_time_diff:.2f}sEpisodes Collected: {self.episode_count}Collecting Now: {self.collecting}---------------------""" print(status) if self.current_joint_data: print("Current Joint Positions:") for i, (name, pos) in enumerate(zip(self.current_joint_data['joint_names'], self.current_joint_data['joint_positions'])): print(f" {name}: {pos:.4f}") def run_interactive(self): """Run in interactive terminal mode""" print("""=== LeRobot Data Collector ===Press 's' to start episodePress 'e' to end and save episodePress 'status' to show current statePress 'q' to quitPress 'h' for help """) rate = rospy.Rate(10) # 10Hz while not rospy.is_shutdown(): try: import select import sys if select.select([sys.stdin], [], [], 0.1)[0]: command = sys.stdin.readline().strip().lower() if command == 's': self.start_collection() elif command == 'e': self.stop_collection() elif command == 'q': if self.collecting: self.stop_collection() break elif command == 'status': self.display_status() elif command == 'h': print("""Commands: 's' - start episode 'e' - stop episode and save 'q' - quit 'status' - show current status 'h' - help """) if self.current_image is not None: display_img = cv2.resize(self.current_image, (640, 480)) status_text = f"Episode: {self.episode_count} | Collecting: {self.collecting}" cv2.putText(display_img, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) cv2.imshow('LeRobot Data Collector', display_img) cv2.waitKey(1) rate.sleep() except KeyboardInterrupt: break cv2.destroyAllWindows() if self.collecting: self.stop_collection() self.save_session_summary() def save_session_summary(self): """Save session metadata summary""" summary = { 'session_info': { 'total_episodes': self.episode_count, 'session_directory': self.session_dir, 'collection_date': datetime.datetime.now().isoformat() }, 'data_format': { 'joint_names': self.current_joint_data['joint_names'] if self.current_joint_data else [], 'description': 'LeRobot compatible dataset format' } } summary_path = os.path.join(self.session_dir, "session_summary.json") with open(summary_path, 'w') as f: json.dump(summary, f, indent=2) rospy.loginfo(f"Session summary saved: {summary_path}") def main(): parser = argparse.ArgumentParser(description='LeRobot data collection') parser.add_argument('--output', '-o', default='lerobot_dataset', help='Output directory (default: lerobot_dataset)') parser.add_argument('--buffer-size', '-b', type=int, default=1000, help='Buffer size (default: 1000)') args = parser.parse_args() try: collector = LeRobotDataCollector( output_dir=args.output, buffer_size=args.buffer_size ) rospy.loginfo("Initializing...") rospy.sleep(2) collector.run_interactive() except rospy.ROSInterruptException: rospy.loginfo("ROS interrupted.") except Exception as e: rospy.logerr(f"Error: {e}") if __name__ == '__main__': main()
4. Run Data Acquisition
Run the RViz process for myController within the ROS environment, and then execute the data collection script. The script will automatically subscribe to the robot arm joint states and external camera data from the ROS topics, process the data accordingly, and save it to files.
Summary
This article provided a brief overview of how to use myController S570 to generate training data for LeRobot. In the next phase, we will integrate a real robotic arm myCobot 280 for data acquisition and model training, aiming to achieve robotic arm control via the LeRobot framework. We also look forward to more researchers exploring the potential of exoskeletons in advanced scientific applications.