Acquiring Training Data with the myController Exoskeleton

A hands-on guide to using the myController S570 to record training data for LeRobot’s learning framework.

LeRobot

LeRobot is an open-source, modular robotics platform built on ROS, designed for fast prototyping and real-world robot control. It supports easy integration with various hardware and AI models.

Project link: huggingface/lerobot

Make sure to use a GPU device with CUDA cores; otherwise, the environment cannot be installed successfully.

myController S570

The myController S570 is highly compatible , easily adapting to the widely used collaborative robots in industrial applications. Its lightweight exoskeleton ensures easy portability, while supporting up to 14 degrees of freedom(DOF ). Equipped with 2 joysticks and 2 buttons , it is well-suited for remote operation , research on unmanned tasks , and data acquisition , making it an ideal tool for industrial automation and workstation setup .

Before installation, you need to prepare:

Python 3.8+

● Miniconda for environment management

● Git and pip

1.1 Install Git Tool

sudo apt update sudo apt install git wget -y

1.2 Install Miniconda

wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.shbash Miniconda3-latest-Linux-x86_64.sh

When prompted, select ‘yes’ to initialize Conda.

Close and reopen your terminal or run:

source ~/.bashrc

1.3 Create a Conda Environment

conda create -n lerobot python=3.8 -yconda activate lerobot

1.4 Clone the LeRobot Repository

git clone git@github.com:lerobot/lerobot.gitcd lerobotpip install .

1.5 Install Python Dependencies

pip install empy==3.4pip install pymycobot pip install  conda install -y -c conda-forge ffmpegpip uninstall -y opencv-pythonconda install -y -c conda-forge "opencv>=4.10.0"

2. Install ROS Package of myController S570

2.1 Build ROS Workspace

cd ~ mkdir myController_ws && mkdir myController_ws/src && cd myController_ws/src (You need to download the correct branch)git clone -b mycontroller_s570 https://github.com/elephantrobotics/mycobot_ros.gitcd ..catkin_make

3 Put the Data Collection Script to Flolder

Create a new file, copy the code into it, and save it as “data_record.py”.

#!/usr/bin/env python3"""LeRobot Data Collection ScriptCollects robot joint states and camera data, and saves them in LeRobot-compatible format.""" import rospyimport cv2import numpy as npimport jsonimport osimport datetimefrom sensor_msgs.msg import JointState, Imagefrom cv_bridge import CvBridgeimport threadingimport timefrom collections import dequeimport argparse class LeRobotDataCollector:    def __init__(self, output_dir="lerobot_dataset", buffer_size=1000):        """        Initialize data collector         Args:            output_dir: Directory to save data            buffer_size: Size of the data buffer        """        rospy.init_node('lerobot_data_collector', anonymous=True)                self.output_dir = output_dir        self.bridge = CvBridge()                # Create output directories        self.setup_directories()                # Data buffers        self.joint_buffer = deque(maxlen=buffer_size)        self.image_buffer = deque(maxlen=buffer_size)                # Control flags        self.collecting = False        self.episode_count = 0        self.frame_count = 0                # Data lock        self.data_lock = threading.Lock()                # ROS subscribers        self.joint_sub = rospy.Subscriber('/joint_states', JointState, self.joint_callback)        self.image_sub = rospy.Subscriber('/camera/image_raw', Image, self.image_callback)                # Camera        self.camera_available = False        self.try_camera_sources()                # Current data        self.current_joint_data = None        self.current_image = None        self.last_joint_time = None        self.last_image_time = None                rospy.loginfo("LeRobot Data Collector initialized")            def setup_directories(self):        """Create required directory structure"""        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")        self.session_dir = os.path.join(self.output_dir, f"session_{timestamp}")                os.makedirs(self.session_dir, exist_ok=True)        os.makedirs(os.path.join(self.session_dir, "images"), exist_ok=True)        os.makedirs(os.path.join(self.session_dir, "episodes"), exist_ok=True)                rospy.loginfo(f"Data will be saved in: {self.session_dir}")        def try_camera_sources(self):        """Try available USB camera sources"""        for i in range(3):            try:                cap = cv2.VideoCapture(i)                if cap.isOpened():                    ret, frame = cap.read()                    if ret:                        self.camera_source = i                        self.camera_available = True                        cap.release()                        rospy.loginfo(f"Found USB camera: /dev/video{i}")                        return                cap.release()            except:                continue                rospy.logwarn("No usable USB camera found. Will use ROS image topic only.")        def joint_callback(self, msg):        """Joint state callback"""        with self.data_lock:            if len(msg.position) >= 14:                joint_data = {                    'timestamp': msg.header.stamp.to_sec(),                    'joint_names': list(msg.name[:14]),                    'joint_positions': list(msg.position[:14]),                    'joint_velocities': list(msg.velocity[:14]) if len(msg.velocity) >= 14 else [0.0] * 14,                    'joint_efforts': list(msg.effort[:14]) if len(msg.effort) >= 14 else [0.0] * 14                }                                self.current_joint_data = joint_data                self.last_joint_time = rospy.Time.now()                                if self.collecting:                    self.joint_buffer.append(joint_data)        def image_callback(self, msg):        """ROS image callback"""        try:            cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")            with self.data_lock:                self.current_image = cv_image                self.last_image_time = rospy.Time.now()                                if self.collecting:                    image_data = {                        'timestamp': msg.header.stamp.to_sec(),                        'image': cv_image,                        'frame_id': self.frame_count                    }                    self.image_buffer.append(image_data)                            except Exception as e:            rospy.logerr(f"Image conversion error: {e}")        def capture_usb_camera(self):        """Capture from USB camera"""        if not self.camera_available:            return None                    try:            cap = cv2.VideoCapture(self.camera_source)            ret, frame = cap.read()            cap.release()                        if ret:                return frame        except:            pass        return None        def sync_and_save_data(self):        """Synchronize and save collected data"""        if not self.joint_buffer or not self.image_buffer:            rospy.logwarn("Buffers are empty. Cannot save episode.")            return                    episode_data = {            'episode_id': self.episode_count,            'timestamp': datetime.datetime.now().isoformat(),            'total_frames': len(self.joint_buffer),            'data': []        }                joint_list = list(self.joint_buffer)        image_list = list(self.image_buffer)                min_length = min(len(joint_list), len(image_list))                for i in range(min_length):            joint_data = joint_list[i]            image_data = image_list[i]                        image_filename = f"episode_{self.episode_count:04d}_frame_{i:06d}.jpg"            image_path = os.path.join(self.session_dir, "images", image_filename)            cv2.imwrite(image_path, image_data['image'])                        data_record = {                'frame_id': i,                'timestamp': joint_data['timestamp'],                'joint_data': {                    'names': joint_data['joint_names'],                    'positions': joint_data['joint_positions'],                    'velocities': joint_data['joint_velocities'],                    'efforts': joint_data['joint_efforts']                },                'image_path': image_filename,                'image_timestamp': image_data['timestamp']            }                        episode_data['data'].append(data_record)                episode_filename = f"episode_{self.episode_count:04d}.json"        episode_path = os.path.join(self.session_dir, "episodes", episode_filename)                with open(episode_path, 'w') as f:            json.dump(episode_data, f, indent=2)                rospy.loginfo(f"Episode {self.episode_count} saved: {min_length} frames")                self.joint_buffer.clear()        self.image_buffer.clear()                self.episode_count += 1        def start_collection(self):        """Start data collection"""        with self.data_lock:            self.collecting = True            self.frame_count = 0            rospy.loginfo("Data collection started.")        def stop_collection(self):        """Stop data collection and save"""        with self.data_lock:            if self.collecting:                self.collecting = False                rospy.loginfo("Data collection stopped.")                self.sync_and_save_data()        def display_status(self):        """Display current status"""        joint_status = "✓" if self.current_joint_data else "✗"        image_status = "✓" if self.current_image is not None else "✗"                joint_time_diff = (rospy.Time.now() - self.last_joint_time).to_sec() if self.last_joint_time else float('inf')        image_time_diff = (rospy.Time.now() - self.last_image_time).to_sec() if self.last_image_time else float('inf')                status = f"""--- System Status ---Joint Data Received: {joint_status} | Age: {joint_time_diff:.2f}sImage Data Received: {image_status} | Age: {image_time_diff:.2f}sEpisodes Collected: {self.episode_count}Collecting Now: {self.collecting}---------------------"""        print(status)                if self.current_joint_data:            print("Current Joint Positions:")            for i, (name, pos) in enumerate(zip(self.current_joint_data['joint_names'],                                                 self.current_joint_data['joint_positions'])):                print(f"  {name}: {pos:.4f}")        def run_interactive(self):        """Run in interactive terminal mode"""        print("""=== LeRobot Data Collector ===Press 's' to start episodePress 'e' to end and save episodePress 'status' to show current statePress 'q' to quitPress 'h' for help        """)                rate = rospy.Rate(10)  # 10Hz                while not rospy.is_shutdown():            try:                import select                import sys                                if select.select([sys.stdin], [], [], 0.1)[0]:                    command = sys.stdin.readline().strip().lower()                                        if command == 's':                        self.start_collection()                    elif command == 'e':                        self.stop_collection()                    elif command == 'q':                        if self.collecting:                            self.stop_collection()                        break                    elif command == 'status':                        self.display_status()                    elif command == 'h':                        print("""Commands:  's' - start episode  'e' - stop episode and save  'q' - quit  'status' - show current status  'h' - help                        """)                                if self.current_image is not None:                    display_img = cv2.resize(self.current_image, (640, 480))                    status_text = f"Episode: {self.episode_count} | Collecting: {self.collecting}"                    cv2.putText(display_img, status_text, (10, 30),                                 cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)                    cv2.imshow('LeRobot Data Collector', display_img)                    cv2.waitKey(1)                                rate.sleep()                            except KeyboardInterrupt:                break                cv2.destroyAllWindows()        if self.collecting:            self.stop_collection()                self.save_session_summary()        def save_session_summary(self):        """Save session metadata summary"""        summary = {            'session_info': {                'total_episodes': self.episode_count,                'session_directory': self.session_dir,                'collection_date': datetime.datetime.now().isoformat()            },            'data_format': {                'joint_names': self.current_joint_data['joint_names'] if self.current_joint_data else [],                'description': 'LeRobot compatible dataset format'            }        }                summary_path = os.path.join(self.session_dir, "session_summary.json")        with open(summary_path, 'w') as f:            json.dump(summary, f, indent=2)                rospy.loginfo(f"Session summary saved: {summary_path}") def main():    parser = argparse.ArgumentParser(description='LeRobot data collection')    parser.add_argument('--output', '-o', default='lerobot_dataset',                         help='Output directory (default: lerobot_dataset)')    parser.add_argument('--buffer-size', '-b', type=int, default=1000,                        help='Buffer size (default: 1000)')        args = parser.parse_args()        try:        collector = LeRobotDataCollector(            output_dir=args.output,            buffer_size=args.buffer_size        )                rospy.loginfo("Initializing...")        rospy.sleep(2)                collector.run_interactive()            except rospy.ROSInterruptException:        rospy.loginfo("ROS interrupted.")    except Exception as e:        rospy.logerr(f"Error: {e}") if __name__ == '__main__':    main()

4. Run Data Acquisition

Run the RViz process for myController within the ROS environment, and then execute the data collection script. The script will automatically subscribe to the robot arm joint states and external camera data from the ROS topics, process the data accordingly, and save it to files.

Summary

This article provided a brief overview of how to use myController S570 to generate training data for LeRobot. In the next phase, we will integrate a real robotic arm myCobot 280 for data acquisition and model training, aiming to achieve robotic arm control via the LeRobot framework. We also look forward to more researchers exploring the potential of exoskeletons in advanced scientific applications.